This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b7e4ce6f1d [fix][io] Config autoCommitEnabled when it disabled 
(#19499)
8b7e4ce6f1d is described below

commit 8b7e4ce6f1d22d20e840a8de123f810b07ca2df8
Author: wenbingshen <[email protected]>
AuthorDate: Tue Feb 14 21:55:18 2023 +0800

    [fix][io] Config autoCommitEnabled when it disabled (#19499)
---
 .../src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index c01158da50a..565c3604747 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -117,6 +117,8 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         }
         props.put(ConsumerConfig.GROUP_ID_CONFIG, 
kafkaSourceConfig.getGroupId());
         props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
String.valueOf(kafkaSourceConfig.getFetchMinBytes()));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                String.valueOf(kafkaSourceConfig.isAutoCommitEnabled()));
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                 String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs()));
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
String.valueOf(kafkaSourceConfig.getSessionTimeoutMs()));

Reply via email to