chamikaramj commented on code in PR #23556:
URL: https://github.com/apache/beam/pull/23556#discussion_r1001058767
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -328,29 +375,62 @@ void addMessage(StorageApiWritePayload payload) throws
Exception {
pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
}
- void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>>
retryManager)
+ long flush(
+ RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
+ OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver)
throws Exception {
if (pendingMessages.isEmpty()) {
- return;
+ return 0;
}
- final ProtoRows.Builder inserts = ProtoRows.newBuilder();
- inserts.addAllSerializedRows(pendingMessages);
- ProtoRows protoRows = inserts.build();
+ final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
+ insertsBuilder.addAllSerializedRows(pendingMessages);
+ final ProtoRows inserts = insertsBuilder.build();
pendingMessages.clear();
+ // Handle the case where the request is too large.
+ if (inserts.getSerializedSize() >= maxRequestSize) {
Review Comment:
Ah, I was hoping this would not be an issue for the Storage Write API :-(
Is there a gRPC based client where such RPC limits would not be an issue ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -328,29 +375,62 @@ void addMessage(StorageApiWritePayload payload) throws
Exception {
pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
}
- void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>>
retryManager)
+ long flush(
+ RetryManager<AppendRowsResponse, AppendRowsContext> retryManager,
+ OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver)
throws Exception {
if (pendingMessages.isEmpty()) {
- return;
+ return 0;
}
- final ProtoRows.Builder inserts = ProtoRows.newBuilder();
- inserts.addAllSerializedRows(pendingMessages);
- ProtoRows protoRows = inserts.build();
+ final ProtoRows.Builder insertsBuilder = ProtoRows.newBuilder();
+ insertsBuilder.addAllSerializedRows(pendingMessages);
+ final ProtoRows inserts = insertsBuilder.build();
pendingMessages.clear();
+ // Handle the case where the request is too large.
+ if (inserts.getSerializedSize() >= maxRequestSize) {
+ if (inserts.getSerializedRowsCount() > 1) {
+ // TODO(reuvenlax): Is it worth trying to handle this case by
splitting the protoRows?
+ // Given that we split
+ // the ProtoRows iterable at 2MB and the max request size is 10MB,
this scenario seems
+ // nearly impossible.
+ LOG.error(
+ "A request containing more than one row is over the request
size limit of "
+ + maxRequestSize
+ + ". This is unexpected. All rows in the request will be
sent to the failed-rows PCollection.");
Review Comment:
Should we still try to send it once ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -415,130 +442,217 @@ public String toString() {
}
};
- Instant now = Instant.now();
- List<AppendRowsContext> contexts = Lists.newArrayList();
- RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
- new RetryManager<>(Duration.standardSeconds(1),
Duration.standardSeconds(10), 1000);
- int numSplits = 0;
- for (ProtoRows protoRows : messages) {
- ++numSplits;
- Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
- context -> {
- try {
- StreamAppendClient appendClient =
- APPEND_CLIENTS.get(
- context.streamName,
- () ->
- datasetService.getStreamAppendClient(
- context.streamName,
descriptor.get().descriptor, false));
- return appendClient.appendRows(context.offset, protoRows);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
+ context -> {
+ if (context.protoRows.getSerializedRowsCount() == 0) {
+ // This might happen if all rows in a batch failed and were sent
to the failed-rows
+ // PCollection.
+ return
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
+ }
+ try {
+ StreamAppendClient appendClient =
+ APPEND_CLIENTS.get(
+ context.streamName,
+ () ->
+ datasetService.getStreamAppendClient(
+ context.streamName, descriptor.get().descriptor,
false));
+ return appendClient.appendRows(context.offset,
context.protoRows);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ Function<Iterable<AppendRowsContext>, RetryType> onError =
+ failedContexts -> {
+ // The first context is always the one that fails.
+ AppendRowsContext failedContext =
+
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
+
+ // AppendSerializationError means that BigQuery detected errors on
individual rows, e.g.
+ // a row not conforming
+ // to bigQuery invariants. These errors are persistent, so we
redirect those rows to the
+ // failedInserts
+ // PCollection, and retry with the remaining rows.
+ if (failedContext.getError() != null
+ && failedContext.getError() instanceof
Exceptions.AppendSerializtionError) {
+ Exceptions.AppendSerializtionError error =
+ Preconditions.checkArgumentNotNull(
+ (Exceptions.AppendSerializtionError)
failedContext.getError());
+ Set<Integer> failedRowIndices =
error.getRowIndexToErrorMessage().keySet();
+ for (int failedIndex : failedRowIndices) {
+ // Convert the message to a TableRow and send it to the
failedRows collection.
+ ByteString protoBytes =
failedContext.protoRows.getSerializedRows(failedIndex);
+ try {
+ TableRow failedRow =
+ TableRowToStorageApiProto.tableRowFromMessage(
+
DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+ new BigQueryStorageApiInsertError(
+ failedRow,
error.getRowIndexToErrorMessage().get(failedIndex));
+ o.get(failedRowsTag)
+ .output(
+ new BigQueryStorageApiInsertError(
+ failedRow,
error.getRowIndexToErrorMessage().get(failedIndex)));
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to insert row and could not parse the
result!");
+ }
}
- };
-
- // RetryManager
- Function<Iterable<AppendRowsContext>, RetryType> onError =
- failedContexts -> {
- // The first context is always the one that fails.
- AppendRowsContext failedContext =
-
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
- // Invalidate the StreamWriter and force a new one to be created.
- LOG.error(
- "Got error " + failedContext.getError() + " closing " +
failedContext.streamName);
- clearClients.accept(contexts);
- appendFailures.inc();
-
- boolean explicitStreamFinalized =
- failedContext.getError() instanceof StreamFinalizedException;
- Throwable error =
Preconditions.checkStateNotNull(failedContext.getError());
- Status.Code statusCode = Status.fromThrowable(error).getCode();
- // This means that the offset we have stored does not match the
current end of
- // the stream in the Storage API. Usually this happens because a
crash or a bundle
- // failure
- // happened after an append but before the worker could
checkpoint it's
- // state. The records that were appended in a failed bundle will
be retried,
- // meaning that the unflushed tail of the stream must be
discarded to prevent
- // duplicates.
- boolean offsetMismatch =
- statusCode.equals(Code.OUT_OF_RANGE) ||
statusCode.equals(Code.ALREADY_EXISTS);
- // This implies that the stream doesn't exist or has already
been finalized. In this
- // case we have no choice but to create a new stream.
- boolean streamDoesNotExist =
- explicitStreamFinalized
- || statusCode.equals(Code.INVALID_ARGUMENT)
- || statusCode.equals(Code.NOT_FOUND)
- || statusCode.equals(Code.FAILED_PRECONDITION);
- if (offsetMismatch || streamDoesNotExist) {
- appendOffsetFailures.inc();
- LOG.warn(
- "Append to "
- + failedContext
- + " failed with "
- + failedContext.getError()
- + " Will retry with a new stream");
- // Finalize the stream and clear streamName so a new stream
will be created.
- o.output(
- KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
- // Reinitialize all contexts with the new stream and new
offsets.
- initializeContexts.accept(failedContexts, true);
-
- // Offset failures imply that all subsequent parallel appends
will also fail.
- // Retry them all.
- return RetryType.RETRY_ALL_OPERATIONS;
+ rowsSentToFailedRowsCollection.inc(failedRowIndices.size());
+
+ // Remove the failed row from the payload, so we retry the batch
without the failed
+ // rows.
+ ProtoRows.Builder retryRows = ProtoRows.newBuilder();
+ for (int i = 0; i <
failedContext.protoRows.getSerializedRowsCount(); ++i) {
+ if (!failedRowIndices.contains(i)) {
+ ByteString rowBytes =
failedContext.protoRows.getSerializedRows(i);
+ retryRows.addSerializedRows(rowBytes);
+ }
}
+ failedContext.protoRows = retryRows.build();
+ // Since we removed rows, we need to update the insert offsets
for all remaining rows.
+ long offset = failedContext.offset;
+ for (AppendRowsContext context : failedContexts) {
+ context.offset = offset;
+ offset += context.protoRows.getSerializedRowsCount();
+ }
+ streamOffset.write(offset);
return RetryType.RETRY_ALL_OPERATIONS;
- };
+ }
- Consumer<AppendRowsContext> onSuccess =
- context -> {
- o.output(
- KV.of(
- context.streamName,
- new Operation(context.offset + context.numRows - 1,
false)));
- flushesScheduled.inc(protoRows.getSerializedRowsCount());
- };
-
- AppendRowsContext context = new AppendRowsContext(element.getKey());
- context.numRows = protoRows.getSerializedRowsCount();
- contexts.add(context);
- retryManager.addOperation(run, onError, onSuccess, context);
- recordsAppended.inc(protoRows.getSerializedRowsCount());
- appendSizeDistribution.update(context.numRows);
- }
- initializeContexts.accept(contexts, false);
+ // Invalidate the StreamWriter and force a new one to be created.
+ LOG.error(
+ "Got error " + failedContext.getError() + " closing " +
failedContext.streamName);
Review Comment:
The logic here is long and hard to follow. I wonder if we can better
structure this someone for anybody in the future who is trying to get an
abstract idea regarding the logic here (and may be modify it).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]