This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 365c2d92965 Support Kafka Managed IO (#31172)
365c2d92965 is described below

commit 365c2d92965c5e23c23d6e1f3c7a1cd048c872d8
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu May 9 17:14:17 2024 -0400

    Support Kafka Managed IO (#31172)
    
    * managed kafka read
    
    * managed kafka write
---
 .../apache/beam/sdk/schemas/utils/YamlUtils.java   |  8 +++
 sdks/java/io/kafka/build.gradle                    |  1 +
 .../io/kafka/KafkaReadSchemaTransformProvider.java |  7 ++-
 .../KafkaReadSchemaTransformProviderTest.java      | 53 ++++++++++++++++++-
 .../KafkaWriteSchemaTransformProviderTest.java     | 59 ++++++++++++++++++++++
 sdks/java/managed/build.gradle                     |  1 +
 .../java/org/apache/beam/sdk/managed/Managed.java  | 14 +++--
 .../managed/ManagedSchemaTransformProvider.java    | 30 ++++++++++-
 .../sdk/managed/ManagedTransformConstants.java     | 52 ++++++++++++++++++-
 9 files changed, 213 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
index 122f2d1963b..e631e166e8b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.schemas.utils;
 import static org.apache.beam.sdk.values.Row.toRow;
 
 import java.math.BigDecimal;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -181,4 +182,11 @@ public class YamlUtils {
     }
     return new Yaml().dumpAsMap(map);
   }
+
+  public static Map<String, Object> yamlStringToMap(@Nullable String yaml) {
+    if (yaml == null || yaml.isEmpty()) {
+      return Collections.emptyMap();
+    }
+    return new Yaml().load(yaml);
+  }
 }
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 269ddb3f5eb..3e095a2bacc 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -90,6 +90,7 @@ dependencies {
   provided library.java.everit_json_schema
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   testImplementation project(":sdks:java:io:synthetic")
+  testImplementation project(":sdks:java:managed")
   testImplementation project(path: ":sdks:java:extensions:avro", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":sdks:java:extensions:protobuf", 
configuration: "testRuntimeMigration")
   testImplementation project(path: ":sdks:java:io:common", configuration: 
"testRuntimeMigration")
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 2776c388f7c..13240ea9dc4 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -151,11 +151,10 @@ public class KafkaReadSchemaTransformProvider
         }
       };
     }
-
-    if (format.equals("RAW")) {
+    if ("RAW".equals(format)) {
       beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
       valueMapper = getRawBytesToRowFunction(beamSchema);
-    } else if (format.equals("PROTO")) {
+    } else if ("PROTO".equals(format)) {
       String fileDescriptorPath = configuration.getFileDescriptorPath();
       String messageName = configuration.getMessageName();
       if (fileDescriptorPath != null) {
@@ -165,7 +164,7 @@ public class KafkaReadSchemaTransformProvider
         beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
         valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
       }
-    } else if (format.equals("JSON")) {
+    } else if ("JSON".equals(format)) {
       beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema);
       valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema);
     } else {
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index f6e231c758a..d5962a737ba 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -18,16 +18,25 @@
 package org.apache.beam.sdk.io.kafka;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.managed.ManagedTransformConstants;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
@@ -51,7 +60,7 @@ public class KafkaReadSchemaTransformProviderTest {
           + "  string name = 2;\n"
           + "  bool active = 3;\n"
           + "\n"
-          + "  // Nested field\n"
+          + "  // Nested field\n\n"
           + "  message Address {\n"
           + "    string street = 1;\n"
           + "    string city = 2;\n"
@@ -284,4 +293,46 @@ public class KafkaReadSchemaTransformProviderTest {
                     .setMessageName("MyMessage")
                     .build()));
   }
+
+  @Test
+  public void testBuildTransformWithManaged() {
+    List<String> configs =
+        Arrays.asList(
+            "topic: topic_1\n" + "bootstrap_servers: some bootstrap\n" + 
"data_format: RAW",
+            "topic: topic_2\n"
+                + "bootstrap_servers: some bootstrap\n"
+                + "schema: 
'{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'",
+            "topic: topic_3\n"
+                + "bootstrap_servers: some bootstrap\n"
+                + "schema_registry_url: some-url\n"
+                + "schema_registry_subject: some-subject\n"
+                + "data_format: RAW",
+            "topic: topic_4\n"
+                + "bootstrap_servers: some bootstrap\n"
+                + "data_format: PROTO\n"
+                + "schema: '"
+                + PROTO_SCHEMA
+                + "'\n"
+                + "message_name: MyMessage");
+
+    for (String config : configs) {
+      // Kafka Read SchemaTransform gets built in 
ManagedSchemaTransformProvider's expand
+      Managed.read(Managed.KAFKA)
+          .withConfig(YamlUtils.yamlStringToMap(config))
+          .expand(PCollectionRowTuple.empty(Pipeline.create()));
+    }
+  }
+
+  @Test
+  public void testManagedMappings() {
+    KafkaReadSchemaTransformProvider provider = new 
KafkaReadSchemaTransformProvider();
+    Map<String, String> mapping = 
ManagedTransformConstants.MAPPINGS.get(provider.identifier());
+
+    assertNotNull(mapping);
+
+    List<String> configSchemaFieldNames = 
provider.configurationSchema().getFieldNames();
+    for (String paramName : mapping.values()) {
+      assertTrue(configSchemaFieldNames.contains(paramName));
+    }
+  }
 }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java
index 48d463a8f43..60bff89b355 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java
@@ -18,17 +18,24 @@
 package org.apache.beam.sdk.io.kafka;
 
 import static 
org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import 
org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.managed.ManagedTransformConstants;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
 import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.schemas.utils.YamlUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -36,6 +43,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
@@ -185,4 +193,55 @@ public class KafkaWriteSchemaTransformProviderTest {
     output.get(ERROR_TAG).setRowSchema(errorSchema);
     p.run().waitUntilFinish();
   }
+
+  private static final String PROTO_SCHEMA =
+      "syntax = \"proto3\";\n"
+          + "\n"
+          + "message MyMessage {\n"
+          + "  int32 id = 1;\n"
+          + "  string name = 2;\n"
+          + "  bool active = 3;\n"
+          + "}";
+
+  @Test
+  public void testBuildTransformWithManaged() {
+    List<String> configs =
+        Arrays.asList(
+            "topic: topic_1\n" + "bootstrap_servers: some bootstrap\n" + 
"data_format: RAW",
+            "topic: topic_2\n"
+                + "bootstrap_servers: some bootstrap\n"
+                + "producer_config_updates: {\"foo\": \"bar\"}\n"
+                + "data_format: AVRO",
+            "topic: topic_3\n"
+                + "bootstrap_servers: some bootstrap\n"
+                + "data_format: PROTO\n"
+                + "schema: '"
+                + PROTO_SCHEMA
+                + "'\n"
+                + "message_name: MyMessage");
+
+    for (String config : configs) {
+      // Kafka Write SchemaTransform gets built in 
ManagedSchemaTransformProvider's expand
+      Managed.write(Managed.KAFKA)
+          .withConfig(YamlUtils.yamlStringToMap(config))
+          .expand(
+              PCollectionRowTuple.of(
+                  "input",
+                  Pipeline.create()
+                      
.apply(Create.empty(Schema.builder().addByteArrayField("bytes").build()))));
+    }
+  }
+
+  @Test
+  public void testManagedMappings() {
+    KafkaWriteSchemaTransformProvider provider = new 
KafkaWriteSchemaTransformProvider();
+    Map<String, String> mapping = 
ManagedTransformConstants.MAPPINGS.get(provider.identifier());
+
+    assertNotNull(mapping);
+
+    List<String> configSchemaFieldNames = 
provider.configurationSchema().getFieldNames();
+    for (String paramName : mapping.values()) {
+      assertTrue(configSchemaFieldNames.contains(paramName));
+    }
+  }
 }
diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle
index f06df27429b..add0d7f3cc0 100644
--- a/sdks/java/managed/build.gradle
+++ b/sdks/java/managed/build.gradle
@@ -29,6 +29,7 @@ ext.summary = """Library that provides managed IOs."""
 dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     implementation library.java.vendored_guava_32_1_2_jre
+    implementation library.java.slf4j_api
 
     testImplementation library.java.junit
     testRuntimeOnly "org.yaml:snakeyaml:2.0"
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index d24a3fd88dd..da4a0853fb3 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.managed;
 
-import static 
org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_READ;
-import static 
org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_WRITE;
-
 import com.google.auto.value.AutoValue;
 import java.util.ArrayList;
 import java.util.List;
@@ -80,12 +77,19 @@ public class Managed {
 
   // TODO: Dynamically generate a list of supported transforms
   public static final String ICEBERG = "iceberg";
+  public static final String KAFKA = "kafka";
 
   // Supported SchemaTransforms
   public static final Map<String, String> READ_TRANSFORMS =
-      ImmutableMap.<String, String>builder().put(ICEBERG, 
ICEBERG_READ).build();
+      ImmutableMap.<String, String>builder()
+          .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ)
+          .put(KAFKA, ManagedTransformConstants.KAFKA_READ)
+          .build();
   public static final Map<String, String> WRITE_TRANSFORMS =
