robertwb commented on code in PR #28865:
URL: https://github.com/apache/beam/pull/28865#discussion_r1367532756


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java:
##########
@@ -39,7 +39,7 @@ public abstract class KafkaReadSchemaTransformConfiguration {
 
   public static final Set<String> VALID_START_OFFSET_VALUES = 
Sets.newHashSet("earliest", "latest");
 
-  public static final String VALID_FORMATS_STR = "AVRO,JSON";
+  public static final String VALID_FORMATS_STR = "raw,avro,json";
   public static final Set<String> VALID_DATA_FORMATS =
       Sets.newHashSet(VALID_FORMATS_STR.split(","));

Review Comment:
   +1 to ensuring the two remain consistent (though one could just as easily 
join for the error message). 



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform 
from(KafkaReadSchemaTransformConfiguration configurati
     consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
     consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
 
+    String format = configuration.getFormat();
+
+    if (format == null || format.isEmpty()) {
+      format = "raw";
+    }
+
+    if (format.equalsIgnoreCase("raw")) {
+      if (inputSchema != null && !inputSchema.isEmpty()) {
+        throw new IllegalArgumentException(
+            "To read from Kafka in raw format, you can't provide a schema.");
+      }
+      Schema rawSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
+      SerializableFunction<byte[], Row> valueMapper = 
getRawBytesToRowFunction(rawSchema);

Review Comment:
   Yeah. Ideally users should be publishing schema'd data, but technically 
Kafka is just a plumber of bytes and doesn't otherwise impose restrictions. 
This allows users to at least get at the data and do what they want with it. 
(Also, perhaps they're using a schema we don't support yet, like protobuf or 
Cap'n Proto or something.)



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -112,16 +113,60 @@ protected SchemaTransform 
from(KafkaReadSchemaTransformConfiguration configurati
     consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
     consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoOffsetReset);
 
+    String format = configuration.getFormat();
+
+    if (format == null || format.isEmpty()) {
+      format = "raw";

Review Comment:
   I think we should require the user to set this. In the future we may want to 
pull the appropriate schema from metadata (in that case we could add an 
explicit "infer" value if we wanted, but if there is a default that should be 
it). We should leave ourselves options--we can always set a default later if we 
really want but we can't retract one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to