This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 74f1f7097cf Generic throttling metrics namespace and BigtableIO write
throttling counter (#31924)
74f1f7097cf is described below
commit 74f1f7097cf9c68f325a34d6f55f14356780c47b
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jul 24 13:31:49 2024 -0400
Generic throttling metrics namespace and BigtableIO write throttling
counter (#31924)
* BigtableIO write throttling counter
* Introduce a generic throttling namespace and counter
* Dataflow accumulates throttling time from generic throttling counter
* Apply throttling counter to BigtableIO write
* default to 3 min for throttlingReportTargetMs
---
.../dataflow/worker/BatchModeExecutionContext.java | 26 +++--
.../dataflow/worker/DataflowSystemMetrics.java | 5 +-
.../dataflow/worker/StreamingDataflowWorker.java | 12 +--
.../dataflow/worker/streaming/StageInfo.java | 14 +--
.../worker/BatchModeExecutionContextTest.java | 3 +-
.../java/org/apache/beam/sdk/metrics/Metrics.java | 7 ++
.../beam/sdk/metrics/MetricsEnvironment.java | 11 ++-
.../java/org/apache/beam/sdk/util/StringUtils.java | 35 +++++++
.../org/apache/beam/sdk/util/StringUtilsTest.java | 23 +++++
.../gcp/util/RetryHttpRequestInitializer.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 4 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 106 ++++++++++++++++++++-
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 4 +
.../sdk/io/gcp/bigtable/BigtableWriteOptions.java | 5 +
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +-
.../sdk/io/gcp/datastore/RampupThrottlingFn.java | 3 +-
.../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 4 +-
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 15 +++
.../beam/sdk/io/synthetic/SyntheticStep.java | 2 +-
19 files changed, 233 insertions(+), 50 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 8c038189ae6..41bbae7cfdb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -40,6 +40,7 @@ import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
@@ -68,9 +69,6 @@ public class BatchModeExecutionContext
private Object key;
private final MetricsContainerRegistry<MetricsContainerImpl>
containerRegistry;
-
- // TODO(https://github.com/apache/beam/issues/19632): Move throttle time
Metric to a dedicated
- // namespace.
protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn";
protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE =
@@ -79,7 +77,6 @@ public class BatchModeExecutionContext
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
- protected static final String THROTTLE_TIME_COUNTER_NAME =
"throttling-msecs";
// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
private final boolean populateStringSetMetrics;
@@ -550,11 +547,18 @@ public class BatchModeExecutionContext
public Long extractThrottleTime() {
long totalThrottleMsecs = 0L;
for (MetricsContainerImpl container : containerRegistry.getContainers()) {
- // TODO(https://github.com/apache/beam/issues/19632): Update throttling
counters to use
- // generic throttling-msecs metric.
+ CounterCell userThrottlingTime =
+ container.tryGetCounter(
+ MetricName.named(
+ Metrics.THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME));
+ if (userThrottlingTime != null) {
+ totalThrottleMsecs += userThrottlingTime.getCumulative();
+ }
+
CounterCell dataStoreThrottlingTime =
container.tryGetCounter(
- MetricName.named(DATASTORE_THROTTLE_TIME_NAMESPACE,
THROTTLE_TIME_COUNTER_NAME));
+ MetricName.named(
+ DATASTORE_THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME));
if (dataStoreThrottlingTime != null) {
totalThrottleMsecs += dataStoreThrottlingTime.getCumulative();
}
@@ -562,7 +566,7 @@ public class BatchModeExecutionContext
CounterCell httpClientApiThrottlingTime =
container.tryGetCounter(
MetricName.named(
- HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE,
THROTTLE_TIME_COUNTER_NAME));
+ HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME));
if (httpClientApiThrottlingTime != null) {
totalThrottleMsecs += httpClientApiThrottlingTime.getCumulative();
}
@@ -570,14 +574,16 @@ public class BatchModeExecutionContext
CounterCell bigqueryStreamingInsertThrottleTime =
container.tryGetCounter(
MetricName.named(
- BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
THROTTLE_TIME_COUNTER_NAME));
+ BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
+ Metrics.THROTTLE_TIME_COUNTER_NAME));
if (bigqueryStreamingInsertThrottleTime != null) {
totalThrottleMsecs +=
bigqueryStreamingInsertThrottleTime.getCumulative();
}
CounterCell bigqueryReadThrottleTime =
container.tryGetCounter(
- MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE,
THROTTLE_TIME_COUNTER_NAME));
+ MetricName.named(
+ BIGQUERY_READ_THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME));
if (bigqueryReadThrottleTime != null) {
totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
index 640febc616b..c5a24df192e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
@@ -20,15 +20,14 @@ package org.apache.beam.runners.dataflow.worker;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
/** This holds system metrics related constants used in Batch and Streaming. */
public class DataflowSystemMetrics {
public static final MetricName THROTTLING_MSECS_METRIC_NAME =
- MetricName.named("dataflow-throttling-metrics", "throttling-msecs");
-
- // TODO: Provide an utility in SDK 'ThrottlingReporter' to update throttling
time.
+ MetricName.named("dataflow-throttling-metrics",
Metrics.THROTTLE_TIME_COUNTER_NAME);
/** System counters populated by streaming dataflow workers. */
public enum StreamingSystemCounterNames {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0e46e7e4687..718d93830c4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -93,13 +93,13 @@ import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
-import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
@@ -113,14 +113,6 @@ import org.slf4j.LoggerFactory;
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingDataflowWorker {
-
- // TODO(https://github.com/apache/beam/issues/19632): Update throttling
counters to use generic
- // throttling-msecs metric.
- public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
- MetricName.named(
-
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
- "throttling-msecs");
-
/**
* Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the
amount of data sinked
* (across all the sinks, if there are more than one) reaches this limit.
This serves as hint for
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index 8f14ea26a46..a18ca8cfd6d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;
-import static
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
+import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterUpdate;
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics;
import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry;
-import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry;
import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
@@ -93,20 +92,13 @@ public abstract class StageInfo {
}
/**
- * Checks if the step counter affects any per-stage counters. Currently
'throttled_millis' is the
+ * Checks if the step counter affects any per-stage counters. Currently
'throttled-msecs' is the
* only counter updated.
*/
private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
CounterStructuredName structuredName =
stepCounterUpdate.getStructuredNameAndMetadata().getName();
- if
((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
- &&
THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
- || (StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
- .getNamespace()
- .equals(structuredName.getOriginNamespace())
- && StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
- .getName()
- .equals(structuredName.getName()))) {
+ if (THROTTLE_TIME_COUNTER_NAME.equals(structuredName.getName())) {
long msecs =
DataflowCounterUpdateExtractor.splitIntToLong(stepCounterUpdate.getInteger());
if (msecs > 0) {
throttledMsecs().addValue(msecs);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
index 18bd814b4df..4062fbf6ebe 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
@@ -43,6 +43,7 @@ import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileSc
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -266,7 +267,7 @@ public class BatchModeExecutionContextTest {
.getCounter(
MetricName.named(
BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE,
- BatchModeExecutionContext.THROTTLE_TIME_COUNTER_NAME));
+ Metrics.THROTTLE_TIME_COUNTER_NAME));
counter.inc(12000);
counter.inc(17000);
counter.inc(1000);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 916e18647c3..dc80a66c055 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -109,6 +109,13 @@ public class Metrics {
return new DelegatingStringSet(MetricName.named(namespace, name));
}
+ /*
+ * A dedicated namespace for client throttling time. User DoFn can increment
this metrics and then
+ * runner will put back pressure on scaling decision, if supported.
+ */
+ public static final String THROTTLE_TIME_NAMESPACE =
"beam-throttling-metrics";
+ public static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";
+
/**
* Implementation of {@link Distribution} that delegates to the instance for
the current context.
*/
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 7f8f2a43643..3421bb4afc8 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.StringUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -134,10 +135,14 @@ public class MetricsEnvironment {
if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false,
true)) {
if (isMetricsSupported()) {
LOG.error(
- "Unable to update metrics on the current thread. "
- + "Most likely caused by using metrics outside the managed
work-execution thread.");
+ "Unable to update metrics on the current thread. Most likely
caused by using metrics "
+ + "outside the managed work-execution thread:\n {}",
+
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
} else {
- LOG.warn("Reporting metrics are not supported in the current execution
environment.");
+ // rate limiting this log as it can be emitted each time metrics
incremented
+ LOG.warn(
+ "Reporting metrics are not supported in the current execution
environment:\n {}",
+
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
}
}
return container;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 13105fb6c02..ccd58857da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -22,6 +22,7 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** Utilities for working with JSON and other human-readable string formats. */
@Internal
@@ -143,4 +144,38 @@ public class StringUtils {
return v1[t.length()];
}
+
+ /**
+ * Convert Array to new lined String. Truncate to first {@code maxLine}
elements.
+ *
+ * <p>Useful to truncate stacktrace and for logging.
+ */
+ public static String arrayToNewlines(Object[] array, int maxLine) {
+ int n = (maxLine > 0 && array.length > maxLine) ? maxLine : array.length;
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ b.append(array[i]);
+ b.append("\n");
+ }
+ if (array.length > maxLine) {
+ b.append("...\n");
+ }
+ return b.toString();
+ }
+
+ /**
+ * Truncate String if length greater than maxLen, and append "..." to the
end. Handles null.
+ *
+ * <p>Useful to truncate long logging message.
+ */
+ public static String leftTruncate(@Nullable Object element, int maxLen) {
+ if (element == null) {
+ return "";
+ }
+ String s = element.toString();
+ if (s.length() > maxLen) {
+ return s.substring(0, maxLen) + "...";
+ }
+ return s;
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 9e9686ca201..e8b0e7ecd47 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -17,9 +17,13 @@
*/
package org.apache.beam.sdk.util;
+import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.UUID;
+import java.util.stream.IntStream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -54,4 +58,23 @@ public class StringUtilsTest {
assertEquals(1, StringUtils.getLevenshteinDistance("abc", "ab1c")); //
insertion
assertEquals(1, StringUtils.getLevenshteinDistance("abc", "a1c")); //
modification
}
+
+ @Test
+ public void testArrayToNewlines() {
+ Object[] uuids = IntStream.range(1, 10).mapToObj(unused ->
UUID.randomUUID()).toArray();
+
+ String r1 = StringUtils.arrayToNewlines(uuids, 6);
+ assertTrue(r1.endsWith("...\n"));
+ assertEquals(7, countMatches(r1, "\n"));
+ String r2 = StringUtils.arrayToNewlines(uuids, 15);
+ String r3 = StringUtils.arrayToNewlines(uuids, 10);
+ assertEquals(r3, r2);
+ }
+
+ @Test
+ public void testLeftTruncate() {
+ assertEquals("", StringUtils.leftTruncate(null, 3));
+ assertEquals("", StringUtils.leftTruncate("", 3));
+ assertEquals("abc...", StringUtils.leftTruncate("abcd", 3));
+ }
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
index d053a5f4bf8..b48dc636805 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
@@ -75,7 +75,7 @@ public class RetryHttpRequestInitializer implements
HttpRequestInitializer {
private final Set<Integer> ignoredResponseCodes;
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
- Metrics.counter(LoggingHttpBackOffHandler.class, "throttling-msecs");
+ Metrics.counter(LoggingHttpBackOffHandler.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
private int ioExceptionRetries;
private int unsuccessfulResponseRetries;
private @Nullable CustomHttpErrors customHttpErrors;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c6b0e17e59d..b87b6a222a4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -575,7 +575,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
private final long maxRowBatchSize;
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
- Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");
+ Metrics.counter(DatasetServiceImpl.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
private @Nullable BoundedExecutorService executor;
@@ -1663,7 +1663,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
static class StorageClientImpl implements StorageClient {
public static final Counter THROTTLING_MSECS =
- Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+ Metrics.counter(StorageClientImpl.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
private transient long unreportedDelay = 0L;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d25ad7d4871..d78ae2cb6c5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -21,13 +21,16 @@ import static
org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.Bigtabl
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static
org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
+import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
@@ -38,6 +41,7 @@ import
com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
+import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -69,6 +73,8 @@ import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEsti
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -82,6 +88,7 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -1109,12 +1116,51 @@ public class BigtableIO {
* always enabled on batch writes and limits the number of outstanding
requests to the Bigtable
* server.
*
+ * <p>When enabled, will also set default {@link
#withThrottlingReportTargetMs} to 1 minute.
+ * This enables runner react with increased latency in flush call due to
flow control.
+ *
* <p>Does not modify this object.
*/
public Write withFlowControl(boolean enableFlowControl) {
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ BigtableWriteOptions.Builder builder =
options.toBuilder().setFlowControl(enableFlowControl);
+ if (enableFlowControl) {
+ builder = builder.setThrottlingReportTargetMs(60_000);
+ }
+ return toBuilder().setBigtableWriteOptions(builder.build()).build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with client side latency based
throttling enabled.
+ *
+ * <p>Will also set {@link #withThrottlingReportTargetMs} to the same
value.
+ */
+ public Write withThrottlingTargetMs(int throttlingTargetMs) {
+ BigtableWriteOptions options = getBigtableWriteOptions();
+ return toBuilder()
+ .setBigtableWriteOptions(
+ options
+ .toBuilder()
+ .setThrottlingTargetMs(throttlingTargetMs)
+ .setThrottlingReportTargetMs(throttlingTargetMs)
+ .build())
+ .build();
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} with throttling time reporting
enabled. When write
+ * request latency exceeded the set value, the amount greater than the
target will be considered
+ * as throttling time and report back to runner.
+ *
+ * <p>If not set, defaults to 3 min for completed batch request. Client
side flowing control
+ * configurations (e.g. {@link #withFlowControl}, {@link
#withThrottlingTargetMs} will adjust
+ * the default value accordingly. Set to 0 to disable throttling time
reporting.
+ */
+ public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
-
.setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build())
+ .setBigtableWriteOptions(
+
options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build())
.build();
}
@@ -1283,7 +1329,14 @@ public class BigtableIO {
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;
+ private final Counter throttlingMsecs =
+ Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
+ private final int throttleReportThresMsecs;
+
private transient Set<KV<BigtableWriteException, BoundedWindow>>
badRecords = null;
+ // Due to callback thread not supporting Beam metrics, Record pending
metrics and report later.
+ private transient long pendingThrottlingMsecs;
// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;
@@ -1301,6 +1354,8 @@ public class BigtableIO {
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
+ // a request completed more than this time will be considered throttled.
Disabled if set to 0
+ throttleReportThresMsecs =
firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
LOG.debug("Created Bigtable Write Fn with writeOptions {} ",
writeOptions);
}
@@ -1322,20 +1377,52 @@ public class BigtableIO {
public void processElement(ProcessContext c, BoundedWindow window) throws
Exception {
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
-
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record,
window));
+ Instant writeStart = Instant.now();
+ pendingThrottlingMsecs = 0;
+ bigtableWriter
+ .writeRecord(record)
+ .whenComplete(handleMutationException(record, window, writeStart));
+ if (pendingThrottlingMsecs > 0) {
+ throttlingMsecs.inc(pendingThrottlingMsecs);
+ }
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0)
+ 1);
}
private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
- KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+ KV<ByteString, Iterable<Mutation>> record, BoundedWindow window,
Instant writeStart) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
+ // Exception due to resource unavailable or rate limited,
+ // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+ boolean isResourceException = false;
+ if (exception instanceof StatusRuntimeException) {
+ StatusRuntimeException se = (StatusRuntimeException) exception;
+ if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+ || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus()))
{
+ isResourceException = true;
+ }
+ } else if (exception instanceof DeadlineExceededException
+ || exception instanceof ResourceExhaustedException) {
+ isResourceException = true;
+ }
+ if (isResourceException) {
+ pendingThrottlingMsecs = new Duration(writeStart,
Instant.now()).getMillis();
+ }
failures.add(new BigtableWriteException(record, exception));
}
+ } else {
+ // add the excessive amount to throttling metrics if elapsed time >
target latency
+ if (throttleReportThresMsecs > 0) {
+ long excessTime =
+ new Duration(writeStart, Instant.now()).getMillis() -
throttleReportThresMsecs;
+ if (excessTime > 0) {
+ pendingThrottlingMsecs = excessTime;
+ }
+ }
}
};
}
@@ -1371,8 +1458,8 @@ public class BigtableIO {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
try {
-
if (bigtableWriter != null) {
+ Instant closeStart = Instant.now();
try {
bigtableWriter.close();
} catch (IOException e) {
@@ -1381,9 +1468,18 @@ public class BigtableIO {
// to the error queue. Bigtable will successfully write other
failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
+ throttlingMsecs.inc(new Duration(closeStart,
Instant.now()).getMillis());
throw e;
}
}
+ // add the excessive amount to throttling metrics if elapsed time >
target latency
+ if (throttleReportThresMsecs > 0) {
+ long excessTime =
+ new Duration(closeStart, Instant.now()).getMillis() -
throttleReportThresMsecs;
+ if (excessTime > 0) {
+ throttlingMsecs.inc(excessTime);
+ }
+ }
bigtableWriter = null;
}
@@ -2015,7 +2111,7 @@ public class BigtableIO {
super(
String.format(
"Error mutating row %s with mutations %s",
- record.getKey().toStringUtf8(), record.getValue()),
+ record.getKey().toStringUtf8(),
StringUtils.leftTruncate(record.getValue(), 100)),
cause);
this.record = record;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 06e0108259d..10cfa724c2a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StreamController;
@@ -611,6 +612,9 @@ class BigtableServiceImpl implements BigtableService {
if (throwable instanceof StatusRuntimeException) {
serviceCallMetric.call(
((StatusRuntimeException)
throwable).getStatus().getCode().value());
+ } else if (throwable instanceof DeadlineExceededException) {
+ // incoming throwable can be a StatusRuntimeException or a specific
grpc ApiException
+ serviceCallMetric.call(504);
} else {
serviceCallMetric.call("unknown");
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index a63cc575809..5963eb6be3c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -57,6 +57,9 @@ abstract class BigtableWriteOptions implements Serializable {
/** Returns the target latency if latency based throttling is enabled. */
abstract @Nullable Integer getThrottlingTargetMs();
+ /** Returns the target latency if latency based throttling report to runner
is enabled. */
+ abstract @Nullable Integer getThrottlingReportTargetMs();
+
/** Returns true if batch write flow control is enabled. Otherwise return
false. */
abstract @Nullable Boolean getFlowControl();
@@ -88,6 +91,8 @@ abstract class BigtableWriteOptions implements Serializable {
abstract Builder setThrottlingTargetMs(int targetMs);
+ abstract Builder setThrottlingReportTargetMs(int targetMs);
+
abstract Builder setFlowControl(boolean enableFlowControl);
abstract Builder setCloseWaitTimeout(Duration timeout);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 86cd7a3439a..1563b0b059f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -1711,7 +1711,7 @@ public class DatastoreV1 {
private WriteBatcher writeBatcher;
private transient AdaptiveThrottler adaptiveThrottler;
private final Counter throttlingMsecs =
- Metrics.counter(DatastoreWriterFn.class, "throttling-msecs");
+ Metrics.counter(DatastoreWriterFn.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
private final Counter rpcErrors =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
private final Counter rpcSuccesses =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
index db098c0a516..ae94d4b612d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
@@ -53,7 +53,8 @@ public class RampupThrottlingFn<T> extends DoFn<T, T>
implements Serializable {
private final PCollectionView<Instant> firstInstantSideInput;
@VisibleForTesting
- Counter throttlingMsecs = Metrics.counter(RampupThrottlingFn.class,
"throttling-msecs");
+ Counter throttlingMsecs =
+ Metrics.counter(RampupThrottlingFn.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
// Initialized on every setup.
private transient MovingFunction successfulOps;
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 50660326275..8695a445c11 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -37,6 +37,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
@@ -178,7 +179,8 @@ public class BigQuerySinkMetricsTest {
testContainer.assertPerWorkerCounterValue(counterName, 1L);
counterName =
- MetricName.named(BigQueryServicesImpl.StorageClientImpl.class,
"throttling-msecs");
+ MetricName.named(
+ BigQueryServicesImpl.StorageClientImpl.class,
Metrics.THROTTLE_TIME_COUNTER_NAME);
assertEquals(1L, (long)
testContainer.getCounter(counterName).getCumulative());
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index dd6a55ff437..e5049b03701 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -436,6 +436,21 @@ public class BigtableIOTest {
write.expand(null);
}
+ @Test
+ public void testWriteClientRateLimitingAlsoSetReportMsecs() {
+ // client side flow control
+ BigtableIO.Write write =
BigtableIO.write().withTableId("table").withFlowControl(true);
+ assertEquals(
+ 60_000, (int)
checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
+
+ // client side latency based throttling
+ int targetMs = 30_000;
+ write =
BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs);
+ assertEquals(
+ targetMs,
+ (int)
checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
+ }
+
/** Helper function to make a single row mutation to be written. */
private static KV<ByteString, Iterable<Mutation>> makeWrite(String key,
String value) {
ByteString rowKey = ByteString.copyFromUtf8(key);
diff --git
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
index d32640ffbf7..98db23c95a3 100644
---
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
+++
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java
@@ -58,7 +58,7 @@ public class SyntheticStep extends DoFn<KV<byte[], byte[]>,
KV<byte[], byte[]>>
private final KV<Long, Long> idAndThroughput;
private final Counter throttlingCounter =
- Metrics.counter("dataflow-throttling-metrics", "throttling-msecs");
+ Metrics.counter("dataflow-throttling-metrics",
Metrics.THROTTLE_TIME_COUNTER_NAME);
/**
* Static cache to store one worker level rate limiter for a step. Value in
KV is the desired