This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b13d427593 Deduplicate MonitoringInfo in PortableMetrics (#37066)
7b13d427593 is described below

commit 7b13d427593ea3a76e9cd2d553599d2bfa6972c7
Author: Suvrat Acharya <[email protected]>
AuthorDate: Tue Dec 30 00:29:22 2025 +0530

    Deduplicate MonitoringInfo in PortableMetrics (#37066)
    
    * deduplicate
    
    * addressing gemini comments
    
    * changes
    
    * fixes
    
    * use correct method
    
    * fix
    
    * spotless fix
---
 .../beam/runners/portability/PortableMetrics.java  | 130 ++++++++++++++-------
 .../runners/portability/PortableRunnerTest.java    |  46 ++++++++
 2 files changed, 132 insertions(+), 44 deletions(-)

diff --git 
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
 
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
index 3c7be7907ec..9d57413a274 100644
--- 
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
+++ 
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
@@ -30,6 +30,7 @@ import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decod
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -101,20 +102,30 @@ public class PortableMetrics extends MetricResults {
 
   private static PortableMetrics convertMonitoringInfosToMetricResults(
       JobApi.MetricResults jobMetrics) {
-    List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
-    // TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and 
Committed metrics
-    monitoringInfoList.addAll(jobMetrics.getAttemptedList());
-    monitoringInfoList.addAll(jobMetrics.getCommittedList());
-    Iterable<MetricResult<Long>> countersFromJobMetrics =
-        extractCountersFromJobMetrics(monitoringInfoList);
+    // Deduplicate attempted + committed. Committed wins.
+    LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();
+
+    for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
+      String key = monitoringInfoKey(attempted);
+      infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
+    }
+
+    for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
+      String key = monitoringInfoKey(committed);
+      infoMap.put(key, new MiAndCommitted(committed, true));
+    }
+
+    List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());
+
+    Iterable<MetricResult<Long>> countersFromJobMetrics = 
extractCountersFromJobMetrics(merged);
     Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
-        extractDistributionMetricsFromJobMetrics(monitoringInfoList);
+        extractDistributionMetricsFromJobMetrics(merged);
     Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
-        extractGaugeMetricsFromJobMetrics(monitoringInfoList);
+        extractGaugeMetricsFromJobMetrics(merged);
     Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
-        extractStringSetMetricsFromJobMetrics(monitoringInfoList);
+        extractStringSetMetricsFromJobMetrics(merged);
     Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
-        extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
+        extractBoundedTrieMetricsFromJobMetrics(merged);
     return new PortableMetrics(
         countersFromJobMetrics,
         distributionsFromMetrics,
@@ -123,26 +134,52 @@ public class PortableMetrics extends MetricResults {
         boundedTrieFromMetrics);
   }
 
+  /**
+   * Build a stable deduplication key for a MonitoringInfo based on type and 
the metric identity
+   * labels.
+   */
+  private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(mi.getType()).append('|');
+    Map<String, String> labels = mi.getLabelsMap();
+    // Use canonical labels that form the metric identity
+    sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
+    return sb.toString();
+  }
+
+  private static class MiAndCommitted {
+    final MetricsApi.MonitoringInfo mi;
+    final boolean committed;
+
+    MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
+      this.mi = mi;
+      this.committed = committed;
+    }
+  }
+
   private static Iterable<MetricResult<DistributionResult>>
-      extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> 
monitoringInfoList) {
+      extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> 
monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
-        .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
+        .filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertDistributionMonitoringInfoToDistribution(m))
         .collect(Collectors.toList());
   }
 
   private static Iterable<MetricResult<GaugeResult>> 
extractGaugeMetricsFromJobMetrics(
-      List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+      List<MiAndCommitted> monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
-        .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
+        .filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertGaugeMonitoringInfoToGauge(m))
         .collect(Collectors.toList());
   }
 
-  private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
-      MetricsApi.MonitoringInfo monitoringInfo) {
+  private static MetricResult<GaugeResult> 
convertGaugeMonitoringInfoToGauge(MiAndCommitted m) {
+    MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+    boolean isCommitted = m.committed;
     Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
@@ -151,29 +188,31 @@ public class PortableMetrics extends MetricResults {
 
     GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
     GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
-    return MetricResult.create(key, false, result);
+    return MetricResult.create(key, isCommitted, result);
   }
 
   private static Iterable<MetricResult<StringSetResult>> 
extractStringSetMetricsFromJobMetrics(
-      List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+      List<MiAndCommitted> monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> SET_STRING_TYPE.equals(item.getType()))
-        .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
+        .filter(m -> SET_STRING_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertStringSetMonitoringInfoToStringSet(m))
         .collect(Collectors.toList());
   }
 
   private static Iterable<MetricResult<BoundedTrieResult>> 
extractBoundedTrieMetricsFromJobMetrics(
-      List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+      List<MiAndCommitted> monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
-        .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
+        .filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m))
         .collect(Collectors.toList());
   }
 
   private static MetricResult<StringSetResult> 
