This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 357fa3b  [pulsar-kafka] Fixed blockIfQueueFull config (#36)
357fa3b is described below

commit 357fa3bef8c3ec42201cf2aac4b3468bf240584a
Author: hailin0 <[email protected]>
AuthorDate: Wed Mar 1 16:42:15 2023 +0800

    [pulsar-kafka] Fixed blockIfQueueFull config (#36)
    
    Co-authored-by: wanghailin <[email protected]>
---
 .../org/apache/kafka/clients/producer/PulsarKafkaProducer.java     | 2 +-
 .../org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 80ca9fd..6dbe09f 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -180,7 +180,7 @@ public class PulsarKafkaProducer<K, V> implements 
Producer<K, V> {
         // Pulsar throws error immediately when the queue is full and 
blockIfQueueFull=false
         // Kafka, on the other hand, still blocks for "max.block.ms" time and 
then gives error.
         Boolean sendTimeOutConfigured = sendTimeoutMillis > 0;
-        boolean shouldBlockPulsarProducer = Boolean.getBoolean(properties
+        boolean shouldBlockPulsarProducer = Boolean.parseBoolean(properties
                 
.getProperty(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, 
sendTimeOutConfigured.toString()));
         pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
 
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 90c2f95..b4a7e79 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -133,6 +133,13 @@ public class PulsarKafkaProducerTest {
         verify(mockClientBuilder, times(1)).keepAliveInterval(1000, 
TimeUnit.SECONDS);
         verify(mockProducerBuilder, times(1)).sendTimeout(1000000, 
TimeUnit.MILLISECONDS);
         verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
+
+        // validate configs change
+        properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, 
Boolean.TRUE.toString());
+        producer = new PulsarKafkaProducer<>(properties);
+        producer.close();
+
+        verify(mockProducerBuilder, times(1)).blockIfQueueFull(true);
     }
 
     @Test

Reply via email to