ffernandez92 commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1446190345


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
       } else if (configuration.getFormat().equals("PROTO")) {
         String descriptorPath = configuration.getFileDescriptorPath();
+        String schema = configuration.getSchema();
         String messageName = configuration.getMessageName();
-        if (descriptorPath == null || messageName == null) {
+        if (messageName == null) {
+          throw new IllegalArgumentException("Expecting messageName to be 
non-null.");
+        }
+        if (descriptorPath != null && schema != null) {
+          throw new IllegalArgumentException(
+              "You must include a descriptorPath or a proto Schema but not 
both.");
+        } else if (descriptorPath != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
+        } else if (schema != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema, 
messageName);
+        } else {
           throw new IllegalArgumentException(
-              "Expecting both descriptorPath and messageName to be non-null.");
+              "At least a descriptorPath or a proto Schema is required.");
         }

Review Comment:
   Providing a separate schema in the Configuration offers flexibility and 
explicit control over the translation process, particularly when addressing 
variations in field mapping, data types, nested structures, default values, and 
schema evolution. Other alternatives, such as using the StorageApiProto, were 
considered, but this approach could potentially prevent the resulting output 
from matching the expected Proto schema for the subsequent reader. Another 
option explored was similar to the approach used in Scio 
(https://spotify.github.io/scio/io/Protobuf.html#write-protobuf-files), where a 
wrapper is created. However, this method introduces a layer of abstraction, 
potentially resulting in the output not precisely aligning with the user's 
desired schema. I remain open to suggestions for alternative approaches in this 
context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to