ihji commented on a change in pull request #15028:
URL: https://github.com/apache/beam/pull/15028#discussion_r658512414
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ @DefaultSchema(JavaFieldSchema.class)
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ /**
+ * Represents a Kafka header. We define a new class so that we can add
schema annotations for
+ * generating Rows.
+ */
+ static class KafkaHeader {
+
+ String key;
+ byte[] value;
+
+ @SchemaCreate
+ public KafkaHeader(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ @DefaultSchema(JavaFieldSchema.class)
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ /**
+ * Represents a Kafka record with metadata. This class should only be used
to represent a Kafka
+ * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord
class when Beam Schema
+ * inference supports generics.
+ */
+ static class ExternalKafkaRecord {
+
+ String topic;
+ int partition;
+ long offset;
+ long timestamp;
+ byte[] key;
+ byte[] value;
+ List<KafkaHeader> headers;
+ int timestampTypeId;
+ String timestampTypeName;
+
+ @SchemaCreate
+ public ExternalKafkaRecord(
+ String topic,
+ int partition,
+ long offset,
+ long timestamp,
+ byte[] key,
+ byte[] value,
+ @Nullable List<KafkaHeader> headers,
+ int timestampTypeId,
+ String timestampTypeName) {
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.key = key;
+ this.value = value;
+ this.headers = headers;
+ this.timestampTypeId = timestampTypeId;
+ this.timestampTypeName = timestampTypeName;
+ }
+ }
+
+ /**
+ * A {@link PTransform} to read from Kafka topics. Similar to {@link
KafkaIO.Read}, but generates
+ * a {@link PCollection} of {@link Row}. This class is primarily used as a
cross-language
+ * transform since {@link KafkaRecord} is not a type that can be easily
encoded using Beam's
+ * standard coders. See {@link KafkaIO} for more information on usage and
configuration of reader.
+ */
+ static class ExternalWithMetadata<K, V> extends PTransform<PBegin,
PCollection<Row>> {
+ private final Read<K, V> read;
+
+ ExternalWithMetadata(Read<K, V> read) {
+ super("KafkaIO.Read");
Review comment:
`TypedWithoutMetadata` also sets the same ptransform name `KafkaIO.Read`:
```
TypedWithoutMetadata(Read<K, V> read) {
super("KafkaIO.Read");
this.read = read;
}
```
Should we use different names for easier debugging?
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ @DefaultSchema(JavaFieldSchema.class)
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ /**
+ * Represents a Kafka header. We define a new class so that we can add
schema annotations for
+ * generating Rows.
+ */
+ static class KafkaHeader {
+
+ String key;
+ byte[] value;
+
+ @SchemaCreate
+ public KafkaHeader(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ @DefaultSchema(JavaFieldSchema.class)
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ /**
+ * Represents a Kafka record with metadata. This class should only be used
to represent a Kafka
+ * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord
class when Beam Schema
+ * inference supports generics.
+ */
+ static class ExternalKafkaRecord {
Review comment:
I think `ExternalKafkaRecord` is somewhat misleading. I would prefer to
use something more intuitive (ex. `RowCodedKafkaRecord`,
`KafkaRowWithMetadata`, etc.)
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -777,14 +778,22 @@ private static Coder resolveCoder(Class deserializer) {
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {
- public static final String URN = "beam:external:java:kafka:read:v1";
+ // Using the transform name in the URN so that the corresponding
transform can be easily
+ // identified.
+ public static final String URN_WITH_METADATA =
+ "beam:external:java:kafkaio:externalwithmetadata:v1";
Review comment:
Is it necessary to use `external` again in `externalwithmetadata`?
`external` is already in the URN.
How about `typedwithmetadata` or `rowwithmetadata`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]