This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9cd00549dd8 Do not suppress downstream Exception in BigQueryIO Storage
write API FailedStorageApiInserts Handling (#31506)
9cd00549dd8 is described below
commit 9cd00549dd8661d2a86543d35efa099c176c9649
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jun 5 19:12:59 2024 -0400
Do not suppress downstream Exception in BigQueryIO Storage write API
FailedStorageApiInserts Handling (#31506)
---
.../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index ed13e6338f8..02d91e46d69 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -678,6 +678,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
// Convert the message to a TableRow and send it to the
failedRows collection.
ByteString protoBytes =
failedContext.protoRows.getSerializedRows(failedIndex);
org.joda.time.Instant timestamp =
failedContext.timestamps.get(failedIndex);
+ BigQueryStorageApiInsertError element = null;
try {
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
@@ -687,13 +688,16 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
.getDescriptor()),
protoBytes),
true);
- failedRowsReceiver.outputWithTimestamp(
+ element =
new BigQueryStorageApiInsertError(
- failedRow,
error.getRowIndexToErrorMessage().get(failedIndex)),
- timestamp);
+ failedRow,
error.getRowIndexToErrorMessage().get(failedIndex));
} catch (Exception e) {
LOG.error("Failed to insert row and could not parse the
result!", e);
}
+ // output outside try {} clause to avoid suppress downstream
Exception
+ if (element != null) {
+ failedRowsReceiver.outputWithTimestamp(element, timestamp);
+ }
}
int numRowsFailed = failedRowIndices.size();
rowsSentToFailedRowsCollection.inc(numRowsFailed);