This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new bd608f1  Add javadoc to MutableReactiveMessageConsumerSpec (#52)
bd608f1 is described below

commit bd608f197e25c2929a9b5ba729620ae98c5cb9f8
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Nov 29 10:54:04 2022 +0100

    Add javadoc to MutableReactiveMessageConsumerSpec (#52)
---
 .../api/MutableReactiveMessageConsumerSpec.java    | 161 +++++++++++++++++++++
 .../client/api/ReactiveMessageConsumerSpec.java    |   6 +-
 2 files changed, 164 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
index 3deb7fe..51e3c91 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
@@ -37,6 +37,7 @@ import reactor.core.scheduler.Scheduler;
  * Mutable spec for a {@link ReactiveMessageConsumer}.
  *
  * @author Lari Hotari
+ * @author Christophe Bornet
  */
 public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsumerSpec {
 
@@ -106,10 +107,18 @@ public class MutableReactiveMessageConsumerSpec 
implements ReactiveMessageConsum
 
        private Duration expireTimeOfIncompleteChunkedMessage;
 
+       /**
+        * Constructs a default MutableReactiveMessageConsumerSpec.
+        */
        public MutableReactiveMessageConsumerSpec() {
 
        }
 
+       /**
+        * Constructs a MutableReactiveMessageConsumerSpec from another
+        * {@link ReactiveMessageConsumerSpec}.
+        * @param consumerSpec the spec to construct from
+        */
        public MutableReactiveMessageConsumerSpec(ReactiveMessageConsumerSpec 
consumerSpec) {
                this.topicNames = (consumerSpec.getTopicNames() != null && 
!consumerSpec.getTopicNames().isEmpty())
                                ? new ArrayList<>(consumerSpec.getTopicNames()) 
: new ArrayList<>();
@@ -185,6 +194,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.topicNames;
        }
 
+       /**
+        * Sets the topics to subscribe to.
+        * @param topicNames the topics to subscribe to.
+        */
        public void setTopicNames(List<String> topicNames) {
                this.topicNames = topicNames;
        }
@@ -194,6 +207,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.topicsPattern;
        }
 
+       /**
+        * Sets the topics pattern of the topics to subscribe to.
+        * @param topicsPattern the topics pattern of the topics to subscribe 
to.
+        */
        public void setTopicsPattern(Pattern topicsPattern) {
                this.topicsPattern = topicsPattern;
        }
@@ -203,6 +220,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.topicsPatternSubscriptionMode;
        }
 
+       /**
+        * Sets the type of topics to subscribe to when using a topic pattern - 
Persistent,
+        * Non-Persistent, or both.
+        * @param topicsPatternSubscriptionMode type of topics to subscribe to
+        */
        public void setTopicsPatternSubscriptionMode(RegexSubscriptionMode 
topicsPatternSubscriptionMode) {
                this.topicsPatternSubscriptionMode = 
topicsPatternSubscriptionMode;
        }
@@ -212,6 +234,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.topicsPatternAutoDiscoveryPeriod;
        }
 
+       /**
+        * Sets the topics auto discovery period when using a topic pattern.
+        * @param topicsPatternAutoDiscoveryPeriod the topics auto discovery 
period
+        */
        public void setTopicsPatternAutoDiscoveryPeriod(Duration 
topicsPatternAutoDiscoveryPeriod) {
                this.topicsPatternAutoDiscoveryPeriod = 
topicsPatternAutoDiscoveryPeriod;
        }
@@ -221,6 +247,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.subscriptionName;
        }
 
+       /**
+        * Sets the subscription name.
+        * @param subscriptionName the subscription name
+        */
        public void setSubscriptionName(String subscriptionName) {
                this.subscriptionName = subscriptionName;
        }
@@ -230,6 +260,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.subscriptionMode;
        }
 
+       /**
+        * Sets the subscription mode.
+        * @param subscriptionMode the subscription mode
+        */
        public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
                this.subscriptionMode = subscriptionMode;
        }
@@ -239,6 +273,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.subscriptionType;
        }
 
