This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ea1625ab5f3 Support samza portable UDF metrics. (#25265)
ea1625ab5f3 is described below
commit ea1625ab5f3a6b63fa045466f5b93257e82025e0
Author: Katie Liu <[email protected]>
AuthorDate: Wed Feb 8 16:38:14 2023 -0800
Support samza portable UDF metrics. (#25265)
---
.../samza/runtime/SamzaMetricsBundleProgressHandler.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
index 20227db10f7..010ae53455f 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
@@ -90,7 +90,9 @@ class SamzaMetricsBundleProgressHandler implements
BundleProgressHandler {
public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
response.getMonitoringInfosList().stream()
.filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
- .forEach(this::parseAndUpdateMetric);
+ .map(this::parseAndUpdateMetric)
+ .distinct()
+ .forEach(samzaMetricsContainer::updateMetrics);
}
/**
@@ -113,8 +115,9 @@ class SamzaMetricsBundleProgressHandler implements
BundleProgressHandler {
*
* @see
*
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+ * @return the final transformUniqueName for the metric
*/
- private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
+ private String parseAndUpdateMetric(MetricsApi.MonitoringInfo
monitoringInfo) {
String pTransformId =
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM,
stepName);
String transformUniqueName =
transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
@@ -148,7 +151,8 @@ class SamzaMetricsBundleProgressHandler implements
BundleProgressHandler {
break;
default:
- LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+ LOG.debug("Unsupported metric type {}", monitoringInfo.getType());
}
+ return transformUniqueName;
}
}