This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 569d3d2 Add
ReactiveMessageConsumerSpec::getSubscriptionInitialPosition (#19)
569d3d2 is described below
commit 569d3d236aec18a5973e25a4c8ba6a02dd54be45
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 9 10:50:06 2022 +0100
Add ReactiveMessageConsumerSpec::getSubscriptionInitialPosition (#19)
---
.../adapter/AdaptedReactiveMessageConsumer.java | 3 ++
.../api/ImmutableReactiveMessageConsumerSpec.java | 55 +++++++++++++++++++---
.../api/MutableReactiveMessageConsumerSpec.java | 17 +++++++
.../client/api/ReactiveMessageConsumerBuilder.java | 7 +++
.../client/api/ReactiveMessageConsumerSpec.java | 3 ++
.../ImmutableReactiveMessageConsumerSpecMixin.java | 2 +
.../jackson/PulsarReactiveClientModuleTest.java | 4 ++
7 files changed, 85 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
index 2a41ecd..587ed69 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java
@@ -129,6 +129,9 @@ class AdaptedReactiveMessageConsumer<T> implements
ReactiveMessageConsumer<T> {
if (this.consumerSpec.getSubscriptionType() != null) {
consumerBuilder.subscriptionType(this.consumerSpec.getSubscriptionType());
}
+ if (this.consumerSpec.getSubscriptionInitialPosition() != null)
{
+
consumerBuilder.subscriptionInitialPosition(this.consumerSpec.getSubscriptionInitialPosition());
+ }
if (this.consumerSpec.getKeySharedPolicy() != null) {
consumerBuilder.keySharedPolicy(this.consumerSpec.getKeySharedPolicy());
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
index 4f4878e..bdf681f 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
@@ -49,6 +50,8 @@ public class ImmutableReactiveMessageConsumerSpec implements
ReactiveMessageCons
private final SubscriptionType subscriptionType;
+ private final SubscriptionInitialPosition subscriptionInitialPosition;
+
private final KeySharedPolicy keySharedPolicy;
private final Boolean replicateSubscriptionState;
@@ -115,6 +118,8 @@ public class ImmutableReactiveMessageConsumerSpec
implements ReactiveMessageCons
this.subscriptionType = consumerSpec.getSubscriptionType();
+ this.subscriptionInitialPosition =
consumerSpec.getSubscriptionInitialPosition();
+
this.keySharedPolicy = consumerSpec.getKeySharedPolicy();
this.replicateSubscriptionState =
consumerSpec.getReplicateSubscriptionState();
@@ -171,12 +176,12 @@ public class ImmutableReactiveMessageConsumerSpec
implements ReactiveMessageCons
public ImmutableReactiveMessageConsumerSpec(List<String> topicNames,
Pattern topicsPattern,
RegexSubscriptionMode topicsPatternSubscriptionMode,
Duration topicsPatternAutoDiscoveryPeriod,
String subscriptionName, SubscriptionMode
subscriptionMode, SubscriptionType subscriptionType,
- KeySharedPolicy keySharedPolicy, Boolean
replicateSubscriptionState,
- Map<String, String> subscriptionProperties, String
consumerName, Map<String, String> properties,
- Integer priorityLevel, Boolean readCompacted, Boolean
batchIndexAckEnabled, Duration ackTimeout,
- Duration ackTimeoutTickTime, Duration
acknowledgementsGroupTime, Boolean acknowledgeAsynchronously,
- Scheduler acknowledgeScheduler, Duration
negativeAckRedeliveryDelay, DeadLetterPolicy deadLetterPolicy,
- Boolean retryLetterTopicEnable, Integer
receiverQueueSize,
+ SubscriptionInitialPosition
subscriptionInitialPosition, KeySharedPolicy keySharedPolicy,
+ Boolean replicateSubscriptionState, Map<String, String>
subscriptionProperties, String consumerName,
+ Map<String, String> properties, Integer priorityLevel,
Boolean readCompacted, Boolean batchIndexAckEnabled,
+ Duration ackTimeout, Duration ackTimeoutTickTime,
Duration acknowledgementsGroupTime,
+ Boolean acknowledgeAsynchronously, Scheduler
acknowledgeScheduler, Duration negativeAckRedeliveryDelay,
+ DeadLetterPolicy deadLetterPolicy, Boolean
retryLetterTopicEnable, Integer receiverQueueSize,
Integer maxTotalReceiverQueueSizeAcrossPartitions,
Boolean autoUpdatePartitions,
Duration autoUpdatePartitionsInterval, CryptoKeyReader
cryptoKeyReader,
ConsumerCryptoFailureAction cryptoFailureAction,
Integer maxPendingChunkedMessage,
@@ -188,6 +193,7 @@ public class ImmutableReactiveMessageConsumerSpec
implements ReactiveMessageCons
this.subscriptionName = subscriptionName;
this.subscriptionMode = subscriptionMode;
this.subscriptionType = subscriptionType;
+ this.subscriptionInitialPosition = subscriptionInitialPosition;
this.keySharedPolicy = keySharedPolicy;
this.replicateSubscriptionState = replicateSubscriptionState;
this.subscriptionProperties = subscriptionProperties;
@@ -215,130 +221,167 @@ public class ImmutableReactiveMessageConsumerSpec
implements ReactiveMessageCons
this.expireTimeOfIncompleteChunkedMessage =
expireTimeOfIncompleteChunkedMessage;
}
+ @Override
public List<String> getTopicNames() {
return this.topicNames;
}
+ @Override
public Pattern getTopicsPattern() {
return this.topicsPattern;
}
+ @Override
public RegexSubscriptionMode getTopicsPatternSubscriptionMode() {
return this.topicsPatternSubscriptionMode;
}
+ @Override
public Duration getTopicsPatternAutoDiscoveryPeriod() {
return this.topicsPatternAutoDiscoveryPeriod;
}
+ @Override
public String getSubscriptionName() {
return this.subscriptionName;
}
+ @Override
public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}
+ @Override
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
+ @Override
+ public SubscriptionInitialPosition getSubscriptionInitialPosition() {
+ return this.subscriptionInitialPosition;
+ }
+
+ @Override
public KeySharedPolicy getKeySharedPolicy() {
return this.keySharedPolicy;
}
+ @Override
public Boolean getReplicateSubscriptionState() {
return this.replicateSubscriptionState;
}
+ @Override
public Map<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}
+ @Override
public String getConsumerName() {
return this.consumerName;
}
+ @Override
public Map<String, String> getProperties() {
return this.properties;
}
+ @Override
public Integer getPriorityLevel() {
return this.priorityLevel;
}
+ @Override
public Boolean getReadCompacted() {
return this.readCompacted;
}
+ @Override
public Boolean getBatchIndexAckEnabled() {
return this.batchIndexAckEnabled;
}
+ @Override
public Duration getAckTimeout() {
return this.ackTimeout;
}
+ @Override
public Duration getAckTimeoutTickTime() {
return this.ackTimeoutTickTime;
}
+ @Override
public Duration getAcknowledgementsGroupTime() {
return this.acknowledgementsGroupTime;
}
+ @Override
public Boolean getAcknowledgeAsynchronously() {
return this.acknowledgeAsynchronously;
}
+ @Override
public Scheduler getAcknowledgeScheduler() {
return this.acknowledgeScheduler;
}
+ @Override
public Duration getNegativeAckRedeliveryDelay() {
return this.negativeAckRedeliveryDelay;
}
+ @Override
public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}
+ @Override
public Boolean getRetryLetterTopicEnable() {
return this.retryLetterTopicEnable;
}
+ @Override
public Integer getReceiverQueueSize() {
return this.receiverQueueSize;
}
+ @Override
public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() {
return this.maxTotalReceiverQueueSizeAcrossPartitions;
}
+ @Override
public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}
+ @Override
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}
+ @Override
public CryptoKeyReader getCryptoKeyReader() {
return this.cryptoKeyReader;
}
+ @Override
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
}
+ @Override
public Integer getMaxPendingChunkedMessage() {
return this.maxPendingChunkedMessage;
}
+ @Override
public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
return this.autoAckOldestChunkedMessageOnQueueFull;
}
+ @Override
public Duration getExpireTimeOfIncompleteChunkedMessage() {
return this.expireTimeOfIncompleteChunkedMessage;
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
index c96d371..2b28eb1 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/MutableReactiveMessageConsumerSpec.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
@@ -48,6 +49,8 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
private SubscriptionType subscriptionType;
+ private SubscriptionInitialPosition subscriptionInitialPosition;
+
private KeySharedPolicy keySharedPolicy;
private Boolean replicateSubscriptionState;
@@ -118,6 +121,8 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
this.subscriptionType = consumerSpec.getSubscriptionType();
+ this.subscriptionInitialPosition =
consumerSpec.getSubscriptionInitialPosition();
+
this.keySharedPolicy = consumerSpec.getKeySharedPolicy();
this.replicateSubscriptionState =
consumerSpec.getReplicateSubscriptionState();
@@ -233,6 +238,15 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
this.subscriptionType = subscriptionType;
}
+ @Override
+ public SubscriptionInitialPosition getSubscriptionInitialPosition() {
+ return this.subscriptionInitialPosition;
+ }
+
+ public void setSubscriptionInitialPosition(SubscriptionInitialPosition
subscriptionInitialPosition) {
+ this.subscriptionInitialPosition = subscriptionInitialPosition;
+ }
+
@Override
public KeySharedPolicy getKeySharedPolicy() {
return this.keySharedPolicy;
@@ -480,6 +494,9 @@ public class MutableReactiveMessageConsumerSpec implements
ReactiveMessageConsum
if (consumerSpec.getSubscriptionType() != null) {
setSubscriptionType(consumerSpec.getSubscriptionType());
}
+ if (consumerSpec.getSubscriptionInitialPosition() != null) {
+
setSubscriptionInitialPosition(consumerSpec.getSubscriptionInitialPosition());
+ }
if (consumerSpec.getKeySharedPolicy() != null) {
setKeySharedPolicy(consumerSpec.getKeySharedPolicy());
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
index b3cb76b..e09179c 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerBuilder.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
@@ -91,6 +92,12 @@ public interface ReactiveMessageConsumerBuilder<T> {
return this;
}
+ default ReactiveMessageConsumerBuilder<T> subscriptionInitialPosition(
+ SubscriptionInitialPosition
subscriptionInitialPosition) {
+
getMutableSpec().setSubscriptionInitialPosition(subscriptionInitialPosition);
+ return this;
+ }
+
default ReactiveMessageConsumerBuilder<T>
keySharedPolicy(KeySharedPolicy keySharedPolicy) {
getMutableSpec().setKeySharedPolicy(keySharedPolicy);
return this;
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
index 46a212b..ebc09b1 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageConsumerSpec.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
@@ -46,6 +47,8 @@ public interface ReactiveMessageConsumerSpec {
SubscriptionType getSubscriptionType();
+ SubscriptionInitialPosition getSubscriptionInitialPosition();
+
KeySharedPolicy getKeySharedPolicy();
Boolean getReplicateSubscriptionState();
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
index 878cca7..3ab1aee 100644
---
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
@@ -46,6 +47,7 @@ abstract class ImmutableReactiveMessageConsumerSpecMixin {
@JsonProperty("subscriptionName") String
subscriptionName,
@JsonProperty("subscriptionMode") SubscriptionMode
subscriptionMode,
@JsonProperty("subscriptionType") SubscriptionType
subscriptionType,
+ @JsonProperty("subscriptionInitialPosition")
SubscriptionInitialPosition subscriptionInitialPosition,
@JsonProperty("keySharedPolicy") KeySharedPolicy
keySharedPolicy,
@JsonProperty("replicateSubscriptionState") Boolean
replicateSubscriptionState,
@JsonProperty("subscriptionProperties") Map<String,
String> subscriptionProperties,
diff --git
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
index d36bea8..aa1f648 100644
---
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
+++
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.KeyBasedBatcherBuilder;
@@ -72,6 +73,7 @@ class PulsarReactiveClientModuleTest {
+ "'subscriptionName': 'my-sub',"
+ "'subscriptionMode': 'Durable',"
+ "'subscriptionType': 'Exclusive',"
+ + "'subscriptionInitialPosition': 'Latest',"
+ "'keySharedPolicy': 'STICKY',"
+ "'replicateSubscriptionState': true,"
+ "'subscriptionProperties': {'my-key':
'my-value'},"
@@ -117,6 +119,7 @@ class PulsarReactiveClientModuleTest {
assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable);
assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
+
assertThat(spec.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Latest);
assertThat(spec.getKeySharedPolicy()).isInstanceOf(KeySharedPolicy.KeySharedPolicySticky.class);
assertThat(spec.getReplicateSubscriptionState()).isTrue();
assertThat(spec.getSubscriptionProperties()).containsOnlyKeys("my-key");
@@ -162,6 +165,7 @@ class PulsarReactiveClientModuleTest {
+ " 'subscriptionName' : 'my-sub',\n"
+ " 'subscriptionMode' : 'Durable',\n"
+ " 'subscriptionType' : 'Exclusive',\n"
+ + " 'subscriptionInitialPosition' :
'Latest',\n"
+ " 'keySharedPolicy' : 'STICKY',\n"
+ " 'replicateSubscriptionState' : true,\n"
+ " 'subscriptionProperties' : {\n"