This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a2260949431 Avoid publishing string set metrics on the Dataflow legacy
runner. (#31825)
a2260949431 is described below
commit a22609494318e3125255105b6d2568bbba30a54f
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 10 07:26:46 2024 -0700
Avoid publishing string set metrics on the Dataflow legacy runner. (#31825)
This can be reverted once Dataflow supports these.
The publishing logic is conditional so that the tests still run.
---
.../dataflow/worker/BatchModeExecutionContext.java | 19 +++++++++++++++----
.../worker/StreamingStepMetricsContainer.java | 7 ++++++-
.../worker/StreamingStepMetricsContainerTest.java | 2 ++
3 files changed, 23 insertions(+), 5 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 62ec70ff9b1..8c038189ae6 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
+import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -80,6 +81,9 @@ public class BatchModeExecutionContext
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
protected static final String THROTTLE_TIME_COUNTER_NAME =
"throttling-msecs";
+ // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
+ private final boolean populateStringSetMetrics;
+
private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
@@ -87,7 +91,8 @@ public class BatchModeExecutionContext
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
- DataflowExecutionStateRegistry executionStateRegistry) {
+ DataflowExecutionStateRegistry executionStateRegistry,
+ boolean populateStringSetMetrics) {
super(
counterFactory,
createMetricsContainerRegistry(),
@@ -100,6 +105,7 @@ public class BatchModeExecutionContext
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>)
getMetricsContainerRegistry();
+ this.populateStringSetMetrics = populateStringSetMetrics;
}
private static MetricsContainerRegistry<MetricsContainerImpl>
createMetricsContainerRegistry() {
@@ -135,7 +141,8 @@ public class BatchModeExecutionContext
counterFactory,
options,
"test-work-item-id"),
- stateRegistry);
+ stateRegistry,
+ true);
}
public static BatchModeExecutionContext forTesting(PipelineOptions options,
String stageName) {
@@ -248,7 +255,8 @@ public class BatchModeExecutionContext
counterFactory,
options,
workItemId),
- executionStateRegistry);
+ executionStateRegistry,
+ false);
}
/** Create a new {@link StepContext}. */
@@ -518,7 +526,10 @@ public class BatchModeExecutionContext
update ->
MetricsToCounterUpdateConverter.fromDistribution(
update.getKey(), true, update.getUpdate())),
- FluentIterable.from(updates.stringSetUpdates())
+ FluentIterable.from(
+ populateStringSetMetrics
+ ? updates.stringSetUpdates()
+ : Collections.emptyList())
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index 7cc0dc68f7e..04f983c4fa7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -23,6 +23,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@@ -88,6 +89,9 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
private final Clock clock;
+ // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
+ @VisibleForTesting boolean populateStringSetUpdates = false;
+
private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
@@ -187,7 +191,8 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates()
.append(distributionUpdates())
- .append(gaugeUpdates().append(stringSetUpdates()));
+ .append(gaugeUpdates())
+ .append(populateStringSetUpdates ? stringSetUpdates() :
Collections.emptyList());
}
private FluentIterable<CounterUpdate> counterUpdates() {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index 2d5a8d8266a..d128255cd23 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -292,6 +292,7 @@ public class StreamingStepMetricsContainerTest {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ab",
"cd", "ef", "gh")));
+ ((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true;
Iterable<CounterUpdate> updates =
StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
@@ -314,6 +315,7 @@ public class StreamingStepMetricsContainerTest {
.setCumulative(false)
.setStringList(new StringList().setElements(Arrays.asList("ij",
"kl", "mn")));
+ ((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));