This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f1f7097cf Generic throttling metrics namespace and BigtableIO write 
throttling counter (#31924)
74f1f7097cf is described below

commit 74f1f7097cf9c68f325a34d6f55f14356780c47b
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jul 24 13:31:49 2024 -0400

    Generic throttling metrics namespace and BigtableIO write throttling 
counter (#31924)
    
    * BigtableIO write throttling counter
    
    * Introduce a generic throttling namespace and counter
    
    * Dataflow accumulates throttling time from generic throttling counter
    
    * Apply throttling counter to BigtableIO write
    
    * default to 3 min for throttlingReportTargetMs
---
 .../dataflow/worker/BatchModeExecutionContext.java |  26 +++--
 .../dataflow/worker/DataflowSystemMetrics.java     |   5 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |  12 +--
 .../dataflow/worker/streaming/StageInfo.java       |  14 +--
 .../worker/BatchModeExecutionContextTest.java      |   3 +-
 .../java/org/apache/beam/sdk/metrics/Metrics.java  |   7 ++
 .../beam/sdk/metrics/MetricsEnvironment.java       |  11 ++-
 .../java/org/apache/beam/sdk/util/StringUtils.java |  35 +++++++
 .../org/apache/beam/sdk/util/StringUtilsTest.java  |  23 +++++
 .../gcp/util/RetryHttpRequestInitializer.java      |   2 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |   4 +-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 106 ++++++++++++++++++++-
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   |   4 +
 .../sdk/io/gcp/bigtable/BigtableWriteOptions.java  |   5 +
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java     |   2 +-
 .../sdk/io/gcp/datastore/RampupThrottlingFn.java   |   3 +-
 .../io/gcp/bigquery/BigQuerySinkMetricsTest.java   |   4 +-
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   |  15 +++
 .../beam/sdk/io/synthetic/SyntheticStep.java       |   2 +-
 19 files changed, 233 insertions(+), 50 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 8c038189ae6..41bbae7cfdb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -40,6 +40,7 @@ import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -68,9 +69,6 @@ public class BatchModeExecutionContext
   private Object key;
 
   private final MetricsContainerRegistry<MetricsContainerImpl> 
containerRegistry;
-
-  // TODO(https://github.com/apache/beam/issues/19632): Move throttle time 
Metric to a dedicated
-  // namespace.
   protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE =
       "org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn";
   protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE =
@@ -79,7 +77,6 @@ public class BatchModeExecutionContext
       
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
   protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
       
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
-  protected static final String THROTTLE_TIME_COUNTER_NAME = 
"throttling-msecs";
 
   // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
   private final boolean populateStringSetMetrics;
@@ -550,11 +547,18 @@ public class BatchModeExecutionContext
   public Long extractThrottleTime() {
     long totalThrottleMsecs = 0L;
     for (MetricsContainerImpl container : containerRegistry.getContainers()) {
-      // TODO(https://github.com/apache/beam/issues/19632): Update throttling 
counters to use
-      // generic throttling-msecs metric.
+      CounterCell userThrottlingTime =
+          container.tryGetCounter(
+              MetricName.named(
+                  Metrics.THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME));
+      if (userThrottlingTime != null) {
+        totalThrottleMsecs += userThrottlingTime.getCumulative();
+      }
+
       CounterCell dataStoreThrottlingTime =
           container.tryGetCounter(
-              MetricName.named(DATASTORE_THROTTLE_TIME_NAMESPACE, 
THROTTLE_TIME_COUNTER_NAME));
+              MetricName.named(
+                  DATASTORE_THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME));
       if (dataStoreThrottlingTime != null) {
         totalThrottleMsecs += dataStoreThrottlingTime.getCumulative();
       }
@@ -562,7 +566,7 @@ public class BatchModeExecutionContext
       CounterCell httpClientApiThrottlingTime =
           container.tryGetCounter(
               MetricName.named(
-                  HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, 
THROTTLE_TIME_COUNTER_NAME));
+                  HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME));
       if (httpClientApiThrottlingTime != null) {
         totalThrottleMsecs += httpClientApiThrottlingTime.getCumulative();
       }
@@ -570,14 +574,16 @@ public class BatchModeExecutionContext
       CounterCell bigqueryStreamingInsertThrottleTime =
           container.tryGetCounter(
               MetricName.named(
-                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE, 
THROTTLE_TIME_COUNTER_NAME));
+                  BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+                  Metrics.THROTTLE_TIME_COUNTER_NAME));
       if (bigqueryStreamingInsertThrottleTime != null) {
         totalThrottleMsecs += 
bigqueryStreamingInsertThrottleTime.getCumulative();
       }
 
       CounterCell bigqueryReadThrottleTime =
           container.tryGetCounter(
-              MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, 
THROTTLE_TIME_COUNTER_NAME));
+              MetricName.named(
+                  BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME));
       if (bigqueryReadThrottleTime != null) {
         totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
index 640febc616b..c5a24df192e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
@@ -20,15 +20,14 @@ package org.apache.beam.runners.dataflow.worker;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 
 /** This holds system metrics related constants used in Batch and Streaming. */
 public class DataflowSystemMetrics {
 
   public static final MetricName THROTTLING_MSECS_METRIC_NAME =
-      MetricName.named("dataflow-throttling-metrics", "throttling-msecs");
-
-  // TODO: Provide an utility in SDK 'ThrottlingReporter' to update throttling 
time.
+      MetricName.named("dataflow-throttling-metrics", 
Metrics.THROTTLE_TIME_COUNTER_NAME);
 
   /** System counters populated by streaming dataflow workers. */
   public enum StreamingSystemCounterNames {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0e46e7e4687..718d93830c4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -93,13 +93,13 @@ import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.JvmInitializers;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
-import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.construction.CoderTranslation;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
@@ -113,14 +113,6 @@ import org.slf4j.LoggerFactory;
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class StreamingDataflowWorker {
-
-  // TODO(https://github.com/apache/beam/issues/19632): Update throttling 
counters to use generic
-  // throttling-msecs metric.
-  public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
-      MetricName.named(
-          
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
-          "throttling-msecs");
-
   /**
    * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the 
amount of data sinked
    * (across all the sinks, if there are more than one) reaches this limit. 
This serves as hint for
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index 8f14ea26a46..a18ca8cfd6d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.worker.streaming;
 
-import static 
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
+import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME;
 
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterUpdate;
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics;
 import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry;
-import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
 import 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry;
 import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
@@ -93,20 +92,13 @@ public abstract class StageInfo {
   }
 
   /**
-   * Checks if the step counter affects any per-stage counters. Currently 
'throttled_millis' is the
+   * Checks if the step counter affects any per-stage counters. Currently 
'throttled-msecs' is the
    * only counter updated.
    */
   private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
     CounterStructuredName structuredName =
         stepCounterUpdate.getStructuredNameAndMetadata().getName();
-    if 
((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
-            && 
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
-        || (StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
-                .getNamespace()
-                .equals(structuredName.getOriginNamespace())
-            && StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
-                .getName()
-                .equals(structuredName.getName()))) {
+    if (THROTTLE_TIME_COUNTER_NAME.equals(structuredName.getName())) {
       long msecs = 
DataflowCounterUpdateExtractor.splitIntToLong(stepCounterUpdate.getInteger());
       if (msecs > 0) {
         throttledMsecs().addValue(msecs);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
index 18bd814b4df..4062fbf6ebe 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
@@ -43,6 +43,7 @@ import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileSc
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.StringSet;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -266,7 +267,7 @@ public class BatchModeExecutionContextTest {
             .getCounter(
                 MetricName.named(
                     
BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE,
-                    BatchModeExecutionContext.THROTTLE_TIME_COUNTER_NAME));
+                    Metrics.THROTTLE_TIME_COUNTER_NAME));
     counter.inc(12000);
     counter.inc(17000);
     counter.inc(1000);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 916e18647c3..dc80a66c055 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -109,6 +109,13 @@ public class Metrics {
     return new DelegatingStringSet(MetricName.named(namespace, name));
   }
 
+  /*
+   * A dedicated namespace for client throttling time. User DoFn can increment 
this metrics and then
+   * runner will put back pressure on scaling decision, if supported.
+   */
+  public static final String THROTTLE_TIME_NAMESPACE = 
"beam-throttling-metrics";
+  public static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";
+
   /**
    * Implementation of {@link Distribution} that delegates to the instance for 
the current context.
    */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 7f8f2a43643..3421bb4afc8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.StringUtils;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -134,10 +135,14 @@ public class MetricsEnvironment {
     if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, 
true)) {
       if (isMetricsSupported()) {
         LOG.error(
-            "Unable to update metrics on the current thread. "
-                + "Most likely caused by using metrics outside the managed 
work-execution thread.");
+            "Unable to update metrics on the current thread. Most likely 
caused by using metrics "
+                + "outside the managed work-execution thread:\n  {}",
+            
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
       } else {
-        LOG.warn("Reporting metrics are not supported in the current execution 
environment.");
+        // rate limiting this log as it can be emitted each time metrics 
incremented
+        LOG.warn(
+            "Reporting metrics are not supported in the current execution 
environment:\n  {}",
+            
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
       }
     }
     return container;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 13105fb6c02..ccd58857da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Internal;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utilities for working with JSON and other human-readable string formats. */
 @Internal
@@ -143,4 +144,38 @@ public class StringUtils {
 
     return v1[t.length()];
   }
+
+  /**
+   * Convert Array to new lined String. Truncate to first {@code maxLine} 
elements.
+   *
+   * <p>Useful to truncate stacktrace and for logging.
+   */
+  public static String arrayToNewlines(Object[] array, int maxLine) {
+    int n = (maxLine > 0 && array.length > maxLine) ? maxLine : array.length;
+    StringBuilder b = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      b.append(array[i]);
+      b.append("\n");
+    }
+    if (array.length > maxLine) {
+      b.append("...\n");
+    }
+    return b.toString();
+  }
+
+  /**
+   * Truncate String if length greater than maxLen, and append "..." to the 
end. Handles null.
+   *
+   * <p>Useful to truncate long logging message.
+   */
+  public static String leftTruncate(@Nullable Object element, int maxLen) {
+    if (element == null) {
+      return "";
+    }
+    String s = element.toString();
+    if (s.length() > maxLen) {
+      return s.substring(0, maxLen) + "...";
+    }
+    return s;
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 9e9686ca201..e8b0e7ecd47 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.sdk.util;
 
+import static org.apache.commons.lang3.StringUtils.countMatches;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.util.UUID;
+import java.util.stream.IntStream;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -54,4 +58,23 @@ public class StringUtilsTest {
     assertEquals(1, StringUtils.getLevenshteinDistance("abc", "ab1c")); // 
insertion
     assertEquals(1, StringUtils.getLevenshteinDistance("abc", "a1c")); // 
modification
   }
+
+  @Test
+  public void testArrayToNewlines() {
+    Object[] uuids = IntStream.range(1, 10).mapToObj(unused -> 
UUID.randomUUID()).toArray();
+
+    String r1 = StringUtils.arrayToNewlines(uuids, 6);
+    assertTrue(r1.endsWith("...\n"));
+    assertEquals(7, countMatches(r1, "\n"));
+    String r2 = StringUtils.arrayToNewlines(uuids, 15);
+    String r3 = StringUtils.arrayToNewlines(uuids, 10);
+    assertEquals(r3, r2);
+  }
+
+  @Test
+  public void testLeftTruncate() {
+    assertEquals("", StringUtils.leftTruncate(null, 3));
+    assertEquals("", StringUtils.leftTruncate("", 3));
+    assertEquals("abc...", StringUtils.leftTruncate("abcd", 3));
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
index d053a5f4bf8..b48dc636805 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
@@ -75,7 +75,7 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
     private final Set<Integer> ignoredResponseCodes;
     // aggregate the total time spent in exponential backoff
     private final Counter throttlingMsecs =
-        Metrics.counter(LoggingHttpBackOffHandler.class, "throttling-msecs");
+        Metrics.counter(LoggingHttpBackOffHandler.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
     private int ioExceptionRetries;
     private int unsuccessfulResponseRetries;
     private @Nullable CustomHttpErrors customHttpErrors;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c6b0e17e59d..b87b6a222a4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -575,7 +575,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
     private final long maxRowBatchSize;
     // aggregate the total time spent in exponential backoff
     private final Counter throttlingMsecs =
-        Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");
+        Metrics.counter(DatasetServiceImpl.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
 
     private @Nullable BoundedExecutorService executor;
 
@@ -1663,7 +1663,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
   static class StorageClientImpl implements StorageClient {
 
     public static final Counter THROTTLING_MSECS =
-        Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+        Metrics.counter(StorageClientImpl.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
 
     private transient long unreportedDelay = 0L;
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d25ad7d4871..d78ae2cb6c5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -21,13 +21,16 @@ import static 
org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.Bigtabl
 import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import static 
org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.DeadlineExceededException;
 import com.google.api.gax.rpc.InvalidArgumentException;
 import com.google.api.gax.rpc.NotFoundException;
+import com.google.api.gax.rpc.ResourceExhaustedException;
 import com.google.auto.value.AutoValue;
 import com.google.bigtable.v2.MutateRowResponse;
 import com.google.bigtable.v2.Mutation;
@@ -38,6 +41,7 @@ import 
com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
 import com.google.cloud.bigtable.data.v2.models.KeyOffset;
 import com.google.protobuf.ByteString;
+import io.grpc.StatusRuntimeException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,6 +73,8 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEsti
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -82,6 +88,7 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
 import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
 import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.StringUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -1109,12 +1116,51 @@ public class BigtableIO {
      * always enabled on batch writes and limits the number of outstanding 
requests to the Bigtable
      * server.
      *
+     * <p>When enabled, will also set default {@link 
#withThrottlingReportTargetMs} to 1 minute.
+     * This enables runner react with increased latency in flush call due to 
flow control.
+     *
      * <p>Does not modify this object.
      */
     public Write withFlowControl(boolean enableFlowControl) {
+      BigtableWriteOptions options = getBigtableWriteOptions();
+      BigtableWriteOptions.Builder builder = 
options.toBuilder().setFlowControl(enableFlowControl);
+      if (enableFlowControl) {
+        builder = builder.setThrottlingReportTargetMs(60_000);
+      }
+      return toBuilder().setBigtableWriteOptions(builder.build()).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} with client side latency based 
throttling enabled.
+     *
+     * <p>Will also set {@link #withThrottlingReportTargetMs} to the same 
value.
+     */
+    public Write withThrottlingTargetMs(int throttlingTargetMs) {
+      BigtableWriteOptions options = getBigtableWriteOptions();
+      return toBuilder()
+          .setBigtableWriteOptions(
+              options
+                  .toBuilder()
+                  .setThrottlingTargetMs(throttlingTargetMs)
+                  .setThrottlingReportTargetMs(throttlingTargetMs)
+                  .build())
+          .build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} with throttling time reporting 
enabled. When write
+     * request latency exceeded the set value, the amount greater than the 
target will be considered
+     * as throttling time and report back to runner.
+     *
+     * <p>If not set, defaults to 3 min for completed batch request. Client 
side flowing control
+     * configurations (e.g. {@link #withFlowControl}, {@link 
#withThrottlingTargetMs} will adjust
+     * the default value accordingly. Set to 0 to disable throttling time 
reporting.
+     */
+    public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
       BigtableWriteOptions options = getBigtableWriteOptions();
       return toBuilder()
-          
.setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build())
+          .setBigtableWriteOptions(
+              
options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build())
           .build();
     }
 
@@ -1283,7 +1329,14 @@ public class BigtableIO {
     private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
     private final BadRecordRouter badRecordRouter;
 
+    private final Counter throttlingMsecs =
+        Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
+    private final int throttleReportThresMsecs;
+
     private transient Set<KV<BigtableWriteException, BoundedWindow>> 
badRecords = null;
+    // Due to callback thread not supporting Beam metrics, Record pending 
metrics and report later.
+    private transient long pendingThrottlingMsecs;
 
     // Assign serviceEntry in startBundle and clear it in tearDown.
     @Nullable private BigtableServiceEntry serviceEntry;
@@ -1301,6 +1354,8 @@ public class BigtableIO {
       this.badRecordRouter = badRecordRouter;
       this.failures = new ConcurrentLinkedQueue<>();
       this.id = factory.newId();
+      // a request completed more than this time will be considered throttled. 
Disabled if set to 0
+      throttleReportThresMsecs = 
firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
       LOG.debug("Created Bigtable Write Fn with writeOptions {} ", 
writeOptions);
     }
 
@@ -1322,20 +1377,52 @@ public class BigtableIO {
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
       checkForFailures();
       KV<ByteString, Iterable<Mutation>> record = c.element();
-      
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, 
window));
+      Instant writeStart = Instant.now();
+      pendingThrottlingMsecs = 0;
+      bigtableWriter
+          .writeRecord(record)
+          .whenComplete(handleMutationException(record, window, writeStart));
+      if (pendingThrottlingMsecs > 0) {
+        throttlingMsecs.inc(pendingThrottlingMsecs);
+      }
       ++recordsWritten;
       seenWindows.compute(window, (key, count) -> (count != null ? count : 0) 
+ 1);
     }
 
     private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
-        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+        KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, 
Instant writeStart) {
       return (MutateRowResponse result, Throwable exception) -> {
         if (exception != null) {
           if (isDataException(exception)) {
             retryIndividualRecord(record, window);
           } else {
+            // Exception due to resource unavailable or rate limited,
+            // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+            boolean isResourceException = false;
+            if (exception instanceof StatusRuntimeException) {
+              StatusRuntimeException se = (StatusRuntimeException) exception;
+              if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+                  || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) 
{
+                isResourceException = true;
+              }
+            } else if (exception instanceof DeadlineExceededException
+                || exception instanceof ResourceExhaustedException) {
+              isResourceException = true;
+            }
+            if (isResourceException) {
+              pendingThrottlingMsecs = new Duration(writeStart, 
Instant.now()).getMillis();
+            }
             failures.add(new BigtableWriteException(record, exception));
           }
+        } else {
+          // add the excessive amount to throttling metrics if elapsed time > 
target latency
+          if (throttleReportThresMsecs > 0) {
+            long excessTime =
+                new Duration(writeStart, Instant.now()).getMillis() - 
throttleReportThresMsecs;
+            if (excessTime > 0) {
+              pendingThrottlingMsecs = excessTime;
+            }
+          }
         }
       };
     }
@@ -1371,8 +1458,8 @@ public class BigtableIO {
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
       try {
-
         if (bigtableWriter != null) {
+          Instant closeStart = Instant.now();
           try {
             bigtableWriter.close();
           } catch (IOException e) {
@@ -1381,9 +1468,18 @@ public class BigtableIO {
             // to the error queue. Bigtable will successfully write other 
failures in the batch,
             // so this exception should be ignored
             if (!(e.getCause() instanceof BatchingException)) {
+              throttlingMsecs.inc(new Duration(closeStart, 
Instant.now()).getMillis());
               throw e;
             }
           }
+          // add the excessive amount to throttling metrics if elapsed time > 
target latency
+          if (throttleReportThresMsecs > 0) {
+            long excessTime =
+                new Duration(closeStart, Instant.now()).getMillis() - 
throttleReportThresMsecs;
+            if (excessTime > 0) {
+              throttlingMsecs.inc(excessTime);
+            }
+          }
           bigtableWriter = null;
         }
 
@@ -2015,7 +2111,7 @@ public class BigtableIO {
       super(
           String.format(
               "Error mutating row %s with mutations %s",
-              record.getKey().toStringUtf8(), record.getValue()),
+              record.getKey().toStringUtf8(), 
StringUtils.leftTruncate(record.getValue(), 100)),
           cause);
       this.record = record;
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 06e0108259d..10cfa724c2a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.api.gax.batching.Batcher;
 import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.grpc.GrpcCallContext;
 import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.DeadlineExceededException;
 import com.google.api.gax.rpc.ResponseObserver;
 import com.google.api.gax.rpc.ServerStream;
 import com.google.api.gax.rpc.StreamController;
@@ -611,6 +612,9 @@ class BigtableServiceImpl implements BigtableService {
         if (throwable instanceof StatusRuntimeException) {
           serviceCallMetric.call(
               ((StatusRuntimeException) 
throwable).getStatus().getCode().value());
+        } else if (throwable instanceof DeadlineExceededException) {
+          // incoming throwable can be a StatusRuntimeException or a specific 
grpc ApiException
+          serviceCallMetric.call(504);
         } else {
           serviceCallMetric.call("unknown");
         }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index a63cc575809..5963eb6be3c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -57,6 +57,9 @@ abstract class BigtableWriteOptions implements Serializable {
   /** Returns the target latency if latency based throttling is enabled. */
   abstract @Nullable Integer getThrottlingTargetMs();
 
+  /** Returns the target latency if latency based throttling report to runner 
is enabled. */
+  abstract @Nullable Integer getThrottlingReportTargetMs();
+
   /** Returns true if batch write flow control is enabled. Otherwise return 
false. */
   abstract @Nullable Boolean getFlowControl();
 
@@ -88,6 +91,8 @@ abstract class BigtableWriteOptions implements Serializable {
 
     abstract Builder setThrottlingTargetMs(int targetMs);
 
+    abstract Builder setThrottlingReportTargetMs(int targetMs);
+
     abstract Builder setFlowControl(boolean enableFlowControl);
 
     abstract Builder setCloseWaitTimeout(Duration timeout);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 86cd7a3439a..1563b0b059f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -1711,7 +1711,7 @@ public class DatastoreV1 {
     private WriteBatcher writeBatcher;
     private transient AdaptiveThrottler adaptiveThrottler;
     private final Counter throttlingMsecs =
-        Metrics.counter(DatastoreWriterFn.class, "throttling-msecs");
+        Metrics.counter(DatastoreWriterFn.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
     private final Counter rpcErrors =
         Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
     private final Counter rpcSuccesses =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
index db098c0a516..ae94d4b612d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
@@ -53,7 +53,8 @@ public class RampupThrottlingFn<T> extends DoFn<T, T> 
implements Serializable {
   private final PCollectionView<Instant> firstInstantSideInput;
 
   @VisibleForTesting
-  Counter throttlingMsecs = Metrics.counter(RampupThrottlingFn.class, 
"throttling-msecs");
+  Counter throttlingMsecs =
+      Metrics.counter(RampupThrottlingFn.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
 
   // Initialized on every setup.
   private transient MovingFunction successfulOps;
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 50660326275..8695a445c11 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Histogram;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.HistogramData;
 import org.apache.beam.sdk.values.KV;
@@ -178,7 +179,8 @@ public class BigQuerySinkMetricsTest {
     testContainer.assertPerWorkerCounterValue(counterName, 1L);
 
     counterName =
-        MetricName.named(BigQueryServicesImpl.StorageClientImpl.class, 
"throttling-msecs");
+        MetricName.named(
+            BigQueryServicesImpl.StorageClientImpl.class, 
Metrics.THROTTLE_TIME_COUNTER_NAME);
     assertEquals(1L, (long) 
testContainer.getCounter(counterName).getCumulative());
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index dd6a55ff437..e5049b03701 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -436,6 +436,21 @@ public class BigtableIOTest {
     write.expand(null);
   }
 
+  @Test
+  public void testWriteClientRateLimitingAlsoSetReportMsecs() {
+    // client side flow control
+    BigtableIO.Write write = 
BigtableIO.write().withTableId("table").withFlowControl(true);
+    assertEquals(
+        60_000, (int) 
checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
+
+    // client side latency based throttling
+    int targetMs = 30_000;
+    write = 
BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs);
+    assertEquals(
+        targetMs,
+        (int) 
checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
+  }
+
   /** Helper function to make a single row mutation to be written. */
   private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, 
String value) {
     ByteString rowKey = ByteString.copyFromUtf8(key);
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
index d32640ffbf7..98db23c95a3 100644
--- 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
@@ -58,7 +58,7 @@ public class SyntheticStep extends DoFn<KV<byte[], byte[]>, 
KV<byte[], byte[]>>
   private final KV<Long, Long> idAndThroughput;
 
   private final Counter throttlingCounter =
-      Metrics.counter("dataflow-throttling-metrics", "throttling-msecs");
+      Metrics.counter("dataflow-throttling-metrics", 
Metrics.THROTTLE_TIME_COUNTER_NAME);
 
   /**
    * Static cache to store one worker level rate limiter for a step. Value in 
KV is the desired


Reply via email to