This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch issue-51
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git

commit 750b795d3896e686729483b9d936bb50042401d4
Author: tison <[email protected]>
AuthorDate: Tue Jul 18 15:52:35 2023 +0800

    Support consumer replicateSubscriptionState config
    
    Signed-off-by: tison <[email protected]>
---
 .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java       | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 40858b1..1af1ee2 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -35,6 +35,7 @@ public class PulsarConsumerKafkaConfig {
     public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = 
"pulsar.consumer.acknowledgments.group.time.millis";
     public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = 
"pulsar.consumer.total.receiver.queue.size.across.partitions";
     public static final String SUBSCRIPTION_TOPICS_MODE = 
"pulsar.consumer.subscription.topics.mode";
+    public static final String REPLICATE_SUBSCRIPTION_STATE = 
"pulsar.consumer.replicate.subscription.state";
 
     public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient 
client, Properties properties) {
         ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
@@ -57,6 +58,11 @@ public class PulsarConsumerKafkaConfig {
                     
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), 
TimeUnit.MILLISECONDS);
         }
 
+        if (properties.containsKey(REPLICATE_SUBSCRIPTION_STATE)) {
+            consumerBuilder.replicateSubscriptionState(
+                    
Boolean.parseBoolean(properties.getProperty(REPLICATE_SUBSCRIPTION_STATE), 
Boolean.FALSE));
+        }
+
         if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
             RegexSubscriptionMode mode;
             try {

Reply via email to