This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 36486447e4d passing consumer configs from user-passed parameters
(#25766)
36486447e4d is described below
commit 36486447e4d07af5076830ca1e331a6b61f14986
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Mar 8 14:34:43 2023 -0800
passing consumer configs from user-passed parameters (#25766)
* passing consumer configs from user-passed parameters
* fixup
---
.../io/kafka/KafkaReadSchemaTransformProvider.java | 47 ++++++++++------------
1 file changed, 21 insertions(+), 26 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 2bab36f465a..bf5bce46180 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,7 +18,9 @@
package org.apache.beam.sdk.io.kafka;
import com.google.auto.service.AutoService;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +40,8 @@ import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -106,11 +109,18 @@ public class KafkaReadSchemaTransformProvider
final String inputSchema = configuration.getSchema();
final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
final String autoOffsetReset =
- configuration.getAutoOffsetResetConfig() == null
- ? "latest"
- : configuration.getAutoOffsetResetConfig();
- if (inputSchema != null) {
- assert configuration.getConfluentSchemaRegistryUrl() == null
+ MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(),
"latest");
+
+ Map<String, Object> consumerConfigs =
+ new HashMap<>(
+
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new
HashMap<>()));
+ consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-read-provider-" + groupId);
+ consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+ consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
+
+ if (inputSchema != null && !inputSchema.isEmpty()) {
+ assert
Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
: "To read from Kafka, a schema must be provided directly or
though Confluent "
+ "Schema Registry, but not both.";
final Schema beamSchema =
@@ -126,16 +136,7 @@ public class KafkaReadSchemaTransformProvider
public PCollectionRowTuple expand(PCollectionRowTuple input) {
KafkaIO.Read<byte[], byte[]> kafkaRead =
KafkaIO.readBytes()
- .withConsumerConfigUpdates(
- ImmutableMap.of(
- ConsumerConfig.GROUP_ID_CONFIG,
- "kafka-read-provider-" + groupId,
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- true,
- ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
- 100,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- autoOffsetReset))
+ .withConsumerConfigUpdates(consumerConfigs)
.withTopic(configuration.getTopic())
.withBootstrapServers(configuration.getBootstrapServers());
if (isTest) {
@@ -153,6 +154,9 @@ public class KafkaReadSchemaTransformProvider
}
};
} else {
+ assert
!Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
+ : "To read from Kafka, a schema must be provided directly or
though Confluent "
+ + "Schema Registry. Neither seems to have been provided.";
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
@@ -168,16 +172,7 @@ public class KafkaReadSchemaTransformProvider
KafkaIO.<byte[], GenericRecord>read()
.withTopic(configuration.getTopic())
.withBootstrapServers(configuration.getBootstrapServers())
- .withConsumerConfigUpdates(
- ImmutableMap.of(
- ConsumerConfig.GROUP_ID_CONFIG,
- "kafka-read-provider-" + groupId,
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- true,
- ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
- 100,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- autoOffsetReset))
+ .withConsumerConfigUpdates(consumerConfigs)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(