+       /**
+        * Sets the subscription type.
+        * @param subscriptionType the subscription type
+        */
        public void setSubscriptionType(SubscriptionType subscriptionType) {
                this.subscriptionType = subscriptionType;
        }
@@ -248,6 +286,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.subscriptionInitialPosition;
        }
 
+       /**
+        * Sets the subscription initial position.
+        * @param subscriptionInitialPosition the subscription initial position
+        */
        public void setSubscriptionInitialPosition(SubscriptionInitialPosition 
subscriptionInitialPosition) {
                this.subscriptionInitialPosition = subscriptionInitialPosition;
        }
@@ -257,6 +299,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.keySharedPolicy;
        }
 
+       /**
+        * Sets the policy used for {@link SubscriptionType#Key_Shared} 
subscriptions.
+        * @param keySharedPolicy the key-shared policy to use
+        */
        public void setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
                this.keySharedPolicy = keySharedPolicy;
        }
@@ -266,6 +312,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.replicateSubscriptionState;
        }
 
+       /**
+        * Sets whether the subscription shall be replicated.
+        * @param replicateSubscriptionState true to replicate the subscription
+        */
        public void setReplicateSubscriptionState(Boolean 
replicateSubscriptionState) {
                this.replicateSubscriptionState = replicateSubscriptionState;
        }
@@ -275,6 +325,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.subscriptionProperties;
        }
 
+       /**
+        * Sets the properties for the subscription.
+        * @param subscriptionProperties the subscription properties
+        */
        public void setSubscriptionProperties(Map<String, String> 
subscriptionProperties) {
                this.subscriptionProperties = subscriptionProperties;
        }
@@ -284,6 +338,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.consumerName;
        }
 
+       /**
+        * Sets the consumer name.
+        * @param consumerName the consumer name
+        */
        public void setConsumerName(String consumerName) {
                this.consumerName = consumerName;
        }
@@ -293,6 +351,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.properties;
        }
 
+       /**
+        * Sets the consumer properties.
+        * @param properties the consumer properties
+        */
        public void setProperties(Map<String, String> properties) {
                this.properties = properties;
        }
@@ -302,6 +364,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.priorityLevel;
        }
 
+       /**
+        * Sets the priority level for the consumer to which a broker gives 
more priority
+        * while dispatching messages.
+        * @param priorityLevel the priority level of the consumer
+        */
        public void setPriorityLevel(Integer priorityLevel) {
                this.priorityLevel = priorityLevel;
        }
@@ -311,6 +378,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.readCompacted;
        }
 
+       /**
+        * Sets whether to read messages from the compacted topic rather than 
reading the full
+        * message backlog of the topic.
+        * @param readCompacted true to read messages from the compacted topic
+        */
        public void setReadCompacted(Boolean readCompacted) {
                this.readCompacted = readCompacted;
        }
@@ -320,6 +392,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.batchIndexAckEnabled;
        }
 
+       /**
+        * Sets whether batch index acknowledgement is enabled.
+        * @param batchIndexAckEnabled true to enable batch index 
acknowledgement
+        */
        public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
                this.batchIndexAckEnabled = batchIndexAckEnabled;
        }
@@ -329,6 +405,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.ackTimeout;
        }
 
+       /**
+        * Sets the timeout duration for unacknowledged messages.
+        * @param ackTimeout the timeout duration for unacknowledged messages
+        */
        public void setAckTimeout(Duration ackTimeout) {
                this.ackTimeout = ackTimeout;
        }
@@ -338,6 +418,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.ackTimeoutTickTime;
        }
 
+       /**
+        * Sets the tick time of the ack-timeout redelivery.
+        * @param ackTimeoutTickTime the tick time of the ack-timeout redelivery
+        */
        public void setAckTimeoutTickTime(Duration ackTimeoutTickTime) {
                this.ackTimeoutTickTime = ackTimeoutTickTime;
        }
@@ -347,6 +431,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.acknowledgementsGroupTime;
        }
 
