This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 36486447e4d passing consumer configs from user-passed parameters 
(#25766)
36486447e4d is described below

commit 36486447e4d07af5076830ca1e331a6b61f14986
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Mar 8 14:34:43 2023 -0800

    passing consumer configs from user-passed parameters (#25766)
    
    * passing consumer configs from user-passed parameters
    
    * fixup
---
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 47 ++++++++++------------
 1 file changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 2bab36f465a..bf5bce46180 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,7 +18,9 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.service.AutoService;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +40,8 @@ import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -106,11 +109,18 @@ public class KafkaReadSchemaTransformProvider
       final String inputSchema = configuration.getSchema();
       final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
       final String autoOffsetReset =
-          configuration.getAutoOffsetResetConfig() == null
-              ? "latest"
-              : configuration.getAutoOffsetResetConfig();
-      if (inputSchema != null) {
-        assert configuration.getConfluentSchemaRegistryUrl() == null
+          MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), 
"latest");
+
+      Map<String, Object> consumerConfigs =
+          new HashMap<>(
+              
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new 
HashMap<>()));
+      consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, 
"kafka-read-provider-" + groupId);
+      consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+      consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+      consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
+
+      if (inputSchema != null && !inputSchema.isEmpty()) {
+        assert 
Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
             : "To read from Kafka, a schema must be provided directly or 
though Confluent "
                 + "Schema Registry, but not both.";
         final Schema beamSchema =
@@ -126,16 +136,7 @@ public class KafkaReadSchemaTransformProvider
           public PCollectionRowTuple expand(PCollectionRowTuple input) {
             KafkaIO.Read<byte[], byte[]> kafkaRead =
                 KafkaIO.readBytes()
-                    .withConsumerConfigUpdates(
-                        ImmutableMap.of(
-                            ConsumerConfig.GROUP_ID_CONFIG,
-                            "kafka-read-provider-" + groupId,
-                            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-                            true,
-                            ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
-                            100,
-                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-                            autoOffsetReset))
+                    .withConsumerConfigUpdates(consumerConfigs)
                     .withTopic(configuration.getTopic())
                     .withBootstrapServers(configuration.getBootstrapServers());
             if (isTest) {
@@ -153,6 +154,9 @@ public class KafkaReadSchemaTransformProvider
           }
         };
       } else {
+        assert 
!Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
+            : "To read from Kafka, a schema must be provided directly or 
though Confluent "
+                + "Schema Registry. Neither seems to have been provided.";
         return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
           @Override
           public PCollectionRowTuple expand(PCollectionRowTuple input) {
@@ -168,16 +172,7 @@ public class KafkaReadSchemaTransformProvider
                 KafkaIO.<byte[], GenericRecord>read()
                     .withTopic(configuration.getTopic())
                     .withBootstrapServers(configuration.getBootstrapServers())
-                    .withConsumerConfigUpdates(
-                        ImmutableMap.of(
-                            ConsumerConfig.GROUP_ID_CONFIG,
-                            "kafka-read-provider-" + groupId,
-                            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-                            true,
-                            ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
-                            100,
-                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-                            autoOffsetReset))
+                    .withConsumerConfigUpdates(consumerConfigs)
                     .withKeyDeserializer(ByteArrayDeserializer.class)
                     .withValueDeserializer(
                         ConfluentSchemaRegistryDeserializerProvider.of(

Reply via email to