ahmedabu98 commented on code in PR #35051:
URL: https://github.com/apache/beam/pull/35051#discussion_r2112098895
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -70,6 +72,48 @@ public class StorageApiFlushAndFinalizeDoFn extends
DoFn<KV<String, Operation>,
private final Counter finalizeOperationsFailed =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"finalizeOperationsFailed");
+ /** Custom exception to indicate that a stream is invalid due to an offset
error. */
+ public static class StreamOffsetBeyondEndException extends IOException {
+ public StreamOffsetBeyondEndException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Checks if the given throwable is or is caused by an ApiException
indicating that an offset is
+ * beyond the end of a BigQuery stream.
+ */
+ private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+ if (t == null) {
+ return false;
+ }
+ if (t instanceof ApiException) {
+ ApiException apiException = (ApiException) t;
+ if (apiException.getStatusCode().getCode() == Code.OUT_OF_RANGE) {
+ // Check if the cause is gRPC StatusRuntimeException for more specific
message check
+ Throwable cause = apiException.getCause();
+ if (cause instanceof io.grpc.StatusRuntimeException) {
+ io.grpc.StatusRuntimeException grpcException =
(io.grpc.StatusRuntimeException) cause;
+ return grpcException.getStatus().getCode() ==
io.grpc.Status.Code.OUT_OF_RANGE
+ && grpcException.getMessage() != null
+ && grpcException
+ .getMessage()
+ .toLowerCase()
+ .contains("is beyond the end of the stream");
+ }
+ // Fallback to checking the ApiException message directly if cause is
not gRPC
+ return apiException.getMessage() != null
+ && apiException.getMessage().toLowerCase().contains("is beyond the
end of the stream");
+ }
+ }
+ // Recursively check the cause, as the specific exception might be wrapped.
+ Throwable cause = t.getCause();
+ if (cause == null) {
+ return false;
+ }
Review Comment:
nit: unnecessary check, this is already covered in the method's base case
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions,
@Element KV<String, Operati
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
+ if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+ flushOperationsOffsetBeyondEnd.inc();
+ LOG.warn(
+ "Flush of stream {} to offset {} failed because the offset
is beyond the end of the stream. "
+ + "This typically means the stream was finalized or
truncated by BQ. "
+ + "The operation will not be retried on this stream.
Error: {}",
Review Comment:
Can we merge this with the exception below?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -70,6 +72,48 @@ public class StorageApiFlushAndFinalizeDoFn extends
DoFn<KV<String, Operation>,
private final Counter finalizeOperationsFailed =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"finalizeOperationsFailed");
+ /** Custom exception to indicate that a stream is invalid due to an offset
error. */
+ public static class StreamOffsetBeyondEndException extends IOException {
+ public StreamOffsetBeyondEndException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Checks if the given throwable is or is caused by an ApiException
indicating that an offset is
+ * beyond the end of a BigQuery stream.
+ */
+ private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+ if (t == null) {
+ return false;
+ }
+ if (t instanceof ApiException) {
+ ApiException apiException = (ApiException) t;
+ if (apiException.getStatusCode().getCode() == Code.OUT_OF_RANGE) {
+ // Check if the cause is gRPC StatusRuntimeException for more specific
message check
+ Throwable cause = apiException.getCause();
+ if (cause instanceof io.grpc.StatusRuntimeException) {
+ io.grpc.StatusRuntimeException grpcException =
(io.grpc.StatusRuntimeException) cause;
+ return grpcException.getStatus().getCode() ==
io.grpc.Status.Code.OUT_OF_RANGE
Review Comment:
grpc library offers a method that can help simplify things here:
[Status.fromThrowable()](https://github.com/grpc/grpc-java/blob/83538cdae3142be122496fbaa80440f39d715a47/api/src/main/java/io/grpc/Status.java#L396)
See how we use it here:
https://github.com/apache/beam/blob/57b151a522e1807694daafba272e212a0c324011/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L773-L774
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions,
@Element KV<String, Operati
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
+ if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+ flushOperationsOffsetBeyondEnd.inc();
+ LOG.warn(
+ "Flush of stream {} to offset {} failed because the offset
is beyond the end of the stream. "
+ + "This typically means the stream was finalized or
truncated by BQ. "
+ + "The operation will not be retried on this stream.
Error: {}",
+ streamId,
+ offset,
+ error.toString());
+ // This specific error is not retriable on the same stream.
+ // Throwing a runtime exception to break out of the RetryManager
and signal
+ // to the Beam runner that the bundle should be retried, which
will then
+ // allow an upstream DoFn to create a new stream.
+ throw new RuntimeException(
Review Comment:
Does the error mean that the data in this stream has already been finalized
to BQ? Could there be any data missing or duplicated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]