Polber commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1445316149
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -115,97 +115,18 @@ protected SchemaTransform
from(KafkaReadSchemaTransformConfiguration configurati
String format = configuration.getFormat();
boolean handleErrors =
ErrorHandling.hasOutput(configuration.getErrorHandling());
- String descriptorPath = configuration.getFileDescriptorPath();
- String messageName = configuration.getMessageName();
-
- if ((format != null && VALID_DATA_FORMATS.contains(format))
- || (!Strings.isNullOrEmpty(inputSchema) && !Objects.equals(format,
"RAW"))
- || (Objects.equals(format, "PROTO")
- && !Strings.isNullOrEmpty(descriptorPath)
- && !Strings.isNullOrEmpty(messageName))) {
- SerializableFunction<byte[], Row> valueMapper;
- Schema beamSchema;
- if (format != null && format.equals("RAW")) {
- if (inputSchema != null) {
- throw new IllegalArgumentException(
- "To read from Kafka in RAW format, you can't provide a schema.");
- }
- beamSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
- valueMapper = getRawBytesToRowFunction(beamSchema);
- } else if (format != null && format.equals("PROTO")) {
- if (descriptorPath == null || messageName == null) {
- throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
- }
- valueMapper =
ProtoByteUtils.getProtoBytesToRowFunction(descriptorPath, messageName);
- beamSchema = ProtoByteUtils.getBeamSchemaFromProto(descriptorPath,
messageName);
- } else {
- assert
Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
- : "To read from Kafka, a schema must be provided directly or
though Confluent "
- + "Schema Registry, but not both.";
- if (inputSchema == null) {
- throw new IllegalArgumentException(
- "To read from Kafka in JSON or AVRO format, you must provide a
schema.");
- }
- beamSchema =
- Objects.equals(format, "JSON")
- ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
- : AvroUtils.toBeamSchema(new
org.apache.avro.Schema.Parser().parse(inputSchema));
- valueMapper =
- Objects.equals(format, "JSON")
- ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
- : AvroUtils.getAvroBytesToRowFunction(beamSchema);
- }
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- KafkaIO.Read<byte[], byte[]> kafkaRead =
- KafkaIO.readBytes()
- .withConsumerConfigUpdates(consumerConfigs)
- .withConsumerFactoryFn(new
ConsumerFactoryWithGcsTrustStores())
- .withTopic(configuration.getTopic())
- .withBootstrapServers(configuration.getBootstrapServers());
- if (isTest) {
- kafkaRead =
kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
- }
- PCollection<byte[]> kafkaValues =
-
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+ SerializableFunction<byte[], Row> valueMapper;
+ Schema beamSchema;
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- PCollectionTuple outputTuple =
- kafkaValues.apply(
- ParDo.of(
- new ErrorFn(
- "Kafka-read-error-counter", valueMapper,
errorSchema, handleErrors))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- PCollectionRowTuple outputRows =
- PCollectionRowTuple.of(
- "output",
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
-
- PCollection<Row> errorOutput =
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
- if (handleErrors) {
- ErrorHandling errorHandling = configuration.getErrorHandling();
- if (errorHandling == null) {
- throw new IllegalArgumentException("You must specify an error
handling option.");
- }
- outputRows = outputRows.and(errorHandling.getOutput(),
errorOutput);
- }
- return outputRows;
- }
- };
- } else {
- assert
!Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
- : "To read from Kafka, a schema must be provided directly or though
Confluent "
- + "Schema Registry. Neither seems to have been provided.";
+ String confluentSchemaRegUrl =
configuration.getConfluentSchemaRegistryUrl();
+ if (confluentSchemaRegUrl != null) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- final String confluentSchemaRegUrl =
configuration.getConfluentSchemaRegistryUrl();
final String confluentSchemaRegSubject =
configuration.getConfluentSchemaRegistrySubject();
- if (confluentSchemaRegUrl == null || confluentSchemaRegSubject ==
null) {
+ if (confluentSchemaRegSubject == null) {
throw new IllegalArgumentException(
"To read from Kafka, a schema must be provided directly or
though Confluent "
+ "Schema Registry. Make sure you are providing one of
these parameters.");
Review Comment:
This can be moved to the Configuration class validate method (explained more
in my other comment).
```suggestion
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
} else if (configuration.getFormat().equals("PROTO")) {
String descriptorPath = configuration.getFileDescriptorPath();
+ String schema = configuration.getSchema();
String messageName = configuration.getMessageName();
- if (descriptorPath == null || messageName == null) {
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting messageName to be
non-null.");
+ }
+ if (descriptorPath != null && schema != null) {
+ throw new IllegalArgumentException(
+ "You must include a descriptorPath or a proto Schema but not
both.");
+ } else if (descriptorPath != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
+ } else if (schema != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema,
messageName);
+ } else {
throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
+ "At least a descriptorPath or a proto Schema is required.");
}
Review Comment:
Similar to my comment for the ReadProvider, I think moving the validation to
the Configuration class validate method would clean up this file so it is
easier to read/debug.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -180,8 +211,14 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
.withKeySerializer(ByteArraySerializer.class)
.withValueSerializer(ByteArraySerializer.class));
+ // TODO: include output from KafkaIO Write once updated from PDone
+ PCollection<Row> errorOutput =
+
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));
return PCollectionRowTuple.of(
- "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+ handleErrors
+ ?
Objects.requireNonNull(configuration.getErrorHandling()).getOutput()
Review Comment:
nit: again, redundant since
`ErrorHandling.hasOutput(configuration.getErrorHandling());` is called.
```suggestion
? configuration.getErrorHandling().getOutput()
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
} else if (configuration.getFormat().equals("PROTO")) {
String descriptorPath = configuration.getFileDescriptorPath();
+ String schema = configuration.getSchema();
String messageName = configuration.getMessageName();
- if (descriptorPath == null || messageName == null) {
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting messageName to be
non-null.");
+ }
+ if (descriptorPath != null && schema != null) {
+ throw new IllegalArgumentException(
+ "You must include a descriptorPath or a proto Schema but not
both.");
+ } else if (descriptorPath != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
+ } else if (schema != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema,
messageName);
+ } else {
throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
+ "At least a descriptorPath or a proto Schema is required.");
}
Review Comment:
More for my understanding - why is a schema provided by the Configuration
required here? The other data formats use the schema from the incoming
PCollectionRowTuple input to create the schema for the outgoing
PCollectionRowTuple output. Can the output Proto schema not be constructed from
the input Row schema?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -234,6 +155,74 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
}
};
}
+
+ if (format != null && format.equals("RAW")) {
+ if (inputSchema != null) {
+ throw new IllegalArgumentException(
+ "To read from Kafka in RAW format, you can't provide a schema.");
+ }
+ beamSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
+ valueMapper = getRawBytesToRowFunction(beamSchema);
+ } else if (format != null && format.equals("PROTO")) {
+ String messageName = configuration.getMessageName();
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting a messageName to be
non-null.");
+ }
+ String fileDescriptorPath = configuration.getFileDescriptorPath();
+ if (fileDescriptorPath != null) {
+ beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath,
messageName);
+ valueMapper =
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
+ } else {
+ beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema,
messageName);
+ valueMapper =
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
+ }
+ } else {
+ beamSchema =
+ Objects.equals(format, "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+ : AvroUtils.toBeamSchema(new
org.apache.avro.Schema.Parser().parse(inputSchema));
+ valueMapper =
+ Objects.equals(format, "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+ }
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ KafkaIO.Read<byte[], byte[]> kafkaRead =
+ KafkaIO.readBytes()
+ .withConsumerConfigUpdates(consumerConfigs)
+ .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores())
+ .withTopic(configuration.getTopic())
+ .withBootstrapServers(configuration.getBootstrapServers());
+ if (isTest) {
+ kafkaRead =
kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
+ }
+
+ PCollection<byte[]> kafkaValues =
+
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+
+ Schema errorSchema = ErrorHandling.errorSchemaBytes();
+ PCollectionTuple outputTuple =
+ kafkaValues.apply(
+ ParDo.of(
+ new ErrorFn(
+ "Kafka-read-error-counter", valueMapper,
errorSchema, handleErrors))
+ .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+ PCollectionRowTuple outputRows =
+ PCollectionRowTuple.of("output",
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
+
+ PCollection<Row> errorOutput =
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
+ if (handleErrors) {
+ outputRows =
+ outputRows.and(
+
Objects.requireNonNull(configuration.getErrorHandling()).getOutput(),
Review Comment:
nit: redundant since
`ErrorHandling.hasOutput(configuration.getErrorHandling());` is already called
```suggestion
configuration.getErrorHandling().getOutput(),
```
##########
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java:
##########
@@ -96,6 +164,23 @@ public Row apply(byte[] input) {
};
}
+ public static SerializableFunction<Row, byte[]> getRowToProtoFromSchemaBytes(
Review Comment:
nit:
```suggestion
public static SerializableFunction<Row, byte[]>
getRowToProtoBytesFromSchema(
```
seems to follow the other names closer.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -234,6 +155,74 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
}
};
}
+
+ if (format != null && format.equals("RAW")) {
+ if (inputSchema != null) {
+ throw new IllegalArgumentException(
+ "To read from Kafka in RAW format, you can't provide a schema.");
+ }
+ beamSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
+ valueMapper = getRawBytesToRowFunction(beamSchema);
+ } else if (format != null && format.equals("PROTO")) {
+ String messageName = configuration.getMessageName();
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting a messageName to be
non-null.");
+ }
+ String fileDescriptorPath = configuration.getFileDescriptorPath();
+ if (fileDescriptorPath != null) {
+ beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath,
messageName);
+ valueMapper =
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
+ } else {
+ beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema,
messageName);
+ valueMapper =
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
+ }
+ } else {
+ beamSchema =
+ Objects.equals(format, "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+ : AvroUtils.toBeamSchema(new
org.apache.avro.Schema.Parser().parse(inputSchema));
+ valueMapper =
+ Objects.equals(format, "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+ }
Review Comment:
The parameter parsing/validation still seems to be a bit messy. I also think
there may be a missing null-check on the PROTO case to make sure there is a
fileDescriptorPath or schema provided. Could you move the validation to the
Configuration class validation method and then clean up the code accordingly.
i.e.
```
public void validate() {
final String startOffset = this.getAutoOffsetResetConfig();
assert startOffset == null ||
VALID_START_OFFSET_VALUES.contains(startOffset)
: "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
final String dataFormat = this.getFormat();
assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
: "Valid data formats are " + VALID_DATA_FORMATS;
final String inputSchema = configuration.getSchema();
final String messageName = configuration.getMessageName();
final String fileDescriptorPath = configuration.getFileDescriptorPath();
final String confluentSchemaRegUrl =
configuration.getConfluentSchemaRegistryUrl();
final String confluentSchemaRegSubject =
configuration.getConfluentSchemaRegistrySubject();
if (confluentSchemaRegUrl != null) {
assert confluentSchemaRegSubject != null
: "To read from Kafka, a schema must be provided directly or
though Confluent "
+ "Schema Registry. Make sure you are providing one of these
parameters.";
}
elif (dataFormat.equals("RAW")) {
assert inputSchema == null
: "To read from Kafka in RAW format, you can't provide a schema.";
}
elif (format.equals("JSON")) {
assert inputSchema != null
: "To read from Kafka in JSON format, you must provide a schema.";
}
elif (dataFormat.equals("PROTO")) {
assert messageName != null
: "To read from Kafka in PROTO format, messageName must be
provided.";
assert fileDescriptorPath != null || inputSchema != null) {
: "To read from Kafka in PROTO format, fileDescriptorPath or
schema must be provided.";
}
else {
assert inputSchema != null
: "To read from Kafka in AVRO format, you must provide a schema.";
}
}
```
```suggestion
if (format.equals("RAW")) {
beamSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
valueMapper = getRawBytesToRowFunction(beamSchema);
} else if (format.equals("PROTO")) {
String fileDescriptorPath = configuration.getFileDescriptorPath();
if (fileDescriptorPath != null) {
beamSchema =
ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName);
valueMapper =
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
} else {
beamSchema =
ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName);
valueMapper =
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
}
} else if (format.equals("JSON")) {
beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema);
valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema);
} else {
beamSchema = AvroUtils.toBeamSchema(new
org.apache.avro.Schema.Parser().parse(inputSchema));
valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema);
}
```
This assumes that AVRO is the default format which I believe is the current
behavior.
##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java:
##########
Review Comment:
nit: could add a failing case too (PROTO format without schema provided
perhaps)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]