This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 9ab5475 Add javadoc to MutableReactiveMessageSenderSpec (#53)
9ab5475 is described below
commit 9ab5475d34705873df14289b7076a5c9eb4f95b3
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Nov 29 10:54:28 2022 +0100
Add javadoc to MutableReactiveMessageSenderSpec (#53)
---
.../api/MutableReactiveMessageSenderSpec.java | 127 +++++++++++++++++++++
.../client/api/ReactiveMessageSenderSpec.java | 6 +-
2 files changed, 130 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageSenderSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageSenderSpec.java
index 96507b3..0952971 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageSenderSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageSenderSpec.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.client.api.ProducerCryptoFailureAction;
* Mutable spec for a {@link ReactiveMessageSender}.
*
* @author Lari Hotari
+ * @author Christophe Bornet
*/
public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSpec {
@@ -91,10 +92,18 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
private Map<String, String> properties;
+ /**
+ * Constructs a default MutableReactiveMessageSenderSpec.
+ */
public MutableReactiveMessageSenderSpec() {
}
+ /**
+ * Constructs a MutableReactiveMessageSenderSpec from another
+ * {@link ReactiveMessageSenderSpec}.
+ * @param senderSpec the spec to construct from
+ */
public MutableReactiveMessageSenderSpec(ReactiveMessageSenderSpec
senderSpec) {
this.topicName = senderSpec.getTopicName();
this.producerName = senderSpec.getProducerName();
@@ -133,6 +142,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.topicName;
}
+ /**
+ * Sets the topic to publish on.
+ * @param topicName the topic
+ */
public void setTopicName(String topicName) {
this.topicName = topicName;
}
@@ -142,6 +155,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.producerName;
}
+ /**
+ * Sets the name of the producer.
+ * @param producerName the producer name
+ */
public void setProducerName(String producerName) {
this.producerName = producerName;
}
@@ -151,6 +168,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.sendTimeout;
}
+ /**
+ * Sets the send timeout.
+ * @param sendTimeout the send timeout
+ */
public void setSendTimeout(Duration sendTimeout) {
this.sendTimeout = sendTimeout;
}
@@ -160,6 +181,11 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.maxPendingMessages;
}
+ /**
+ * Sets the maximum size of the queue holding the messages pending to
receive an
+ * acknowledgment from the broker.
+ * @param maxPendingMessages the maximum pending messages
+ */
public void setMaxPendingMessages(Integer maxPendingMessages) {
this.maxPendingMessages = maxPendingMessages;
}
@@ -169,6 +195,11 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.maxPendingMessagesAcrossPartitions;
}
+ /**
+ * Sets the maximum number of pending messages across all the
partitions.
+ * @param maxPendingMessagesAcrossPartitions the maximum number of
pending messages
+ * across all the partitions
+ */
public void setMaxPendingMessagesAcrossPartitions(Integer
maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions =
maxPendingMessagesAcrossPartitions;
}
@@ -178,6 +209,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.messageRoutingMode;
}
+ /**
+ * Sets the message routing logic for producers on partitioned topics.
+ * @param messageRoutingMode the message routing mode
+ */
public void setMessageRoutingMode(MessageRoutingMode
messageRoutingMode) {
this.messageRoutingMode = messageRoutingMode;
}
@@ -187,6 +222,11 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.hashingScheme;
}
+ /**
+ * Sets the hashing function determining the partition where to publish
a particular
+ * message on partitioned topics.
+ * @param hashingScheme the hashing scheme
+ */
public void setHashingScheme(HashingScheme hashingScheme) {
this.hashingScheme = hashingScheme;
}
@@ -196,6 +236,11 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.cryptoFailureAction;
}
+ /**
+ * Sets the action the producer will take in case of encryption
failures.
+ * @param cryptoFailureAction the action the producer will take in case
of encryption
+ * failures
+ */
public void setCryptoFailureAction(ProducerCryptoFailureAction
cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
@@ -205,6 +250,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.messageRouter;
}
+ /**
+ * Sets a custom message router.
+ * @param messageRouter the message router
+ */
public void setMessageRouter(MessageRouter messageRouter) {
this.messageRouter = messageRouter;
}
@@ -214,6 +263,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.batchingMaxPublishDelay;
}
+ /**
+ * Sets the time period within which the messages sent will be batched.
+ * @param batchingMaxPublishDelay the batch delay
+ */
public void setBatchingMaxPublishDelay(Duration
batchingMaxPublishDelay) {
this.batchingMaxPublishDelay = batchingMaxPublishDelay;
}
@@ -223,6 +276,12 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.roundRobinRouterBatchingPartitionSwitchFrequency;
}
+ /**
+ * Sets the partition switch frequency while batching of messages is
enabled and using
+ * round-robin routing mode for non-keyed message.
+ * @param roundRobinRouterBatchingPartitionSwitchFrequency the
frequency of partition
+ * switch
+ */
public void setRoundRobinRouterBatchingPartitionSwitchFrequency(
Integer
roundRobinRouterBatchingPartitionSwitchFrequency) {
this.roundRobinRouterBatchingPartitionSwitchFrequency =
roundRobinRouterBatchingPartitionSwitchFrequency;
@@ -233,6 +292,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.batchingMaxMessages;
}
+ /**
+ * Sets the maximum number of messages permitted in a batch.
+ * @param batchingMaxMessages the maximum number of messages in a batch
+ */
public void setBatchingMaxMessages(Integer batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
}
@@ -242,6 +305,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.batchingMaxBytes;
}
+ /**
+ * Sets the maximum number of bytes permitted in a batch.
+ * @param batchingMaxBytes the maximum number of bytes in a batch
+ */
public void setBatchingMaxBytes(Integer batchingMaxBytes) {
this.batchingMaxBytes = batchingMaxBytes;
}
@@ -251,6 +318,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.batchingEnabled;
}
+ /**
+ * Sets whether automatic batching of messages is enabled for the
producer.
+ * @param batchingEnabled true to enable batching
+ */
public void setBatchingEnabled(Boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}
@@ -260,6 +331,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.batcherBuilder;
}
+ /**
+ * Sets the batcher builder of the producer.
+ * @param batcherBuilder the batcher builder
+ */
public void setBatcherBuilder(BatcherBuilder batcherBuilder) {
this.batcherBuilder = batcherBuilder;
}
@@ -269,6 +344,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.chunkingEnabled;
}
+ /**
+ * Sets whether chunking of messages is enabled.
+ * @param chunkingEnabled true to enable message chunking
+ */
public void setChunkingEnabled(Boolean chunkingEnabled) {
this.chunkingEnabled = chunkingEnabled;
}
@@ -278,6 +357,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.cryptoKeyReader;
}
+ /**
+ * Sets the key reader to be used to encrypt the message payloads.
+ * @param cryptoKeyReader the key reader to be used to encrypt the
message payloads
+ */
public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}
@@ -287,6 +370,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.encryptionKeys;
}
+ /**
+ * Sets the public encryption key names, used by producer to encrypt
the data key.
+ * @param encryptionKeys the public encryption key names
+ */
public void setEncryptionKeys(Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
}
@@ -296,6 +383,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.compressionType;
}
+ /**
+ * Sets the compression type for the producer.
+ * @param compressionType the compression type
+ */
public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}
@@ -305,6 +396,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.initialSequenceId;
}
+ /**
+ * Sets the baseline for the sequence ids for messages published by the
producer.
+ * @param initialSequenceId the initial sequence id
+ */
public void setInitialSequenceId(Long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
}
@@ -314,6 +409,12 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.autoUpdatePartitions;
}
+ /**
+ * Sets whether partitioned producer will automatically discover new
partitions at
+ * runtime.
+ * @param autoUpdatePartitions true to enable auto discovery of the
partition
+ * configuration changes
+ */
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
@@ -323,6 +424,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.autoUpdatePartitionsInterval;
}
+ /**
+ * Sets the interval of partitions updates if autoUpdatePartitions is
enabled.
+ * @param autoUpdatePartitionsInterval the interval of partitions
updates
+ */
public void setAutoUpdatePartitionsInterval(Duration
autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval =
autoUpdatePartitionsInterval;
}
@@ -332,6 +437,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.multiSchema;
}
+ /**
+ * Sets whether the multiple schema mode is enabled.
+ * @param multiSchema true to enable the multiple schema mode
+ */
public void setMultiSchema(Boolean multiSchema) {
this.multiSchema = multiSchema;
}
@@ -341,6 +450,10 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.accessMode;
}
+ /**
+ * Sets the type of access mode that the producer requires on the topic.
+ * @param accessMode the access mode
+ */
public void setAccessMode(ProducerAccessMode accessMode) {
this.accessMode = accessMode;
}
@@ -350,6 +463,12 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.lazyStartPartitionedProducers;
}
+ /**
+ * Sets whether producers register and connect immediately to the owner
broker of each
+ * partition or start lazily on demand.
+ * @param lazyStartPartitionedProducers true to enable lazy starting of
partitioned
+ * producers
+ */
public void setLazyStartPartitionedProducers(Boolean
lazyStartPartitionedProducers) {
this.lazyStartPartitionedProducers =
lazyStartPartitionedProducers;
}
@@ -359,10 +478,18 @@ public class MutableReactiveMessageSenderSpec implements
ReactiveMessageSenderSp
return this.properties;
}
+ /**
+ * Sets the properties of the producer.
+ * @param properties the properties of the producer
+ */
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
+ /**
+ * Updates this spec from the defined values of another spec.
+ * @param senderSpec the spec from which to update
+ */
public void applySpec(ReactiveMessageSenderSpec senderSpec) {
if (senderSpec.getTopicName() != null) {
setTopicName(senderSpec.getTopicName());
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderSpec.java
index 0ee1715..41dcd1b 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderSpec.java
@@ -97,8 +97,8 @@ public interface ReactiveMessageSenderSpec {
ProducerCryptoFailureAction getCryptoFailureAction();
/**
- * Gets the action the producer will take in case of encryption
failures.
- * @return the action the producer will take in case of encryption
failures
+ * Gets the custom message router.
+ * @return the message router
* @see ProducerBuilder#messageRouter
*/
MessageRouter getMessageRouter();
@@ -127,7 +127,7 @@ public interface ReactiveMessageSenderSpec {
/**
* Gets the maximum number of bytes permitted in a batch.
- * @return the maximum number of messages in a batch
+ * @return the maximum bytes of messages in a batch
* @see ProducerBuilder#batchingMaxBytes
*/
Integer getBatchingMaxBytes();