This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c79fd72 Improved Javadocs on Java client API (#3592)
c79fd72 is described below
commit c79fd728cf27417ca117ca220dd07dc4319d4c46
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Feb 13 17:55:59 2019 -0800
Improved Javadocs on Java client API (#3592)
---
.../pulsar/client/api/AuthenticationFactory.java | 12 +-
.../apache/pulsar/client/api/ClientBuilder.java | 161 ++++++++------
.../apache/pulsar/client/api/CompressionType.java | 3 +
.../org/apache/pulsar/client/api/Consumer.java | 17 +-
.../apache/pulsar/client/api/ConsumerBuilder.java | 151 ++++++++++---
.../client/api/ConsumerCryptoFailureAction.java | 23 +-
.../pulsar/client/api/ConsumerEventListener.java | 10 +
.../pulsar/client/api/ConsumerInterceptor.java | 2 -
.../apache/pulsar/client/api/CryptoKeyReader.java | 3 +
.../apache/pulsar/client/api/DeadLetterPolicy.java | 11 +
.../pulsar/client/api/EncryptionKeyInfo.java | 16 +-
.../apache/pulsar/client/api/HashingScheme.java | 2 +-
.../java/org/apache/pulsar/client/api/Message.java | 19 +-
.../org/apache/pulsar/client/api/MessageId.java | 31 ++-
.../apache/pulsar/client/api/MessageRouter.java | 7 +
.../org/apache/pulsar/client/api/Producer.java | 14 +-
.../apache/pulsar/client/api/ProducerBuilder.java | 239 ++++++++++++++-------
.../client/api/ProducerCryptoFailureAction.java | 14 +-
.../org/apache/pulsar/client/api/PulsarClient.java | 155 ++++++++++---
.../java/org/apache/pulsar/client/api/Reader.java | 50 ++++-
.../apache/pulsar/client/api/ReaderBuilder.java | 78 ++++---
.../apache/pulsar/client/api/ReaderListener.java | 6 +-
.../pulsar/client/api/RegexSubscriptionMode.java | 9 +
.../java/org/apache/pulsar/client/api/Schema.java | 39 ++++
.../pulsar/client/api/ServiceUrlProvider.java | 6 +-
.../client/api/SubscriptionInitialPosition.java | 5 +-
.../apache/pulsar/client/api/SubscriptionType.java | 12 +-
.../pulsar/client/api/TypedMessageBuilder.java | 30 ++-
28 files changed, 829 insertions(+), 296 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
index 7613c42..a25afdd 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
@@ -24,6 +24,10 @@ import java.util.function.Supplier;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.internal.DefaultImplementation;
+/**
+ * Factory class that allows to create {@link Authentication} instances
+ * for all the supported authentication methods.
+ */
public final class AuthenticationFactory {
/**
@@ -31,6 +35,7 @@ public final class AuthenticationFactory {
*
* @param token
* the client auth token
+ * @return the Authentication object initialized with the token credentials
*/
public static Authentication token(String token) {
return DefaultImplementation.newAuthenticationToken(token);
@@ -41,6 +46,7 @@ public final class AuthenticationFactory {
*
* @param tokenSupplier
* a supplier of the client auth token
+ * @return the Authentication object initialized with the token credentials
*/
public static Authentication token(Supplier<String> tokenSupplier) {
return DefaultImplementation.newAuthenticationToken(tokenSupplier);
@@ -53,19 +59,21 @@ public final class AuthenticationFactory {
* the path to the TLS client public key
* @param keyFilePath
* the path to the TLS client private key
+ * @return the Authentication object initialized with the TLS credentials
*/
public static Authentication TLS(String certFilePath, String keyFilePath) {
return DefaultImplementation.newAuthenticationTLS(certFilePath,
keyFilePath);
}
/**
- * Create an instance of the Authentication-Plugin
+ * Create an instance of the {@link Authentication} object by using
+ * the plugin class name.
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the
Authentication-Plugin, e.g., "key1:val1,key2:val2"
- * @return instance of the Authentication-Plugin
+ * @return instance of the Authentication object
* @throws UnsupportedAuthenticationException
*/
public static Authentication create(String authPluginClassName, String
authParamsString)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 416fc6c..9d30aba 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -24,13 +24,15 @@ import java.util.concurrent.TimeUnit;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
/**
- * Builder interface that is used to construct a {@link PulsarClient} instance.
+ * Builder interface that is used to configure and construct a {@link
PulsarClient} instance.
*
* @since 2.0.0
*/
public interface ClientBuilder extends Cloneable {
/**
+ * Construct the final {@link PulsarClient} instance
+ *
* @return the new {@link PulsarClient} instance
*/
PulsarClient build() throws PulsarClientException;
@@ -38,20 +40,25 @@ public interface ClientBuilder extends Cloneable {
/**
* Load the configuration from provided <tt>config</tt> map.
*
- * <p>Example:
+ * <p>
+ * Example:
+ *
* <pre>
- * Map<String, Object> config = new HashMap<>();
- * config.put("serviceUrl", "pulsar://localhost:5550");
+ * {@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put("serviceUrl", "pulsar://localhost:6650");
* config.put("numIoThreads", 20);
*
* ClientBuilder builder = ...;
* builder = builder.loadConf(config);
*
* PulsarClient client = builder.build();
+ * }
* </pre>
*
- * @param config configuration to load
- * @return client builder instance
+ * @param config
+ * configuration to load
+ * @return the client builder instance
*/
ClientBuilder loadConf(Map<String, Object> config);
@@ -61,29 +68,47 @@ public interface ClientBuilder extends Cloneable {
* Cloning the builder can be used to share an incomplete configuration
and specialize it multiple times. For
* example:
*
- * <pre>
- * ClientBuilder builder =
PulsarClient.builder().ioThreads(8).listenerThreads(4);
+ * <pre>{@code
+ * ClientBuilder builder = PulsarClient.builder()
+ * .ioThreads(8)
+ * .listenerThreads(4);
*
- * PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
- * PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
- * </pre>
+ * PulsarClient client1 = builder.clone()
+ * .serviceUrl("pulsar://localhost:6650").build();
+ * PulsarClient client2 = builder.clone()
+ * .serviceUrl("pulsar://other-host:6650").build();
+ * }</pre>
+ *
+ * @return a clone of the client builder instance
*/
ClientBuilder clone();
/**
* Configure the service URL for the Pulsar service.
* <p>
- * This parameter is required
+ * This parameter is required.
+ * <p>
+ * Examples:
+ * <ul>
+ * <li>{@code pulsar://my-broker:6650} for regular endpoint</li>
+ * <li>{@code pulsar+ssl://my-broker:6651} for TLS encrypted endpoint</li>
+ * </ul>
*
* @param serviceUrl
- * @return
+ * the URL of the Pulsar service that the client should connect
to
+ * @return the client builder instance
*/
ClientBuilder serviceUrl(String serviceUrl);
/**
* Configure the service URL provider for Pulsar service
+ * <p>
+ * Instead of specifying a static service URL string (with {@link
#serviceUrl(String)}), an application can pass a
+ * {@link ServiceUrlProvider} instance that dynamically provide a service
URL.
+ *
* @param serviceUrlProvider
- * @return
+ * the provider instance
+ * @return the client builder instance
*/
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
@@ -91,47 +116,38 @@ public interface ClientBuilder extends Cloneable {
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
- * <p>
- *
- * <pre>
- * <code>
- * String AUTH_CLASS =
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
- *
- * Map<String, String> conf = new TreeMap<>();
- * conf.put("tlsCertFile", "/my/cert/file");
- * conf.put("tlsKeyFile", "/my/key/file");
- *
- * Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
- *
+ * <pre>{@code
* PulsarClient client = PulsarClient.builder()
- * .serviceUrl(SERVICE_URL)
- * .authentication(auth)
- * .build();
- * ....
- * </code>
- * </pre>
+ * .serviceUrl("pulsar+ssl://broker.example.com:6651/")
+ * .authentication(
+ * AuthenticationFactory.TLS("/my/cert/file", "/my/key/file")
+ * .build();
+ * }</pre>
+ *
+ * For token based authentication, this will look like:
+ * <pre>{@code
+ *
AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY")
+ * }</pre>
*
* @param authentication
* an instance of the {@link Authentication} provider already
constructed
+ * @return the client builder instance
*/
ClientBuilder authentication(Authentication authentication);
/**
- * Set the authentication provider to use in the Pulsar client instance.
+ * Configure the authentication provider to use in the Pulsar client
instance.
* <p>
* Example:
- * <p>
*
* <pre>
* <code>
- * String AUTH_CLASS =
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
- * String AUTH_PARAMS =
"tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
- *
* PulsarClient client = PulsarClient.builder()
- * .serviceUrl(SERVICE_URL)
- * .authentication(AUTH_CLASS, AUTH_PARAMS)
+ * .serviceUrl("pulsar+ssl://broker.example.com:6651/)
+ * .authentication(
+ * "org.apache.pulsar.client.impl.auth.AuthenticationTls",
+ * "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file")
* .build();
- * ....
* </code>
* </pre>
*
@@ -139,6 +155,7 @@ public interface ClientBuilder extends Cloneable {
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the
Authentication-Plugin, e.g., "key1:val1,key2:val2"
+ * @return the client builder instance
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
@@ -146,30 +163,28 @@ public interface ClientBuilder extends Cloneable {
throws UnsupportedAuthenticationException;
/**
- * Set the authentication provider to use in the Pulsar client instance.
+ * Configure the authentication provider to use in the Pulsar client
instance
+ * using a config map.
* <p>
* Example:
- * <p>
- *
- * <pre>
- * <code>
- * String AUTH_CLASS =
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
*
+ * <pre>{@code
* Map<String, String> conf = new TreeMap<>();
* conf.put("tlsCertFile", "/my/cert/file");
* conf.put("tlsKeyFile", "/my/key/file");
*
* PulsarClient client = PulsarClient.builder()
- * .serviceUrl(SERVICE_URL)
- * .authentication(AUTH_CLASS, conf)
+ * .serviceUrl("pulsar+ssl://broker.example.com:6651/)
+ * .authentication(
+ * "org.apache.pulsar.client.impl.auth.AuthenticationTls",
conf)
* .build();
- * ....
- * </code>
+ * }</pre>
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
+ * @return the client builder instance
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
@@ -186,20 +201,27 @@ public interface ClientBuilder extends Cloneable {
* operation timeout
* @param unit
* time unit for {@code operationTimeout}
+ * @return the client builder instance
*/
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
/**
* Set the number of threads to be used for handling connections to
brokers <i>(default: 1 thread)</i>
*
- * @param numIoThreads
+ * @param numIoThreads the number of IO threads
+ * @return the client builder instance
*/
ClientBuilder ioThreads(int numIoThreads);
/**
- * Set the number of threads to be used for message listeners <i>(default:
1 thread)</i>
+ * Set the number of threads to be used for message listeners <i>(default:
1 thread)</i>.
+ * <p>
+ * The listener thread pool is shared across all the consumers and readers
that are
+ * using a "listener" model to get messages. For a given consumer, the
listener will be
+ * always invoked from the same thread, to ensure ordering.
*
- * @param numListenerThreads
+ * @param numListenerThreads the number of listener threads
+ * @return the client builder instance
*/
ClientBuilder listenerThreads(int numListenerThreads);
@@ -208,10 +230,10 @@ public interface ClientBuilder extends Cloneable {
* <p>
* By default, the connection pool will use a single connection for all
the producers and consumers. Increasing this
* parameter may improve throughput when using many producers over a high
latency connection.
- * <p>
*
* @param connectionsPerBroker
* max number of connections per broker (needs to be greater
than 0)
+ * @return the client builder instance
*/
ClientBuilder connectionsPerBroker(int connectionsPerBroker);
@@ -222,9 +244,10 @@ public interface ClientBuilder extends Cloneable {
* low latency publishes. On the other hand, sending out a huge number of
small packets might limit the overall
* throughput, so if latency is not a concern, it's advisable to set the
<code>useTcpNoDelay</code> flag to false.
* <p>
- * Default value is true
+ * Default value is true.
*
- * @param enableTcpNoDelay
+ * @param enableTcpNoDelay whether to enable TCP no-delay feature
+ * @return the client builder instance
*/
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
@@ -234,6 +257,7 @@ public interface ClientBuilder extends Cloneable {
*
* @param enableTls
* @deprecated use "pulsar+ssl://" in serviceUrl to enable
+ * @return the client builder instance
*/
@Deprecated
ClientBuilder enableTls(boolean enableTls);
@@ -242,13 +266,15 @@ public interface ClientBuilder extends Cloneable {
* Set the path to the trusted TLS certificate file
*
* @param tlsTrustCertsFilePath
+ * @return the client builder instance
*/
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
/**
* Configure whether the Pulsar client accept untrusted TLS certificate
from broker <i>(default: false)</i>
*
- * @param allowTlsInsecureConnection
+ * @param allowTlsInsecureConnection whether to accept a untrusted TLS
certificate
+ * @return the client builder instance
*/
ClientBuilder allowTlsInsecureConnection(boolean
allowTlsInsecureConnection);
@@ -257,9 +283,10 @@ public interface ClientBuilder extends Cloneable {
* certificate and matches provided hostname(CN/SAN) with expected
broker's host name. It follows RFC 2818, 3.1.
* Server Identity hostname verification.
*
- * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
+ * @see <a href="https://tools.ietf.org/html/rfc2818">RFC 818</a>
*
- * @param enableTlsHostnameVerification
+ * @param enableTlsHostnameVerification whether to enable TLS hostname
verification
+ * @return the client builder instance
*/
ClientBuilder enableTlsHostnameVerification(boolean
enableTlsHostnameVerification);
@@ -271,6 +298,7 @@ public interface ClientBuilder extends Cloneable {
* the interval between each stat info
* @param unit
* time unit for {@code statsInterval}
+ * @return the client builder instance
*/
ClientBuilder statsInterval(long statsInterval, TimeUnit unit);
@@ -280,6 +308,7 @@ public interface ClientBuilder extends Cloneable {
* on thousands of topic using created {@link PulsarClient}
*
* @param maxConcurrentLookupRequests
+ * @return the client builder instance
*/
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
@@ -290,6 +319,7 @@ public interface ClientBuilder extends Cloneable {
* maxConcurrentLookupRequests and under maxLookupRequests will wait in
each client cnx.
*
* @param maxLookupRequests
+ * @return the client builder instance
*/
ClientBuilder maxLookupRequests(int maxLookupRequests);
@@ -299,6 +329,7 @@ public interface ClientBuilder extends Cloneable {
* 50)</i>
*
* @param maxNumberOfRejectedRequestPerConnection
+ * @return the client builder instance
*/
ClientBuilder maxNumberOfRejectedRequestPerConnection(int
maxNumberOfRejectedRequestPerConnection);
@@ -307,16 +338,20 @@ public interface ClientBuilder extends Cloneable {
*
* @param keepAliveIntervalSeconds
* @param unit time unit for {@code statsInterval}
+ * @return the client builder instance
*/
ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit
unit);
/**
- * Set the duration of time to wait for a connection to a broker to be
established. If the duration
- * passes without a response from the broker, the connection attempt is
dropped.
+ * Set the duration of time to wait for a connection to a broker to be
established. If the duration passes without a
+ * response from the broker, the connection attempt is dropped.
*
* @since 2.3.0
- * @param duration the duration to wait
- * @param unit the time unit in which the duration is defined
+ * @param duration
+ * the duration to wait
+ * @param unit
+ * the time unit in which the duration is defined
+ * @return the client builder instance
*/
ClientBuilder connectionTimeout(int duration, TimeUnit unit);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
index 8ad0f3d..49eb925 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.client.api;
+/**
+ * The compression type that can be specified on a {@link Producer}.
+ */
public enum CompressionType {
/** No compression */
NONE,
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 12de077..3f5b9ae 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
/**
* An interface that abstracts behavior of Pulsar's consumer.
+ * <p>
+ * All the operations on the consumer instance are thread safe.
*/
public interface Consumer<T> extends Closeable {
@@ -45,15 +47,22 @@ public interface Consumer<T> extends Closeable {
* Unsubscribe the consumer
* <p>
* This call blocks until the consumer is unsubscribed.
+ * <p>
+ * Unsubscribing will the subscription to be deleted and all the
+ * data retained can potentially be deleted as well.
+ * <p>
+ * The operation will fail when performed on a shared subscription
+ * where multiple consumers are currently connected.
*
- * @throws PulsarClientException
+ * @throws PulsarClientException if the operation fails
*/
void unsubscribe() throws PulsarClientException;
/**
* Asynchronously unsubscribe the consumer
*
- * @return {@link CompletableFuture} for this operation
+ * @see Consumer#unsubscribe()
+ * @return {@link CompletableFuture} to track the operation
*/
CompletableFuture<Void> unsubscribeAsync();
@@ -111,10 +120,10 @@ public interface Consumer<T> extends Closeable {
void acknowledge(Message<?> message) throws PulsarClientException;
/**
- * Acknowledge the consumption of a single message, identified by its
MessageId
+ * Acknowledge the consumption of a single message, identified by its
{@link MessageId}.
*
* @param messageId
- * The {@code MessageId} to be acknowledged
+ * The {@link MessageId} to be acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index f940640..504dd50 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -39,15 +39,17 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Cloning the builder can be used to share an incomplete configuration
and specialize it multiple times. For
* example:
*
- * <pre>
- * ConsumerBuilder builder = client.newConsumer() //
- * .subscriptionName("my-subscription-name") //
- * .subscriptionType(SubscriptionType.Shared) //
+ * <pre>{@code
+ * ConsumerBuilder<String> builder = client.newConsumer(Schema.STRING)
+ * .subscriptionName("my-subscription-name")
+ * .subscriptionType(SubscriptionType.Shared)
* .receiverQueueSize(10);
*
- * Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe();
- * Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe();
- * </pre>
+ * Consumer<String> consumer1 =
builder.clone().topic("my-topic-1").subscribe();
+ * Consumer<String> consumer2 =
builder.clone().topic("my-topic-2").subscribe();
+ * }</pre>
+ *
+ * @return a cloned consumer builder object
*/
ConsumerBuilder<T> clone();
@@ -55,19 +57,20 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Load the configuration from provided <tt>config</tt> map.
*
* <p>Example:
- * <pre>
- * Map<String, Object> config = new HashMap<>();
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
* config.put("ackTimeoutMillis", 1000);
* config.put("receiverQueueSize", 2000);
*
- * ConsumerBuilder<byte[]> builder = ...;
- * builder = builder.loadConf(config);
+ * Consumer<byte[]> builder = client.newConsumer()
+ * .loadConf(config)
+ * .subscribe();
*
- * Consumer<byte[]> consumer = builder.subscribe();
- * </pre>
+ * Consumer<byte[]> consumer = builder.subscribe();
+ * }</pre>
*
* @param config configuration to load
- * @return consumer builder instance
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> loadConf(Map<String, Object> config);
@@ -75,10 +78,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Finalize the {@link Consumer} creation by subscribing to the topic.
*
* <p>
- * If the subscription does not exist, a new subscription will be created
and all messages published after the
- * creation will be retained until acknowledged, even if the consumer is
not connected.
+ * If the subscription does not exist, a new subscription will be created.
By default the subscription
+ * will be created at the end of the topic. See {@link
#subscriptionInitialPosition(SubscriptionInitialPosition)}
+ * to configure the initial position behavior.
+ * <p>
+ * Once a subscription is created, it will retain the data and the
subscription cursor even if the consumer is not
+ * connected.
*
- * @return the {@link Consumer} instance
+ * @return the consumer builder instance
* @throws PulsarClientException
* if the the subscribe operation fails
*/
@@ -88,8 +95,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Finalize the {@link Consumer} creation by subscribing to the topic in
asynchronous mode.
*
* <p>
- * If the subscription does not exist, a new subscription will be created
and all messages published after the
- * creation will be retained until acknowledged, even if the consumer is
not connected.
+ * If the subscription does not exist, a new subscription will be created.
By default the subscription
+ * will be created at the end of the topic. See {@link
#subscriptionInitialPosition(SubscriptionInitialPosition)}
+ * to configure the initial position behavior.
+ * <p>
+ * Once a subscription is created, it will retain the data and the
subscription cursor even if the consumer is not
+ * connected.
*
* @return a future that will yield a {@link Consumer} instance
* @throws PulsarClientException
@@ -101,7 +112,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Specify the topics this consumer will subscribe on.
* <p>
*
- * @param topicNames
+ * @param topicNames a set of topic that the consumer will subscribe on
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> topic(String... topicNames);
@@ -109,26 +121,39 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Specify a list of topics that this consumer will subscribe on.
* <p>
*
- * @param topicNames
+ * @param topicNames a list of topic that the consumer will subscribe on
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> topics(List<String> topicNames);
/**
* Specify a pattern for topics that this consumer will subscribe on.
* <p>
+ * The pattern will be applied to subscribe to all the topics, within a
single namespace, that will match the
+ * pattern.
+ * <p>
+ * The consumer will automatically subscribe to topics created after
itself.
*
* @param topicsPattern
+ * a regular expression to select a list of topics to subscribe
to
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
/**
* Specify a pattern for topics that this consumer will subscribe on.
- * It accepts regular expression and will be compiled into a pattern
internally.
- * Eg. "persistent://prop/use/ns-abc/pattern-topic-.*"
* <p>
+ * It accepts regular expression and will be compiled into a pattern
internally. Eg.
+ * "persistent://public/default/pattern-topic-.*"
+ * <p>
+ * The pattern will be applied to subscribe to all the topics, within a
single namespace, that will match the
+ * pattern.
+ * <p>
+ * The consumer will automatically subscribe to topics created after
itself.
*
* @param topicsPattern
* given regular expression for topics pattern
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(String topicsPattern);
@@ -137,28 +162,45 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>
* This argument is required when constructing the consumer.
*
- * @param subscriptionName
+ * @param subscriptionName the name of the subscription that this consumer
should attach to
+ *
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> subscriptionName(String subscriptionName);
/**
* Set the timeout for unacked messages, truncated to the nearest
millisecond. The timeout needs to be greater than
* 10 seconds.
+ * <p>
+ * By default, the acknowledge timeout is disabled and that means that
messages delivered to a
+ * consumer will not be re-delivered unless the consumer crashes.
+ * <p>
+ * When enabling ack timeout, if a message is not acknowledged within the
specified timeout
+ * it will be re-delivered to the consumer (possibly to a different
consumer in case of
+ * a shared subscription).
*
* @param ackTimeout
* for unacked messages.
* @param timeUnit
* unit in which the timeout is provided.
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/**
* Select the subscription type to be used when subscribing to the topic.
* <p>
- * Default is {@link SubscriptionType#Exclusive}
+ * Options are:
+ *
+ * <ul>
+ * <li>{@link SubscriptionType#Exclusive} (Default)</li>
+ * <li>{@link SubscriptionType#Failover}</li>
+ * <li>{@link SubscriptionType#Shared}</li>
+ * </ul>
*
* @param subscriptionType
* the subscription type value
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
@@ -170,14 +212,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
*
* @param messageListener
* the listener object
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
/**
- * Sets a {@link CryptoKeyReader}
+ * Sets a {@link CryptoKeyReader}.
+ * <p>
+ * Configure the key reader to be used to decrypt the message payloads.
*
* @param cryptoKeyReader
* CryptoKeyReader object
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
@@ -185,7 +231,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Sets the ConsumerCryptoFailureAction to the value specified
*
* @param action
- * The consumer action
+ * the action the consumer will take in case of decryption
failures
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
@@ -214,6 +261,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
*
* @param receiverQueueSize
* the new receiver queue size value
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
@@ -222,12 +270,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>
* By default, the consumer will use a 100 ms grouping time to send out
the acknowledgments to the broker.
* <p>
- * Setting a group time of 0, will send out the acknowledgments
immediately.
+ * Setting a group time of 0, will send out the acknowledgments
immediately. A longer ack group time
+ * will be more efficient at the expense of a slight increase in message
re-deliveries after a failure.
*
* @param delay
* the max amount of time an acknowledgemnt can be delayed
* @param unit
* the time unit for the delay
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
@@ -236,15 +286,24 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>
* This setting will be used to reduce the receiver queue size for
individual partitions
* {@link #receiverQueueSize(int)} if the total exceeds this value
(default: 50000).
+ * The purpose of this setting is to have an upper-limit on the number
+ * of messages that a consumer can be pushed at once from a broker, across
all
+ * the partitions.
*
* @param maxTotalReceiverQueueSizeAcrossPartitions
+ * max pending messages across all the partitions
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int
maxTotalReceiverQueueSizeAcrossPartitions);
/**
* Set the consumer name.
+ * <p>
+ * Consumer name is informative and it can be used to indentify a
particular consumer
+ * instance from the topic stats.
*
* @param consumerName
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> consumerName(String consumerName);
@@ -257,6 +316,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
*
* @param consumerEventListener
* the consumer group listener object
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> consumerEventListener(ConsumerEventListener
consumerEventListener);
@@ -265,13 +325,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* of the topic. This means that, if the topic has been compacted, the
consumer will only see the latest value for
* each key in the topic, up until the point in the topic message backlog
that has been compacted. Beyond that
* point, the messages will be sent as normal.
- *
+ * <p>
* readCompacted can only be enabled subscriptions to persistent topics,
which have a single active consumer (i.e.
* failure or exclusive subscriptions). Attempting to enable it on
subscriptions to a non-persistent topics or on a
* shared subscription, will lead to the subscription call throwing a
PulsarClientException.
*
* @param readCompacted
* whether to read from the compacted topic
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> readCompacted(boolean readCompacted);
@@ -282,6 +343,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @param periodInMinutes
* number of minutes between checks for
* new topics matching pattern set with {@link
#topicsPattern(String)}
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes);
@@ -303,28 +365,49 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1,
C4, C5, C4
* </pre>
*
- * @param priorityLevel
+ * @param priorityLevel the priority of this consumer
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> priorityLevel(int priorityLevel);
/**
* Set a name/value property with this consumer.
+ * <p>
+ * Properties are application defined metadata that can be attached to the
consumer. When getting the topic stats,
+ * this metadata will be associated to the consumer stats for easier
identification.
*
* @param key
+ * the property key
+ * @param value
+ * the property value
+ * @param key
* @param value
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> property(String key, String value);
/**
- * Add all the properties in the provided map
+ * Add all the properties in the provided map to the consumer.
+ * <p>
+ * Properties are application defined metadata that can be attached to the
consumer. When getting the topic stats,
+ * this metadata will be associated to the consumer stats for easier
identification.
*
- * @param properties
+ * @param key
+ * the property key
+ * @param value
+ * the property value
+ * @return the consumer builder instance
*/
ConsumerBuilder<T> properties(Map<String, String> properties);
/**
- * Set subscriptionInitialPosition for the consumer
- */
+ * Set the {@link SubscriptionInitialPosition} for the consumer.
+ * <p>
+ *
+ * @param subscriptionInitialPosition
+ * the position where to initialize a newly created subscription
+ * @return the consumer builder instance
+ */
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition
subscriptionInitialPosition);
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index ad59994..e9a2ace 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,20 +19,29 @@
package org.apache.pulsar.client.api;
+/**
+ * The action a consumer should take when a consumer receives a
+ * message that it cannot decrypt.
+ */
public enum ConsumerCryptoFailureAction {
- FAIL, // This is the default option to fail consume until crypto succeeds
- DISCARD, // Message is silently acknowledged and not delivered to the
application
/**
- *
- * <pre>
+ * This is the default option to fail consume messages until crypto
succeeds
+ */
+ FAIL,
+
+ /**
+ * Message is silently acknowledged and not delivered to the application
+ */
+ DISCARD,
+
+ /**
* Deliver the encrypted message to the application. It's the
application's responsibility to decrypt the message.
+ * <p>
* If message is also compressed, decompression will fail. If message
contain batch messages, client will not be
* able to retrieve individual messages in the batch.
- * </pre>
- *
+ * <p>
* Delivered encrypted message contains {@link EncryptionContext} which
contains encryption and compression
* information in it using which application can decrypt consumed message
payload.
- *
*/
CONSUME;
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
index af0bd50..8d38084 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -27,11 +27,21 @@ public interface ConsumerEventListener extends Serializable
{
/**
* Notified when the consumer group is changed, and the consumer becomes
the active consumer.
+ *
+ * @param consumer
+ * the consumer that originated the event
+ * @param partitionId
+ * the id of the partition that became active
*/
void becameActive(Consumer<?> consumer, int partitionId);
/**
* Notified when the consumer group is changed, and the consumer is still
inactive or becomes inactive.
+ *
+ * @param consumer
+ * the consumer that originated the event
+ * @param partitionId
+ * the id of the partition that became inactive
*/
void becameInactive(Consumer<?> consumer, int partitionId);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index 1134d8a..ce99df0 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.client.api;
-import java.util.List;
-
/**
* A plugin interface that allows you to intercept (and possibly mutate)
* messages received by the consumer.
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
index 274191f..8213ed6 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.client.api;
import java.io.Serializable;
import java.util.Map;
+/**
+ * Interface that abstracts the access to a key store.
+ */
public interface CryptoKeyReader extends Serializable {
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
index 52a2a23..bf69d9c 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
@@ -21,12 +21,23 @@ package org.apache.pulsar.client.api;
import lombok.Builder;
import lombok.Data;
+/**
+ * Configuration for the "dead letter queue" feature in consumer.
+ *
+ * @see ConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)
+ */
@Builder
@Data
public class DeadLetterPolicy {
+ /**
+ * Maximum number of times that a message will be redelivered before being
sent to the dead letter queue.
+ */
private int maxRedeliverCount;
+ /**
+ * Name of the topic where the failing messages will be sent.
+ */
private String deadLetterTopic;
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncryptionKeyInfo.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncryptionKeyInfo.java
index cce175d..f6b3849 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncryptionKeyInfo.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncryptionKeyInfo.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.client.api;
import java.util.Map;
+/**
+ * EncryptionKeyInfo contains the encryption key and corresponding metadata
which contains additional information about
+ * the key such as version, timestamp.
+ */
public class EncryptionKeyInfo {
- /*
- * This object contains the encryption key and corresponding metadata
which contains
- * additional information about the key such as version, timestammp
- */
- private Map<String,String> metadata = null;
+ private Map<String, String> metadata = null;
private byte[] key = null;
public EncryptionKeyInfo() {
@@ -42,15 +42,15 @@ public class EncryptionKeyInfo {
public byte[] getKey() {
return key;
}
-
+
public void setKey(byte[] key) {
this.key = key;
}
-
+
public Map<String, String> getMetadata() {
return metadata;
}
-
+
public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
index a451c7e..3af6a66 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
@@ -24,7 +24,7 @@ package org.apache.pulsar.client.api;
public enum HashingScheme {
/**
- * Use regural <code>String.hashCode()</code>
+ * Use regular <code>String.hashCode()</code>
*/
JavaStringHash,
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 28b1a1d..58e6162 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -33,7 +33,7 @@ public interface Message<T> {
/**
* Return the properties attached to the message.
- *
+ * <p>
* Properties are application defined key/value pairs that will be
attached to the message
*
* @return an unmodifiable view of the properties map
@@ -60,12 +60,20 @@ public interface Message<T> {
String getProperty(String name);
/**
- * Get the content of the message
+ * Get the raw payload of the message.
+ * <p>
+ * Even when using the Schema and type-safe API, an application
+ * has access to the underlying raw message payload.
*
* @return the byte array with the message payload
*/
byte[] getData();
+ /**
+ * Get the de-serialized value of the message, according the configured
{@link Schema}.
+ *
+ * @return the deserialized value of the message
+ */
T getValue();
/**
@@ -95,6 +103,7 @@ public interface Message<T> {
*
* @see MessageBuilder#setEventTime(long)
* @since 1.20.0
+ * @return the message event time or 0 if event time wasn't set
*/
long getEventTime();
@@ -156,16 +165,16 @@ public interface Message<T> {
* {@link EncryptionContext} contains encryption and compression
information in it using which application can
* decrypt consumed message with encrypted-payload.
*
- * @return
+ * @return the optiona encryption context
*/
Optional<EncryptionContext> getEncryptionCtx();
/**
* Get message redelivery count, redelivery count maintain in pulsar
broker. When client acknowledge message
* timeout, broker will dispatch message again with message redelivery
count in CommandMessage defined.
- *
+ * <p>
* Message redelivery increases monotonically in a broker, when topic
switch ownership to a another broker
- * redelivery count will be recalculate.
+ * redelivery count will be recalculated.
*
* @since 2.3.0
* @return message redelivery count
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageId.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageId.java
index d74ce2f..d6282f4 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -25,16 +25,20 @@ import
org.apache.pulsar.client.internal.DefaultImplementation;
/**
* Opaque unique identifier of a single message
- *
+ * <p>
* The MessageId can be used to reference a specific message, for example when
acknowledging, without having to retain
* the message content in memory for an extended period of time.
- *
- *
+ * <p>
+ * Message ids are {@link Comparable} and a bigger message id will imply that
a message was published "after" the other
+ * one.
*/
public interface MessageId extends Comparable<MessageId>, Serializable {
/**
- * Serialize the message ID into a byte array
+ * Serialize the message ID into a byte array.
+ * <p>
+ * The serialized message id can be stored away and later get deserialized
by
+ * using {@link #fromByteArray(byte[])}.
*/
byte[] toByteArray();
@@ -44,16 +48,35 @@ public interface MessageId extends Comparable<MessageId>,
Serializable {
* @param data
* byte array containing the serialized message id
* @return the de-serialized messageId object
+ * @throws IOException if the de-serialization fails
*/
public static MessageId fromByteArray(byte[] data) throws IOException {
return DefaultImplementation.newMessageIdFromByteArray(data);
}
+ /**
+ * De-serialize a message id from a byte array with its topic
+ * information attached.
+ * <p>
+ * The topic information is needed when acknowledging a {@link MessageId}
on
+ * a consumer that is consuming from multiple topics.
+ *
+ * @param data the byte array with the serialized message id
+ * @param topicName the topic name
+ * @return a {@link MessageId instance}
+ * @throws IOException if the de-serialization fails
+ */
public static MessageId fromByteArrayWithTopic(byte[] data, String
topicName) throws IOException {
return DefaultImplementation.newMessageIdFromByteArrayWithTopic(data,
topicName);
}
+ /**
+ * MessageId that represents the oldest message available in the topic
+ */
public static final MessageId earliest =
DefaultImplementation.newMessageId(-1, -1, -1);
+ /**
+ * MessageId that represents the next message published in the topic
+ */
public static final MessageId latest =
DefaultImplementation.newMessageId(Long.MAX_VALUE, Long.MAX_VALUE, -1);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
index bc2b915..6a60f7e 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageRouter.java
@@ -20,6 +20,13 @@ package org.apache.pulsar.client.api;
import java.io.Serializable;
+/**
+ * Interface for custom message router that can be passed
+ * to a producer to select the partition that a particular
+ * messsage should be published on.
+ *
+ * @see ProducerBuilder#messageRouter(MessageRouter)
+ */
public interface MessageRouter extends Serializable {
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
index 36b150b..62b58d5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -22,11 +22,9 @@ import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
/**
- * Producer object.
- *
- * The producer is used to publish messages on a topic
- *
+ * Producer is used to publish messages on a topic.
*
+ * A single producer instance can be used across multiple threads.
*/
public interface Producer<T> extends Closeable {
@@ -97,15 +95,13 @@ public interface Producer<T> extends Closeable {
*
* This message builder allows to specify additional properties on the
message. For example:
*
- * <pre>
- * <code>
+ * <pre>{@code
* producer.newMessage()
* .key(messageKey)
* .value(myValue)
* .property("user-defined-property", "value")
* .send();
- * </code>
- * </pre>
+ * }</pre>
*
* @return a typed message builder that can be used to construct the
message to be sent through this producer
*/
@@ -165,7 +161,7 @@ public interface Producer<T> extends Closeable {
CompletableFuture<Void> closeAsync();
/**
- * @return Whether the producer is connected to the broker
+ * @return Whether the producer is currently connected to the broker
*/
boolean isConnected();
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 3b01314..2224e51 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -22,12 +22,15 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MaximizeAction;
+
import
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
/**
* {@link ProducerBuilder} is used to configure and create instances of {@link
Producer}.
*
* @see PulsarClient#newProducer()
+ * @see PulsarClient#newProducer(Schema)
*/
public interface ProducerBuilder<T> extends Cloneable {
@@ -61,19 +64,19 @@ public interface ProducerBuilder<T> extends Cloneable {
* Load the configuration from provided <tt>config</tt> map.
*
* <p>Example:
- * <pre>
- * Map<String, Object> config = new HashMap<>();
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
* config.put("producerName", "test-producer");
* config.put("sendTimeoutMs", 2000);
*
- * ProducerBuilder<byte[]> builder = ...;
- * builder = builder.loadConf(config);
+ * ProducerBuilder<byte[]> builder = client.newProducer()
+ * .loadConf(config);
*
- * Producer<byte[]> producer = builder.create();
- * </pre>
+ * Producer<byte[]> producer = builder.create();
+ * }</pre>
*
- * @param config configuration to load
- * @return producer builder instance
+ * @param config configuration map to load
+ * @return the producer builder instance
*/
ProducerBuilder<T> loadConf(Map<String, Object> config);
@@ -83,12 +86,16 @@ public interface ProducerBuilder<T> extends Cloneable {
* Cloning the builder can be used to share an incomplete configuration
and specialize it multiple times. For
* example:
*
- * <pre>
- * ProducerBuilder builder = client.newProducer().sendTimeout(10,
TimeUnit.SECONDS).blockIfQueueFull(true);
+ * <pre>{@code
+ * ProducerBuilder<String> builder = client.newProducer(Schema.STRING)
+ * .sendTimeout(10, TimeUnit.SECONDS)
+ * .blockIfQueueFull(true);
+ *
+ * Producer<String> producer1 = builder.clone().topic("topic-1").create();
+ * Producer<String> producer2 = builder.clone().topic("topic-2").create();
+ * }</pre>
*
- * Producer producer1 = builder.clone().topic(TOPIC_1).create();
- * Producer producer2 = builder.clone().topic(TOPIC_2).create();
- * </pre>
+ * @return a clone of the producer builder instance
*/
ProducerBuilder<T> clone();
@@ -97,22 +104,24 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>
* This argument is required when constructing the produce.
*
- * @param topicName
+ * @param topicName the name of the topic
+ * @return the producer builder instance
*/
ProducerBuilder<T> topic(String topicName);
/**
* Specify a name for the producer
* <p>
- * If not assigned, the system will generate a globally unique name which
can be access with
+ * If not assigned, the system will generate a globally unique name which
can be accessed with
* {@link Producer#getProducerName()}.
* <p>
- * When specifying a name, it is up to the user to ensure that, for a
given topic, the producer name is unique
+ * <b>Warning</b>: When specifying a name, it is up to the user to ensure
that, for a given topic, the producer name is unique
* across all Pulsar's clusters. Brokers will enforce that only a single
producer a given name can be publishing on
* a topic.
*
* @param producerName
* the custom name to use for the producer
+ * @return the producer builder instance
*/
ProducerBuilder<T> producerName(String producerName);
@@ -120,13 +129,16 @@ public interface ProducerBuilder<T> extends Cloneable {
* Set the send timeout <i>(default: 30 seconds)</i>
* <p>
* If a message is not acknowledged by the server before the sendTimeout
expires, an error will be reported.
- * Setting the timeout to zero, for example <code>setTimeout(0,
TimeUnit.SECONDS)</code> will set the timeout
- * to infinity, which can be useful when using Pulsar's message
deduplication feature.
+ * <p>
+ * Setting the timeout to zero, for example {@code setTimeout(0,
TimeUnit.SECONDS)} will set the timeout
+ * to infinity, which can be useful when using Pulsar's message
deduplication feature, since the client
+ * library will retry forever to publish a message. No errors will be
propagated back to the application.
*
* @param sendTimeout
* the send timeout
* @param unit
* the time unit of the {@code sendTimeout}
+ * @return the producer builder instance
*/
ProducerBuilder<T> sendTimeout(int sendTimeout, TimeUnit unit);
@@ -134,10 +146,18 @@ public interface ProducerBuilder<T> extends Cloneable {
* Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
* <p>
* When the queue is full, by default, all calls to {@link Producer#send}
and {@link Producer#sendAsync} will fail
- * unless blockIfQueueFull is set to true. Use {@link
#blockIfQueueFull(boolean)} to change the blocking behavior.
+ * unless {@code blockIfQueueFull=true}. Use {@link
#blockIfQueueFull(boolean)} to change the blocking behavior.
+ * <p>
+ * The producer queue size also determines the max amount of memory that
will be required by the client application.
+ * Until, the producer gets a successful acknowledgment back from the
broker, it will keep in memory (direct memory
+ * pool) all the messages in the pending queue.
+ *
+ * <p>
+ * Default is 1000.
*
* @param maxPendingMessages
- * @return
+ * the max size of the pending messages queue for the producer
+ * @return the producer builder instance
*/
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
@@ -146,8 +166,18 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>
* This setting will be used to lower the max pending messages for each
partition
* ({@link #maxPendingMessages(int)}), if the total exceeds the configured
value.
+ * The purpose of this setting is to have an upper-limit on the number
+ * of pending messages when publishing on a partitioned topic.
+ * <p>
+ * Default is 50000.
+ * <p>
+ * If publishing at high rate over a topic with many partitions
(especially when publishing messages without a
+ * partitioning key), it might be beneficial to increase this parameter to
allow for more pipelining within the
+ * individual partitions producers.
*
* @param maxPendingMessagesAcrossPartitions
+ * max pending messages across all the partitions
+ * @return the producer builder instance
*/
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int
maxPendingMessagesAcrossPartitions);
@@ -155,26 +185,51 @@ public interface ProducerBuilder<T> extends Cloneable {
* Set whether the {@link Producer#send} and {@link Producer#sendAsync}
operations should block when the outgoing
* message queue is full.
* <p>
- * Default is <code>false</code>. If set to <code>false</code>, send
operations will immediately fail with
- * {@link ProducerQueueIsFullError} when there is no space left in pending
queue.
+ * Default is {@code false}. If set to {@code false}, send operations will
immediately fail with
+ * {@link ProducerQueueIsFullError} when there is no space left in pending
queue. If set to
+ * {@code true}, the {@link Producer#sendAsync} operation will instead
block.
+ * <p>
+ * Setting {@code blockIfQueueFull=true} simplifies the task of an
application that
+ * just wants to publish messages as fast as possible, without having to
worry
+ * about overflowing the producer send queue.
+ * <p>
+ * For example:
+ * <pre><code>
+ * Producer<String> producer = client.newProducer()
+ * .topic("my-topic")
+ * .blockIfQueueFull(true)
+ * .create();
+ *
+ * while (true) {
+ * producer.sendAsync("my-message")
+ * .thenAccept(messageId -> {
+ * System.out.println("Published message: " + messageId);
+ * })
+ * .exceptionally(ex -> {
+ * System.err.println("Failed to publish: " + e);
+ * return null;
+ * });
+ * }
+ * </code></pre>
*
* @param blockIfQueueFull
* whether to block {@link Producer#send} and {@link
Producer#sendAsync} operations on queue full
- * @return
+ * @return the producer builder instance
*/
ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull);
/**
- * Set the message routing mode for the partitioned producer.
- *
- * Default routing mode is round-robin routing.
- *
- * This logic is applied when the application is not setting a key {@link
MessageBuilder#setKey(String)} on a
- * particular message.
+ * Set the {@link MessageRoutingMode} for a partitioned producer.
+ * <p>
+ * Default routing mode is to round-robin across the available partitions.
+ * <p>
+ * This logic is applied when the application is not setting a key on a
+ * particular message. If the key is set with {@link
MessageBuilder#setKey(String)},
+ * then the hash of the key will be used to select a partition for the
message.
*
* @param messageRoutingMode
* the message routing mode
- * @return producer builder
+ * @return the producer builder instance
* @see MessageRoutingMode
*/
ProducerBuilder<T> messageRoutingMode(MessageRoutingMode
messageRoutingMode);
@@ -184,15 +239,14 @@ public interface ProducerBuilder<T> extends Cloneable {
*
* Standard hashing functions available are:
* <ul>
- * <li><code>JavaStringHash</code>: Java <code>String.hashCode()</code>
- * <li><code>Murmur3_32Hash</code>: Use Murmur3 hashing function.
+ * <li>{@link HashingScheme#JavaStringHash}: Java {@code
String.hashCode()} (Default)
+ * <li>{@link HashingScheme#Murmur3_32Hash}: Use Murmur3 hashing function.
* <a
href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
* </ul>
*
- * Default is <code>JavaStringHash</code>.
- *
* @param hashingScheme
* the chosen {@link HashingScheme}
+ * @return the producer builder instance
*/
ProducerBuilder<T> hashingScheme(HashingScheme hashingScheme);
@@ -201,59 +255,68 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>
* By default, message payloads are not compressed. Supported compression
types are:
* <ul>
- * <li>{@link CompressionType.LZ4}</li>
- * <li>{@link CompressionType.ZLIB}</li>
- * <li>{@link CompressionType.ZSTD} (Since Pulsar 2.3. Zstd
- * cannot be used if consumer applications are not in version >= 2.3
as well)</li>
+ * <li>{@link CompressionType#NONE}: No compression (Default)</li>
+ * <li>{@link CompressionType#LZ4}: Compress with LZ4 algorithm. Faster
but lower compression than ZLib</li>
+ * <li>{@link CompressionType#ZLIB}: Standard ZLib compression</li>
+ * <li>{@link CompressionType#ZSTD} Compress with Zstandard codec. Since
Pulsar 2.3. Zstd cannot be used if consumer
+ * applications are not in version >= 2.3 as well</li>
* </ul>
*
* @param compressionType
- * @return
+ * the selected compression type
+ * @return the producer builder instance
*/
ProducerBuilder<T> compressionType(CompressionType compressionType);
/**
- * Set a custom message routing policy by passing an implementation of
MessageRouter
- *
+ * Set a custom message routing policy by passing an implementation of
MessageRouter.
*
* @param messageRouter
+ * @return the producer builder instance
*/
ProducerBuilder<T> messageRouter(MessageRouter messageRouter);
/**
- * Control whether automatic batching of messages is enabled for the
producer. <i>default: false [No batching]</i>
- *
- * When batching is enabled, multiple calls to Producer.sendAsync can
result in a single batch to be sent to the
+ * Control whether automatic batching of messages is enabled for the
producer. <i>default: enabled</i>
+ * <p>
+ * When batching is enabled, multiple calls to {@link Producer#sendAsync}
can result in a single batch to be sent to the
* broker, leading to better throughput, especially when publishing small
messages. If compression is enabled,
* messages will be compressed at the batch level, leading to a much
better compression ratio for similar headers or
* contents.
- *
+ * <p>
* When enabled default batch delay is set to 1 ms and default batch size
is 1000 messages
+ * <p>
+ * Batching is enabled by default since 2.0.0.
*
- * <p>Batching is enabled by default since 2.0.0.
- *
- * @return producer builder.
* @see #batchingMaxPublishDelay(long, TimeUnit)
* @see #batchingMaxMessages(int)
+ * @return the producer builder instance
*/
ProducerBuilder<T> enableBatching(boolean enableBatching);
/**
- * Sets a {@link CryptoKeyReader}
+ * Sets a {@link CryptoKeyReader}.
+ * <p>
+ * Configure the key reader to be used to encrypt the message payloads.
*
* @param cryptoKeyReader
* CryptoKeyReader object
+ * @return the producer builder instance
*/
ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/**
* Add public encryption key, used by producer to encrypt the data key.
- *
+ * <p>
* At the time of producer creation, Pulsar client checks if there are
keys added to encryptionKeys. If keys are
- * found, a callback getKey(String keyName) is invoked against each key to
load the values of the key. Application
- * should implement this callback to return the key in pkcs8 format. If
compression is enabled, message is encrypted
- * after compression. If batch messaging is enabled, the batched message
is encrypted.
+ * found, a callback {@link CryptoKeyReader#getPrivateKey(String, Map)} and
+ * {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against
each key to load the values of the key.
+ * Application should implement this callback to return the key in pkcs8
format. If compression is enabled, message
+ * is encrypted after compression. If batch messaging is enabled, the
batched message is encrypted.
*
+ * @param key
+ * the name of the encryption key in the key store
+ * @return the producer builder instance
*/
ProducerBuilder<T> addEncryptionKey(String key);
@@ -261,80 +324,104 @@ public interface ProducerBuilder<T> extends Cloneable {
* Sets the ProducerCryptoFailureAction to the value specified
*
* @param action
- * producer action
+ * the action the producer will take in case of encryption
failures
+ * @return the producer builder instance
*/
ProducerBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction action);
/**
* Set the time period within which the messages sent will be batched
<i>default: 1 ms</i> if batch messages are
- * enabled. If set to a non zero value, messages will be queued until this
time interval or until
+ * enabled. If set to a non zero value, messages will be queued until
either:
+ * <ul>
+ * <li>this time interval expires</li>
+ * <li>the max number of messages in a batch is reached ({@link
#batchingMaxMessages(int)})
+ * <li>the max size of batch is reached
+ * </ul>
+ * <p>
+ * All messages will be published as a single batch message. The consumer
will be delivered individual messages in
+ * the batch in the same order they were enqueued.
*
- * @see ProducerConfiguration#getBatchingMaxMessages() threshold is
reached; all messages will be published as a single
- * batch message. The consumer will be delivered individual messages
in the batch in the same order they were
- * enqueued
* @param batchDelay
* the batch delay
* @param timeUnit
* the time unit of the {@code batchDelay}
- * @return
+ * @return the producer builder instance
*/
ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, TimeUnit
timeUnit);
/**
* Set the maximum number of messages permitted in a batch. <i>default:
1000</i> If set to a value greater than 1,
* messages will be queued until this threshold is reached or batch
interval has elapsed
+ * <p>
+ * All messages in batch will be published as a single batch message. The
consumer will be delivered individual
+ * messages in the batch in the same order they were enqueued
*
- * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
All messages in batch will be published as
- * a single batch message. The consumer will be delivered individual
messages in the batch in the same order
- * they were enqueued
+ * @see #batchingMaxPublishDelay(long, TimeUnit)
* @param batchMessagesMaxMessagesPerBatch
* maximum number of messages in a batch
- * @return
+ * @return the producer builder instance
*/
ProducerBuilder<T> batchingMaxMessages(int
batchMessagesMaxMessagesPerBatch);
/**
* Set the baseline for the sequence ids for messages published by the
producer.
* <p>
- * First message will be using (initialSequenceId + 1) as its sequence id
and subsequent messages will be assigned
+ * First message will be using {@code (initialSequenceId + 1)} as its
sequence id and subsequent messages will be assigned
* incremental sequence ids, if not otherwise specified.
*
- * @param initialSequenceId
- * @return
+ * @param initialSequenceId the initial sequence id for the producer
+ * @return the producer builder instance
*/
ProducerBuilder<T> initialSequenceId(long initialSequenceId);
/**
* Set a name/value property with this producer.
+ * <p>
+ * Properties are application defined metadata that can be attached to the
producer. When getting the topic stats,
+ * this metadata will be associated to the producer stats for easier
identification.
*
* @param key
+ * the property key
* @param value
- * @return
+ * the property value
+ * @return the producer builder instance
*/
ProducerBuilder<T> property(String key, String value);
/**
- * Add all the properties in the provided map
+ * Add all the properties in the provided map to the producer.
+ * <p>
+ * Properties are application defined metadata that can be attached to the
producer. When getting the topic stats,
+ * this metadata will be associated to the producer stats for easier
identification.
*
- * @param properties
- * @return
+ * @param key
+ * the property key
+ * @param value
+ * the property value
+ * @return the producer builder instance
*/
ProducerBuilder<T> properties(Map<String, String> properties);
/**
- * Intercept {@link Producer}.
+ * Add a set of {@link ProducerInterceptor} to the producer.
+ * <p>
+ * Interceptors can be used to trace the publish and acknowledgments
operation happening in a producer.
*
- * @param interceptors the list of interceptors to intercept the producer
created by this builder.
- * @return producer builder.
+ * @param interceptors
+ * the list of interceptors to intercept the producer created
by this builder.
+ * @return the producer builder instance
*/
ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
/**
- * If enabled, partitioned producer will auto create new producers for new
partitions.
- * This is only for partitioned producer.
+ * If enabled, partitioned producer will automatically discover new
partitions at runtime. This is only applied on
+ * partitioned topics .
+ * <p>
+ * Default is true.
*
* @param autoUpdate
- * whether to auto update partition increasement
+ * whether to auto discover the partition configuration changes
+ * @return the producer builder instance
*/
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerCryptoFailureAction.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerCryptoFailureAction.java
index 2ecdded..06cd8d6 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerCryptoFailureAction.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerCryptoFailureAction.java
@@ -19,7 +19,17 @@
package org.apache.pulsar.client.api;
+/**
+ * The action the producer will take in case of encryption failures
+ */
public enum ProducerCryptoFailureAction {
- FAIL, // This is the default option to fail send if crypto operation fails
- SEND // Ignore crypto failure and proceed with sending unencrypted
messages
+ /**
+ * This is the default option to fail send if crypto operation fails
+ */
+ FAIL,
+
+ /**
+ * Ignore crypto failure and proceed with sending unencrypted messages
+ */
+ SEND
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index cd36d27..a568f9b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -29,6 +29,14 @@ import
org.apache.pulsar.client.internal.DefaultImplementation;
* <p>
* Client instances are thread-safe and can be reused for managing multiple
{@link Producer}, {@link Consumer} and
* {@link Reader} instances.
+ * <p>
+ * Example of constructing a client:
+ *
+ * <pre>{@code
+ * PulsarClient client = PulsarClient.builder()
+ * .serviceUrl("pulsar://broker:6650")
+ * .build();
+ * }</pre>
*/
public interface PulsarClient extends Closeable {
@@ -44,13 +52,17 @@ public interface PulsarClient extends Closeable {
}
/**
- * Create a producer with default for publishing on a specific topic
+ * Create a producer builder that can be used to configure
+ * and construct a producer with default {@link Schema.BYTES}.
* <p>
* Example:
*
- * <code>
- * Producer producer = client.newProducer().topic(myTopic).create();
- * </code>
+ * <pre>{@code
+ * Producer<byte[]> producer = client.newProducer()
+ * .topic("my-topic")
+ * .create();
+ * producer.send("test".getBytes());
+ * }</pre>
*
* @return a {@link ProducerBuilder} object to configure and construct the
{@link Producer} instance
*
@@ -59,13 +71,17 @@ public interface PulsarClient extends Closeable {
ProducerBuilder<byte[]> newProducer();
/**
- * Create a producer with default for publishing on a specific topic
+ * Create a producer builder that can be used to configure
+ * and construct a producer with the specified schema
* <p>
* Example:
*
- * <code>
- * Producer producer =
client.newProducer(mySchema).topic(myTopic).create();
- * </code>
+ * <pre>{@code
+ * Producer<String> producer = client.newProducer(Schema.STRING)
+ * .topic("my-topic")
+ * .create();
+ * producer.send("test");
+ * }</pre>
*
* @param schema
* provide a way to convert between serialized data and domain
objects
@@ -77,7 +93,21 @@ public interface PulsarClient extends Closeable {
<T> ProducerBuilder<T> newProducer(Schema<T> schema);
/**
- * Create a consumer with default for subscribing on a specific topic
+ * Create a consumer builder with no schema ({@link Schema.BYTES}) for
subscribing to
+ * one or more topics.
+ * <p>
+ *
+ * <pre>{@code
+ * Consumer<byte[]> consumer = client.newConsumer()
+ * .topic("my-topic")
+ * .subscriptionName("my-subscription-name")
+ * .subscribe();
+ *
+ * while (true) {
+ * Message<byte[]> message = consumer.receive();
+ * System.out.println("Got message: " + message.getValue());
+ * consumer.acknowledge(message);
+ * }</pre>
*
* @return a {@link ConsumerBuilder} object to configure and construct the
{@link Consumer} instance
*
@@ -86,10 +116,24 @@ public interface PulsarClient extends Closeable {
ConsumerBuilder<byte[]> newConsumer();
/**
- * Create a consumer with default for subscribing on a specific topic
- *
+ * Create a consumer builder with a specific schema for subscribing on a
specific topic
+ * <p>
* Since 2.2, if you are creating a consumer with non-bytes schema on a
non-existence topic, it will
* automatically create the topic with the provided schema.
+ * <p>
+ *
+ * <pre>{@code
+ * Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ * .topic("my-topic")
+ * .subscriptionName("my-subscription-name")
+ * .subscribe();
+ *
+ * while (true) {
+ * Message<String> message = consumer.receive();
+ * System.out.println("Got message: " + message.getValue());
+ * consumer.acknowledge(message);
+ * }
+ * }</pre>
*
* @param schema
* provide a way to convert between serialized data and domain
objects
@@ -100,10 +144,37 @@ public interface PulsarClient extends Closeable {
<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
/**
- * Create a topic reader for reading messages from the specified topic.
+ * Create a topic reader builder with no schema ({@link Schema.BYTES}) to
read from the specified topic.
* <p>
* The Reader provides a low-level abstraction that allows for manual
positioning in the topic, without using a
- * subscription. Reader can only work on non-partitioned topics.
+ * subscription. A reader needs to be specified a {@link
ReaderBuilder#startMessageId(MessageId)} that can either
+ * be:
+ * <ul>
+ * <li>{@link MessageId#earliest}: Start reading from the earliest message
available in the topic</li>
+ * <li>{@link MessageId#latest}: Start reading from end of the topic. The
first message read will be the one
+ * published <b>*after*</b> the creation of the builder</li>
+ * <li>{@link MessageId}: Position the reader on a particular message. The
first message read will be the one
+ * immediately <b>*after*</b> the specified message</li>
+ * </ul>
+ * <p>
+ * A Reader can only from non-partitioned topics. In case of partitioned
topics, one can create the readers
+ * directly on the individual partitions. See {@link
#getPartitionsForTopic(String)} for how to get the
+ * topic partitions names.
+ * <p>
+ * Example of usage of Reader:
+ * <pre>
+ * {@code
+ * Reader<byte[]> reader = client.newReader()
+ * .topic("my-topic")
+ * .startMessageId(MessageId.earliest)
+ * .create();
+ *
+ * while (true) {
+ * Message<byte[]> message = reader.readNext();
+ * System.out.println("Got message: " + message.getValue());
+ * // Reader doesn't need acknowledgments
+ * }
+ * </pre>
*
* @return a {@link ReaderBuilder} that can be used to configure and
construct a {@link Reader} instance
*
@@ -112,13 +183,37 @@ public interface PulsarClient extends Closeable {
ReaderBuilder<byte[]> newReader();
/**
- * Create a topic reader for reading messages from the specified topic.
+ * Create a topic reader builder with a specific {@link Schema}) to read
from the specified topic.
* <p>
* The Reader provides a low-level abstraction that allows for manual
positioning in the topic, without using a
- * subscription. Reader can only work on non-partitioned topics.
+ * subscription. A reader needs to be specified a {@link
ReaderBuilder#startMessageId(MessageId)} that can either
+ * be:
+ * <ul>
+ * <li>{@link MessageId#earliest}: Start reading from the earliest message
available in the topic</li>
+ * <li>{@link MessageId#latest}: Start reading from end of the topic. The
first message read will be the one
+ * published <b>*after*</b> the creation of the builder</li>
+ * <li>{@link MessageId}: Position the reader on a particular message. The
first message read will be the one
+ * immediately <b>*after*</b> the specified message</li>
+ * </ul>
+ * <p>
+ * A Reader can only from non-partitioned topics. In case of partitioned
topics, one can create the readers
+ * directly on the individual partitions. See {@link
#getPartitionsForTopic(String)} for how to get the
+ * topic partitions names.
+ * <p>
+ * Example of usage of Reader:
+ * <pre>
+ * {@code
+ * Reader<String> reader = client.newReader(Schema.STRING)
+ * .topic("my-topic")
+ * .startMessageId(MessageId.earliest)
+ * .create();
*
- * @param schema
- * provide a way to convert between serialized data and domain
objects
+ * while (true) {
+ * Message<String> message = reader.readNext();
+ * System.out.println("Got message: " + message.getValue());
+ * // Reader doesn't need acknowledgments
+ * }
+ * </pre>
*
* @return a {@link ReaderBuilder} that can be used to configure and
construct a {@link Reader} instance
*
@@ -128,7 +223,7 @@ public interface PulsarClient extends Closeable {
/**
* Update the service URL this client is using.
- *
+ * <p>
* This will force the client close all existing connections and to
restart service discovery to the new service
* endpoint.
*
@@ -141,24 +236,27 @@ public interface PulsarClient extends Closeable {
/**
* Get the list of partitions for a given topic.
- *
+ * <p>
* If the topic is partitioned, this will return a list of partition
names. If the topic is not partitioned, the
* returned list will contain the topic name itself.
- *
+ * <p>
* This can be used to discover the partitions and create {@link Reader},
{@link Consumer} or {@link Producer}
* instances directly on a particular partition.
*
* @param topic
* the topic name
- * @return a future that will yield a list of the topic partitions
+ * @return a future that will yield a list of the topic partitions or
{@link PulsarClientException} if there was any
+ * error in the operation.
* @since 2.3.0
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic);
/**
* Close the PulsarClient and release all the resources.
- *
- * All the producers and consumers will be orderly closed. Waits until all
pending write request are persisted.
+ * <p>
+ * This operation will trigger a graceful close of all producer, consumer
and reader instances that this client has
+ * currently active. That implies that close will block and wait until all
pending producer send requests are
+ * persisted.
*
* @throws PulsarClientException
* if the close operation fails
@@ -168,8 +266,10 @@ public interface PulsarClient extends Closeable {
/**
* Asynchronously close the PulsarClient and release all the resources.
- *
- * All the producers and consumers will be orderly closed. Waits until all
pending write request are persisted.
+ * <p>
+ * This operation will trigger a graceful close of all producer, consumer
and reader instances that this client has
+ * currently active. That implies that close and wait, asynchronously,
until all pending producer send requests are
+ * persisted.
*
* @throws PulsarClientException
* if the close operation fails
@@ -178,8 +278,9 @@ public interface PulsarClient extends Closeable {
/**
* Perform immediate shutdown of PulsarClient.
- *
- * Release all the resources and close all the producers without waiting
for ongoing operations to complete.
+ * <p>
+ * Release all the resources and close all the producer, consumer and
reader instances without waiting for ongoing
+ * operations to complete.
*
* @throws PulsarClientException
* if the forceful shutdown fails
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index cdf1132..aa22cb6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -34,22 +34,31 @@ public interface Reader<T> extends Closeable {
String getTopic();
/**
- * Read the next message in the topic
+ * Read the next message in the topic.
+ * <p>
+ * This method will block until a message is available.
*
- * @return the next messasge
+ * @return the next message
* @throws PulsarClientException
*/
Message<T> readNext() throws PulsarClientException;
/**
- * Read the next message in the topic waiting for a maximum of timeout
- * time units. Returns null if no message is recieved in that time.
+ * Read the next message in the topic waiting for a maximum time.
+ * <p>
+ * Returns null if no message is received before the timeout.
*
* @return the next message(Could be null if none received in time)
* @throws PulsarClientException
*/
Message<T> readNext(int timeout, TimeUnit unit) throws
PulsarClientException;
+ /**
+ * Read asynchronously the next message in the topic.
+ *
+ * @return a future that will yield a message (when it's available) or
{@link PulsarClientException} if the reader
+ * is already closed.
+ */
CompletableFuture<Message<T>> readNextAsync();
/**
@@ -60,17 +69,46 @@ public interface Reader<T> extends Closeable {
CompletableFuture<Void> closeAsync();
/**
- * Return true if the topic was terminated and this reader has reached the
end of the topic
+ * Return true if the topic was terminated and this reader has reached the
end of the topic.
+ * <p>
+ * Note that this only applies to a "terminated" topic (where the topic is
"sealed" and no
+ * more messages can be published) and not just that the reader is simply
caught up with
+ * the publishers. Use {@link #hasMessageAvailable()} to check for for
that.
*/
boolean hasReachedEndOfTopic();
/**
* Check if there is any message available to read from the current
position.
+ * <p>
+ * This check can be used by an application to scan through a topic and
stop
+ * when the reader reaches the current last published message. For example:
+ *
+ * <pre>
+ * while (reader.hasMessageAvailable()) {
+ * Message<String> msg = reader.readNext();
+ * // Do something
+ * }
+ *
+ * // Done reading
+ * </pre>
+ *
+ * Note that this call might be blocking (see {@link
#hasMessageAvailableAsync() for async version) and
+ * that even if this call returns true, that will not guarantee that a
subsequent call to {@link #readNext()}
+ * will not block.
+ *
+ * @return true if the are messages available to be read, false otherwise
+ * @throws PulsarClientException if there was any error in the operation
*/
boolean hasMessageAvailable() throws PulsarClientException;
/**
- * Asynchronously Check if there is message that has been published
successfully to the broker in the topic.
+ * Asynchronously check if there is any message available to read from the
current position.
+ * <p>
+ * This check can be used by an application to scan through a topic and
stop when the reader reaches the current
+ * last published message.
+ *
+ * @return a future that will yield true if the are messages available to
be read, false otherwise, or a
+ * {@link PulsarClientException} if there was any error in the
operation
*/
CompletableFuture<Boolean> hasMessageAvailableAsync();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index bc162bd..792cf7b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -32,9 +32,8 @@ public interface ReaderBuilder<T> extends Cloneable {
/**
* Finalize the creation of the {@link Reader} instance.
- *
* <p>
- * This method will block until the reader is created successfully.
+ * This method will block until the reader is created successfully or an
exception is thrown.
*
* @return the reader instance
* @throws PulsarClientException
@@ -57,20 +56,23 @@ public interface ReaderBuilder<T> extends Cloneable {
/**
* Load the configuration from provided <tt>config</tt> map.
*
- * <p>Example:
- * <pre>
- * Map<String, Object> config = new HashMap<>();
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
* config.put("topicName", "test-topic");
* config.put("receiverQueueSize", 2000);
*
- * ReaderBuilder<byte[]> builder = ...;
+ * ReaderBuilder<byte[]> builder = ...;
* builder = builder.loadConf(config);
*
- * Reader<byte[]> reader = builder.create();
- * </pre>
+ * Reader<byte[]> reader = builder.create();
+ * }</pre>
*
- * @param config configuration to load
- * @return reader builder instance
+ * @param config
+ * configuration to load
+ * @return the reader builder instance
*/
ReaderBuilder<T> loadConf(Map<String, Object> config);
@@ -80,33 +82,42 @@ public interface ReaderBuilder<T> extends Cloneable {
* Cloning the builder can be used to share an incomplete configuration
and specialize it multiple times. For
* example:
*
- * <pre>
- * ReaderBuilder builder =
client.newReader().readerName("my-reader").receiverQueueSize(10);
+ * <pre>{@code
+ * ReaderBuilder<String> builder = client.newReader(Schema.STRING)
+ * .readerName("my-reader")
+ * .receiverQueueSize(10);
*
- * Reader reader1 = builder.clone().topic(TOPIC_1).create();
- * Reader reader2 = builder.clone().topic(TOPIC_2).create();
- * </pre>
+ * Reader<String> reader1 = builder.clone().topic("topic-1").create();
+ * Reader<String> reader2 = builder.clone().topic("topic-2").create();
+ * }</pre>
+ *
+ * @return a clone of the reader builder instance
*/
ReaderBuilder<T> clone();
/**
- * Specify the topic this consumer will subscribe on.
+ * Specify the topic this reader will read from.
* <p>
- * This argument is required when constructing the consumer.
+ * This argument is required when constructing the reader.
*
* @param topicName
+ * the name of the topic
+ * @return the reader builder instance
*/
ReaderBuilder<T> topic(String topicName);
/**
* The initial reader positioning is done by specifying a message id. The
options are:
* <ul>
- * <li><code>MessageId.earliest</code> : Start reading from the earliest
message available in the topic
- * <li><code>MessageId.latest</code> : Start reading from the end topic,
only getting messages published after the
- * reader was created
- * <li><code>MessageId</code> : When passing a particular message id, the
reader will position itself on that
- * specific position. The first message to be read will be the message
next to the specified messageId.
+ * <li>{@link MessageId#earliest}: Start reading from the earliest message
available in the topic</li>
+ * <li>{@link MessageId#latest}: Start reading from end of the topic. The
first message read will be the one
+ * published <b>*after*</b> the creation of the builder</li>
+ * <li>{@link MessageId}: Position the reader on a particular message. The
first message read will be the one
+ * immediately <b>*after*</b> the specified message</li>
* </ul>
+ *
+ * @param startMessageId the message id where the reader will be initially
positioned on
+ * @return the reader builder instance
*/
ReaderBuilder<T> startMessageId(MessageId startMessageId);
@@ -118,22 +129,25 @@ public interface ReaderBuilder<T> extends Cloneable {
*
* @param readerListener
* the listener object
+ * @return the reader builder instance
*/
ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
/**
- * Sets a {@link CryptoKeyReader}
+ * Sets a {@link CryptoKeyReader} to decrypt the message payloads.
*
* @param cryptoKeyReader
* CryptoKeyReader object
+ * @return the reader builder instance
*/
ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/**
- * Sets the ConsumerCryptoFailureAction to the value specified
+ * Sets the {@link ConsumerCryptoFailureAction} to specify
*
* @param action
* The action to take when the decoding fails
+ * @return the reader builder instance
*/
ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
@@ -143,18 +157,24 @@ public interface ReaderBuilder<T> extends Cloneable {
* The consumer receive queue controls how many messages can be
accumulated by the {@link Consumer} before the
* application calls {@link Consumer#receive()}. Using a higher value
could potentially increase the consumer
* throughput at the expense of bigger memory utilization.
- * </p>
+ * <p>
* Default value is {@code 1000} messages and should be good for most use
cases.
*
* @param receiverQueueSize
* the new receiver queue size value
+ * @return the reader builder instance
*/
ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);
/**
- * Set the reader name.
+ * Specify a reader name.
+ * <p>
+ * The reader name is purely informational and can used to track a
particular reader in the reported stats. By
+ * default a randomly generated name is used.
*
* @param readerName
+ * the name to use for the reader
+ * @return the reader builder instance
*/
ReaderBuilder<T> readerName(String readerName);
@@ -162,6 +182,7 @@ public interface ReaderBuilder<T> extends Cloneable {
* Set the subscription role prefix. The default prefix is "reader".
*
* @param subscriptionRolePrefix
+ * @return the reader builder instance
*/
ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);
@@ -170,12 +191,13 @@ public interface ReaderBuilder<T> extends Cloneable {
* of the topic. This means that, if the topic has been compacted, the
reader will only see the latest value for
* each key in the topic, up until the point in the topic message backlog
that has been compacted. Beyond that
* point, the messages will be sent as normal.
- *
+ * <p>
* readCompacted can only be enabled when reading from a persistent topic.
Attempting to enable it on non-persistent
- * topics will lead to the reader create call throwing a
PulsarClientException.
+ * topics will lead to the reader create call throwing a {@link
PulsarClientException}.
*
* @param readCompacted
* whether to read from the compacted topic
+ * @return the reader builder instance
*/
ReaderBuilder<T> readCompacted(boolean readCompacted);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
index 3d2af2a..d198bf5 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
@@ -26,11 +26,11 @@ import java.io.Serializable;
public interface ReaderListener<T> extends Serializable {
/**
* This method is called whenever a new message is received.
- *
+ * <p>
* Messages are guaranteed to be delivered in order and from the same
thread for a single consumer
- *
+ * <p>
* This method will only be called once for each message, unless either
application or broker crashes.
- *
+ * <p>
* Application is responsible of handling any exception that could be
thrown while processing the message.
*
* @param reader
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RegexSubscriptionMode.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RegexSubscriptionMode.java
index 23bde7a..d7b17fb 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RegexSubscriptionMode.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/RegexSubscriptionMode.java
@@ -23,9 +23,18 @@ package org.apache.pulsar.client.api;
* to only pick a certain type of topics.
*/
public enum RegexSubscriptionMode {
+ /**
+ * Only subscribe to persistent topics
+ */
PersistentOnly,
+ /**
+ * Only subscribe to non-persistent topics
+ */
NonPersistentOnly,
+ /**
+ * Subscribe to both persistent and non-persistent topics
+ */
AllTopics
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index b79c368..cd26fc4 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -80,14 +80,32 @@ public interface Schema<T> {
*/
Schema<String> STRING = DefaultImplementation.newStringSchema();
+ /**
+ * Create a Protobuf schema type by extracting the fields of the specified
class.
+ *
+ * @param clazz the Protobuf generated class to be used to extract the
schema
+ * @return a Schema instance
+ */
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
PROTOBUF(Class<T> clazz) {
return DefaultImplementation.newProtobufSchema(clazz);
}
+ /**
+ * Create a Avro schema type by extracting the fields of the specified
class.
+ *
+ * @param clazz the POJO class to be used to extract the Avro schema
+ * @return a Schema instance
+ */
static <T> Schema<T> AVRO(Class<T> clazz) {
return DefaultImplementation.newAvroSchema(clazz);
}
+ /**
+ * Create a JSON schema type by extracting the fields of the specified
class.
+ *
+ * @param clazz the POJO class to be used to extract the JSON schema
+ * @return a Schema instance
+ */
static <T> Schema<T> JSON(Class<T> clazz) {
return DefaultImplementation.newJSONSchema(clazz);
}
@@ -123,10 +141,31 @@ public interface Schema<T> {
return AUTO_CONSUME();
}
+ /**
+ * Create a schema instance that automatically deserialize messages
+ * based on the current topic schema.
+ * <p>
+ * The messages values are deserialized into a {@link GenericRecord}
object.
+ * <p>
+ * Currently this is only supported with Avro and JSON schema types.
+ *
+ * @return the auto schema instance
+ */
static Schema<GenericRecord> AUTO_CONSUME() {
return DefaultImplementation.newAutoConsumeSchema();
}
+ /**
+ * Create a schema instance that accepts a serialized payload
+ * and validates it against the topic schema.
+ * <p>
+ * Currently this is only supported with Avro and JSON schema types.
+ * <p>
+ * This method can be used when publishing a raw JSON payload,
+ * for which the format is known and a POJO class is not avaialable.
+ *
+ * @return the auto schema instance
+ */
static Schema<byte[]> AUTO_PRODUCE_BYTES() {
return DefaultImplementation.newAutoProduceSchema();
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index a668987..6bcc877 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -20,17 +20,17 @@ package org.apache.pulsar.client.api;
/**
* The provider to provide the service url.
- *
+ * <p>
* This allows applications to retrieve the service URL from an external
configuration provider and, more importantly,
* to force the Pulsar client to reconnect if the service URL has been changed.
- *
+ * <p>
* It can be passed with {@link
ClientBuilder#serviceUrlProvider(ServiceUrlProvider)}
*/
public interface ServiceUrlProvider {
/**
* Initialize the service url provider with Pulsar client instance.
- *
+ * <p>
* This can be used by the provider to force the Pulsar client to
reconnect whenever the service url might have
* changed. See {@link PulsarClient#updateServiceUrl(String)}.
*
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
index e28953f..94edc61 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -19,8 +19,9 @@
package org.apache.pulsar.client.api;
/**
- * intial position at which the cursor will be set when subscribe
- *
+ * When creating a consumer, if the subscription does not exist, a new
subscription will be created. By default the
+ * subscription will be created at the end of the topic. See
+ * {@link #subscriptionInitialPosition(SubscriptionInitialPosition)} to
configure the initial position behavior.
*
*/
public enum SubscriptionInitialPosition {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
index 8a18375..9d38664 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionType.java
@@ -25,19 +25,27 @@ package org.apache.pulsar.client.api;
*/
public enum SubscriptionType {
/**
- * There can be only 1 consumer on the same topic with the same
subscription name
+ * There can be only 1 consumer on the same topic with the same
subscription name.
*/
Exclusive,
/**
* Multiple consumer will be able to use the same subscription name and
the messages will be dispatched according to
- * a round-robin rotation between the connected consumers
+ * a round-robin rotation between the connected consumers.
+ * <p>
+ * In this mode, the consumption order is not guaranteed.
*/
Shared,
/**
* Multiple consumer will be able to use the same subscription name but
only 1 consumer will receive the messages.
* If that consumer disconnects, one of the other connected consumers will
start receiving messages.
+ * <p>
+ * In failover mode, the consumption ordering is guaranteed.
+ * <p>
+ * In case of partitioned topics, the ordering is guaranteed on a
per-partition basis. The partitions assignments will
+ * be split across the available consumers. On each partition, at most one
consumer will be active at a given point
+ * in time.
*/
Failover
}
\ No newline at end of file
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
index 019e50a..423c080 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
@@ -43,11 +43,13 @@ public interface TypedMessageBuilder<T> extends
Serializable {
* <p>
* Example:
*
- * <pre>
- * <code>MessageId msgId =
producer.newMessage().key(myKey).value(myValue).send();
+ * <pre>{@code
+ * MessageId msgId = producer.newMessage()
+ * .key(myKey)
+ * .value(myValue)
+ * .send();
* System.out.println("Published message: " + msgId);
- * </code>
- * </pre>
+ * }</pre>
*
* @return the {@link MessageId} assigned by the broker to the published
message.
*/
@@ -62,7 +64,9 @@ public interface TypedMessageBuilder<T> extends Serializable {
* Example:
*
* <pre>
- *
<code>producer.newMessage().value(myValue).sendAsync().thenAccept(messageId -> {
+ * <code>producer.newMessage()
+ * .value(myValue)
+ * .sendAsync().thenAccept(messageId -> {
* System.out.println("Published message: " + messageId);
* }).exceptionally(e -> {
* System.out.println("Failed to publish " + e);
@@ -85,7 +89,8 @@ public interface TypedMessageBuilder<T> extends Serializable {
/**
* Sets the key of the message for routing policy
*
- * @param key
+ * @param key the partitioning key for the message
+ * @return the message builder instance
*/
TypedMessageBuilder<T> key(String key);
@@ -94,6 +99,7 @@ public interface TypedMessageBuilder<T> extends Serializable {
* Internally the bytes will be base64 encoded.
*
* @param key routing key for message, in byte array form
+ * @return the message builder instance
*/
TypedMessageBuilder<T> keyBytes(byte[] key);
@@ -102,6 +108,7 @@ public interface TypedMessageBuilder<T> extends
Serializable {
*
* @param value
* the domain object
+ * @return the message builder instance
*/
TypedMessageBuilder<T> value(T value);
@@ -112,11 +119,13 @@ public interface TypedMessageBuilder<T> extends
Serializable {
* the name of the property
* @param value
* the associated value
+ * @return the message builder instance
*/
TypedMessageBuilder<T> property(String name, String value);
/**
* Add all the properties in the provided map
+ * @return the message builder instance
*/
TypedMessageBuilder<T> properties(Map<String, String> properties);
@@ -129,6 +138,7 @@ public interface TypedMessageBuilder<T> extends
Serializable {
* <p>
* Note: currently pulsar doesn't support event-time based index. so the
subscribers can't seek the messages by
* event time.
+ * @return the message builder instance
*/
TypedMessageBuilder<T> eventTime(long timestamp);
@@ -146,18 +156,22 @@ public interface TypedMessageBuilder<T> extends
Serializable {
*
* @param sequenceId
* the sequence id to assign to the current message
+ * @return the message builder instance
*/
TypedMessageBuilder<T> sequenceId(long sequenceId);
/**
- * Override the replication clusters for this message.
+ * Override the geo-replication clusters for this message.
*
* @param clusters
+ * @return the message builder instance
*/
TypedMessageBuilder<T> replicationClusters(List<String> clusters);
/**
- * Disable replication for this message.
+ * Disable geo-replication for this message.
+ *
+ * @return the message builder instance
*/
TypedMessageBuilder<T> disableReplication();
}