robertwb commented on code in PR #29160:
URL: https://github.com/apache/beam/pull/29160#discussion_r1376701562
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -131,10 +134,18 @@ public void finish() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Schema inputSchema = input.get("input").getSchema();
- final SerializableFunction<Row, byte[]> toBytesFn =
- configuration.getFormat().equals("JSON")
- ? JsonUtils.getRowToJsonBytesFunction(inputSchema)
- : AvroUtils.getRowToAvroBytesFunction(inputSchema);
+ final SerializableFunction<Row, byte[]> toBytesFn;
+ if (configuration.getFormat().equals("RAW")) {
+ if (!inputSchema.hasField(PAYLOAD)) {
Review Comment:
In PubSub, I've been just requiring a single payload (not caring about the
name). Do we want to have the same pattern here?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -193,7 +217,7 @@ public abstract static class
KafkaWriteSchemaTransformConfiguration implements S
@SchemaFieldDescription(
"A list of host/port pairs to use for establishing the initial
connection to the"
+ " Kafka cluster. The client will make use of all servers
irrespective of which servers are specified"
- + " here for bootstrapping—this list only impacts the initial
hosts used to discover the full set"
+ + " here for bootstrapping—this list only impacts the initial
hosts used to discover the full set"
Review Comment:
Revert?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]