ffernandez92 commented on code in PR #28865:
URL: https://github.com/apache/beam/pull/28865#discussion_r1349483811


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform 
from(KafkaReadSchemaTransformConfiguration configurati
     consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
     consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
 
+    String format = configuration.getFormat();
+
+    if (format == null || format.isEmpty()) {
+      format = "raw";
+    }
+
+    if (format.equalsIgnoreCase("raw")) {
+      if (inputSchema != null && !inputSchema.isEmpty()) {
+        throw new IllegalArgumentException(
+            "To read from Kafka in raw format, you can't provide a schema.");
+      }
+      Schema rawSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
+      SerializableFunction<byte[], Row> valueMapper = 
getRawBytesToRowFunction(rawSchema);

Review Comment:
   Same as before: 
https://github.com/apache/beam/blob/7531501ff27f53bcb4fcd6942f34dbd45665805d/sdks/python/apache_beam/yaml/yaml_io.py#L195
   
   It does have an schema is just that this schema only has one attribute which 
is `payload` and then the values are raw bytes.
   
   I do agree that this feature might not see widespread use. Ideally, users 
should utilize schemas for data processing. However, there could be scenarios 
where this feature comes in handy. For instance, imagine you have events in a 
format like a,b,c in Kafka, resembling CSV data. In such cases, you might 
receive raw bytes in the payload, and it becomes your responsibility to parse 
them downstream in the correct way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to