ahmedabu98 commented on code in PR #31924:
URL: https://github.com/apache/beam/pull/31924#discussion_r1689998669


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1381,9 +1461,20 @@ public void finishBundle(FinishBundleContext c) throws 
Exception {
             // to the error queue. Bigtable will successfully write other 
failures in the batch,
             // so this exception should be ignored
             if (!(e.getCause() instanceof BatchingException)) {
+              throttlingMsecs.inc(new Duration(closeStart, 
Instant.now()).getMillis());
               throw e;

Review Comment:
   Is this error due to resource exhaustion?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1318,24 +1361,61 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       badRecords = new HashSet<>();
     }
 
+    private final Counter throttlingMsecs =
+        Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
       checkForFailures();
       KV<ByteString, Iterable<Mutation>> record = c.element();
-      
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, 
window));
+      Instant writeStart = Instant.now();
+      pendingThrottlingMsecs = 0;
+      bigtableWriter
+          .writeRecord(record)
+          .whenComplete(handleMutationException(record, window, writeStart));
+      if (pendingThrottlingMsecs > 0) {
+        throttlingMsecs.inc(pendingThrottlingMsecs);
+      }
       ++recordsWritten;
       seenWindows.compute(window, (key, count) -> (count != null ? count : 0) 
+ 1);
     }
 
     private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
-        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, 
Instant writeStart) {
       return (MutateRowResponse result, Throwable exception) -> {
         if (exception != null) {
           if (isDataException(exception)) {
             retryIndividualRecord(record, window);
           } else {
+            // Exception due to resource unavailable or rate limited,
+            // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+            boolean isResourceException = false;
+            if (exception instanceof StatusRuntimeException) {
+              StatusRuntimeException se = (StatusRuntimeException) exception;
+              if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+                  || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) 
{
+                isResourceException = true;
+              }
+            } else if (exception instanceof DeadlineExceededException
+                || exception instanceof ResourceExhaustedException) {
+              isResourceException = true;
+            }
+            if (isResourceException) {
+              pendingThrottlingMsecs = new Duration(writeStart, 
Instant.now()).getMillis();
+            }
             failures.add(new BigtableWriteException(record, exception));
           }
+        } else {
+          // add the excessive amount to throttling metrics if elapsed time > 
target latency
+          if (writeOptions.getThrottlingReportTargetMs() != null
+              && writeOptions.getThrottlingReportTargetMs() > 0) {

Review Comment:
   Should we set a default value for throttlingReportTargetMs? So existing use 
cases can also benefit from reported throttling 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1318,24 +1361,61 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       badRecords = new HashSet<>();
     }
 
+    private final Counter throttlingMsecs =
+        Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
       checkForFailures();
       KV<ByteString, Iterable<Mutation>> record = c.element();
-      
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, 
window));
+      Instant writeStart = Instant.now();
+      pendingThrottlingMsecs = 0;
+      bigtableWriter
+          .writeRecord(record)
+          .whenComplete(handleMutationException(record, window, writeStart));
+      if (pendingThrottlingMsecs > 0) {
+        throttlingMsecs.inc(pendingThrottlingMsecs);
+      }
       ++recordsWritten;
       seenWindows.compute(window, (key, count) -> (count != null ? count : 0) 
+ 1);
     }
 
     private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
-        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, 
Instant writeStart) {
       return (MutateRowResponse result, Throwable exception) -> {
         if (exception != null) {
           if (isDataException(exception)) {
             retryIndividualRecord(record, window);
           } else {
+            // Exception due to resource unavailable or rate limited,
+            // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+            boolean isResourceException = false;
+            if (exception instanceof StatusRuntimeException) {
+              StatusRuntimeException se = (StatusRuntimeException) exception;
+              if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+                  || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) 
{
+                isResourceException = true;
+              }
+            } else if (exception instanceof DeadlineExceededException
+                || exception instanceof ResourceExhaustedException) {
+              isResourceException = true;
+            }
+            if (isResourceException) {
+              pendingThrottlingMsecs = new Duration(writeStart, 
Instant.now()).getMillis();
+            }
             failures.add(new BigtableWriteException(record, exception));
           }
+        } else {
+          // add the excessive amount to throttling metrics if elapsed time > 
target latency
+          if (writeOptions.getThrottlingReportTargetMs() != null
+              && writeOptions.getThrottlingReportTargetMs() > 0) {

Review Comment:
   Maybe here:
   
https://github.com/apache/beam/blob/eadb81fd01f281755f18a8a3095e3c2c8194e9fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java#L69



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to