ihji commented on a change in pull request #15028:
URL: https://github.com/apache/beam/pull/15028#discussion_r658512414



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka header. We define a new class so that we can add 
schema annotations for
+   * generating Rows.
+   */
+  static class KafkaHeader {
+
+    String key;
+    byte[] value;
+
+    @SchemaCreate
+    public KafkaHeader(String key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka record with metadata. This class should only be used 
to represent a Kafka
+   * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord 
class when Beam Schema
+   * inference supports generics.
+   */
+  static class ExternalKafkaRecord {
+
+    String topic;
+    int partition;
+    long offset;
+    long timestamp;
+    byte[] key;
+    byte[] value;
+    List<KafkaHeader> headers;
+    int timestampTypeId;
+    String timestampTypeName;
+
+    @SchemaCreate
+    public ExternalKafkaRecord(
+        String topic,
+        int partition,
+        long offset,
+        long timestamp,
+        byte[] key,
+        byte[] value,
+        @Nullable List<KafkaHeader> headers,
+        int timestampTypeId,
+        String timestampTypeName) {
+      this.topic = topic;
+      this.partition = partition;
+      this.offset = offset;
+      this.timestamp = timestamp;
+      this.key = key;
+      this.value = value;
+      this.headers = headers;
+      this.timestampTypeId = timestampTypeId;
+      this.timestampTypeName = timestampTypeName;
+    }
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link 
KafkaIO.Read}, but generates
+   * a {@link PCollection} of {@link Row}. This class is primarily used as a 
cross-language
+   * transform since {@link KafkaRecord} is not a type that can be easily 
encoded using Beam's
+   * standard coders. See {@link KafkaIO} for more information on usage and 
configuration of reader.
+   */
+  static class ExternalWithMetadata<K, V> extends PTransform<PBegin, 
PCollection<Row>> {
+    private final Read<K, V> read;
+
+    ExternalWithMetadata(Read<K, V> read) {
+      super("KafkaIO.Read");

Review comment:
       `TypedWithoutMetadata` also sets the same ptransform name `KafkaIO.Read`:
   ```
   TypedWithoutMetadata(Read<K, V> read) {
         super("KafkaIO.Read");
         this.read = read;
       }
   ```
   Should we use different names for easier debugging?

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka header. We define a new class so that we can add 
schema annotations for
+   * generating Rows.
+   */
+  static class KafkaHeader {
+
+    String key;
+    byte[] value;
+
+    @SchemaCreate
+    public KafkaHeader(String key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka record with metadata. This class should only be used 
to represent a Kafka
+   * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord 
class when Beam Schema
+   * inference supports generics.
+   */
+  static class ExternalKafkaRecord {

Review comment:
       I think `ExternalKafkaRecord` is somewhat misleading. I would prefer to 
use something more intuitive (ex. `RowCodedKafkaRecord`, 
`KafkaRowWithMetadata`, etc.)

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -777,14 +778,22 @@ private static Coder resolveCoder(Class deserializer) {
     @AutoService(ExternalTransformRegistrar.class)
     public static class External implements ExternalTransformRegistrar {
 
-      public static final String URN = "beam:external:java:kafka:read:v1";
+      // Using the transform name in the URN so that the corresponding 
transform can be easily
+      // identified.
+      public static final String URN_WITH_METADATA =
+          "beam:external:java:kafkaio:externalwithmetadata:v1";

Review comment:
       Is it necessary to use `external` again in `externalwithmetadata`? 
`external` is already in the URN.
   
   How about `typedwithmetadata` or `rowwithmetadata`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to