This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e259f472add Don't override existing consumer config values KafkaIO 
(#32443)
e259f472add is described below

commit e259f472addfd8daf429508c887e5288f43ae0f8
Author: Jeff Kinard <[email protected]>
AuthorDate: Sun Sep 15 16:22:55 2024 -0400

    Don't override existing consumer config values KafkaIO (#32443)
    
    Signed-off-by: Jeffrey Kinard <[email protected]>
---
 .../beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java       | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index b2eeb1a54d1..e87669ab2b0 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -149,10 +149,10 @@ public class KafkaReadSchemaTransformProvider
       Map<String, Object> consumerConfigs =
           new HashMap<>(
               
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new 
HashMap<>()));
-      consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, 
"kafka-read-provider-" + groupId);
-      consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-      consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
-      consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
+      consumerConfigs.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, 
"kafka-read-provider-" + groupId);
+      consumerConfigs.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
true);
+      
consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+      consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
 
       String format = configuration.getFormat();
       boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());

Reply via email to