This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6b006531b07 Adding support for GCS-stored files for consumer config
overrides (#25773)
6b006531b07 is described below
commit 6b006531b07cfdcb4c1e8eec02d2ff9c672a135f
Author: Pablo Estrada <[email protected]>
AuthorDate: Thu Mar 30 20:03:42 2023 -0700
Adding support for GCS-stored files for consumer config overrides (#25773)
* Adding support for GCS-stored files for consumer config overrides
* fixup
---
.../io/kafka/KafkaReadSchemaTransformProvider.java | 76 ++++++++++++++++++++++
1 file changed, 76 insertions(+)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index bf5bce46180..053cd3ff76f 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,13 +18,23 @@
package org.apache.beam.sdk.io.kafka;
import com.google.auto.service.AutoService;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -43,14 +53,20 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class KafkaReadSchemaTransformProvider
extends
TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaReadSchemaTransformProvider.class);
+
final Boolean isTest;
final Integer testTimeoutSecs;
@@ -137,6 +153,7 @@ public class KafkaReadSchemaTransformProvider
KafkaIO.Read<byte[], byte[]> kafkaRead =
KafkaIO.readBytes()
.withConsumerConfigUpdates(consumerConfigs)
+ .withConsumerFactoryFn(new
ConsumerFactoryWithGcsTrustStores())
.withTopic(configuration.getTopic())
.withBootstrapServers(configuration.getBootstrapServers());
if (isTest) {
@@ -171,6 +188,7 @@ public class KafkaReadSchemaTransformProvider
KafkaIO.Read<byte[], GenericRecord> kafkaRead =
KafkaIO.<byte[], GenericRecord>read()
.withTopic(configuration.getTopic())
+ .withConsumerFactoryFn(new
ConsumerFactoryWithGcsTrustStores())
.withBootstrapServers(configuration.getBootstrapServers())
.withConsumerConfigUpdates(consumerConfigs)
.withKeyDeserializer(ByteArrayDeserializer.class)
@@ -193,4 +211,62 @@ public class KafkaReadSchemaTransformProvider
}
}
};
+
+ private static class ConsumerFactoryWithGcsTrustStores
+ implements SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>> {
+
+ @Override
+ public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
+ return KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN.apply(
+ input.entrySet().stream()
+ .map(
+ entry ->
+ Maps.immutableEntry(
+ entry.getKey(),
identityOrGcsToLocalFile(entry.getValue())))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
+ private static Object identityOrGcsToLocalFile(Object configValue) {
+ if (configValue instanceof String) {
+ String configStr = (String) configValue;
+ if (configStr.startsWith("gs://")) {
+ try {
+ Path localFile = Files.createTempFile("", "");
+ LOG.info(
+ "Downloading {} into local filesystem ({})", configStr,
localFile.toAbsolutePath());
+ // TODO(pabloem): Only copy if file does not exist.
+ ReadableByteChannel channel =
+
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
+ FileOutputStream outputStream = new
FileOutputStream(localFile.toFile());
+
+ // Create a WritableByteChannel to write data to the
FileOutputStream
+ WritableByteChannel outputChannel =
Channels.newChannel(outputStream);
+
+ // Read data from the ReadableByteChannel and write it to the
WritableByteChannel
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ while (channel.read(buffer) != -1) {
+ buffer.flip();
+ outputChannel.write(buffer);
+ buffer.compact();
+ }
+
+ // Close the channels and the output stream
+ channel.close();
+ outputChannel.close();
+ outputStream.close();
+ return localFile.toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable to fetch file %s to be used locally to create a
Kafka Consumer.",
+ configStr));
+ }
+ } else {
+ return configValue;
+ }
+ } else {
+ return configValue;
+ }
+ }
+ }
}