This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch revert-34738-retrive-trie
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b8282ceb2146a639f0e161cc1ed354f536f4bb02
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Jun 13 11:23:53 2025 -0400

    Revert "Parse values returned from Dataflow API to BoundedTrieData (#34738)"
    
    This reverts commit de76cfa16d36e2cb8dd1a1092710f0cd477e0f8d.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  2 +-
 .../beam/runners/core/metrics/BoundedTrieData.java |  9 ++-
 .../beam/runners/dataflow/DataflowMetrics.java     | 80 ++--------------------
 .../beam/runners/dataflow/DataflowMetricsTest.java | 37 +---------
 4 files changed, 12 insertions(+), 116 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 7867c4895c4..042b95e1314 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_common                           : 
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
         google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev20250427-2.0.0",  // 
[bomupgrader] sets version
         google_api_services_cloudresourcemanager    : 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0",
  // [bomupgrader] sets version
-        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version",
+        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20250106-$google_clients_version",
         google_api_services_healthcare              : 
"com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
         google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
         google_api_services_storage                 : 
"com.google.apis:google-api-services-storage:v1-rev20250424-2.0.0",  // 
[bomupgrader] sets version
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
index c80e24d9507..63fb289d3ee 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
@@ -311,7 +311,7 @@ public class BoundedTrieData implements Serializable {
    * intended to be used directly outside of {@link BoundedTrieData} with 
multiple threads.
    */
   @VisibleForTesting
-  public static class BoundedTrieNode implements Serializable {
+  static class BoundedTrieNode implements Serializable {
 
     public static final String TRUNCATED_TRUE = String.valueOf(true);
     public static final String TRUNCATED_FALSE = String.valueOf(false);
@@ -334,7 +334,7 @@ public class BoundedTrieData implements Serializable {
     private int size;
 
     /** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */
-    public BoundedTrieNode() {
+    BoundedTrieNode() {
       this(new HashMap<>(), false, 1);
     }
 
@@ -345,8 +345,7 @@ public class BoundedTrieData implements Serializable {
      * @param truncated Whether this node is truncated.
      * @param size The size of the subtree rooted at this node.
      */
-    public BoundedTrieNode(
-        @Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int 
size) {
+    BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean 
truncated, int size) {
       this.children = children;
       this.size = size;
       this.truncated = truncated;
@@ -562,7 +561,7 @@ public class BoundedTrieData implements Serializable {
      *
      * @return The size of the subtree.
      */
-    public int getSize() {
+    int getSize() {
       return size;
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 06435793b56..e8e2f6c44c7 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -29,7 +29,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.metrics.BoundedTrieData;
@@ -45,11 +44,9 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.StringSetResult;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -136,8 +133,7 @@ class DataflowMetrics extends MetricResults {
     return result;
   }
 
-  @VisibleForTesting
-  static class DataflowMetricResultExtractor {
+  private static class DataflowMetricResultExtractor {
     private final ImmutableList.Builder<MetricResult<Long>> counterResults;
     private final ImmutableList.Builder<MetricResult<DistributionResult>> 
distributionResults;
     private final ImmutableList.Builder<MetricResult<GaugeResult>> 
gaugeResults;
@@ -179,7 +175,7 @@ class DataflowMetrics extends MetricResults {
         // stringset metric
         StringSetResult value = getStringSetValue(committed);
         stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, 
value));
-      } else if (committed.getBoundedTrie() != null && 
attempted.getBoundedTrie() != null) {
+      } else if (committed.getTrie() != null && attempted.getTrie() != null) {
         BoundedTrieResult value = getBoundedTrieValue(committed);
         boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, 
value));
       } else {
@@ -210,20 +206,12 @@ class DataflowMetrics extends MetricResults {
     }
 
     private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
-      BoundedTrieData trieData = null;
-      Object trieFromResponse = metricUpdate.getBoundedTrie();
-      // Fail-safely cast Trie returned by dataflow API to BoundedTrieResult
-      if (trieFromResponse instanceof BoundedTrie) {
-        trieData = BoundedTrieData.fromProto((BoundedTrie) trieFromResponse);
-      } else if (trieFromResponse instanceof ArrayMap) {
-        trieData = trieFromArrayMap((ArrayMap) trieFromResponse);
-      }
-
-      if (trieData != null) {
-        return BoundedTrieResult.create(trieData.extractResult().getResult());
-      } else {
+      if (metricUpdate.getTrie() == null) {
         return BoundedTrieResult.empty();
       }
+      BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie();
+      BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
+      return BoundedTrieResult.create(trieData.extractResult().getResult());
     }
 
     private DistributionResult getDistributionValue(MetricUpdate metricUpdate) 
{
@@ -238,62 +226,6 @@ class DataflowMetrics extends MetricResults {
       return DistributionResult.create(sum, count, min, max);
     }
 
-    /** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. 
*/
-    @VisibleForTesting
-    static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) {
-      int bound = 0;
-      List<String> singleton = null;
-      Object maybeBound = fieldsMap.get("bound");
-      if (maybeBound instanceof Number) {
-        bound = ((Number) maybeBound).intValue();
-      }
-      Object maybeSingleton = fieldsMap.get("singleton");
-      if (maybeSingleton instanceof List) {
-        List valueList = (List) maybeSingleton;
-        ImmutableList.Builder<String> builder = ImmutableList.builder();
-        for (Object stringValue : valueList) {
-          builder.add((String) stringValue);
-        }
-        singleton = builder.build();
-      }
-      Object maybeRoot = fieldsMap.get("root");
-      BoundedTrieData.BoundedTrieNode root = null;
-      if (maybeRoot instanceof Map) {
-        root = trieNodeFromMap((Map) maybeRoot);
-      }
-      return new BoundedTrieData(singleton, root, bound);
-    }
-
-    /** Translate Map returned by Dataflow API client to 
BoundedTrieData.BoundedTrieNode. */
-    private static BoundedTrieData.BoundedTrieNode trieNodeFromMap(Map 
fieldsMap) {
-      boolean truncated = false;
-      Object mayTruncated = fieldsMap.get("truncated");
-      if (mayTruncated instanceof Boolean) {
-        truncated = (boolean) mayTruncated;
-      }
-      int childrenSize = 0;
-      ImmutableMap.Builder<String, BoundedTrieData.BoundedTrieNode> builder =
-          ImmutableMap.builder();
-      Object maybeChildren = fieldsMap.get("children");
-      if (maybeChildren instanceof Map) {
-        Map allChildren = (Map) maybeChildren;
-        for (Object maybeChildValue : allChildren.entrySet()) {
-          Map.Entry childValue = (Map.Entry) maybeChildValue;
-          Object maybeChild = childValue.getValue();
-          if (maybeChild instanceof Map) {
-            BoundedTrieData.BoundedTrieNode child = trieNodeFromMap((Map) 
maybeChild);
-            Object maybeKey = childValue.getKey();
-            if (maybeKey instanceof String) {
-              builder.put((String) maybeKey, child);
-            }
-            childrenSize += child.getSize();
-          }
-        }
-      }
-      Map<String, BoundedTrieData.BoundedTrieNode> children = builder.build();
-      return new BoundedTrieData.BoundedTrieNode(children, truncated, 
Math.max(1, childrenSize));
-    }
-
     public Iterable<MetricResult<DistributionResult>> getDistributionResults() 
{
       return distributionResults.build();
     }
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index 98e3d4bd405..30f1466d4a5 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -203,7 +202,7 @@ public class DataflowMetricsTest {
   private MetricUpdate makeBoundedTrieMetricUpdate(
       String name, String namespace, String step, BoundedTrie data, boolean 
tentative) {
     MetricUpdate update = new MetricUpdate();
-    update.setBoundedTrie(data);
+    update.setTrie(data);
     return setStructuredName(update, name, namespace, step, tentative);
   }
 
@@ -297,40 +296,6 @@ public class DataflowMetricsTest {
                 StringSetResult.create(ImmutableSet.of("ab", "cd")))));
   }
 
-  @Test
-  public void testParseBoundedTrieWithSingleton() {
-    ArrayMap arrayMap = ArrayMap.create();
-    arrayMap.put("bound", 100);
-    arrayMap.put(
-        "singleton", ImmutableList.of("pubsub:", "topic:", 
"`google.com:abc`.", "some-topic"));
-
-    BoundedTrieData result =
-        
DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap);
-    assertEquals(
-        "BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", 
result.toString());
-  }
-
-  @Test
-  @SuppressWarnings("unchecked") // assemble ArrayMap from scratch for testing
-  public void testParseBoundedTrieWithRoot() {
-    ArrayMap arrayMap = ArrayMap.create();
-    arrayMap.put("bound", 100);
-    ArrayMap root = ArrayMap.create();
-    root.put("truncated", false);
-    ArrayMap children = ArrayMap.create();
-    ArrayMap leaf = ArrayMap.create();
-    leaf.put("1", ArrayMap.of("truncated", false));
-    leaf.put("2", ArrayMap.of("truncated", false));
-    leaf.put("3", ArrayMap.of("truncated", false));
-    children.put("gcs:some-bucket.some-folder/", leaf);
-    root.put("children", children);
-    arrayMap.put("root", root);
-
-    BoundedTrieData result =
-        
DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap);
-    assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/false'})", 
result.toString());
-  }
-
   @Test
   public void testSingleBoundedTrieUpdates() throws IOException {
     AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);

Reply via email to