This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 0adca97 Auto update partition for pulsar kafka producer (#28)
0adca97 is described below
commit 0adca97f98025ce60ea7504431711b01a9b080cd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Mar 31 10:19:19 2022 -0700
Auto update partition for pulsar kafka producer (#28)
---
.../clients/producer/PulsarKafkaProducer.java | 75 +++++++++++++++++-----
.../kafka/compat/PulsarProducerKafkaConfig.java | 1 +
.../clients/producer/PulsarKafkaProducerTest.java | 72 +++++++++++++++++++--
3 files changed, 126 insertions(+), 22 deletions(-)
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index fca5da8..80ca9fd 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -30,7 +30,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -44,26 +46,31 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
-import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+
public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
+ private static final Logger log =
LoggerFactory.getLogger(PulsarKafkaProducer.class);
private final PulsarClient client;
final ProducerBuilder<byte[]> pulsarProducerBuilder;
@@ -74,6 +81,9 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
+ @Getter
+ private final int autoUpdatePartitionDurationMs;
+ private final ScheduledExecutorService executor;
private List<ProducerInterceptor<K, V>> interceptors;
@@ -176,6 +186,10 @@ public class PulsarKafkaProducer<K, V> implements
Producer<K, V> {
interceptors = (List) producerConfig.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
+
+ autoUpdatePartitionDurationMs = Integer.parseInt(
+
properties.getProperty(PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS_REFRESH_DURATION,
"300000"));
+ executor = Executors.newSingleThreadScheduledExecutor();
}
@Override
@@ -272,14 +286,19 @@ public class PulsarKafkaProducer<K, V> implements
Producer<K, V> {
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
partitioner.close();
+ if (executor != null) {
+ executor.shutdown();
+ }
}
@Override
public void close(long timeout, TimeUnit unit) {
- try {
- client.closeAsync().get(timeout, unit);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- throw new RuntimeException(e);
+ if (client != null) {
+ try {
+ client.closeAsync().get(timeout, unit);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -290,23 +309,44 @@ public class PulsarKafkaProducer<K, V> implements
Producer<K, V> {
private org.apache.pulsar.client.api.Producer<byte[]>
createNewProducer(String topic) {
try {
+ Map<TopicPartition, PartitionInfo> partitionMap =
readPartitionsInfo(topic);
// Add the partitions info for the new topic
- synchronized (this){
- cluster = cluster.withPartitions(readPartitionsInfo(topic));
+ synchronized (this) {
+ cluster = cluster.withPartitions(partitionMap);
+ }
+ if (getPartitions(topic) > 1 && autoUpdatePartitionDurationMs > 0)
{
+ // allow pulsar client lib to update partition before updating
at kakfa clusters
+ scheduleRefreshPartitionMetadata(topic,
autoUpdatePartitionDurationMs * 2);
}
+
List<org.apache.pulsar.client.api.ProducerInterceptor>
wrappedInterceptors = interceptors.stream()
.map(interceptor -> new
KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
.collect(Collectors.toList());
- return pulsarProducerBuilder.clone()
- .topic(topic)
- .intercept(wrappedInterceptors.toArray(new
org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
+ return pulsarProducerBuilder.clone().topic(topic)
+ .intercept(wrappedInterceptors
+ .toArray(new
org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
+ .autoUpdatePartitionsInterval((int)
autoUpdatePartitionDurationMs, TimeUnit.MILLISECONDS)
.create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
- private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String
topic) {
+ @VisibleForTesting
+ public void scheduleRefreshPartitionMetadata(String topic, long
autoUpdatePartitionDurationMs) {
+ executor.scheduleAtFixedRate(() -> {
+ int partitions = getPartitions(topic);
+ Map<TopicPartition, PartitionInfo> partitionMap =
readPartitionsInfo(topic);
+ if (partitionMap.size() != partitions) {
+ synchronized (this) {
+ cluster = cluster.withPartitions(partitionMap);
+ }
+ log.info("Updated partitions {} -> {} for topic {}",
partitionMap.size(), partitions, topic);
+ }
+ }, autoUpdatePartitionDurationMs, autoUpdatePartitionDurationMs,
TimeUnit.MILLISECONDS);
+ }
+
+ public Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic)
{
List<String> partitions = client.getPartitionsForTopic(topic).join();
Map<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<>();
@@ -383,5 +423,10 @@ public class PulsarKafkaProducer<K, V> implements
Producer<K, V> {
return new RecordMetadata(tp, offset, 0L, publishTime, 0L, mb.hasKey()
? mb.getKey().length() : 0, size);
}
+ public int getPartitions(String topic) {
+ Integer partitions = cluster.partitionCountForTopic(topic);
+ return partitions != null ? partitions : 0;
+ }
+
private static final Logger logger =
LoggerFactory.getLogger(PulsarKafkaProducer.class);
}
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 276d777..77147d7 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -36,6 +36,7 @@ public class PulsarProducerKafkaConfig {
public static final String BATCHING_ENABLED =
"pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES =
"pulsar.producer.batching.max.messages";
public static final String AUTO_UPDATE_PARTITIONS =
"pulsar.auto.update.partitions";
+ public static final String AUTO_UPDATE_PARTITIONS_REFRESH_DURATION =
"pulsar.auto.update.partition.duration.ms";
public static final String CRYPTO_READER_FACTORY_CLASS_NAME =
"pulsar.crypto.reader.factory.class.name";
/**
* send operations will immediately fail with {@link
ProducerQueueIsFullError} when there is no space left in
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index bf6a6cf..90c2f95 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -21,12 +21,15 @@ package org.apache.kafka.clients.producer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,12 +39,10 @@ import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -54,6 +55,8 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -62,6 +65,12 @@ import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import com.google.api.client.util.Maps;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
@PrepareForTest({PulsarClientKafkaConfig.class,
PulsarProducerKafkaConfig.class})
@PowerMockIgnore({"org.apache.logging.log4j.*",
"org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
@@ -118,7 +127,8 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL,
Boolean.FALSE.toString());
- new PulsarKafkaProducer<>(properties);
+ PulsarKafkaProducer producer = new PulsarKafkaProducer<>(properties);
+ producer.close();
verify(mockClientBuilder, times(1)).keepAliveInterval(1000,
TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000,
TimeUnit.MILLISECONDS);
@@ -171,7 +181,7 @@ public class PulsarKafkaProducerTest {
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key",
"value"));
// Verify
- verify(mockProducerBuilder, times(1)).intercept(
+ verify(mockProducerBuilder, atLeastOnce()).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor)any());
}
@@ -193,6 +203,7 @@ public class PulsarKafkaProducerTest {
doReturn(mockClient).when(mockClientBuilder).build();
doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+
doReturn(mockProducerBuilder).when(mockProducerBuilder).autoUpdatePartitionsInterval(anyInt(),
any());
doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor) any());
@@ -245,7 +256,54 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,
Long.toString((Integer.MAX_VALUE + 1L) * 1000));
- new PulsarKafkaProducer<>(properties);
+ PulsarKafkaProducer producer = new PulsarKafkaProducer<>(properties);
+ producer.close();
+ }
+
+ @Test
+ public void testAutoRefreshPartitions() throws Exception {
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000,
"Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
+ }).when(mockProducerBuilder).sendTimeout(anyInt(),
any(TimeUnit.class));
+
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep
alive interval is suppose to be 1000.");
+ return mockClientBuilder;
+ }).when(mockClientBuilder).keepAliveInterval(anyInt(),
any(TimeUnit.class));
+
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(),
any())).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,
"1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+ properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL,
Boolean.FALSE.toString());
+
+ PulsarKafkaProducer producer = spy(new
PulsarKafkaProducer<>(properties));
+ String topic = "persistent://prop/ns/t1";
+ Map<TopicPartition, PartitionInfo> map = Maps.newHashMap();
+ map.put(new TopicPartition(topic, 1), new PartitionInfo(topic, 1,
null, null, null));
+ map.put(new TopicPartition(topic, 2), new PartitionInfo(topic, 2,
null, null, null));
+ map.put(new TopicPartition(topic, 3), new PartitionInfo(topic, 3,
null, null, null));
+ doReturn(map).when(producer).readPartitionsInfo(anyString());
+ producer.scheduleRefreshPartitionMetadata(topic, 1);
+ for (int i = 0; i < 5; i++) {
+ if (producer.getPartitions(topic) == map.size()) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertEquals(producer.getPartitions(topic), map.size());
+ producer.close();
}
public static class PulsarKafkaProducerInterceptor implements
org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {