ffernandez92 commented on code in PR #28865:
URL: https://github.com/apache/beam/pull/28865#discussion_r1349483811
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform
from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
+ String format = configuration.getFormat();
+
+ if (format == null || format.isEmpty()) {
+ format = "raw";
+ }
+
+ if (format.equalsIgnoreCase("raw")) {
+ if (inputSchema != null && !inputSchema.isEmpty()) {
+ throw new IllegalArgumentException(
+ "To read from Kafka in raw format, you can't provide a schema.");
+ }
+ Schema rawSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
+ SerializableFunction<byte[], Row> valueMapper =
getRawBytesToRowFunction(rawSchema);
Review Comment:
Same as before:
https://github.com/apache/beam/blob/7531501ff27f53bcb4fcd6942f34dbd45665805d/sdks/python/apache_beam/yaml/yaml_io.py#L195
It does have an schema is just that this schema only has one attribute which
is `payload` and then the values are raw bytes.
I do agree that this feature might not see widespread use. Ideally, users
should utilize schemas for data processing. However, there could be scenarios
where this feature comes in handy. For instance, imagine you have events in a
format like a,b,c in Kafka, resembling CSV data. In such cases, you might
receive raw bytes in the payload, and it becomes your responsibility to parse
them downstream in the correct way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]