chamikaramj commented on code in PR #23556:
URL: https://github.com/apache/beam/pull/23556#discussion_r1001058767


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -328,29 +375,62 @@ void addMessage(StorageApiWritePayload payload) throws 
Exception {
         pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
       }
 
-      void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> 
retryManager)
+      long flush(
+          RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
+          OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver)
           throws Exception {
         if (pendingMessages.isEmpty()) {
-          return;
+          return 0;
         }
-        final ProtoRows.Builder inserts = ProtoRows.newBuilder();
-        inserts.addAllSerializedRows(pendingMessages);
 
-        ProtoRows protoRows = inserts.build();
+        final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
+        insertsBuilder.addAllSerializedRows(pendingMessages);
+        final ProtoRows inserts = insertsBuilder.build();
         pendingMessages.clear();
 
+        // Handle the case where the request is too large.
+        if (inserts.getSerializedSize() >= maxRequestSize) {

Review Comment:
   Ah, I was hoping this would not be an issue for the Storage Write API :-(
   
   Is there a gRPC based client where such RPC limits would not be an issue ? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -328,29 +375,62 @@ void addMessage(StorageApiWritePayload payload) throws 
Exception {
         pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
       }
 
-      void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> 
retryManager)
+      long flush(
+          RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
+          OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver)
           throws Exception {
         if (pendingMessages.isEmpty()) {
-          return;
+          return 0;
         }
-        final ProtoRows.Builder inserts = ProtoRows.newBuilder();
-        inserts.addAllSerializedRows(pendingMessages);
 
-        ProtoRows protoRows = inserts.build();
+        final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
+        insertsBuilder.addAllSerializedRows(pendingMessages);
+        final ProtoRows inserts = insertsBuilder.build();
         pendingMessages.clear();
 
+        // Handle the case where the request is too large.
+        if (inserts.getSerializedSize() >= maxRequestSize) {
+          if (inserts.getSerializedRowsCount() > 1) {
+            // TODO(reuvenlax): Is it worth trying to handle this case by 
splitting the protoRows?
+            // Given that we split
+            // the ProtoRows iterable at 2MB and the max request size is 10MB, 
this scenario seems
+            // nearly impossible.
+            LOG.error(
+                "A request containing more than one row is over the request 
size limit of "
+                    + maxRequestSize
+                    + ". This is unexpected. All rows in the request will be 
sent to the failed-rows PCollection.");

Review Comment:
   Should we still try to send it once ? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -415,130 +442,217 @@ public String toString() {
             }
           };
 
-      Instant now = Instant.now();
-      List<AppendRowsContext> contexts = Lists.newArrayList();
-      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
-          new RetryManager<>(Duration.standardSeconds(1), 
Duration.standardSeconds(10), 1000);
-      int numSplits = 0;
-      for (ProtoRows protoRows : messages) {
-        ++numSplits;
-        Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
-            context -> {
-              try {
-                StreamAppendClient appendClient =
-                    APPEND_CLIENTS.get(
-                        context.streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                context.streamName, 
descriptor.get().descriptor, false));
-                return appendClient.appendRows(context.offset, protoRows);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
+      Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
+          context -> {
+            if (context.protoRows.getSerializedRowsCount() == 0) {
+              // This might happen if all rows in a batch failed and were sent 
to the failed-rows
+              // PCollection.
+              return 
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
+            }
+            try {
+              StreamAppendClient appendClient =
+                  APPEND_CLIENTS.get(
+                      context.streamName,
+                      () ->
+                          datasetService.getStreamAppendClient(
+                              context.streamName, descriptor.get().descriptor, 
false));
+              return appendClient.appendRows(context.offset, 
context.protoRows);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          };
+
+      Function<Iterable<AppendRowsContext>, RetryType> onError =
+          failedContexts -> {
+            // The first context is always the one that fails.
+            AppendRowsContext failedContext =
+                
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
+
+            // AppendSerializationError means that BigQuery detected errors on 
individual rows, e.g.
+            // a row not conforming
+            // to bigQuery invariants. These errors are persistent, so we 
redirect those rows to the
+            // failedInserts
+            // PCollection, and retry with the remaining rows.
+            if (failedContext.getError() != null
+                && failedContext.getError() instanceof 
Exceptions.AppendSerializtionError) {
+              Exceptions.AppendSerializtionError error =
+                  Preconditions.checkArgumentNotNull(
+                      (Exceptions.AppendSerializtionError) 
failedContext.getError());
+              Set<Integer> failedRowIndices = 
error.getRowIndexToErrorMessage().keySet();
+              for (int failedIndex : failedRowIndices) {
+                // Convert the message to a TableRow and send it to the 
failedRows collection.
+                ByteString protoBytes = 
failedContext.protoRows.getSerializedRows(failedIndex);
+                try {
+                  TableRow failedRow =
+                      TableRowToStorageApiProto.tableRowFromMessage(
+                          
DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+                  new BigQueryStorageApiInsertError(
+                      failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex));
+                  o.get(failedRowsTag)
+                      .output(
+                          new BigQueryStorageApiInsertError(
+                              failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex)));
+                } catch (InvalidProtocolBufferException e) {
+                  LOG.error("Failed to insert row and could not parse the 
result!");
+                }
               }
-            };
-
-        // RetryManager
-        Function<Iterable<AppendRowsContext>, RetryType> onError =
-            failedContexts -> {
-              // The first context is always the one that fails.
-              AppendRowsContext failedContext =
-                  
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
-              // Invalidate the StreamWriter and force a new one to be created.
-              LOG.error(
-                  "Got error " + failedContext.getError() + " closing " + 
failedContext.streamName);
-              clearClients.accept(contexts);
-              appendFailures.inc();
-
-              boolean explicitStreamFinalized =
-                  failedContext.getError() instanceof StreamFinalizedException;
-              Throwable error = 
Preconditions.checkStateNotNull(failedContext.getError());
-              Status.Code statusCode = Status.fromThrowable(error).getCode();
-              // This means that the offset we have stored does not match the 
current end of
-              // the stream in the Storage API. Usually this happens because a 
crash or a bundle
-              // failure
-              // happened after an append but before the worker could 
checkpoint it's
-              // state. The records that were appended in a failed bundle will 
be retried,
-              // meaning that the unflushed tail of the stream must be 
discarded to prevent
-              // duplicates.
-              boolean offsetMismatch =
-                  statusCode.equals(Code.OUT_OF_RANGE) || 
statusCode.equals(Code.ALREADY_EXISTS);
-              // This implies that the stream doesn't exist or has already 
been finalized. In this
-              // case we have no choice but to create a new stream.
-              boolean streamDoesNotExist =
-                  explicitStreamFinalized
-                      || statusCode.equals(Code.INVALID_ARGUMENT)
-                      || statusCode.equals(Code.NOT_FOUND)
-                      || statusCode.equals(Code.FAILED_PRECONDITION);
-              if (offsetMismatch || streamDoesNotExist) {
-                appendOffsetFailures.inc();
-                LOG.warn(
-                    "Append to "
-                        + failedContext
-                        + " failed with "
-                        + failedContext.getError()
-                        + " Will retry with a new stream");
-                // Finalize the stream and clear streamName so a new stream 
will be created.
-                o.output(
-                    KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
-                // Reinitialize all contexts with the new stream and new 
offsets.
-                initializeContexts.accept(failedContexts, true);
-
-                // Offset failures imply that all subsequent parallel appends 
will also fail.
-                // Retry them all.
-                return RetryType.RETRY_ALL_OPERATIONS;
+              rowsSentToFailedRowsCollection.inc(failedRowIndices.size());
+
+              // Remove the failed row from the payload, so we retry the batch 
without the failed
+              // rows.
+              ProtoRows.Builder retryRows = ProtoRows.newBuilder();
+              for (int i = 0; i < 
failedContext.protoRows.getSerializedRowsCount(); ++i) {
+                if (!failedRowIndices.contains(i)) {
+                  ByteString rowBytes = 
failedContext.protoRows.getSerializedRows(i);
+                  retryRows.addSerializedRows(rowBytes);
+                }
               }
+              failedContext.protoRows = retryRows.build();
 
+              // Since we removed rows, we need to update the insert offsets 
for all remaining rows.
+              long offset = failedContext.offset;
+              for (AppendRowsContext context : failedContexts) {
+                context.offset = offset;
+                offset += context.protoRows.getSerializedRowsCount();
+              }
+              streamOffset.write(offset);
               return RetryType.RETRY_ALL_OPERATIONS;
-            };
+            }
 
-        Consumer<AppendRowsContext> onSuccess =
-            context -> {
-              o.output(
-                  KV.of(
-                      context.streamName,
-                      new Operation(context.offset + context.numRows - 1, 
false)));
-              flushesScheduled.inc(protoRows.getSerializedRowsCount());
-            };
-
-        AppendRowsContext context = new AppendRowsContext(element.getKey());
-        context.numRows = protoRows.getSerializedRowsCount();
-        contexts.add(context);
-        retryManager.addOperation(run, onError, onSuccess, context);
-        recordsAppended.inc(protoRows.getSerializedRowsCount());
-        appendSizeDistribution.update(context.numRows);
-      }
-      initializeContexts.accept(contexts, false);
+            // Invalidate the StreamWriter and force a new one to be created.
+            LOG.error(
+                "Got error " + failedContext.getError() + " closing " + 
failedContext.streamName);

Review Comment:
   The logic here is long and hard to follow. I wonder if we can better 
structure this someone for anybody in the future who is trying to get an 
abstract idea regarding the logic here (and may be modify it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to