This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 145ac23 feat: Support consumer replicateSubscriptionState config (#52)
145ac23 is described below
commit 145ac23da6e7aa2dfad036d425469f6188530b47
Author: tison <[email protected]>
AuthorDate: Mon Aug 7 14:50:19 2023 +0800
feat: Support consumer replicateSubscriptionState config (#52)
Signed-off-by: tison <[email protected]>
---
.../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 40858b1..90a80db 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -35,6 +35,7 @@ public class PulsarConsumerKafkaConfig {
public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS =
"pulsar.consumer.acknowledgments.group.time.millis";
public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS =
"pulsar.consumer.total.receiver.queue.size.across.partitions";
public static final String SUBSCRIPTION_TOPICS_MODE =
"pulsar.consumer.subscription.topics.mode";
+ public static final String REPLICATE_SUBSCRIPTION_STATE =
"pulsar.consumer.replicate.subscription.state";
public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient
client, Properties properties) {
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
@@ -57,6 +58,11 @@ public class PulsarConsumerKafkaConfig {
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)),
TimeUnit.MILLISECONDS);
}
+ if (properties.containsKey(REPLICATE_SUBSCRIPTION_STATE)) {
+ consumerBuilder.replicateSubscriptionState(
+
Boolean.parseBoolean(properties.getProperty(REPLICATE_SUBSCRIPTION_STATE)));
+ }
+
if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
RegexSubscriptionMode mode;
try {