+       /**
+        * Sets the duration used to group acknowledgements.
+        * @param acknowledgementsGroupTime the duration used to group 
acknowledgements
+        */
        public void setAcknowledgementsGroupTime(Duration 
acknowledgementsGroupTime) {
                this.acknowledgementsGroupTime = acknowledgementsGroupTime;
        }
@@ -356,6 +444,13 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.acknowledgeAsynchronously;
        }
 
+       /**
+        * Sets whether to ignore the acknowledge operation completion and make 
it
+        * asynchronous from the message consuming processing to improve 
performance by
+        * allowing the acknowledges and message processing to interleave.
+        * @param acknowledgeAsynchronously true to ignore the acknowledge 
operation
+        * completion
+        */
        public void setAcknowledgeAsynchronously(Boolean 
acknowledgeAsynchronously) {
                this.acknowledgeAsynchronously = acknowledgeAsynchronously;
        }
@@ -365,6 +460,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.acknowledgeScheduler;
        }
 
+       /**
+        * Sets the scheduler to use to handle acknowledgements.
+        * @param acknowledgeScheduler the scheduler to use to handle 
acknowledgements
+        */
        public void setAcknowledgeScheduler(Scheduler acknowledgeScheduler) {
                this.acknowledgeScheduler = acknowledgeScheduler;
        }
@@ -374,6 +473,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.negativeAckRedeliveryDelay;
        }
 
+       /**
+        * Sets the delay to wait before re-delivering messages that have 
failed to be
+        * processed.
+        * @param negativeAckRedeliveryDelay the redelivery delay for failed 
messages
+        */
        public void setNegativeAckRedeliveryDelay(Duration 
negativeAckRedeliveryDelay) {
                this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
        }
@@ -383,6 +487,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.deadLetterPolicy;
        }
 
+       /**
+        * Sets the dead letter policy for the consumer.
+        * @param deadLetterPolicy the dead letter policy
+        */
        public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
                this.deadLetterPolicy = deadLetterPolicy;
        }
@@ -392,6 +500,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.retryLetterTopicEnable;
        }
 
+       /**
+        * Sets whether the retries are enabled.
+        * @param retryLetterTopicEnable true to enable retries
+        */
        public void setRetryLetterTopicEnable(Boolean retryLetterTopicEnable) {
                this.retryLetterTopicEnable = retryLetterTopicEnable;
        }
@@ -401,6 +513,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.receiverQueueSize;
        }
 
+       /**
+        * Sets the size of the consumer receive queue.
+        * @param receiverQueueSize the size of the consumer receive queue
+        */
        public void setReceiverQueueSize(Integer receiverQueueSize) {
                this.receiverQueueSize = receiverQueueSize;
        }
@@ -410,6 +526,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.maxTotalReceiverQueueSizeAcrossPartitions;
        }
 
+       /**
+        * Sets the maximum total receiver queue size across partitions.
+        * @param maxTotalReceiverQueueSizeAcrossPartitions the maximum total 
receiver queue
+        * size across partitions
+        */
        public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer 
maxTotalReceiverQueueSizeAcrossPartitions) {
                this.maxTotalReceiverQueueSizeAcrossPartitions = 
maxTotalReceiverQueueSizeAcrossPartitions;
        }
@@ -419,6 +540,12 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.autoUpdatePartitions;
        }
 
+       /**
+        * Sets whether the consumer shall subscribe automatically to new 
partitions of
+        * partitioned topics.
+        * @param autoUpdatePartitions true for the consumer to subscribe 
automatically to new
+        * partitions
+        */
        public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
                this.autoUpdatePartitions = autoUpdatePartitions;
        }
@@ -428,6 +555,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.autoUpdatePartitionsInterval;
        }
 
+       /**
+        * Sets the interval of updating partitions when autoUpdatePartitions 
is enabled.
+        * @param autoUpdatePartitionsInterval the interval between partitions 
updates
+        */
        public void setAutoUpdatePartitionsInterval(Duration 
autoUpdatePartitionsInterval) {
                this.autoUpdatePartitionsInterval = 
autoUpdatePartitionsInterval;
        }
