manavgarg commented on code in PR #27039:
URL: https://github.com/apache/beam/pull/27039#discussion_r1222426195


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java:
##########
@@ -51,22 +60,33 @@ public String identifier() {
    * {@link PCollection} file names written using {@link TextIO.Write}.
    */
   @Override
-  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+  public PTransform<PCollection<Row>, PCollectionTuple> buildTransform(

Review Comment:
   nit: please fix the comments at all places where we change the method args.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java:
##########
@@ -38,6 +39,6 @@ public interface FileWriteSchemaTransformFormatProvider 
extends Providers.Identi
    * Builds a {@link PTransform} that writes a {@link Row} {@link PCollection} 
and outputs the
    * resulting {@link PCollection} of the file names.
    */
-  PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+  PTransform<PCollection<Row>, PCollectionTuple> buildTransform(

Review Comment:
   nit: change the @link in the above comment to PCollectionTuple?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java:
##########
@@ -47,22 +57,27 @@ public String identifier() {
    * {@link PCollection} file names written using {@link AvroIO.Write}.
    */
   @Override
-  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+  public PTransform<PCollection<Row>, PCollectionTuple> buildTransform(
       FileWriteSchemaTransformConfiguration configuration, Schema schema) {
 
-    return new PTransform<PCollection<Row>, PCollection<String>>() {
+    return new PTransform<PCollection<Row>, PCollectionTuple>() {
       @Override
-      public PCollection<String> expand(PCollection<Row> input) {
+      public PCollectionTuple expand(PCollection<Row> input) {
 
         org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
         AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
 
-        PCollection<GenericRecord> avro =
-            input
-                .apply(
-                    "Row To Avro Generic Record",
-                    
FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
-                .setCoder(coder);
+        PCollectionTuple tuple =
+            input.apply(
+                "Row To Avro Generic Record",
+                ParDo.of(
+                        new ErrorCounterFn<GenericRecord>(
+                            "Avro-write-error-counter",
+                            
AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(schema)),
+                            ERROR_FN_OUPUT_TAG))
+                    .withOutputTags(ERROR_FN_OUPUT_TAG, 
TupleTagList.of(ERROR_TAG)));

Review Comment:
   to be sure, "ERROR_FN_OUPUT_TAG" corresponds to the successful records after 
applying the ErrorCounterFn?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java:
##########
@@ -129,7 +136,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                                   .build()))
               .setRowSchema(OUTPUT_SCHEMA);
 
-      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+      if (files.has(ERROR_TAG)) {
+        return PCollectionRowTuple.of(OUTPUT_TAG, output).and("error", 
files.get(ERROR_TAG));

Review Comment:
   can we have a constant for "error" similar to OUTPUT_TAG. Let's change it 
all the places. Using hardcoded strings is not a good practice and makes 
refactoring difficult. 



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java:
##########
@@ -69,14 +80,25 @@ public PCollection<String> expand(PCollection<Row> input) {
 
         write = applyCommonFileIOWriteFeatures(write, configuration);
 
-        return input
-            .apply(
+        PCollectionTuple avro =

Review Comment:
   change avro to parquet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to