gemini-code-assist[bot] commented on code in PR #37066:
URL: https://github.com/apache/beam/pull/37066#discussion_r2604210756


##########
runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java:
##########
@@ -123,26 +134,61 @@ private static PortableMetrics 
convertMonitoringInfosToMetricResults(
         boundedTrieFromMetrics);
   }
 
+  /**
+   * Build a stable deduplication key for a MonitoringInfo based on type and 
the metric identity
+   * labels.
+   */
+  private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(mi.getType()).append('|');
+    Map<String, String> labels = mi.getLabelsMap();
+    // Use canonical labels that form the metric identity
+    sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
+    return sb.toString();
+  }
+
+  private static class MiAndCommitted {
+    final MetricsApi.MonitoringInfo mi;
+    final boolean committed;
+
+    MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
+      this.mi = mi;
+      this.committed = committed;
+    }
+  }
+
   private static Iterable<MetricResult<DistributionResult>>
-      extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> 
monitoringInfoList) {
+      extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> 
monitoringInfoList) {
     return monitoringInfoList.stream()
+        .map(m -> m.mi)
         .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
         .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
+        .map(
+            item -> {
+              boolean isCommitted = findCommittedFlag(monitoringInfoList, 
item);
+              return convertDistributionMonitoringInfoToDistribution(item, 
isCommitted);
+            })
         .collect(Collectors.toList());

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current implementation of `extract...MetricsFromJobMetrics` methods is 
inefficient. It first maps the `List<MiAndCommitted>` to a 
`Stream<MonitoringInfo>` and then, for each item, it calls `findCommittedFlag` 
which iterates over the original list again to find the `committed` flag. This 
results in O(N^2) complexity where N is the number of metrics.
   
   You can improve this by operating on the `Stream<MiAndCommitted>` directly, 
which avoids the need for `findCommittedFlag` and makes the operation O(N).
   
   This same issue is present in:
   * `extractGaugeMetricsFromJobMetrics`
   * `extractStringSetMetricsFromJobMetrics`
   * `extractBoundedTrieMetricsFromJobMetrics`
   * `extractCountersFromJobMetrics`
   
   After refactoring these methods, the `findCommittedFlag` method can be 
removed.
   
   ```java
       return monitoringInfoList.stream()
           .filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
           .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
           .map(m -> convertDistributionMonitoringInfoToDistribution(m.mi, 
m.committed))
           .collect(Collectors.toList());
   ```



##########
runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java:
##########
@@ -208,27 +264,45 @@ private static MetricResult<DistributionResult> 
convertDistributionMonitoringInf
     DistributionData data = 
decodeInt64Distribution(monitoringInfo.getPayload());
     DistributionResult result =
         DistributionResult.create(data.sum(), data.count(), data.min(), 
data.max());
-    return MetricResult.create(key, false, result);
+    return MetricResult.create(key, isCommitted, result);
   }
 
   private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
-      List<MetricsApi.MonitoringInfo> monitoringInfoList) {
+      List<MiAndCommitted> monitoringInfoList) {
     return monitoringInfoList.stream()
+        .map(m -> m.mi)
         .filter(item -> SUM_INT64_TYPE.equals(item.getType()))
         .filter(
             item ->
                 item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter 
out pcollection metrics
-        .map(PortableMetrics::convertCounterMonitoringInfoToCounter)
+        .map(
+            item -> {
+              boolean isCommitted = findCommittedFlag(monitoringInfoList, 
item);
+              return convertCounterMonitoringInfoToCounter(item, isCommitted);
+            })
         .collect(Collectors.toList());
   }
 
   private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
-      MetricsApi.MonitoringInfo counterMonInfo) {
+      MetricsApi.MonitoringInfo counterMonInfo, boolean isCommitted) {
     Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
     MetricKey key =
         MetricKey.create(
             labelsMap.get(STEP_NAME_LABEL),
             MetricName.named(labelsMap.get(NAMESPACE_LABEL), 
labelsMap.get(METRIC_NAME_LABEL)));
-    return MetricResult.create(key, false, 
decodeInt64Counter(counterMonInfo.getPayload()));
+    return MetricResult.create(key, isCommitted, 
decodeInt64Counter(counterMonInfo.getPayload()));
+  }
+
+  /** Helper to retrieve the committed flag for a MonitoringInfo from the 
merged list. */
+  private static boolean findCommittedFlag(
+      List<MiAndCommitted> merged, MetricsApi.MonitoringInfo mi) {
+    // Reconstruct the key and look up in the merged map entries.
+    String key = monitoringInfoKey(mi);
+    for (MiAndCommitted entry : merged) {
+      if (monitoringInfoKey(entry.mi).equals(key)) {
+        return entry.committed;
+      }
+    }
+    return false;
   }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   After refactoring the `extract...` methods as suggested in my other comment, 
this helper method is no longer needed and can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to