ahmedabu98 commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1489846433


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java:
##########
@@ -291,6 +287,10 @@ private synchronized boolean readNextRecord() throws 
IOException {
                   * 1.0
                   / totalRowsInCurrentResponse;
 
+      SchemaAndRecord schemaAndRecord = new 
SchemaAndRecord(reader.readSingleRecord(), tableSchema);
+
+      current = parseFn.apply(schemaAndRecord);
+

Review Comment:
   nit: see previous comment



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -680,62 +697,92 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeDynamicallyShardedFil
     // parallelize properly. We also ensure that the files are written if a 
threshold number of
     // records are ready. Dynamic sharding is achieved via the 
withShardedKey() option provided by
     // GroupIntoBatches.
-    return input
-        .apply(
-            GroupIntoBatches.<DestinationT, 
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
-                .withByteSize(byteSize)
-                .withMaxBufferingDuration(maxBufferingDuration)
-                .withShardedKey())
-        .setCoder(
-            KvCoder.of(
-                org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
-                IterableCoder.of(elementCoder)))
-        .apply(
-            "StripShardId",
-            MapElements.via(
-                new SimpleFunction<
-                    KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, 
Iterable<ElementT>>,
-                    KV<DestinationT, Iterable<ElementT>>>() {
-                  @Override
-                  public KV<DestinationT, Iterable<ElementT>> apply(
-                      KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, 
Iterable<ElementT>>
-                          input) {
-                    return KV.of(input.getKey().getKey(), input.getValue());
-                  }
-                }))
-        .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
-        .apply(
-            "WriteGroupedRecords",
-            ParDo.of(
-                    new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
-                        tempFilePrefix, maxFileSize, rowWriterFactory))
-                .withSideInputs(tempFilePrefix))
+    TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+    PCollectionTuple writeResults =
+        input
+            .apply(
+                GroupIntoBatches.<DestinationT, 
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                    .withByteSize(byteSize)
+                    .withMaxBufferingDuration(maxBufferingDuration)
+                    .withShardedKey())
+            .setCoder(
+                KvCoder.of(
+                    
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+                    IterableCoder.of(elementCoder)))
+            .apply(
+                "StripShardId",
+                MapElements.via(
+                    new SimpleFunction<
+                        KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, 
Iterable<ElementT>>,
+                        KV<DestinationT, Iterable<ElementT>>>() {
+                      @Override
+                      public KV<DestinationT, Iterable<ElementT>> apply(
+                          
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
+                              input) {
+                        return KV.of(input.getKey().getKey(), 
input.getValue());
+                      }
+                    }))
+            .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(elementCoder)))
+            .apply(
+                "WriteGroupedRecords",
+                ParDo.of(
+                        new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
+                            tempFilePrefix,
+                            maxFileSize,
+                            rowWriterFactory,
+                            badRecordRouter,
+                            successfulResultsTag,
+                            elementCoder))
+                    .withSideInputs(tempFilePrefix)
+                    .withOutputTags(successfulResultsTag, 
TupleTagList.of(BAD_RECORD_TAG)));
+    badRecordErrorHandler.addErrorCollection(
+        
writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
+
+    return writeResults
+        .get(successfulResultsTag)
         .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
   }
 
   private PCollection<Result<DestinationT>> writeShardedRecords(
       PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
       PCollectionView<String> tempFilePrefix) {
-    return shardedRecords
-        .apply("GroupByDestination", GroupByKey.create())
-        .apply(
-            "StripShardId",
-            MapElements.via(
-                new SimpleFunction<
-                    KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
-                    KV<DestinationT, Iterable<ElementT>>>() {
-                  @Override
-                  public KV<DestinationT, Iterable<ElementT>> apply(
-                      KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
-                    return KV.of(input.getKey().getKey(), input.getValue());
-                  }
-                }))
-        .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
-        .apply(
-            "WriteGroupedRecords",
-            ParDo.of(
-                    new WriteGroupedRecordsToFiles<>(tempFilePrefix, 
maxFileSize, rowWriterFactory))
-                .withSideInputs(tempFilePrefix))
+    TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+    PCollectionTuple writeResults =
+        shardedRecords
+            .apply("GroupByDestination", GroupByKey.create())
+            .apply(
+                "StripShardId",
+                MapElements.via(
+                    new SimpleFunction<
+                        KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
+                        KV<DestinationT, Iterable<ElementT>>>() {
+                      @Override
+                      public KV<DestinationT, Iterable<ElementT>> apply(
+                          KV<ShardedKey<DestinationT>, Iterable<ElementT>> 
input) {
+                        return KV.of(input.getKey().getKey(), 
input.getValue());
+                      }
+                    }))
+            .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(elementCoder)))
+            .apply(
+                "WriteGroupedRecords",
+                ParDo.of(
+                        new WriteGroupedRecordsToFiles<>(
+                            tempFilePrefix,
+                            maxFileSize,
+                            rowWriterFactory,
+                            badRecordRouter,
+                            successfulResultsTag,
+                            elementCoder))
+                    .withSideInputs(tempFilePrefix)
+                    .withOutputTags(successfulResultsTag, 
TupleTagList.of(BAD_RECORD_TAG)));
+
+    badRecordErrorHandler.addErrorCollection(
+        writeResults
+            .get(BAD_RECORD_TAG)
+            .setCoder(BadRecord.getCoder(shardedRecords.getPipeline())));

