Polber commented on code in PR #29835:
URL: https://github.com/apache/beam/pull/29835#discussion_r1445316149


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -115,97 +115,18 @@ protected SchemaTransform 
from(KafkaReadSchemaTransformConfiguration configurati
 
     String format = configuration.getFormat();
     boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());
-    String descriptorPath = configuration.getFileDescriptorPath();
-    String messageName = configuration.getMessageName();
-
-    if ((format != null && VALID_DATA_FORMATS.contains(format))
-        || (!Strings.isNullOrEmpty(inputSchema) && !Objects.equals(format, 
"RAW"))
-        || (Objects.equals(format, "PROTO")
-            && !Strings.isNullOrEmpty(descriptorPath)
-            && !Strings.isNullOrEmpty(messageName))) {
-      SerializableFunction<byte[], Row> valueMapper;
-      Schema beamSchema;
-      if (format != null && format.equals("RAW")) {
-        if (inputSchema != null) {
-          throw new IllegalArgumentException(
-              "To read from Kafka in RAW format, you can't provide a schema.");
-        }
-        beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
-        valueMapper = getRawBytesToRowFunction(beamSchema);
-      } else if (format != null && format.equals("PROTO")) {
-        if (descriptorPath == null || messageName == null) {
-          throw new IllegalArgumentException(
-              "Expecting both descriptorPath and messageName to be non-null.");
-        }
-        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(descriptorPath, messageName);
-        beamSchema = ProtoByteUtils.getBeamSchemaFromProto(descriptorPath, 
messageName);
-      } else {
-        assert 
Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
-            : "To read from Kafka, a schema must be provided directly or 
though Confluent "
-                + "Schema Registry, but not both.";
-        if (inputSchema == null) {
-          throw new IllegalArgumentException(
-              "To read from Kafka in JSON or AVRO format, you must provide a 
schema.");
-        }
-        beamSchema =
-            Objects.equals(format, "JSON")
-                ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
-                : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
-        valueMapper =
-            Objects.equals(format, "JSON")
-                ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
-                : AvroUtils.getAvroBytesToRowFunction(beamSchema);
-      }
-      return new SchemaTransform() {
-        @Override
-        public PCollectionRowTuple expand(PCollectionRowTuple input) {
-          KafkaIO.Read<byte[], byte[]> kafkaRead =
-              KafkaIO.readBytes()
-                  .withConsumerConfigUpdates(consumerConfigs)
-                  .withConsumerFactoryFn(new 
ConsumerFactoryWithGcsTrustStores())
-                  .withTopic(configuration.getTopic())
-                  .withBootstrapServers(configuration.getBootstrapServers());
-          if (isTest) {
-            kafkaRead = 
kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
-          }
 
-          PCollection<byte[]> kafkaValues =
-              
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+    SerializableFunction<byte[], Row> valueMapper;
+    Schema beamSchema;
 
-          Schema errorSchema = ErrorHandling.errorSchemaBytes();
-          PCollectionTuple outputTuple =
-              kafkaValues.apply(
-                  ParDo.of(
-                          new ErrorFn(
-                              "Kafka-read-error-counter", valueMapper, 
errorSchema, handleErrors))
-                      .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-          PCollectionRowTuple outputRows =
-              PCollectionRowTuple.of(
-                  "output", 
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
-
-          PCollection<Row> errorOutput = 
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
-          if (handleErrors) {
-            ErrorHandling errorHandling = configuration.getErrorHandling();
-            if (errorHandling == null) {
-              throw new IllegalArgumentException("You must specify an error 
handling option.");
-            }
-            outputRows = outputRows.and(errorHandling.getOutput(), 
errorOutput);
-          }
-          return outputRows;
-        }
-      };
-    } else {
-      assert 
!Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
-          : "To read from Kafka, a schema must be provided directly or though 
Confluent "
-              + "Schema Registry. Neither seems to have been provided.";
+    String confluentSchemaRegUrl = 
configuration.getConfluentSchemaRegistryUrl();
+    if (confluentSchemaRegUrl != null) {
       return new SchemaTransform() {
         @Override
         public PCollectionRowTuple expand(PCollectionRowTuple input) {
-          final String confluentSchemaRegUrl = 
configuration.getConfluentSchemaRegistryUrl();
           final String confluentSchemaRegSubject =
               configuration.getConfluentSchemaRegistrySubject();
-          if (confluentSchemaRegUrl == null || confluentSchemaRegSubject == 
null) {
+          if (confluentSchemaRegSubject == null) {
             throw new IllegalArgumentException(
                 "To read from Kafka, a schema must be provided directly or 
though Confluent "
                     + "Schema Registry. Make sure you are providing one of 
these parameters.");

Review Comment:
   This can be moved to the Configuration class validate method (explained more 
in my other comment).
   ```suggestion
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
       } else if (configuration.getFormat().equals("PROTO")) {
         String descriptorPath = configuration.getFileDescriptorPath();
+        String schema = configuration.getSchema();
         String messageName = configuration.getMessageName();
-        if (descriptorPath == null || messageName == null) {
+        if (messageName == null) {
+          throw new IllegalArgumentException("Expecting messageName to be 
non-null.");
+        }
+        if (descriptorPath != null && schema != null) {
+          throw new IllegalArgumentException(
+              "You must include a descriptorPath or a proto Schema but not 
both.");
+        } else if (descriptorPath != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
+        } else if (schema != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema, 
messageName);
+        } else {
           throw new IllegalArgumentException(
-              "Expecting both descriptorPath and messageName to be non-null.");
+              "At least a descriptorPath or a proto Schema is required.");
         }

Review Comment:
   Similar to my comment for the ReadProvider, I think moving the validation to 
the Configuration class validate method would clean up this file so it is 
easier to read/debug.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -180,8 +211,14 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                   .withKeySerializer(ByteArraySerializer.class)
                   .withValueSerializer(ByteArraySerializer.class));
 
+      // TODO: include output from KafkaIO Write once updated from PDone
+      PCollection<Row> errorOutput =
+          
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));
       return PCollectionRowTuple.of(
-          "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+          handleErrors
+              ? 
Objects.requireNonNull(configuration.getErrorHandling()).getOutput()

Review Comment:
   nit: again, redundant since 
`ErrorHandling.hasOutput(configuration.getErrorHandling());` is called.
   ```suggestion
                 ? configuration.getErrorHandling().getOutput()
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -148,23 +164,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
       } else if (configuration.getFormat().equals("PROTO")) {
         String descriptorPath = configuration.getFileDescriptorPath();
+        String schema = configuration.getSchema();
         String messageName = configuration.getMessageName();
-        if (descriptorPath == null || messageName == null) {
+        if (messageName == null) {
+          throw new IllegalArgumentException("Expecting messageName to be 
non-null.");
+        }
+        if (descriptorPath != null && schema != null) {
+          throw new IllegalArgumentException(
+              "You must include a descriptorPath or a proto Schema but not 
both.");
+        } else if (descriptorPath != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
+        } else if (schema != null) {
+          toBytesFn = ProtoByteUtils.getRowToProtoFromSchemaBytes(schema, 
messageName);
+        } else {
           throw new IllegalArgumentException(
-              "Expecting both descriptorPath and messageName to be non-null.");
+              "At least a descriptorPath or a proto Schema is required.");
         }

Review Comment:
   More for my understanding - why is a schema provided by the Configuration 
required here? The other data formats use the schema from the incoming 
PCollectionRowTuple input to create the schema for the outgoing 
PCollectionRowTuple output. Can the output Proto schema not be constructed from 
the input Row schema?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -234,6 +155,74 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         }
       };
     }
+
+    if (format != null && format.equals("RAW")) {
+      if (inputSchema != null) {
+        throw new IllegalArgumentException(
+            "To read from Kafka in RAW format, you can't provide a schema.");
+      }
+      beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
+      valueMapper = getRawBytesToRowFunction(beamSchema);
+    } else if (format != null && format.equals("PROTO")) {
+      String messageName = configuration.getMessageName();
+      if (messageName == null) {
+        throw new IllegalArgumentException("Expecting a messageName to be 
non-null.");
+      }
+      String fileDescriptorPath = configuration.getFileDescriptorPath();
+      if (fileDescriptorPath != null) {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
+      } else {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
+      }
+    } else {
+      beamSchema =
+          Objects.equals(format, "JSON")
+              ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+              : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
+      valueMapper =
+          Objects.equals(format, "JSON")
+              ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+              : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+    }
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        KafkaIO.Read<byte[], byte[]> kafkaRead =
+            KafkaIO.readBytes()
+                .withConsumerConfigUpdates(consumerConfigs)
+                .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores())
+                .withTopic(configuration.getTopic())
+                .withBootstrapServers(configuration.getBootstrapServers());
+        if (isTest) {
+          kafkaRead = 
kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
+        }
+
+        PCollection<byte[]> kafkaValues =
+            
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+
+        Schema errorSchema = ErrorHandling.errorSchemaBytes();
+        PCollectionTuple outputTuple =
+            kafkaValues.apply(
+                ParDo.of(
+                        new ErrorFn(
+                            "Kafka-read-error-counter", valueMapper, 
errorSchema, handleErrors))
+                    .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+        PCollectionRowTuple outputRows =
+            PCollectionRowTuple.of("output", 
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
+
+        PCollection<Row> errorOutput = 
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
+        if (handleErrors) {
+          outputRows =
+              outputRows.and(
+                  
Objects.requireNonNull(configuration.getErrorHandling()).getOutput(),

Review Comment:
   nit: redundant since 
`ErrorHandling.hasOutput(configuration.getErrorHandling());` is already called
   ```suggestion
                     configuration.getErrorHandling().getOutput(),
   ```



##########
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java:
##########
@@ -96,6 +164,23 @@ public Row apply(byte[] input) {
     };
   }
 
+  public static SerializableFunction<Row, byte[]> getRowToProtoFromSchemaBytes(

Review Comment:
   nit: 
   ```suggestion
     public static SerializableFunction<Row, byte[]> 
getRowToProtoBytesFromSchema(
   ```
   seems to follow the other names closer.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -234,6 +155,74 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
         }
       };
     }
+
+    if (format != null && format.equals("RAW")) {
+      if (inputSchema != null) {
+        throw new IllegalArgumentException(
+            "To read from Kafka in RAW format, you can't provide a schema.");
+      }
+      beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
+      valueMapper = getRawBytesToRowFunction(beamSchema);
+    } else if (format != null && format.equals("PROTO")) {
+      String messageName = configuration.getMessageName();
+      if (messageName == null) {
+        throw new IllegalArgumentException("Expecting a messageName to be 
non-null.");
+      }
+      String fileDescriptorPath = configuration.getFileDescriptorPath();
+      if (fileDescriptorPath != null) {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
+      } else {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
+      }
+    } else {
+      beamSchema =
+          Objects.equals(format, "JSON")
+              ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+              : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
+      valueMapper =
+          Objects.equals(format, "JSON")
+              ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+              : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+    }

Review Comment:
   The parameter parsing/validation still seems to be a bit messy. I also think 
there may be a missing null-check on the PROTO case to make sure there is a 
fileDescriptorPath or schema provided. Could you move the validation to the 
Configuration class validation method and then clean up the code accordingly. 
i.e.
   
   ```
   public void validate() {
       final String startOffset = this.getAutoOffsetResetConfig();
       assert startOffset == null || 
VALID_START_OFFSET_VALUES.contains(startOffset)
           : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
       final String dataFormat = this.getFormat();
       assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
           : "Valid data formats are " + VALID_DATA_FORMATS;
   
       final String inputSchema = configuration.getSchema();
       final String messageName = configuration.getMessageName();
       final String fileDescriptorPath = configuration.getFileDescriptorPath();
       final String confluentSchemaRegUrl = 
configuration.getConfluentSchemaRegistryUrl();
       final String confluentSchemaRegSubject = 
configuration.getConfluentSchemaRegistrySubject();
   
       if (confluentSchemaRegUrl != null) {
         assert confluentSchemaRegSubject != null
             : "To read from Kafka, a schema must be provided directly or 
though Confluent "
                 + "Schema Registry. Make sure you are providing one of these 
parameters.";
       }
       elif (dataFormat.equals("RAW")) {
         assert inputSchema == null
             : "To read from Kafka in RAW format, you can't provide a schema.";
       }
       elif (format.equals("JSON")) {
         assert inputSchema != null
             : "To read from Kafka in JSON format, you must provide a schema.";
       }
       elif (dataFormat.equals("PROTO")) {
         assert messageName != null
             : "To read from Kafka in PROTO format, messageName must be 
provided.";
         assert fileDescriptorPath != null || inputSchema != null) {
             : "To read from Kafka in PROTO format, fileDescriptorPath or 
schema must be provided.";
       }
       else {
         assert inputSchema != null
             : "To read from Kafka in AVRO format, you must provide a schema.";
       }
     }
   ```
   
   ```suggestion
       if (format.equals("RAW")) {
         beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
         valueMapper = getRawBytesToRowFunction(beamSchema);
       } else if (format.equals("PROTO")) {
         String fileDescriptorPath = configuration.getFileDescriptorPath();
         if (fileDescriptorPath != null) {
           beamSchema = 
ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName);
           valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
         } else {
           beamSchema = 
ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName);
           valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
         }
       } else if (format.equals("JSON")) {
         beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema);
         valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema);
       } else {
         beamSchema = AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
         valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema);
       }
   ```
   
   This assumes that AVRO is the default format which I believe is the current 
behavior.



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java:
##########


Review Comment:
   nit: could add a failing case too (PROTO format without schema provided 
perhaps)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to