This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ea1625ab5f3 Support samza portable UDF metrics. (#25265)
ea1625ab5f3 is described below

commit ea1625ab5f3a6b63fa045466f5b93257e82025e0
Author: Katie Liu <[email protected]>
AuthorDate: Wed Feb 8 16:38:14 2023 -0800

    Support samza portable UDF metrics. (#25265)
---
 .../samza/runtime/SamzaMetricsBundleProgressHandler.java       | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
index 20227db10f7..010ae53455f 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
@@ -90,7 +90,9 @@ class SamzaMetricsBundleProgressHandler implements 
BundleProgressHandler {
   public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
     response.getMonitoringInfosList().stream()
         .filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
-        .forEach(this::parseAndUpdateMetric);
+        .map(this::parseAndUpdateMetric)
+        .distinct()
+        .forEach(samzaMetricsContainer::updateMetrics);
   }
 
   /**
@@ -113,8 +115,9 @@ class SamzaMetricsBundleProgressHandler implements 
BundleProgressHandler {
    *
    * @see
    *     
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   * @return the final transformUniqueName for the metric
    */
-  private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
+  private String parseAndUpdateMetric(MetricsApi.MonitoringInfo 
monitoringInfo) {
     String pTransformId =
         
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, 
stepName);
     String transformUniqueName = 
transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
@@ -148,7 +151,8 @@ class SamzaMetricsBundleProgressHandler implements 
BundleProgressHandler {
         break;
 
       default:
-        LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+        LOG.debug("Unsupported metric type {}", monitoringInfo.getType());
     }
+    return transformUniqueName;
   }
 }

Reply via email to