This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b006531b07 Adding support for GCS-stored files for consumer config 
overrides (#25773)
6b006531b07 is described below

commit 6b006531b07cfdcb4c1e8eec02d2ff9c672a135f
Author: Pablo Estrada <[email protected]>
AuthorDate: Thu Mar 30 20:03:42 2023 -0700

    Adding support for GCS-stored files for consumer config overrides (#25773)
    
    * Adding support for GCS-stored files for consumer config overrides
    
    * fixup
---
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 76 ++++++++++++++++++++++
 1 file changed, 76 insertions(+)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index bf5bce46180..053cd3ff76f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,13 +18,23 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.service.AutoService;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -43,14 +53,20 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class KafkaReadSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaReadSchemaTransformProvider.class);
+
   final Boolean isTest;
   final Integer testTimeoutSecs;
 
@@ -137,6 +153,7 @@ public class KafkaReadSchemaTransformProvider
             KafkaIO.Read<byte[], byte[]> kafkaRead =
                 KafkaIO.readBytes()
                     .withConsumerConfigUpdates(consumerConfigs)
+                    .withConsumerFactoryFn(new 
ConsumerFactoryWithGcsTrustStores())
                     .withTopic(configuration.getTopic())
                     .withBootstrapServers(configuration.getBootstrapServers());
             if (isTest) {
@@ -171,6 +188,7 @@ public class KafkaReadSchemaTransformProvider
             KafkaIO.Read<byte[], GenericRecord> kafkaRead =
                 KafkaIO.<byte[], GenericRecord>read()
                     .withTopic(configuration.getTopic())
+                    .withConsumerFactoryFn(new 
ConsumerFactoryWithGcsTrustStores())
                     .withBootstrapServers(configuration.getBootstrapServers())
                     .withConsumerConfigUpdates(consumerConfigs)
                     .withKeyDeserializer(ByteArrayDeserializer.class)
@@ -193,4 +211,62 @@ public class KafkaReadSchemaTransformProvider
       }
     }
   };
+
+  private static class ConsumerFactoryWithGcsTrustStores
+      implements SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>> {
+
+    @Override
+    public Consumer<byte[], byte[]> apply(Map<String, Object> input) {
+      return KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN.apply(
+          input.entrySet().stream()
+              .map(
+                  entry ->
+                      Maps.immutableEntry(
+                          entry.getKey(), 
identityOrGcsToLocalFile(entry.getValue())))
+              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    private static Object identityOrGcsToLocalFile(Object configValue) {
+      if (configValue instanceof String) {
+        String configStr = (String) configValue;
+        if (configStr.startsWith("gs://")) {
+          try {
+            Path localFile = Files.createTempFile("", "");
+            LOG.info(
+                "Downloading {} into local filesystem ({})", configStr, 
localFile.toAbsolutePath());
+            // TODO(pabloem): Only copy if file does not exist.
+            ReadableByteChannel channel =
+                
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
+            FileOutputStream outputStream = new 
FileOutputStream(localFile.toFile());
+
+            // Create a WritableByteChannel to write data to the 
FileOutputStream
+            WritableByteChannel outputChannel = 
Channels.newChannel(outputStream);
+
+            // Read data from the ReadableByteChannel and write it to the 
WritableByteChannel
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            while (channel.read(buffer) != -1) {
+              buffer.flip();
+              outputChannel.write(buffer);
+              buffer.compact();
+            }
+
+            // Close the channels and the output stream
+            channel.close();
+            outputChannel.close();
+            outputStream.close();
+            return localFile.toAbsolutePath().toString();
+          } catch (IOException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Unable to fetch file %s to be used locally to create a 
Kafka Consumer.",
+                    configStr));
+          }
+        } else {
+          return configValue;
+        }
+      } else {
+        return configValue;
+      }
+    }
+  }
 }

Reply via email to