robertwb commented on code in PR #29160:
URL: https://github.com/apache/beam/pull/29160#discussion_r1376701562


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -131,10 +134,18 @@ public void finish() {
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       Schema inputSchema = input.get("input").getSchema();
-      final SerializableFunction<Row, byte[]> toBytesFn =
-          configuration.getFormat().equals("JSON")
-              ? JsonUtils.getRowToJsonBytesFunction(inputSchema)
-              : AvroUtils.getRowToAvroBytesFunction(inputSchema);
+      final SerializableFunction<Row, byte[]> toBytesFn;
+      if (configuration.getFormat().equals("RAW")) {
+        if (!inputSchema.hasField(PAYLOAD)) {

Review Comment:
   In PubSub, I've been just requiring a single payload (not caring about the 
name). Do we want to have the same pattern here? 



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -193,7 +217,7 @@ public abstract static class 
KafkaWriteSchemaTransformConfiguration implements S
     @SchemaFieldDescription(
         "A list of host/port pairs to use for establishing the initial 
connection to the"
             + " Kafka cluster. The client will make use of all servers 
irrespective of which servers are specified"
-            + " here for bootstrapping—this list only impacts the initial 
hosts used to discover the full set"
+            + " here for bootstrapping—this list only impacts the initial 
hosts used to discover the full set"

Review Comment:
   Revert?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to