reuvenlax commented on code in PR #25723:
URL: https://github.com/apache/beam/pull/25723#discussion_r1127370925
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -558,7 +574,23 @@ long flush(
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
},
- c -> recordsAppended.inc(c.protoRows.getSerializedRowsCount()),
+ c -> {
+ recordsAppended.inc(c.protoRows.getSerializedRowsCount());
+ if (successfulRowsReceiver != null) {
+ for (ByteString rowBytes :
c.protoRows.getSerializedRowsList()) {
+ try {
+ TableRow row =
+ TableRowToStorageApiProto.tableRowFromMessage(
+ DynamicMessage.parseFrom(
+
Preconditions.checkStateNotNull(appendClientInfo).getDescriptor(),
+ rowBytes));
+ successfulRowsReceiver.output(row);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Failure parsing TableRow: " + e);
Review Comment:
In theory a failure here should be impossible: these bytes are the result of
serializing the protocol buffer in the previous stage. The only way I could see
this failing is if data on the wire was corrupted, in which case there's not
much we can do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]