This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e259f472add Don't override existing consumer config values KafkaIO
(#32443)
e259f472add is described below
commit e259f472addfd8daf429508c887e5288f43ae0f8
Author: Jeff Kinard <[email protected]>
AuthorDate: Sun Sep 15 16:22:55 2024 -0400
Don't override existing consumer config values KafkaIO (#32443)
Signed-off-by: Jeffrey Kinard <[email protected]>
---
.../beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index b2eeb1a54d1..e87669ab2b0 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -149,10 +149,10 @@ public class KafkaReadSchemaTransformProvider
Map<String, Object> consumerConfigs =
new HashMap<>(
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new
HashMap<>()));
- consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-read-provider-" + groupId);
- consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
- consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
+ consumerConfigs.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-read-provider-" + groupId);
+ consumerConfigs.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
+
consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+ consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
String format = configuration.getFormat();
boolean handleErrors =
ErrorHandling.hasOutput(configuration.getErrorHandling());