convertStringSetMonitoringInfoToStringSet(
-      MetricsApi.MonitoringInfo monitoringInfo) {
+      MiAndCommitted m) {
+    MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+    boolean isCommitted = m.committed;
     Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
@@ -182,11 +221,13 @@ public class PortableMetrics extends MetricResults {
 
     StringSetData data = decodeStringSet(monitoringInfo.getPayload());
     StringSetResult result = StringSetResult.create(data.stringSet());
-    return MetricResult.create(key, false, result);
+    return MetricResult.create(key, isCommitted, result);
   }
 
   private static MetricResult<BoundedTrieResult> 
convertBoundedTrieMonitoringInfoToBoundedTrie(
-      MetricsApi.MonitoringInfo monitoringInfo) {
+      MiAndCommitted m) {
+    MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+    boolean isCommitted = m.committed;
     Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
@@ -195,11 +236,13 @@ public class PortableMetrics extends MetricResults {
 
     BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
     BoundedTrieResult result = 
BoundedTrieResult.create(data.extractResult().getResult());
-    return MetricResult.create(key, false, result);
+    return MetricResult.create(key, isCommitted, result);
   }
 
   private static MetricResult<DistributionResult> 
convertDistributionMonitoringInfoToDistribution(
-      MetricsApi.MonitoringInfo monitoringInfo) {
+      MiAndCommitted m) {
+    MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+    boolean isCommitted = m.committed;
     Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
@@ -208,27 +251,26 @@ public class PortableMetrics extends MetricResults {
     DistributionData data = 
decodeInt64Distribution(monitoringInfo.getPayload());
     DistributionResult result =
         DistributionResult.create(data.sum(), data.count(), data.min(), 
data.max());
-    return MetricResult.create(key, false, result);
+    return MetricResult.create(key, isCommitted, result);
   }
 
   private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
-      List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+      List<MiAndCommitted> monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> SUM_INT64_TYPE.equals(item.getType()))
-        .filter(
-            item ->
-                item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter 
out pcollection metrics
-        .map(PortableMetrics::convertCounterMonitoringInfoToCounter)
+        .filter(m -> SUM_INT64_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertCounterMonitoringInfoToCounter(m))
         .collect(Collectors.toList());
   }
 
-  private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
-      MetricsApi.MonitoringInfo counterMonInfo) {
+  private static MetricResult<Long> 
convertCounterMonitoringInfoToCounter(MiAndCommitted m) {
+    MetricsApi.MonitoringInfo counterMonInfo = m.mi;
+    boolean isCommitted = m.committed;
     Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
             labelsMap.get(STEP_NAME_LABEL),
             MetricName.named(labelsMap.get(NAMESPACE_LABEL), 
labelsMap.get(METRIC_NAME_LABEL)));
-    return MetricResult.create(key, false, 
decodeInt64Counter(counterMonInfo.getPayload()));
+    return MetricResult.create(key, isCommitted, 
decodeInt64Counter(counterMonInfo.getPayload()));
   }
 }
diff --git 
a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
 
b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
index 8c87a46ff17..68f3f6eae39 100644
--- 
a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
+++ 
b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
@@ -222,6 +222,52 @@ public class PortableRunnerTest implements Serializable {
     server.start();
   }
 
+  @Test
+  public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
+    Map<String, String> labelMap = new HashMap<>();
+    labelMap.put(NAMESPACE_LABEL, NAMESPACE);
+    labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
+    labelMap.put(STEP_NAME_LABEL, STEP_NAME);
+
+    // attempted counter (value 7) and committed counter (value 10) with same 
identity
+    MetricsApi.MonitoringInfo attemptedCounter =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(COUNTER_TYPE)
+            .putAllLabels(labelMap)
+            .setPayload(encodeInt64Counter(7L))
+            .build();
+
+    MetricsApi.MonitoringInfo committedCounter =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(COUNTER_TYPE)
+            .putAllLabels(labelMap)
+            .setPayload(encodeInt64Counter(10L))
+            .build();
+
+    JobApi.MetricResults metricResults =
+        JobApi.MetricResults.newBuilder()
+            .addAttempted(attemptedCounter)
+            .addCommitted(committedCounter)
+            .build();
+
+    createJobServer(JobState.Enum.DONE, metricResults);
+    PortableRunner runner = PortableRunner.create(options, 
ManagedChannelFactory.createInProcess());
+    PipelineResult result = runner.run(p);
+    result.waitUntilFinish();
+
+    Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
+        result.metrics().allMetrics().getCounters();
+    ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
+        ImmutableList.copyOf(counters);
+
+    // Only one MetricResult should be present for the same identity.
+    assertThat(list.size(), is(1));
+    org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);
+
+    // Committed value should be present and equal to the committed payload 
(10).
+    assertThat(r.getCommitted(), is(10L));
+  }
+
   private static PipelineOptions createPipelineOptions() {
     PortablePipelineOptions options =
         PipelineOptionsFactory.create().as(PortablePipelineOptions.class);

Reply via email to