This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push: new 0372947 [pulsar-kafka] add auto update partition support to producer/consumer (#13) 0372947 is described below commit 0372947e326278a60915b0a647270ebc7ab33620 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu May 13 23:24:29 2021 -0700 [pulsar-kafka] add auto update partition support to producer/consumer (#13) --- .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java | 5 ++++- .../pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java | 4 ++++ .../org/apache/kafka/clients/producer/PulsarKafkaProducer.java | 9 ++++----- .../pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java | 5 ++++- .../pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java | 4 ++++ 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index a527827..09a9806 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -21,7 +21,7 @@ package org.apache.pulsar.client.kafka.compat; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit; - +import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -67,6 +67,9 @@ public class PulsarConsumerKafkaConfig { consumerBuilder.subscriptionTopicsMode(mode); } + if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { + consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); + } return consumerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java index 7554faf..3315cd2 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -34,6 +34,7 @@ public class PulsarProducerKafkaConfig { public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions"; public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled"; public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; + public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions"; /** * send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in * pending queue. @@ -66,6 +67,9 @@ public class PulsarProducerKafkaConfig { producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES))); } + if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { + producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); + } return producerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 7d6e146..48481a9 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -19,18 +19,14 @@ package org.apache.kafka.clients.producer; import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Serializable; import java.lang.reflect.Constructor; import java.util.Base64; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.SerializationUtils; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRouter; @@ -73,6 +69,7 @@ public class PulsarKafkaProducer<K, V> extends Producer<K, V> { public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages"; public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages"; public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms"; + public static String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions"; private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>(); @@ -134,7 +131,9 @@ public class PulsarKafkaProducer<K, V> extends Producer<K, V> { if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) { pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS); } - + if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { + pulsarProducerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); + } pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType); } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index a527827..09a9806 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -21,7 +21,7 @@ package org.apache.pulsar.client.kafka.compat; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit; - +import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -67,6 +67,9 @@ public class PulsarConsumerKafkaConfig { consumerBuilder.subscriptionTopicsMode(mode); } + if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { + consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); + } return consumerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java index 5a9a651..509df03 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -33,6 +33,7 @@ public class PulsarProducerKafkaConfig { public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions"; public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled"; public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; + public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions"; public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) { ProducerBuilder<byte[]> producerBuilder = client.newProducer(); @@ -60,6 +61,9 @@ public class PulsarProducerKafkaConfig { producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES))); } + if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) { + producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS))); + } return producerBuilder; } }