robertwb commented on code in PR #28865:
URL: https://github.com/apache/beam/pull/28865#discussion_r1367532756
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java:
##########
@@ -39,7 +39,7 @@ public abstract class KafkaReadSchemaTransformConfiguration {
public static final Set<String> VALID_START_OFFSET_VALUES =
Sets.newHashSet("earliest", "latest");
- public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final String VALID_FORMATS_STR = "raw,avro,json";
public static final Set<String> VALID_DATA_FORMATS =
Sets.newHashSet(VALID_FORMATS_STR.split(","));
Review Comment:
+1 to ensuring the two remain consistent (though one could just as easily
join for the error message).
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform
from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
+ String format = configuration.getFormat();
+
+ if (format == null || format.isEmpty()) {
+ format = "raw";
+ }
+
+ if (format.equalsIgnoreCase("raw")) {
+ if (inputSchema != null && !inputSchema.isEmpty()) {
+ throw new IllegalArgumentException(
+ "To read from Kafka in raw format, you can't provide a schema.");
+ }
+ Schema rawSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
+ SerializableFunction<byte[], Row> valueMapper =
getRawBytesToRowFunction(rawSchema);
Review Comment:
Yeah. Ideally users should be publishing schema'd data, but technically
Kafka is just a plumber of bytes and doesn't otherwise impose restrictions.
This allows users to at least get at the data and do what they want with it.
(Also, perhaps they're using a schema we don't support yet, like protobuf or
Cap'n Proto or something.)
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform
from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
+ String format = configuration.getFormat();
+
+ if (format == null || format.isEmpty()) {
+ format = "raw";
Review Comment:
I think we should require the user to set this. In the future we may want to
pull the appropriate schema from metadata (in that case we could add an
explicit "infer" value if we wanted, but if there is a default that should be
it). We should leave ourselves options--we can always set a default later if we
really want but we can't retract one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]