ffernandez92 commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1446190345
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
} else if (configuration.getFormat().equals("PROTO")) {
String descriptorPath = configuration.getFileDescriptorPath();
+ String schema = configuration.getSchema();
String messageName = configuration.getMessageName();
- if (descriptorPath == null || messageName == null) {
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting messageName to be
non-null.");
+ }
+ if (descriptorPath != null && schema != null) {
+ throw new IllegalArgumentException(
+ "You must include a descriptorPath or a proto Schema but not
both.");
+ } else if (descriptorPath != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
+ } else if (schema != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema,
messageName);
+ } else {
throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
+ "At least a descriptorPath or a proto Schema is required.");
}
Review Comment:
Providing a separate schema in the Configuration offers flexibility and
explicit control over the translation process, particularly when addressing
variations in field mapping, data types, nested structures, default values, and
schema evolution. Other alternatives, such as using the StorageApiProto, were
considered, but this approach could potentially prevent the resulting output
from matching the expected Proto schema for the subsequent reader. Another
option explored was similar to the approach used in Scio
(https://spotify.github.io/scio/io/Protobuf.html#write-protobuf-files), where a
wrapper is created. However, this method introduces a layer of abstraction,
potentially resulting in the output not precisely aligning with the user's
desired schema. I remain open to suggestions for alternative approaches in this
context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]