Review Comment:
   QQ what happens if this `BadRecord.getCoder()` method throws an error here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1428,7 +1452,7 @@ private PCollection<T> expandForDirectRead(
         PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions 
bqOptions) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
-      if (tableProvider != null) {
+      if (tableProvider != null && getBadRecordRouter() instanceof 
ThrowingBadRecordRouter) {

Review Comment:
   P.S. same with `getWithTemplateCompatibility` below



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1776,45 +1839,102 @@ private PCollection<T> 
createPCollectionForDirectReadWithStreamBundle(
         TupleTag<List<ReadStream>> listReadStreamsTag,
         PCollectionView<ReadSession> readSessionView,
         PCollectionView<String> tableSchemaView) {
-      PCollection<T> rows =
+      TupleTag<T> rowTag = new TupleTag<>();
+      PCollectionTuple resultTuple =
           tuple
               .get(listReadStreamsTag)
               .apply(Reshuffle.viaRandomKey())
               .apply(
                   ParDo.of(
                           new DoFn<List<ReadStream>, T>() {
                             @ProcessElement
-                            public void processElement(ProcessContext c) 
throws Exception {
+                            public void processElement(
+                                ProcessContext c, MultiOutputReceiver 
outputReceiver)
+                                throws Exception {
                               ReadSession readSession = 
c.sideInput(readSessionView);
                               TableSchema tableSchema =
                                   BigQueryHelpers.fromJsonString(
                                       c.sideInput(tableSchemaView), 
TableSchema.class);
                               List<ReadStream> streamBundle = c.element();
 
+                              ErrorHandlingParseFn errorHandlingParseFn =
+                                  new ErrorHandlingParseFn(getParseFn());
+
                               BigQueryStorageStreamBundleSource<T> 
streamSource =
                                   BigQueryStorageStreamBundleSource.create(
                                       readSession,
                                       streamBundle,
                                       tableSchema,
-                                      getParseFn(),
+                                      errorHandlingParseFn,
                                       outputCoder,
                                       getBigQueryServices(),
                                       1L);
 
-                              // Read all of the data from the stream. In the 
event that this work
-                              // item fails and is rescheduled, the same rows 
will be returned in
-                              // the same order.
-                              BoundedReader<T> reader =
-                                  
streamSource.createReader(c.getPipelineOptions());
-                              for (boolean more = reader.start(); more; more = 
reader.advance()) {
-                                c.output(reader.getCurrent());
-                              }
+                              readStreamSource(
+                                  c.getPipelineOptions(),
+                                  rowTag,
+                                  outputReceiver,
+                                  streamSource,
+                                  errorHandlingParseFn);
                             }
                           })
-                      .withSideInputs(readSessionView, tableSchemaView))
-              .setCoder(outputCoder);
+                      .withSideInputs(readSessionView, tableSchemaView)
+                      .withOutputTags(rowTag, 
TupleTagList.of(BAD_RECORD_TAG)));
 
-      return rows;
+      getBadRecordErrorHandler()
+          .addErrorCollection(
+              
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline())));
+
+      return resultTuple.get(rowTag).setCoder(outputCoder);
+    }
+
+    public void readStreamSource(
+        PipelineOptions options,
+        TupleTag<T> rowTag,
+        MultiOutputReceiver outputReceiver,
+        BoundedSource<T> streamSource,
+        ErrorHandlingParseFn errorHandlingParseFn)
+        throws Exception {
+      // Read all the data from the stream. In the event that this work
+      // item fails and is rescheduled, the same rows will be returned in
+      // the same order.
+      BoundedSource.BoundedReader<T> reader = 
streamSource.createReader(options);
+
+      try {
+        if (reader.start()) {
+          outputReceiver.get(rowTag).output(reader.getCurrent());
+        } else {
+          return;
+        }
+      } catch (ParseException e) {
+        GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+        getBadRecordRouter()
+            .route(
+                outputReceiver,
+                record,
+                AvroCoder.of(record.getSchema()),
+                (Exception) e.getCause(),
+                "Unable to parse record reading from BigQuery");
+      }
+
+      while (true) {
+        try {
+          if (reader.advance()) {
+            outputReceiver.get(rowTag).output(reader.getCurrent());
+          } else {
+            return;
+          }
+        } catch (ParseException e) {
+          GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+          getBadRecordRouter()
+              .route(
+                  outputReceiver,
+                  record,
+                  AvroCoder.of(record.getSchema()),
+                  (Exception) e.getCause(),
+                  "Unable to parse record reading from BigQuery");
+        }
+      }

Review Comment:
   Are we able to combine this in a for loop with:
   ```python
   for (boolean more = reader.start(); more; more = reader.advance()) {
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -159,9 +178,20 @@ public void processElement(
                 .toMessage(element.getValue(), rowMutationInformation)
                 .withTimestamp(timestamp);
         o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
-      } catch (TableRowToStorageApiProto.SchemaConversionException e) {
-        TableRow tableRow = messageConverter.toTableRow(element.getValue());
-        o.get(failedWritesTag).output(new 
BigQueryStorageApiInsertError(tableRow, e.toString()));
+      } catch (TableRowToStorageApiProto.SchemaConversionException 
conversionException) {
+        TableRow tableRow;
+        try {
+          tableRow = messageConverter.toTableRow(element.getValue());
+        } catch (Exception e) {
+          badRecordRouter.route(
+              o, element, elementCoder, e, "Unable to convert value to 
StorageWriteApiPayload");

Review Comment:
   >Unable to convert value to StorageWriteApiPayload
   
   nit: description could be a little more accurate, like "Unable to convert 
failed element to TableRow"
   
   The conversion to StorageWriteApiPayload already fails above. This exception 
will happen when we fail to convert user element to a TableRow



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1428,7 +1452,7 @@ private PCollection<T> expandForDirectRead(
         PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions 
bqOptions) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
-      if (tableProvider != null) {
+      if (tableProvider != null && getBadRecordRouter() instanceof 
ThrowingBadRecordRouter) {

Review Comment:
   So if someone reads with a table using a different BadRecordRouter, the 
connector will use a different code path? 
   Might be worth documenting this somewhere or logging on INFO level. 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -355,6 +351,11 @@ private boolean readNextRecord() throws IOException {
       // progress made in the current Stream gives us the overall StreamBundle 
progress.
       fractionOfStreamBundleConsumed =
           (currentStreamBundleIndex + fractionOfCurrentStreamConsumed) / 
source.streamBundle.size();
+
+      SchemaAndRecord schemaAndRecord = new 
SchemaAndRecord(reader.readSingleRecord(), tableSchema);
+
+      current = parseFn.apply(schemaAndRecord);
+

Review Comment:
   nit: does this move result in any functional change? maybe revert



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java:
##########
@@ -234,17 +245,28 @@ public void processElement(
 
     try {
       writer.write(element.getValue());
-    } catch (Exception e) {
-      // Discard write result and close the write.
+    } catch (BigQueryRowSerializationException e) {
       try {
-        writer.close();
-        // The writer does not need to be reset, as this DoFn cannot be reused.
-      } catch (Exception closeException) {
-        // Do not mask the exception that caused the write to fail.
-        e.addSuppressed(closeException);
+        badRecordRouter.route(
+            outputReceiver, element, coder, e, "Unable to Write BQ Record to 
File");
+      } catch (Exception e2) {
+        cleanupWriter(writer, e2);
       }
-      throw e;
+    } catch (IOException e) {

Review Comment:
   This used to catch `Exception`. Is there a reason we're narrowing it down to 
`IOException`? Ideally we'd want to cleanup writer on any exception right?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java:
##########
@@ -234,17 +245,28 @@ public void processElement(
 
     try {
       writer.write(element.getValue());
-    } catch (Exception e) {
-      // Discard write result and close the write.
+    } catch (BigQueryRowSerializationException e) {
       try {
-        writer.close();
-        // The writer does not need to be reset, as this DoFn cannot be reused.
-      } catch (Exception closeException) {
-        // Do not mask the exception that caused the write to fail.
-        e.addSuppressed(closeException);
+        badRecordRouter.route(
+            outputReceiver, element, coder, e, "Unable to Write BQ Record to 
File");

Review Comment:
   nit: 
   ```suggestion
               outputReceiver, element, coder, e, "Unable to convert element to 
TableRow");
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -159,9 +178,20 @@ public void processElement(
                 .toMessage(element.getValue(), rowMutationInformation)
                 .withTimestamp(timestamp);
         o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
-      } catch (TableRowToStorageApiProto.SchemaConversionException e) {
-        TableRow tableRow = messageConverter.toTableRow(element.getValue());
-        o.get(failedWritesTag).output(new 
BigQueryStorageApiInsertError(tableRow, e.toString()));
+      } catch (TableRowToStorageApiProto.SchemaConversionException 
conversionException) {
+        TableRow tableRow;
+        try {
+          tableRow = messageConverter.toTableRow(element.getValue());
+        } catch (Exception e) {
+          badRecordRouter.route(
+              o, element, elementCoder, e, "Unable to convert value to 
StorageWriteApiPayload");
+          return;
+        }
+        o.get(failedWritesTag)
+            .output(new BigQueryStorageApiInsertError(tableRow, 
conversionException.toString()));
+      } catch (Exception e) {
+        badRecordRouter.route(
+            o, element, elementCoder, e, "Unable to convert value to 
StorageWriteApiPayload");

Review Comment:
   nit: more accurate to say "Unable to output failed TableRow to dead-letter 
queue"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to