This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch issue-51 in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
commit 750b795d3896e686729483b9d936bb50042401d4 Author: tison <[email protected]> AuthorDate: Tue Jul 18 15:52:35 2023 +0800 Support consumer replicateSubscriptionState config Signed-off-by: tison <[email protected]> --- .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index 40858b1..1af1ee2 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -35,6 +35,7 @@ public class PulsarConsumerKafkaConfig { public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis"; public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions"; public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode"; + public static final String REPLICATE_SUBSCRIPTION_STATE = "pulsar.consumer.replicate.subscription.state"; public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) { ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer(); @@ -57,6 +58,11 @@ public class PulsarConsumerKafkaConfig { Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS); } + if (properties.containsKey(REPLICATE_SUBSCRIPTION_STATE)) { + consumerBuilder.replicateSubscriptionState( + Boolean.parseBoolean(properties.getProperty(REPLICATE_SUBSCRIPTION_STATE), Boolean.FALSE)); + } + if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) { RegexSubscriptionMode mode; try {
