This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 145ac23  feat: Support consumer replicateSubscriptionState config (#52)
145ac23 is described below

commit 145ac23da6e7aa2dfad036d425469f6188530b47
Author: tison <[email protected]>
AuthorDate: Mon Aug 7 14:50:19 2023 +0800

    feat: Support consumer replicateSubscriptionState config (#52)
    
    Signed-off-by: tison <[email protected]>
---
 .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java       | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 40858b1..90a80db 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -35,6 +35,7 @@ public class PulsarConsumerKafkaConfig {
     public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = 
"pulsar.consumer.acknowledgments.group.time.millis";
     public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = 
"pulsar.consumer.total.receiver.queue.size.across.partitions";
     public static final String SUBSCRIPTION_TOPICS_MODE = 
"pulsar.consumer.subscription.topics.mode";
+    public static final String REPLICATE_SUBSCRIPTION_STATE = 
"pulsar.consumer.replicate.subscription.state";
 
     public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient 
client, Properties properties) {
         ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
@@ -57,6 +58,11 @@ public class PulsarConsumerKafkaConfig {
                     
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), 
TimeUnit.MILLISECONDS);
         }
 
+        if (properties.containsKey(REPLICATE_SUBSCRIPTION_STATE)) {
+            consumerBuilder.replicateSubscriptionState(
+                    
Boolean.parseBoolean(properties.getProperty(REPLICATE_SUBSCRIPTION_STATE)));
+        }
+
         if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
             RegexSubscriptionMode mode;
             try {

Reply via email to