brucearctor commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1446498632


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
       } else if (configuration.getFormat().equals("PROTO")) {
         String descriptorPath = configuration.getFileDescriptorPath();
+        String schema = configuration.getSchema();
         String messageName = configuration.getMessageName();
-        if (descriptorPath == null || messageName == null) {
+        if (messageName == null) {
+          throw new IllegalArgumentException("Expecting messageName to be 
non-null.");
+        }
+        if (descriptorPath != null && schema != null) {
+          throw new IllegalArgumentException(
+              "You must include a descriptorPath or a proto Schema but not 
both.");
+        } else if (descriptorPath != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
+        } else if (schema != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema, 
messageName);
+        } else {
           throw new IllegalArgumentException(
-              "Expecting both descriptorPath and messageName to be non-null.");
+              "At least a descriptorPath or a proto Schema is required.");
         }

Review Comment:
   Interesting -- I didn't realize a translation function existed from beam row 
to proto, I imagine there are things around: 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.html
 
   
   We'd definitely need to understand the translation much more, to ensure 
sufficiently deterministic.  Passing the information explicitly removes all 
doubt.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to