This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new bd608f1 Add javadoc to MutableReactiveMessageConsumerSpec (#52)
bd608f1 is described below
commit bd608f197e25c2929a9b5ba729620ae98c5cb9f8
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue Nov 29 10:54:04 2022 +0100
Add javadoc to MutableReactiveMessageConsumerSpec (#52)
---
.../api/MutableReactiveMessageConsumerSpec.java | 161 +++++++++++++++++++++
.../client/api/ReactiveMessageConsumerSpec.java | 6 +-
2 files changed, 164 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
index 3deb7fe..51e3c91 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
@@ -37,6 +37,7 @@ import reactor.core.scheduler.Scheduler;
* Mutable spec for a {@link ReactiveMessageConsumer}.
*
* @author Lari Hotari
+ * @author Christophe Bornet
*/
public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsumerSpec {
@@ -106,10 +107,18 @@ public class MutableReactiveMessageConsumerSpec
implements ReactiveMessageConsum
private Duration expireTimeOfIncompleteChunkedMessage;
+ /**
+ * Constructs a default MutableReactiveMessageConsumerSpec.
+ */
public MutableReactiveMessageConsumerSpec() {
}
+ /**
+ * Constructs a MutableReactiveMessageConsumerSpec from another
+ * {@link ReactiveMessageConsumerSpec}.
+ * @param consumerSpec the spec to construct from
+ */
public MutableReactiveMessageConsumerSpec(ReactiveMessageConsumerSpec
consumerSpec) {
this.topicNames = (consumerSpec.getTopicNames() != null &&
!consumerSpec.getTopicNames().isEmpty())
? new ArrayList<>(consumerSpec.getTopicNames())
: new ArrayList<>();
@@ -185,6 +194,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.topicNames;
}
+ /**
+ * Sets the topics to subscribe to.
+ * @param topicNames the topics to subscribe to.
+ */
public void setTopicNames(List<String> topicNames) {
this.topicNames = topicNames;
}
@@ -194,6 +207,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.topicsPattern;
}
+ /**
+ * Sets the topics pattern of the topics to subscribe to.
+ * @param topicsPattern the topics pattern of the topics to subscribe
to.
+ */
public void setTopicsPattern(Pattern topicsPattern) {
this.topicsPattern = topicsPattern;
}
@@ -203,6 +220,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.topicsPatternSubscriptionMode;
}
+ /**
+ * Sets the type of topics to subscribe to when using a topic pattern -
Persistent,
+ * Non-Persistent, or both.
+ * @param topicsPatternSubscriptionMode type of topics to subscribe to
+ */
public void setTopicsPatternSubscriptionMode(RegexSubscriptionMode
topicsPatternSubscriptionMode) {
this.topicsPatternSubscriptionMode =
topicsPatternSubscriptionMode;
}
@@ -212,6 +234,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.topicsPatternAutoDiscoveryPeriod;
}
+ /**
+ * Sets the topics auto discovery period when using a topic pattern.
+ * @param topicsPatternAutoDiscoveryPeriod the topics auto discovery
period
+ */
public void setTopicsPatternAutoDiscoveryPeriod(Duration
topicsPatternAutoDiscoveryPeriod) {
this.topicsPatternAutoDiscoveryPeriod =
topicsPatternAutoDiscoveryPeriod;
}
@@ -221,6 +247,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.subscriptionName;
}
+ /**
+ * Sets the subscription name.
+ * @param subscriptionName the subscription name
+ */
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
@@ -230,6 +260,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.subscriptionMode;
}
+ /**
+ * Sets the subscription mode.
+ * @param subscriptionMode the subscription mode
+ */
public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}
@@ -239,6 +273,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.subscriptionType;
}
+ /**
+ * Sets the subscription type.
+ * @param subscriptionType the subscription type
+ */
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
@@ -248,6 +286,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.subscriptionInitialPosition;
}
+ /**
+ * Sets the subscription initial position.
+ * @param subscriptionInitialPosition the subscription initial position
+ */
public void setSubscriptionInitialPosition(SubscriptionInitialPosition
subscriptionInitialPosition) {
this.subscriptionInitialPosition = subscriptionInitialPosition;
}
@@ -257,6 +299,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.keySharedPolicy;
}
+ /**
+ * Sets the policy used for {@link SubscriptionType#Key_Shared}
subscriptions.
+ * @param keySharedPolicy the key-shared policy to use
+ */
public void setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
this.keySharedPolicy = keySharedPolicy;
}
@@ -266,6 +312,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.replicateSubscriptionState;
}
+ /**
+ * Sets whether the subscription shall be replicated.
+ * @param replicateSubscriptionState true to replicate the subscription
+ */
public void setReplicateSubscriptionState(Boolean
replicateSubscriptionState) {
this.replicateSubscriptionState = replicateSubscriptionState;
}
@@ -275,6 +325,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.subscriptionProperties;
}
+ /**
+ * Sets the properties for the subscription.
+ * @param subscriptionProperties the subscription properties
+ */
public void setSubscriptionProperties(Map<String, String>
subscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;
}
@@ -284,6 +338,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.consumerName;
}
+ /**
+ * Sets the consumer name.
+ * @param consumerName the consumer name
+ */
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
@@ -293,6 +351,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.properties;
}
+ /**
+ * Sets the consumer properties.
+ * @param properties the consumer properties
+ */
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
@@ -302,6 +364,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.priorityLevel;
}
+ /**
+ * Sets the priority level for the consumer to which a broker gives
more priority
+ * while dispatching messages.
+ * @param priorityLevel the priority level of the consumer
+ */
public void setPriorityLevel(Integer priorityLevel) {
this.priorityLevel = priorityLevel;
}
@@ -311,6 +378,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.readCompacted;
}
+ /**
+ * Sets whether to read messages from the compacted topic rather than
reading the full
+ * message backlog of the topic.
+ * @param readCompacted true to read messages from the compacted topic
+ */
public void setReadCompacted(Boolean readCompacted) {
this.readCompacted = readCompacted;
}
@@ -320,6 +392,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.batchIndexAckEnabled;
}
+ /**
+ * Sets whether batch index acknowledgement is enabled.
+ * @param batchIndexAckEnabled true to enable batch index
acknowledgement
+ */
public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
this.batchIndexAckEnabled = batchIndexAckEnabled;
}
@@ -329,6 +405,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.ackTimeout;
}
+ /**
+ * Sets the timeout duration for unacknowledged messages.
+ * @param ackTimeout the timeout duration for unacknowledged messages
+ */
public void setAckTimeout(Duration ackTimeout) {
this.ackTimeout = ackTimeout;
}
@@ -338,6 +418,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.ackTimeoutTickTime;
}
+ /**
+ * Sets the tick time of the ack-timeout redelivery.
+ * @param ackTimeoutTickTime the tick time of the ack-timeout redelivery
+ */
public void setAckTimeoutTickTime(Duration ackTimeoutTickTime) {
this.ackTimeoutTickTime = ackTimeoutTickTime;
}
@@ -347,6 +431,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.acknowledgementsGroupTime;
}
+ /**
+ * Sets the duration used to group acknowledgements.
+ * @param acknowledgementsGroupTime the duration used to group
acknowledgements
+ */
public void setAcknowledgementsGroupTime(Duration
acknowledgementsGroupTime) {
this.acknowledgementsGroupTime = acknowledgementsGroupTime;
}
@@ -356,6 +444,13 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.acknowledgeAsynchronously;
}
+ /**
+ * Sets whether to ignore the acknowledge operation completion and make
it
+ * asynchronous from the message consuming processing to improve
performance by
+ * allowing the acknowledges and message processing to interleave.
+ * @param acknowledgeAsynchronously true to ignore the acknowledge
operation
+ * completion
+ */
public void setAcknowledgeAsynchronously(Boolean
acknowledgeAsynchronously) {
this.acknowledgeAsynchronously = acknowledgeAsynchronously;
}
@@ -365,6 +460,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.acknowledgeScheduler;
}
+ /**
+ * Sets the scheduler to use to handle acknowledgements.
+ * @param acknowledgeScheduler the scheduler to use to handle
acknowledgements
+ */
public void setAcknowledgeScheduler(Scheduler acknowledgeScheduler) {
this.acknowledgeScheduler = acknowledgeScheduler;
}
@@ -374,6 +473,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.negativeAckRedeliveryDelay;
}
+ /**
+ * Sets the delay to wait before re-delivering messages that have
failed to be
+ * processed.
+ * @param negativeAckRedeliveryDelay the redelivery delay for failed
messages
+ */
public void setNegativeAckRedeliveryDelay(Duration
negativeAckRedeliveryDelay) {
this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
}
@@ -383,6 +487,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.deadLetterPolicy;
}
+ /**
+ * Sets the dead letter policy for the consumer.
+ * @param deadLetterPolicy the dead letter policy
+ */
public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}
@@ -392,6 +500,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.retryLetterTopicEnable;
}
+ /**
+ * Sets whether the retries are enabled.
+ * @param retryLetterTopicEnable true to enable retries
+ */
public void setRetryLetterTopicEnable(Boolean retryLetterTopicEnable) {
this.retryLetterTopicEnable = retryLetterTopicEnable;
}
@@ -401,6 +513,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.receiverQueueSize;
}
+ /**
+ * Sets the size of the consumer receive queue.
+ * @param receiverQueueSize the size of the consumer receive queue
+ */
public void setReceiverQueueSize(Integer receiverQueueSize) {
this.receiverQueueSize = receiverQueueSize;
}
@@ -410,6 +526,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.maxTotalReceiverQueueSizeAcrossPartitions;
}
+ /**
+ * Sets the maximum total receiver queue size across partitions.
+ * @param maxTotalReceiverQueueSizeAcrossPartitions the maximum total
receiver queue
+ * size across partitions
+ */
public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer
maxTotalReceiverQueueSizeAcrossPartitions) {
this.maxTotalReceiverQueueSizeAcrossPartitions =
maxTotalReceiverQueueSizeAcrossPartitions;
}
@@ -419,6 +540,12 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.autoUpdatePartitions;
}
+ /**
+ * Sets whether the consumer shall subscribe automatically to new
partitions of
+ * partitioned topics.
+ * @param autoUpdatePartitions true for the consumer to subscribe
automatically to new
+ * partitions
+ */
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
@@ -428,6 +555,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.autoUpdatePartitionsInterval;
}
+ /**
+ * Sets the interval of updating partitions when autoUpdatePartitions
is enabled.
+ * @param autoUpdatePartitionsInterval the interval between partitions
updates
+ */
public void setAutoUpdatePartitionsInterval(Duration
autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval =
autoUpdatePartitionsInterval;
}
@@ -437,6 +568,10 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.cryptoKeyReader;
}
+ /**
+ * Sets the key reader to be used to decrypt the message payloads.
+ * @param cryptoKeyReader the key reader to be used to decrypt the
message payloads
+ */
public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}
@@ -446,6 +581,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.cryptoFailureAction;
}
+ /**
+ * Sets the action the consumer will take in case of decryption
failures.
+ * @param cryptoFailureAction the action the consumer will take in case
of decryption
+ * failures
+ */
public void setCryptoFailureAction(ConsumerCryptoFailureAction
cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
@@ -455,6 +595,11 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.maxPendingChunkedMessage;
}
+ /**
+ * Sets the maximum number of messages in the queue holding pending
chunked messages.
+ * @param maxPendingChunkedMessage the maximum number of messages in
the queue holding
+ * pending chunked messages
+ */
public void setMaxPendingChunkedMessage(Integer
maxPendingChunkedMessage) {
this.maxPendingChunkedMessage = maxPendingChunkedMessage;
}
@@ -464,6 +609,12 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
return this.autoAckOldestChunkedMessageOnQueueFull;
}
+ /**
+ * Sets whether to automatically acknowledge pending chunked messages
when
+ * maxPendingChunkedMessage is reached.
+ * @param autoAckOldestChunkedMessageOnQueueFull true to acknowledge
the messages,
+ * false to have them redelivered
+ */
public void setAutoAckOldestChunkedMessageOnQueueFull(Boolean
autoAckOldestChunkedMessageOnQueueFull) {
this.autoAckOldestChunkedMessageOnQueueFull =
autoAckOldestChunkedMessageOnQueueFull;
}
@@ -473,10 +624,20 @@ public class MutableReactiveMessageConsumerSpec
implements ReactiveMessageConsum
return this.expireTimeOfIncompleteChunkedMessage;
}
+ /**
+ * Sets the time interval to expire incomplete chunks if a consumer
fails to receive
+ * all the chunks.
+ * @param expireTimeOfIncompleteChunkedMessage the time interval to
expire incomplete
+ * chunks.
+ */
public void setExpireTimeOfIncompleteChunkedMessage(Duration
expireTimeOfIncompleteChunkedMessage) {
this.expireTimeOfIncompleteChunkedMessage =
expireTimeOfIncompleteChunkedMessage;
}
+ /**
+ * Updates this spec from the defined values of another spec.
+ * @param consumerSpec the spec from which to update
+ */
public void applySpec(ReactiveMessageConsumerSpec consumerSpec) {
if (consumerSpec.getTopicNames() != null &&
!consumerSpec.getTopicNames().isEmpty()) {
setTopicNames(new
ArrayList<>(consumerSpec.getTopicNames()));
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
index 186649a..11fc799 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
@@ -220,8 +220,8 @@ public interface ReactiveMessageConsumerSpec {
Integer getReceiverQueueSize();
/**
- * Gets the max total receiver queue size across partitons.
- * @return the max total receiver queue size across partitons
+ * Gets the max total receiver queue size across partitions.
+ * @return the max total receiver queue size across partitions
* @see ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions
*/
Integer getMaxTotalReceiverQueueSizeAcrossPartitions();
@@ -236,7 +236,7 @@ public interface ReactiveMessageConsumerSpec {
/**
* Gets the interval of updating partitions when autoUpdatePartitions
is enabled.
- * @return true if the consumer subscribes automatically to new
partitions
+ * @return the interval between partitions updates
* @see ConsumerBuilder#autoUpdatePartitionsInterval
* @see ConsumerBuilder#autoUpdatePartitions
*/