yirutang commented on code in PR #23556:
URL: https://github.com/apache/beam/pull/23556#discussion_r1001138343


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -415,130 +442,217 @@ public String toString() {
             }
           };
 
-      Instant now = Instant.now();
-      List<AppendRowsContext> contexts = Lists.newArrayList();
-      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
-          new RetryManager<>(Duration.standardSeconds(1), 
Duration.standardSeconds(10), 1000);
-      int numSplits = 0;
-      for (ProtoRows protoRows : messages) {
-        ++numSplits;
-        Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
-            context -> {
-              try {
-                StreamAppendClient appendClient =
-                    APPEND_CLIENTS.get(
-                        context.streamName,
-                        () ->
-                            datasetService.getStreamAppendClient(
-                                context.streamName, 
descriptor.get().descriptor, false));
-                return appendClient.appendRows(context.offset, protoRows);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
+      Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
+          context -> {
+            if (context.protoRows.getSerializedRowsCount() == 0) {
+              // This might happen if all rows in a batch failed and were sent 
to the failed-rows
+              // PCollection.
+              return 
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
+            }
+            try {
+              StreamAppendClient appendClient =
+                  APPEND_CLIENTS.get(
+                      context.streamName,
+                      () ->
+                          datasetService.getStreamAppendClient(
+                              context.streamName, descriptor.get().descriptor, 
false));
+              return appendClient.appendRows(context.offset, 
context.protoRows);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          };
+
+      Function<Iterable<AppendRowsContext>, RetryType> onError =
+          failedContexts -> {
+            // The first context is always the one that fails.
+            AppendRowsContext failedContext =
+                
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
+
+            // AppendSerializationError means that BigQuery detected errors on 
individual rows, e.g.
+            // a row not conforming
+            // to bigQuery invariants. These errors are persistent, so we 
redirect those rows to the
+            // failedInserts
+            // PCollection, and retry with the remaining rows.
+            if (failedContext.getError() != null
+                && failedContext.getError() instanceof 
Exceptions.AppendSerializtionError) {
+              Exceptions.AppendSerializtionError error =
+                  Preconditions.checkArgumentNotNull(
+                      (Exceptions.AppendSerializtionError) 
failedContext.getError());
+              Set<Integer> failedRowIndices = 
error.getRowIndexToErrorMessage().keySet();
+              for (int failedIndex : failedRowIndices) {
+                // Convert the message to a TableRow and send it to the 
failedRows collection.
+                ByteString protoBytes = 
failedContext.protoRows.getSerializedRows(failedIndex);
+                try {
+                  TableRow failedRow =
+                      TableRowToStorageApiProto.tableRowFromMessage(
+                          
DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+                  new BigQueryStorageApiInsertError(
+                      failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex));
+                  o.get(failedRowsTag)
+                      .output(
+                          new BigQueryStorageApiInsertError(
+                              failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex)));
+                } catch (InvalidProtocolBufferException e) {
+                  LOG.error("Failed to insert row and could not parse the 
result!");
+                }
               }
-            };
-
-        // RetryManager
-        Function<Iterable<AppendRowsContext>, RetryType> onError =
-            failedContexts -> {
-              // The first context is always the one that fails.
-              AppendRowsContext failedContext =
-                  
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
-              // Invalidate the StreamWriter and force a new one to be created.
-              LOG.error(
-                  "Got error " + failedContext.getError() + " closing " + 
failedContext.streamName);
-              clearClients.accept(contexts);
-              appendFailures.inc();
-
-              boolean explicitStreamFinalized =
-                  failedContext.getError() instanceof StreamFinalizedException;
-              Throwable error = 
Preconditions.checkStateNotNull(failedContext.getError());
-              Status.Code statusCode = Status.fromThrowable(error).getCode();
-              // This means that the offset we have stored does not match the 
current end of
-              // the stream in the Storage API. Usually this happens because a 
crash or a bundle
-              // failure
-              // happened after an append but before the worker could 
checkpoint it's
-              // state. The records that were appended in a failed bundle will 
be retried,
-              // meaning that the unflushed tail of the stream must be 
discarded to prevent
-              // duplicates.
-              boolean offsetMismatch =
-                  statusCode.equals(Code.OUT_OF_RANGE) || 
statusCode.equals(Code.ALREADY_EXISTS);
-              // This implies that the stream doesn't exist or has already 
been finalized. In this
-              // case we have no choice but to create a new stream.
-              boolean streamDoesNotExist =
-                  explicitStreamFinalized
-                      || statusCode.equals(Code.INVALID_ARGUMENT)
-                      || statusCode.equals(Code.NOT_FOUND)
-                      || statusCode.equals(Code.FAILED_PRECONDITION);
-              if (offsetMismatch || streamDoesNotExist) {
-                appendOffsetFailures.inc();
-                LOG.warn(
-                    "Append to "
-                        + failedContext
-                        + " failed with "
-                        + failedContext.getError()
-                        + " Will retry with a new stream");
-                // Finalize the stream and clear streamName so a new stream 
will be created.
-                o.output(
-                    KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
-                // Reinitialize all contexts with the new stream and new 
offsets.
-                initializeContexts.accept(failedContexts, true);
-
-                // Offset failures imply that all subsequent parallel appends 
will also fail.
-                // Retry them all.
-                return RetryType.RETRY_ALL_OPERATIONS;
+              rowsSentToFailedRowsCollection.inc(failedRowIndices.size());
+
+              // Remove the failed row from the payload, so we retry the batch 
without the failed
+              // rows.
+              ProtoRows.Builder retryRows = ProtoRows.newBuilder();
+              for (int i = 0; i < 
failedContext.protoRows.getSerializedRowsCount(); ++i) {
+                if (!failedRowIndices.contains(i)) {
+                  ByteString rowBytes = 
failedContext.protoRows.getSerializedRows(i);
+                  retryRows.addSerializedRows(rowBytes);
+                }
               }
+              failedContext.protoRows = retryRows.build();
 
+              // Since we removed rows, we need to update the insert offsets 
for all remaining rows.
+              long offset = failedContext.offset;
+              for (AppendRowsContext context : failedContexts) {
+                context.offset = offset;
+                offset += context.protoRows.getSerializedRowsCount();
+              }
+              streamOffset.write(offset);
               return RetryType.RETRY_ALL_OPERATIONS;
-            };
+            }
 
-        Consumer<AppendRowsContext> onSuccess =
-            context -> {
-              o.output(
-                  KV.of(
-                      context.streamName,
-                      new Operation(context.offset + context.numRows - 1, 
false)));
-              flushesScheduled.inc(protoRows.getSerializedRowsCount());
-            };
-
-        AppendRowsContext context = new AppendRowsContext(element.getKey());
-        context.numRows = protoRows.getSerializedRowsCount();
-        contexts.add(context);
-        retryManager.addOperation(run, onError, onSuccess, context);
-        recordsAppended.inc(protoRows.getSerializedRowsCount());
-        appendSizeDistribution.update(context.numRows);
-      }
-      initializeContexts.accept(contexts, false);
+            // Invalidate the StreamWriter and force a new one to be created.
+            LOG.error(
+                "Got error " + failedContext.getError() + " closing " + 
failedContext.streamName);
+            clearClients.accept(failedContexts);
+            appendFailures.inc();
+
+            boolean explicitStreamFinalized =
+                failedContext.getError() instanceof StreamFinalizedException;
+            Throwable error = 
Preconditions.checkStateNotNull(failedContext.getError());
+            Status.Code statusCode = Status.fromThrowable(error).getCode();
+            // This means that the offset we have stored does not match the 
current end of
+            // the stream in the Storage API. Usually this happens because a 
crash or a bundle
+            // failure
+            // happened after an append but before the worker could checkpoint 
it's
+            // state. The records that were appended in a failed bundle will 
be retried,
+            // meaning that the unflushed tail of the stream must be discarded 
to prevent
+            // duplicates.
+            boolean offsetMismatch =
+                statusCode.equals(Code.OUT_OF_RANGE) || 
statusCode.equals(Code.ALREADY_EXISTS);
+            // This implies that the stream doesn't exist or has already been 
finalized. In this
+            // case we have no choice but to create a new stream.
+            boolean streamDoesNotExist =
+                explicitStreamFinalized
+                    || statusCode.equals(Code.INVALID_ARGUMENT)
+                    || statusCode.equals(Code.NOT_FOUND)
+                    || statusCode.equals(Code.FAILED_PRECONDITION);

Review Comment:
   Why FAILED_PRECONDITION belongs to streamDoesNotExist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to