reuvenlax commented on code in PR #23556:
URL: https://github.com/apache/beam/pull/23556#discussion_r1001139680
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -415,130 +442,217 @@ public String toString() {
}
};
- Instant now = Instant.now();
- List<AppendRowsContext> contexts = Lists.newArrayList();
- RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
- new RetryManager<>(Duration.standardSeconds(1),
Duration.standardSeconds(10), 1000);
- int numSplits = 0;
- for (ProtoRows protoRows : messages) {
- ++numSplits;
- Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
- context -> {
- try {
- StreamAppendClient appendClient =
- APPEND_CLIENTS.get(
- context.streamName,
- () ->
- datasetService.getStreamAppendClient(
- context.streamName,
descriptor.get().descriptor, false));
- return appendClient.appendRows(context.offset, protoRows);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> runOperation =
+ context -> {
+ if (context.protoRows.getSerializedRowsCount() == 0) {
+ // This might happen if all rows in a batch failed and were sent
to the failed-rows
+ // PCollection.
+ return
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
+ }
+ try {
+ StreamAppendClient appendClient =
+ APPEND_CLIENTS.get(
+ context.streamName,
+ () ->
+ datasetService.getStreamAppendClient(
+ context.streamName, descriptor.get().descriptor,
false));
+ return appendClient.appendRows(context.offset,
context.protoRows);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ Function<Iterable<AppendRowsContext>, RetryType> onError =
+ failedContexts -> {
+ // The first context is always the one that fails.
+ AppendRowsContext failedContext =
+
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
+
+ // AppendSerializationError means that BigQuery detected errors on
individual rows, e.g.
+ // a row not conforming
+ // to bigQuery invariants. These errors are persistent, so we
redirect those rows to the
+ // failedInserts
+ // PCollection, and retry with the remaining rows.
+ if (failedContext.getError() != null
+ && failedContext.getError() instanceof
Exceptions.AppendSerializtionError) {
+ Exceptions.AppendSerializtionError error =
+ Preconditions.checkArgumentNotNull(
+ (Exceptions.AppendSerializtionError)
failedContext.getError());
+ Set<Integer> failedRowIndices =
error.getRowIndexToErrorMessage().keySet();
+ for (int failedIndex : failedRowIndices) {
+ // Convert the message to a TableRow and send it to the
failedRows collection.
+ ByteString protoBytes =
failedContext.protoRows.getSerializedRows(failedIndex);
+ try {
+ TableRow failedRow =
+ TableRowToStorageApiProto.tableRowFromMessage(
+
DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+ new BigQueryStorageApiInsertError(
+ failedRow,
error.getRowIndexToErrorMessage().get(failedIndex));
+ o.get(failedRowsTag)
+ .output(
+ new BigQueryStorageApiInsertError(
+ failedRow,
error.getRowIndexToErrorMessage().get(failedIndex)));
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to insert row and could not parse the
result!");
+ }
}
- };
-
- // RetryManager
- Function<Iterable<AppendRowsContext>, RetryType> onError =
- failedContexts -> {
- // The first context is always the one that fails.
- AppendRowsContext failedContext =
-
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
- // Invalidate the StreamWriter and force a new one to be created.
- LOG.error(
- "Got error " + failedContext.getError() + " closing " +
failedContext.streamName);
- clearClients.accept(contexts);
- appendFailures.inc();
-
- boolean explicitStreamFinalized =
- failedContext.getError() instanceof StreamFinalizedException;
- Throwable error =
Preconditions.checkStateNotNull(failedContext.getError());
- Status.Code statusCode = Status.fromThrowable(error).getCode();
- // This means that the offset we have stored does not match the
current end of
- // the stream in the Storage API. Usually this happens because a
crash or a bundle
- // failure
- // happened after an append but before the worker could
checkpoint it's
- // state. The records that were appended in a failed bundle will
be retried,
- // meaning that the unflushed tail of the stream must be
discarded to prevent
- // duplicates.
- boolean offsetMismatch =
- statusCode.equals(Code.OUT_OF_RANGE) ||
statusCode.equals(Code.ALREADY_EXISTS);
- // This implies that the stream doesn't exist or has already
been finalized. In this
- // case we have no choice but to create a new stream.
- boolean streamDoesNotExist =
- explicitStreamFinalized
- || statusCode.equals(Code.INVALID_ARGUMENT)
- || statusCode.equals(Code.NOT_FOUND)
- || statusCode.equals(Code.FAILED_PRECONDITION);
- if (offsetMismatch || streamDoesNotExist) {
- appendOffsetFailures.inc();
- LOG.warn(
- "Append to "
- + failedContext
- + " failed with "
- + failedContext.getError()
- + " Will retry with a new stream");
- // Finalize the stream and clear streamName so a new stream
will be created.
- o.output(
- KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
- // Reinitialize all contexts with the new stream and new
offsets.
- initializeContexts.accept(failedContexts, true);
-
- // Offset failures imply that all subsequent parallel appends
will also fail.
- // Retry them all.
- return RetryType.RETRY_ALL_OPERATIONS;
+ rowsSentToFailedRowsCollection.inc(failedRowIndices.size());
+
+ // Remove the failed row from the payload, so we retry the batch
without the failed
+ // rows.
+ ProtoRows.Builder retryRows = ProtoRows.newBuilder();
+ for (int i = 0; i <
failedContext.protoRows.getSerializedRowsCount(); ++i) {
+ if (!failedRowIndices.contains(i)) {
+ ByteString rowBytes =
failedContext.protoRows.getSerializedRows(i);
+ retryRows.addSerializedRows(rowBytes);
+ }
}
+ failedContext.protoRows = retryRows.build();
+ // Since we removed rows, we need to update the insert offsets
for all remaining rows.
+ long offset = failedContext.offset;
+ for (AppendRowsContext context : failedContexts) {
+ context.offset = offset;
+ offset += context.protoRows.getSerializedRowsCount();
+ }
+ streamOffset.write(offset);
return RetryType.RETRY_ALL_OPERATIONS;
- };
+ }
- Consumer<AppendRowsContext> onSuccess =
- context -> {
- o.output(
- KV.of(
- context.streamName,
- new Operation(context.offset + context.numRows - 1,
false)));
- flushesScheduled.inc(protoRows.getSerializedRowsCount());
- };
-
- AppendRowsContext context = new AppendRowsContext(element.getKey());
- context.numRows = protoRows.getSerializedRowsCount();
- contexts.add(context);
- retryManager.addOperation(run, onError, onSuccess, context);
- recordsAppended.inc(protoRows.getSerializedRowsCount());
- appendSizeDistribution.update(context.numRows);
- }
- initializeContexts.accept(contexts, false);
+ // Invalidate the StreamWriter and force a new one to be created.
+ LOG.error(
+ "Got error " + failedContext.getError() + " closing " +
failedContext.streamName);
+ clearClients.accept(failedContexts);
+ appendFailures.inc();
+
+ boolean explicitStreamFinalized =
+ failedContext.getError() instanceof StreamFinalizedException;
+ Throwable error =
Preconditions.checkStateNotNull(failedContext.getError());
+ Status.Code statusCode = Status.fromThrowable(error).getCode();
+ // This means that the offset we have stored does not match the
current end of
+ // the stream in the Storage API. Usually this happens because a
crash or a bundle
+ // failure
+ // happened after an append but before the worker could checkpoint
it's
+ // state. The records that were appended in a failed bundle will
be retried,
+ // meaning that the unflushed tail of the stream must be discarded
to prevent
+ // duplicates.
+ boolean offsetMismatch =
+ statusCode.equals(Code.OUT_OF_RANGE) ||
statusCode.equals(Code.ALREADY_EXISTS);
+ // This implies that the stream doesn't exist or has already been
finalized. In this
+ // case we have no choice but to create a new stream.
+ boolean streamDoesNotExist =
+ explicitStreamFinalized
+ || statusCode.equals(Code.INVALID_ARGUMENT)
+ || statusCode.equals(Code.NOT_FOUND)
+ || statusCode.equals(Code.FAILED_PRECONDITION);
Review Comment:
I don't 100% remember, but I believed FAILED_PRECONDITION was being returned
when we tried to write to a stream that had been finalized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]