@@ -437,6 +568,10 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.cryptoKeyReader;
        }
 
+       /**
+        * Sets the key reader to be used to decrypt the message payloads.
+        * @param cryptoKeyReader the key reader to be used to decrypt the 
message payloads
+        */
        public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
                this.cryptoKeyReader = cryptoKeyReader;
        }
@@ -446,6 +581,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.cryptoFailureAction;
        }
 
+       /**
+        * Sets the action the consumer will take in case of decryption 
failures.
+        * @param cryptoFailureAction the action the consumer will take in case 
of decryption
+        * failures
+        */
        public void setCryptoFailureAction(ConsumerCryptoFailureAction 
cryptoFailureAction) {
                this.cryptoFailureAction = cryptoFailureAction;
        }
@@ -455,6 +595,11 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.maxPendingChunkedMessage;
        }
 
+       /**
+        * Sets the maximum number of messages in the queue holding pending 
chunked messages.
+        * @param maxPendingChunkedMessage the maximum number of messages in 
the queue holding
+        * pending chunked messages
+        */
        public void setMaxPendingChunkedMessage(Integer 
maxPendingChunkedMessage) {
                this.maxPendingChunkedMessage = maxPendingChunkedMessage;
        }
@@ -464,6 +609,12 @@ public class MutableReactiveMessageConsumerSpec implements 
ReactiveMessageConsum
                return this.autoAckOldestChunkedMessageOnQueueFull;
        }
 
+       /**
+        * Sets whether to automatically acknowledge pending chunked messages 
when
+        * maxPendingChunkedMessage is reached.
+        * @param autoAckOldestChunkedMessageOnQueueFull true to acknowledge 
the messages,
+        * false to have them redelivered
+        */
        public void setAutoAckOldestChunkedMessageOnQueueFull(Boolean 
autoAckOldestChunkedMessageOnQueueFull) {
                this.autoAckOldestChunkedMessageOnQueueFull = 
autoAckOldestChunkedMessageOnQueueFull;
        }
@@ -473,10 +624,20 @@ public class MutableReactiveMessageConsumerSpec 
implements ReactiveMessageConsum
                return this.expireTimeOfIncompleteChunkedMessage;
        }
 
+       /**
+        * Sets the time interval to expire incomplete chunks if a consumer 
fails to receive
+        * all the chunks.
+        * @param expireTimeOfIncompleteChunkedMessage the time interval to 
expire incomplete
+        * chunks.
+        */
        public void setExpireTimeOfIncompleteChunkedMessage(Duration 
expireTimeOfIncompleteChunkedMessage) {
                this.expireTimeOfIncompleteChunkedMessage = 
expireTimeOfIncompleteChunkedMessage;
        }
 
+       /**
+        * Updates this spec from the defined values of another spec.
+        * @param consumerSpec the spec from which to update
+        */
        public void applySpec(ReactiveMessageConsumerSpec consumerSpec) {
                if (consumerSpec.getTopicNames() != null && 
!consumerSpec.getTopicNames().isEmpty()) {
                        setTopicNames(new 
ArrayList<>(consumerSpec.getTopicNames()));
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
index 186649a..11fc799 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
@@ -220,8 +220,8 @@ public interface ReactiveMessageConsumerSpec {
        Integer getReceiverQueueSize();
 
        /**
-        * Gets the max total receiver queue size across partitons.
-        * @return the max total receiver queue size across partitons
+        * Gets the max total receiver queue size across partitions.
+        * @return the max total receiver queue size across partitions
         * @see ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions
         */
        Integer getMaxTotalReceiverQueueSizeAcrossPartitions();
@@ -236,7 +236,7 @@ public interface ReactiveMessageConsumerSpec {
 
        /**
         * Gets the interval of updating partitions when autoUpdatePartitions 
is enabled.
-        * @return true if the consumer subscribes automatically to new 
partitions
+        * @return the interval between partitions updates
         * @see ConsumerBuilder#autoUpdatePartitionsInterval
         * @see ConsumerBuilder#autoUpdatePartitions
         */

Reply via email to