ahmedabu98 commented on code in PR #35051:
URL: https://github.com/apache/beam/pull/35051#discussion_r2112098895


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -70,6 +72,48 @@ public class StorageApiFlushAndFinalizeDoFn extends 
DoFn<KV<String, Operation>,
   private final Counter finalizeOperationsFailed =
       Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsFailed");
 
+  /** Custom exception to indicate that a stream is invalid due to an offset 
error. */
+  public static class StreamOffsetBeyondEndException extends IOException {
+    public StreamOffsetBeyondEndException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Checks if the given throwable is or is caused by an ApiException 
indicating that an offset is
+   * beyond the end of a BigQuery stream.
+   */
+  private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+    if (t == null) {
+      return false;
+    }
+    if (t instanceof ApiException) {
+      ApiException apiException = (ApiException) t;
+      if (apiException.getStatusCode().getCode() == Code.OUT_OF_RANGE) {
+        // Check if the cause is gRPC StatusRuntimeException for more specific 
message check
+        Throwable cause = apiException.getCause();
+        if (cause instanceof io.grpc.StatusRuntimeException) {
+          io.grpc.StatusRuntimeException grpcException = 
(io.grpc.StatusRuntimeException) cause;
+          return grpcException.getStatus().getCode() == 
io.grpc.Status.Code.OUT_OF_RANGE
+              && grpcException.getMessage() != null
+              && grpcException
+                  .getMessage()
+                  .toLowerCase()
+                  .contains("is beyond the end of the stream");
+        }
+        // Fallback to checking the ApiException message directly if cause is 
not gRPC
+        return apiException.getMessage() != null
+            && apiException.getMessage().toLowerCase().contains("is beyond the 
end of the stream");
+      }
+    }
+    // Recursively check the cause, as the specific exception might be wrapped.
+    Throwable cause = t.getCause();
+    if (cause == null) {
+      return false;
+    }

Review Comment:
   nit: unnecessary check, this is already covered in the method's base case



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions, 
@Element KV<String, Operati
             BigQuerySinkMetrics.reportFailedRPCMetrics(
                 failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
 
+            if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+              flushOperationsOffsetBeyondEnd.inc();
+              LOG.warn(
+                  "Flush of stream {} to offset {} failed because the offset 
is beyond the end of the stream. "
+                      + "This typically means the stream was finalized or 
truncated by BQ. "
+                      + "The operation will not be retried on this stream. 
Error: {}",

Review Comment:
   Can we merge this with the exception below?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -70,6 +72,48 @@ public class StorageApiFlushAndFinalizeDoFn extends 
DoFn<KV<String, Operation>,
   private final Counter finalizeOperationsFailed =
       Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsFailed");
 
+  /** Custom exception to indicate that a stream is invalid due to an offset 
error. */
+  public static class StreamOffsetBeyondEndException extends IOException {
+    public StreamOffsetBeyondEndException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Checks if the given throwable is or is caused by an ApiException 
indicating that an offset is
+   * beyond the end of a BigQuery stream.
+   */
+  private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+    if (t == null) {
+      return false;
+    }
+    if (t instanceof ApiException) {
+      ApiException apiException = (ApiException) t;
+      if (apiException.getStatusCode().getCode() == Code.OUT_OF_RANGE) {
+        // Check if the cause is gRPC StatusRuntimeException for more specific 
message check
+        Throwable cause = apiException.getCause();
+        if (cause instanceof io.grpc.StatusRuntimeException) {
+          io.grpc.StatusRuntimeException grpcException = 
(io.grpc.StatusRuntimeException) cause;
+          return grpcException.getStatus().getCode() == 
io.grpc.Status.Code.OUT_OF_RANGE

Review Comment:
   grpc library offers a method that can help simplify things here: 
[Status.fromThrowable()](https://github.com/grpc/grpc-java/blob/83538cdae3142be122496fbaa80440f39d715a47/api/src/main/java/io/grpc/Status.java#L396)
   
   See how we use it here: 
https://github.com/apache/beam/blob/57b151a522e1807694daafba272e212a0c324011/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L773-L774



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions, 
@Element KV<String, Operati
             BigQuerySinkMetrics.reportFailedRPCMetrics(
                 failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
 
+            if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+              flushOperationsOffsetBeyondEnd.inc();
+              LOG.warn(
+                  "Flush of stream {} to offset {} failed because the offset 
is beyond the end of the stream. "
+                      + "This typically means the stream was finalized or 
truncated by BQ. "
+                      + "The operation will not be retried on this stream. 
Error: {}",
+                  streamId,
+                  offset,
+                  error.toString());
+              // This specific error is not retriable on the same stream.
+              // Throwing a runtime exception to break out of the RetryManager 
and signal
+              // to the Beam runner that the bundle should be retried, which 
will then
+              // allow an upstream DoFn to create a new stream.
+              throw new RuntimeException(

Review Comment:
   Does the error mean that the data in this stream has already been finalized 
to BQ? Could there be any data missing or duplicated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to