-      ImmutableMap.<String, String>builder().put(ICEBERG, 
ICEBERG_WRITE).build();
+      ImmutableMap.<String, String>builder()
+          .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE)
+          .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE)
+          .build();
 
   /**
    * Instantiates a {@link Managed.ManagedTransform} transform for the 
specified source. The
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index ff08e79e5ea..54e1404c650 100644
--- 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.managed;
 
+import static org.apache.beam.sdk.managed.ManagedTransformConstants.MAPPINGS;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
@@ -49,10 +50,13 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class ManagedSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<ManagedSchemaTransformProvider.ManagedConfig> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ManagedSchemaTransformProvider.class);
 
   @Override
   public String identifier() {
@@ -179,6 +183,11 @@ public class ManagedSchemaTransformProvider
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      LOG.debug(
+          "Building transform \"{}\" with Row configuration: {}",
+          underlyingTransformProvider.identifier(),
+          underlyingTransformConfig);
+
       return 
input.apply(underlyingTransformProvider.from(underlyingTransformConfig));
     }
 
@@ -205,7 +214,26 @@ public class ManagedSchemaTransformProvider
   static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
     // May return an empty row (perhaps the underlying transform doesn't have 
any required
     // parameters)
-    return YamlUtils.toBeamRow(config.resolveUnderlyingConfig(), 
transformSchema, false);
+    String yamlConfig = config.resolveUnderlyingConfig();
+    Map<String, Object> configMap = YamlUtils.yamlStringToMap(yamlConfig);
+
+    // The config Row object will be used to build the underlying 
SchemaTransform.
+    // If a mapping for the SchemaTransform exists, we use it to update 
parameter names and align
+    // with the underlying config schema
+    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
+    if (mapping != null && configMap != null) {
+      Map<String, Object> remappedConfig = new HashMap<>();
+      for (Map.Entry<String, Object> entry : configMap.entrySet()) {
+        String paramName = entry.getKey();
+        if (mapping.containsKey(paramName)) {
+          paramName = mapping.get(paramName);
+        }
+        remappedConfig.put(paramName, entry.getValue());
+      }
+      configMap = remappedConfig;
+    }
+
+    return YamlUtils.toBeamRow(configMap, transformSchema, false);
   }
 
   Map<String, SchemaTransformProvider> getAllProviders() {
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 48735d8c33a..8165633cf15 100644
--- 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -17,9 +17,59 @@
  */
 package org.apache.beam.sdk.managed;
 
-/** This class contains constants for supported managed transform identifiers. 
*/
+import java.util.Map;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * This class contains constants for supported managed transforms, including:
+ *
+ * <ul>
+ *   <li>Identifiers of supported transforms
+ *   <li>Configuration parameter renaming
+ * </ul>
+ *
+ * <p>Configuration parameter names exposed via Managed interface may differ 
from the parameter
+ * names in the underlying SchemaTransform implementation.
+ *
+ * <p>Any naming differences are laid out in {@link 
ManagedTransformConstants#MAPPINGS} to update
+ * the configuration object before it's used to build the underlying transform.
+ *
+ * <p>Mappings don't need to include ALL underlying parameter names, as we may 
not want to expose
+ * every single parameter through the Managed interface.
+ */
 public class ManagedTransformConstants {
   public static final String ICEBERG_READ = 
"beam:schematransform:org.apache.beam:iceberg_read:v1";
   public static final String ICEBERG_WRITE =
       "beam:schematransform:org.apache.beam:iceberg_write:v1";
+  public static final String KAFKA_READ = 
"beam:schematransform:org.apache.beam:kafka_read:v1";
+  public static final String KAFKA_WRITE = 
"beam:schematransform:org.apache.beam:kafka_write:v1";
+
+  private static final Map<String, String> KAFKA_READ_MAPPINGS =
+      ImmutableMap.<String, String>builder()
+          .put("topic", "topic")
+          .put("bootstrap_servers", "bootstrapServers")
+          .put("consumer_config_updates", "consumerConfigUpdates")
+          .put("confluent_schema_registry_url", "confluentSchemaRegistryUrl")
+          .put("confluent_schema_registry_subject", 
"confluentSchemaRegistrySubject")
+          .put("data_format", "format")
+          .put("schema", "schema")
+          .put("file_descriptor_path", "fileDescriptorPath")
+          .put("message_name", "messageName")
+          .build();
+
+  private static final Map<String, String> KAFKA_WRITE_MAPPINGS =
+      ImmutableMap.<String, String>builder()
+          .put("topic", "topic")
+          .put("bootstrap_servers", "bootstrapServers")
+          .put("producer_config_updates", "producerConfigUpdates")
+          .put("data_format", "format")
+          .put("file_descriptor_path", "fileDescriptorPath")
+          .put("message_name", "messageName")
+          .build();
+
+  public static final Map<String, Map<String, String>> MAPPINGS =
+      ImmutableMap.<String, Map<String, String>>builder()
+          .put(KAFKA_READ, KAFKA_READ_MAPPINGS)
+          .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS)
+          .build();
 }

Reply via email to