Polber commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1446619899
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
} else if (configuration.getFormat().equals("PROTO")) {
String descriptorPath = configuration.getFileDescriptorPath();
+ String schema = configuration.getSchema();
String messageName = configuration.getMessageName();
- if (descriptorPath == null || messageName == null) {
+ if (messageName == null) {
+ throw new IllegalArgumentException("Expecting messageName to be
non-null.");
+ }
+ if (descriptorPath != null && schema != null) {
+ throw new IllegalArgumentException(
+ "You must include a descriptorPath or a proto Schema but not
both.");
+ } else if (descriptorPath != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
+ } else if (schema != null) {
+ toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema,
messageName);
+ } else {
throw new IllegalArgumentException(
- "Expecting both descriptorPath and messageName to be non-null.");
+ "At least a descriptorPath or a proto Schema is required.");
}
Review Comment:
I see, in that case I think providing an explicit schema, at least
optionally, makes sense. Perhaps adding support for an implicit schema could be
provided in a future FR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]