This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7b13d427593 Deduplicate MonitoringInfo in PortableMetrics (#37066)
7b13d427593 is described below
commit 7b13d427593ea3a76e9cd2d553599d2bfa6972c7
Author: Suvrat Acharya <[email protected]>
AuthorDate: Tue Dec 30 00:29:22 2025 +0530
Deduplicate MonitoringInfo in PortableMetrics (#37066)
* deduplicate
* addressing gemini comments
* changes
* fixes
* use correct method
* fix
* spotless fix
---
.../beam/runners/portability/PortableMetrics.java | 130 ++++++++++++++-------
.../runners/portability/PortableRunnerTest.java | 46 ++++++++
2 files changed, 132 insertions(+), 44 deletions(-)
diff --git
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
index 3c7be7907ec..9d57413a274 100644
---
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
+++
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
@@ -30,6 +30,7 @@ import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decod
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -101,20 +102,30 @@ public class PortableMetrics extends MetricResults {
private static PortableMetrics convertMonitoringInfosToMetricResults(
JobApi.MetricResults jobMetrics) {
- List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
- // TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and
Committed metrics
- monitoringInfoList.addAll(jobMetrics.getAttemptedList());
- monitoringInfoList.addAll(jobMetrics.getCommittedList());
- Iterable<MetricResult<Long>> countersFromJobMetrics =
- extractCountersFromJobMetrics(monitoringInfoList);
+ // Deduplicate attempted + committed. Committed wins.
+ LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();
+
+ for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
+ String key = monitoringInfoKey(attempted);
+ infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
+ }
+
+ for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
+ String key = monitoringInfoKey(committed);
+ infoMap.put(key, new MiAndCommitted(committed, true));
+ }
+
+ List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());
+
+ Iterable<MetricResult<Long>> countersFromJobMetrics =
extractCountersFromJobMetrics(merged);
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
- extractDistributionMetricsFromJobMetrics(monitoringInfoList);
+ extractDistributionMetricsFromJobMetrics(merged);
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
- extractGaugeMetricsFromJobMetrics(monitoringInfoList);
+ extractGaugeMetricsFromJobMetrics(merged);
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
- extractStringSetMetricsFromJobMetrics(monitoringInfoList);
+ extractStringSetMetricsFromJobMetrics(merged);
Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
- extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
+ extractBoundedTrieMetricsFromJobMetrics(merged);
return new PortableMetrics(
countersFromJobMetrics,
distributionsFromMetrics,
@@ -123,26 +134,52 @@ public class PortableMetrics extends MetricResults {
boundedTrieFromMetrics);
}
+ /**
+ * Build a stable deduplication key for a MonitoringInfo based on type and
the metric identity
+ * labels.
+ */
+ private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(mi.getType()).append('|');
+ Map<String, String> labels = mi.getLabelsMap();
+ // Use canonical labels that form the metric identity
+ sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
+ sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
+ sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
+ return sb.toString();
+ }
+
+ private static class MiAndCommitted {
+ final MetricsApi.MonitoringInfo mi;
+ final boolean committed;
+
+ MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
+ this.mi = mi;
+ this.committed = committed;
+ }
+ }
+
private static Iterable<MetricResult<DistributionResult>>
- extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo>
monitoringInfoList) {
+ extractDistributionMetricsFromJobMetrics(List<MiAndCommitted>
monitoringInfoList) {
return monitoringInfoList.stream()
- .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
- .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
- .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
+ .filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
+ .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(m -> convertDistributionMonitoringInfoToDistribution(m))
.collect(Collectors.toList());
}
private static Iterable<MetricResult<GaugeResult>>
extractGaugeMetricsFromJobMetrics(
- List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+ List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
- .filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
- .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
- .map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
+ .filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType()))
+ .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(m -> convertGaugeMonitoringInfoToGauge(m))
.collect(Collectors.toList());
}
- private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
- MetricsApi.MonitoringInfo monitoringInfo) {
+ private static MetricResult<GaugeResult>
convertGaugeMonitoringInfoToGauge(MiAndCommitted m) {
+ MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+ boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
@@ -151,29 +188,31 @@ public class PortableMetrics extends MetricResults {
GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
- return MetricResult.create(key, false, result);
+ return MetricResult.create(key, isCommitted, result);
}
private static Iterable<MetricResult<StringSetResult>>
extractStringSetMetricsFromJobMetrics(
- List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+ List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
- .filter(item -> SET_STRING_TYPE.equals(item.getType()))
- .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
- .map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
+ .filter(m -> SET_STRING_TYPE.equals(m.mi.getType()))
+ .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(m -> convertStringSetMonitoringInfoToStringSet(m))
.collect(Collectors.toList());
}
private static Iterable<MetricResult<BoundedTrieResult>>
extractBoundedTrieMetricsFromJobMetrics(
- List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+ List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
- .filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
- .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
- .map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
+ .filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType()))
+ .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m))
.collect(Collectors.toList());
}
private static MetricResult<StringSetResult>
convertStringSetMonitoringInfoToStringSet(
- MetricsApi.MonitoringInfo monitoringInfo) {
+ MiAndCommitted m) {
+ MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+ boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
@@ -182,11 +221,13 @@ public class PortableMetrics extends MetricResults {
StringSetData data = decodeStringSet(monitoringInfo.getPayload());
StringSetResult result = StringSetResult.create(data.stringSet());
- return MetricResult.create(key, false, result);
+ return MetricResult.create(key, isCommitted, result);
}
private static MetricResult<BoundedTrieResult>
convertBoundedTrieMonitoringInfoToBoundedTrie(
- MetricsApi.MonitoringInfo monitoringInfo) {
+ MiAndCommitted m) {
+ MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+ boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
@@ -195,11 +236,13 @@ public class PortableMetrics extends MetricResults {
BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
BoundedTrieResult result =
BoundedTrieResult.create(data.extractResult().getResult());
- return MetricResult.create(key, false, result);
+ return MetricResult.create(key, isCommitted, result);
}
private static MetricResult<DistributionResult>
convertDistributionMonitoringInfoToDistribution(
- MetricsApi.MonitoringInfo monitoringInfo) {
+ MiAndCommitted m) {
+ MetricsApi.MonitoringInfo monitoringInfo = m.mi;
+ boolean isCommitted = m.committed;
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
@@ -208,27 +251,26 @@ public class PortableMetrics extends MetricResults {
DistributionData data =
decodeInt64Distribution(monitoringInfo.getPayload());
DistributionResult result =
DistributionResult.create(data.sum(), data.count(), data.min(),
data.max());
- return MetricResult.create(key, false, result);
+ return MetricResult.create(key, isCommitted, result);
}
private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
- List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+ List<MiAndCommitted> monitoringInfoList) {
return monitoringInfoList.stream()
- .filter(item -> SUM_INT64_TYPE.equals(item.getType()))
- .filter(
- item ->
- item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter
out pcollection metrics
- .map(PortableMetrics::convertCounterMonitoringInfoToCounter)
+ .filter(m -> SUM_INT64_TYPE.equals(m.mi.getType()))
+ .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(m -> convertCounterMonitoringInfoToCounter(m))
.collect(Collectors.toList());
}
- private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
- MetricsApi.MonitoringInfo counterMonInfo) {
+ private static MetricResult<Long>
convertCounterMonitoringInfoToCounter(MiAndCommitted m) {
+ MetricsApi.MonitoringInfo counterMonInfo = m.mi;
+ boolean isCommitted = m.committed;
Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
MetricKey key =
MetricKey.create(
labelsMap.get(STEP_NAME_LABEL),
MetricName.named(labelsMap.get(NAMESPACE_LABEL),
labelsMap.get(METRIC_NAME_LABEL)));
- return MetricResult.create(key, false,
decodeInt64Counter(counterMonInfo.getPayload()));
+ return MetricResult.create(key, isCommitted,
decodeInt64Counter(counterMonInfo.getPayload()));
}
}
diff --git
a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
index 8c87a46ff17..68f3f6eae39 100644
---
a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
+++
b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
@@ -222,6 +222,52 @@ public class PortableRunnerTest implements Serializable {
server.start();
}
+ @Test
+ public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
+ Map<String, String> labelMap = new HashMap<>();
+ labelMap.put(NAMESPACE_LABEL, NAMESPACE);
+ labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
+ labelMap.put(STEP_NAME_LABEL, STEP_NAME);
+
+ // attempted counter (value 7) and committed counter (value 10) with same
identity
+ MetricsApi.MonitoringInfo attemptedCounter =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(COUNTER_TYPE)
+ .putAllLabels(labelMap)
+ .setPayload(encodeInt64Counter(7L))
+ .build();
+
+ MetricsApi.MonitoringInfo committedCounter =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(COUNTER_TYPE)
+ .putAllLabels(labelMap)
+ .setPayload(encodeInt64Counter(10L))
+ .build();
+
+ JobApi.MetricResults metricResults =
+ JobApi.MetricResults.newBuilder()
+ .addAttempted(attemptedCounter)
+ .addCommitted(committedCounter)
+ .build();
+
+ createJobServer(JobState.Enum.DONE, metricResults);
+ PortableRunner runner = PortableRunner.create(options,
ManagedChannelFactory.createInProcess());
+ PipelineResult result = runner.run(p);
+ result.waitUntilFinish();
+
+ Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
+ result.metrics().allMetrics().getCounters();
+ ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
+ ImmutableList.copyOf(counters);
+
+ // Only one MetricResult should be present for the same identity.
+ assertThat(list.size(), is(1));
+ org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);
+
+ // Committed value should be present and equal to the committed payload
(10).
+ assertThat(r.getCommitted(), is(10L));
+ }
+
private static PipelineOptions createPipelineOptions() {
PortablePipelineOptions options =
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);