This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a2260949431 Avoid publishing string set metrics on the Dataflow legacy 
runner. (#31825)
a2260949431 is described below

commit a22609494318e3125255105b6d2568bbba30a54f
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 10 07:26:46 2024 -0700

    Avoid publishing string set metrics on the Dataflow legacy runner. (#31825)
    
    This can be reverted once Dataflow supports these.
    
    The publishing logic is conditional so that the tests still run.
---
 .../dataflow/worker/BatchModeExecutionContext.java    | 19 +++++++++++++++----
 .../worker/StreamingStepMetricsContainer.java         |  7 ++++++-
 .../worker/StreamingStepMetricsContainerTest.java     |  2 ++
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 62ec70ff9b1..8c038189ae6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.SideInputInfo;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -80,6 +81,9 @@ public class BatchModeExecutionContext
       
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
   protected static final String THROTTLE_TIME_COUNTER_NAME = 
"throttling-msecs";
 
+  // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
+  private final boolean populateStringSetMetrics;
+
   private BatchModeExecutionContext(
       CounterFactory counterFactory,
       Cache<?, WeightedValue<?>> dataCache,
@@ -87,7 +91,8 @@ public class BatchModeExecutionContext
       ReaderFactory readerFactory,
       PipelineOptions options,
       DataflowExecutionStateTracker executionStateTracker,
-      DataflowExecutionStateRegistry executionStateRegistry) {
+      DataflowExecutionStateRegistry executionStateRegistry,
+      boolean populateStringSetMetrics) {
     super(
         counterFactory,
         createMetricsContainerRegistry(),
@@ -100,6 +105,7 @@ public class BatchModeExecutionContext
     this.dataCache = dataCache;
     this.containerRegistry =
         (MetricsContainerRegistry<MetricsContainerImpl>) 
getMetricsContainerRegistry();
+    this.populateStringSetMetrics = populateStringSetMetrics;
   }
 
   private static MetricsContainerRegistry<MetricsContainerImpl> 
createMetricsContainerRegistry() {
@@ -135,7 +141,8 @@ public class BatchModeExecutionContext
             counterFactory,
             options,
             "test-work-item-id"),
-        stateRegistry);
+        stateRegistry,
+        true);
   }
 
   public static BatchModeExecutionContext forTesting(PipelineOptions options, 
String stageName) {
@@ -248,7 +255,8 @@ public class BatchModeExecutionContext
             counterFactory,
             options,
             workItemId),
-        executionStateRegistry);
+        executionStateRegistry,
+        false);
   }
 
   /** Create a new {@link StepContext}. */
@@ -518,7 +526,10 @@ public class BatchModeExecutionContext
                           update ->
                               MetricsToCounterUpdateConverter.fromDistribution(
                                   update.getKey(), true, update.getUpdate())),
-                  FluentIterable.from(updates.stringSetUpdates())
+                  FluentIterable.from(
+                          populateStringSetMetrics
+                              ? updates.stringSetUpdates()
+                              : Collections.emptyList())
                       .transform(
                           update ->
                               MetricsToCounterUpdateConverter.fromStringSet(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index 7cc0dc68f7e..04f983c4fa7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -23,6 +23,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -88,6 +89,9 @@ public class StreamingStepMetricsContainer implements 
MetricsContainer {
 
   private final Clock clock;
 
+  // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
+  @VisibleForTesting boolean populateStringSetUpdates = false;
+
   private StreamingStepMetricsContainer(String stepName) {
     this.stepName = stepName;
     this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
@@ -187,7 +191,8 @@ public class StreamingStepMetricsContainer implements 
MetricsContainer {
   public Iterable<CounterUpdate> extractUpdates() {
     return counterUpdates()
         .append(distributionUpdates())
-        .append(gaugeUpdates().append(stringSetUpdates()));
+        .append(gaugeUpdates())
+        .append(populateStringSetUpdates ? stringSetUpdates() : 
Collections.emptyList());
   }
 
   private FluentIterable<CounterUpdate> counterUpdates() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index 2d5a8d8266a..d128255cd23 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -292,6 +292,7 @@ public class StreamingStepMetricsContainerTest {
             .setCumulative(false)
             .setStringList(new StringList().setElements(Arrays.asList("ab", 
"cd", "ef", "gh")));
 
+    ((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true;
     Iterable<CounterUpdate> updates = 
StreamingStepMetricsContainer.extractMetricUpdates(registry);
     assertThat(updates, containsInAnyOrder(name1Update));
 
@@ -314,6 +315,7 @@ public class StreamingStepMetricsContainerTest {
             .setCumulative(false)
             .setStringList(new StringList().setElements(Arrays.asList("ij", 
"kl", "mn")));
 
+    ((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true;
     updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
     assertThat(updates, containsInAnyOrder(name1Update, name2Update));
 

Reply via email to