This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new 357fa3b [pulsar-kafka] Fixed blockIfQueueFull config (#36)
357fa3b is described below
commit 357fa3bef8c3ec42201cf2aac4b3468bf240584a
Author: hailin0 <[email protected]>
AuthorDate: Wed Mar 1 16:42:15 2023 +0800
[pulsar-kafka] Fixed blockIfQueueFull config (#36)
Co-authored-by: wanghailin <[email protected]>
---
.../org/apache/kafka/clients/producer/PulsarKafkaProducer.java | 2 +-
.../org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 80ca9fd..6dbe09f 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -180,7 +180,7 @@ public class PulsarKafkaProducer<K, V> implements
Producer<K, V> {
// Pulsar throws error immediately when the queue is full and
blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and
then gives error.
Boolean sendTimeOutConfigured = sendTimeoutMillis > 0;
- boolean shouldBlockPulsarProducer = Boolean.getBoolean(properties
+ boolean shouldBlockPulsarProducer = Boolean.parseBoolean(properties
.getProperty(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL,
sendTimeOutConfigured.toString()));
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
diff --git
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 90c2f95..b4a7e79 100644
---
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -133,6 +133,13 @@ public class PulsarKafkaProducerTest {
verify(mockClientBuilder, times(1)).keepAliveInterval(1000,
TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000,
TimeUnit.MILLISECONDS);
verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
+
+ // validate configs change
+ properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL,
Boolean.TRUE.toString());
+ producer = new PulsarKafkaProducer<>(properties);
+ producer.close();
+
+ verify(mockProducerBuilder, times(1)).blockIfQueueFull(true);
}
@Test