brucearctor commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1446498632
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
} else if (configuration.getFormat().equals("PROTO")) {
String descriptorPath = configuration.getFileDescriptorPath();
+ String schema = configuration.getSchema();
String messageName = configuration.getMessageName();
- if (descriptorPath == null || messageName == null) {
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting messageName to be
non-null.");
+ }
+ if (descriptorPath != null && schema != null) {
+ throw new IllegalArgumentException(
+ "You must include a descriptorPath or a proto Schema but not
both.");
+ } else if (descriptorPath != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
+ } else if (schema != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema,
messageName);
+ } else {
throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
+ "At least a descriptorPath or a proto Schema is required.");
}
Review Comment:
Interesting -- I didn't realize a translation function existed from beam row
to proto, I imagine there are things around:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.html
We'd definitely need to understand the translation much more, to ensure
sufficiently deterministic. Passing the information explicitly removes all
doubt.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]