This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cd20288318d Support DoFn metrics in portable Samza Runner (#25068)
cd20288318d is described below
commit cd20288318d4478c1a62a4d18b4989121def3015
Author: Katie Liu <[email protected]>
AuthorDate: Fri Jan 20 17:17:01 2023 -0800
Support DoFn metrics in portable Samza Runner (#25068)
---
CHANGES.md | 1 +
.../runners/samza/runtime/SamzaDoFnRunners.java | 32 +++-
.../runtime/SamzaMetricsBundleProgressHandler.java | 154 +++++++++++++++++
.../SamzaMetricsBundleProgressHandlerTest.java | 187 +++++++++++++++++++++
4 files changed, 368 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 35eee824a6c..8d286fcd3fb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
* Adding override of allowed TLS algorithms (Java), now maintaining the
disabled/legacy algorithms
present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java
versions). This is accompanied
by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11.
+* Add UDF metrics support for Samza portable mode.
## Breaking Changes
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 8af62059de9..6f658262e96 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.samza.runtime;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -27,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
@@ -39,7 +41,7 @@ import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -246,7 +248,8 @@ public class SamzaDoFnRunners {
idToTupleTagMap,
bundledEventsBag,
stateRequestHandler,
- samzaExecutionContext);
+ samzaExecutionContext,
+ executableStage.getTransforms());
return pipelineOptions.getEnableMetrics()
? DoFnRunnerWithMetrics.wrap(
underlyingRunner, executionContext.getMetricsContainer(),
transformFullName)
@@ -270,7 +273,8 @@ public class SamzaDoFnRunners {
private final StateRequestHandler stateRequestHandler;
private final SamzaExecutionContext samzaExecutionContext;
private long startBundleTime;
- private final String metricName;
+ private final String stepName;
+ private final Collection<PipelineNode.PTransformNode> pTransformNodes;
private SdkHarnessDoFnRunner(
SamzaPipelineOptions pipelineOptions,
@@ -282,7 +286,8 @@ public class SamzaDoFnRunners {
Map<String, TupleTag<?>> idToTupleTagMap,
BagState<WindowedValue<InT>> bundledEventsBag,
StateRequestHandler stateRequestHandler,
- SamzaExecutionContext samzaExecutionContext) {
+ SamzaExecutionContext samzaExecutionContext,
+ Collection<PipelineNode.PTransformNode> pTransformNodes) {
this.pipelineOptions = pipelineOptions;
this.timerInternalsFactory = timerInternalsFactory;
this.windowingStrategy = windowingStrategy;
@@ -292,7 +297,8 @@ public class SamzaDoFnRunners {
this.bundledEventsBag = bundledEventsBag;
this.stateRequestHandler = stateRequestHandler;
this.samzaExecutionContext = samzaExecutionContext;
- this.metricName = "ExecutableStage-" + stepName + "-process-ns";
+ this.stepName = stepName;
+ this.pTransformNodes = pTransformNodes;
}
@SuppressWarnings("unchecked")
@@ -324,12 +330,25 @@ public class SamzaDoFnRunners {
final TimerReceiverFactory timerReceiverFactory =
new TimerReceiverFactory(stageBundleFactory,
this::timerDataConsumer, windowCoder);
+ Map<String, String> transformFullNameToUniqueName =
+ pTransformNodes.stream()
+ .collect(
+ Collectors.toMap(
+ pTransformNode -> pTransformNode.getId(),
+ pTransformNode ->
pTransformNode.getTransform().getUniqueName()));
+
+ SamzaMetricsBundleProgressHandler samzaMetricsBundleProgressHandler =
+ new SamzaMetricsBundleProgressHandler(
+ stepName,
+ samzaExecutionContext.getMetricsContainer(),
+ transformFullNameToUniqueName);
+
remoteBundle =
stageBundleFactory.getBundle(
receiverFactory,
timerReceiverFactory,
stateRequestHandler,
- BundleProgressHandler.ignored());
+ samzaMetricsBundleProgressHandler);
startBundleTime = getStartBundleTime();
@@ -396,6 +415,7 @@ public class SamzaDoFnRunners {
final long finishBundleTime = System.nanoTime();
final long averageProcessTime = (finishBundleTime - startBundleTime) /
count;
+ String metricName = "ExecutableStage-" + stepName + "-process-ns";
samzaExecutionContext
.getMetricsContainer()
.updateExecutableStageBundleMetric(metricName, averageProcessTime);
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
new file mode 100644
index 00000000000..20227db10f7
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress
messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+ private final String stepName;
+
+ private final SamzaMetricsContainer samzaMetricsContainer;
+ private final Map<String, String> transformIdToUniqueName;
+
+ /**
+ * Constructor of a SamzaMetricsBundleProgressHandler.
+ *
+ * <p>The full metric names in classic mode is
{transformUniqueName}:{className}:{metricName}. We
+ * attempt to follow the same format in portable mode, but the
monitoringInfos returned by the
+ * worker only contains the transformId. The current solution is to provide
a mapping from
+ * transformId back to uniqueName. A future improvement would be making the
monitoring infos
+ * contain the uniqueName.
+ *
+ * @param stepName Default stepName provided by the runner.
+ * @param samzaMetricsContainer The destination for publishing the metrics.
+ * @param transformIdToUniqueName A mapping from transformId to uniqueName
for pTransforms.
+ */
+ public SamzaMetricsBundleProgressHandler(
+ String stepName,
+ SamzaMetricsContainer samzaMetricsContainer,
+ Map<String, String> transformIdToUniqueName) {
+ this.stepName = stepName;
+ this.samzaMetricsContainer = samzaMetricsContainer;
+ this.transformIdToUniqueName = transformIdToUniqueName;
+ }
+
+ @Override
+ /**
+ * {@inheritDoc} Handles a progress report from the bundle while it is
executing. We choose to
+ * ignore the progress report. The metrics do not have to be updated on
every progress report, so
+ * we save computation resources by ignoring it.
+ */
+ public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+ @Override
+ /**
+ * {@inheritDoc} Handles the bundle's completion report. Parses the
monitoringInfos in the
+ * response, then updates the MetricsRegistry.
+ */
+ public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+ response.getMonitoringInfosList().stream()
+ .filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
+ .forEach(this::parseAndUpdateMetric);
+ }
+
+ /**
+ * Parses the metric contained in monitoringInfo, then publishes the metric
to the
+ * metricContainer.
+ *
+ * <p>We attempt to construct a classic mode metricName
+ * ({transformUniqueName}:{className}:{metricName}). All the info should be
in the labels, but we
+ * have fallbacks in case the labels don't exist.
+ *
+ * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName
using the
+ * transformIdToUniqueName 2. The transformId provided by the monitoringInfo
3. The stepName
+ * provided by the runner, which maybe a result of fusing.
+ *
+ * <p>Priorities for the className 1. The namespace label 2. The
monitoringInfo urn. Copying the
+ * implementation in MonitoringInfoMetricName.
+ *
+ * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo
urn. Copying the
+ * implementation in MonitoringInfoMetricName.
+ *
+ * @see
+ *
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+ */
+ private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
+ String pTransformId =
+
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM,
stepName);
+ String transformUniqueName =
transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+ String className =
+ monitoringInfo.getLabelsOrDefault(
+ MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+ String userMetricName =
+ monitoringInfo.getLabelsOrDefault(
+ MonitoringInfoConstants.Labels.NAME,
monitoringInfo.getLabelsMap().toString());
+
+ MetricsContainer metricsContainer =
samzaMetricsContainer.getContainer(transformUniqueName);
+ MetricName metricName = MetricName.named(className, userMetricName);
+
+ switch (monitoringInfo.getType()) {
+ case SUM_INT64_TYPE:
+ Counter counter = metricsContainer.getCounter(metricName);
+ counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+ break;
+
+ case DISTRIBUTION_INT64_TYPE:
+ Distribution distribution =
metricsContainer.getDistribution(metricName);
+ DistributionData data =
decodeInt64Distribution(monitoringInfo.getPayload());
+ distribution.update(data.sum(), data.count(), data.min(), data.max());
+ break;
+
+ case LATEST_INT64_TYPE:
+ Gauge gauge = metricsContainer.getGauge(metricName);
+ // Gauge doesn't expose update as public. This will reset the
timestamp.
+
+ gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+ break;
+
+ default:
+ LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+ }
+ }
+}
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
new file mode 100644
index 00000000000..20bb53c0e3e
--- /dev/null
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.DistributionCell;
+import org.apache.beam.runners.core.metrics.GaugeCell;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SamzaMetricsBundleProgressHandlerTest {
+
+ public static final String EXPECTED_NAMESPACE = "namespace";
+ public static final String EXPECTED_COUNTER_NAME = "counterName";
+ private MetricsRegistryMap metricsRegistryMap;
+ private SamzaMetricsContainer samzaMetricsContainer;
+
+ private SamzaMetricsBundleProgressHandler samzaMetricsBundleProgressHandler;
+ private String stepName = "stepName";
+ Map<String, String> transformIdToUniqueName = new HashMap<>();
+
+ @Before
+ public void setup() {
+ metricsRegistryMap = new MetricsRegistryMap();
+ samzaMetricsContainer = new SamzaMetricsContainer(metricsRegistryMap);
+ samzaMetricsBundleProgressHandler =
+ new SamzaMetricsBundleProgressHandler(
+ stepName, samzaMetricsContainer, transformIdToUniqueName);
+ }
+
+ @Test
+ public void testCounter() {
+ // Hex for 123
+ byte[] payload = "\173".getBytes(Charset.defaultCharset());
+
+ MetricsApi.MonitoringInfo monitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(SUM_INT64_TYPE)
+ .setPayload(ByteString.copyFrom(payload))
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
EXPECTED_NAMESPACE)
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
EXPECTED_COUNTER_NAME)
+ .build();
+ BeamFnApi.ProcessBundleResponse response =
+
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+ // Execute
+ samzaMetricsBundleProgressHandler.onCompleted(response);
+
+ // Verify
+ MetricName metricName = MetricName.named(EXPECTED_NAMESPACE,
EXPECTED_COUNTER_NAME);
+ CounterCell counter =
+ (CounterCell)
samzaMetricsContainer.getContainer(stepName).getCounter(metricName);
+
+ assertEquals(counter.getCumulative(), (Long) 123L);
+ }
+
+ @Test
+ public void testGauge() {
+ // TimeStamp = 0, Value = 123
+ byte[] payload = "\000\173".getBytes(Charset.defaultCharset());
+
+ MetricsApi.MonitoringInfo monitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(LATEST_INT64_TYPE)
+ .setPayload(ByteString.copyFrom(payload))
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
EXPECTED_NAMESPACE)
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
EXPECTED_COUNTER_NAME)
+ .build();
+ BeamFnApi.ProcessBundleResponse response =
+
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+ // Execute
+ samzaMetricsBundleProgressHandler.onCompleted(response);
+
+ // Verify
+ MetricName metricName = MetricName.named(EXPECTED_NAMESPACE,
EXPECTED_COUNTER_NAME);
+ GaugeCell gauge = (GaugeCell)
samzaMetricsContainer.getContainer(stepName).getGauge(metricName);
+
+ assertEquals(123L, gauge.getCumulative().value());
+ assertTrue(
+
gauge.getCumulative().timestamp().isBefore(Instant.now().plus(Duration.millis(500))));
+ assertTrue(
+
gauge.getCumulative().timestamp().isAfter(Instant.now().minus(Duration.millis(500))));
+ }
+
+ @Test
+ public void testDistribution() {
+ // Count = 123, sum = 124, min = 125, max = 126
+ byte[] payload = "\173\174\175\176".getBytes(Charset.defaultCharset());
+
+ MetricsApi.MonitoringInfo monitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(DISTRIBUTION_INT64_TYPE)
+ .setPayload(ByteString.copyFrom(payload))
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
EXPECTED_NAMESPACE)
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
EXPECTED_COUNTER_NAME)
+ .build();
+ BeamFnApi.ProcessBundleResponse response =
+
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+ // Execute
+ samzaMetricsBundleProgressHandler.onCompleted(response);
+
+ // Verify
+ MetricName metricName = MetricName.named(EXPECTED_NAMESPACE,
EXPECTED_COUNTER_NAME);
+ DistributionCell gauge =
+ (DistributionCell)
samzaMetricsContainer.getContainer(stepName).getDistribution(metricName);
+
+ assertEquals(123L, gauge.getCumulative().count());
+ assertEquals(124L, gauge.getCumulative().sum());
+ assertEquals(125L, gauge.getCumulative().min());
+ assertEquals(126L, gauge.getCumulative().max());
+ }
+
+ @Test
+ public void testEmptyPayload() {
+
+ byte[] emptyPayload = "".getBytes(Charset.defaultCharset());
+
+ MetricsApi.MonitoringInfo emptyMonitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(SUM_INT64_TYPE)
+ .setPayload(ByteString.copyFrom(emptyPayload))
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
EXPECTED_NAMESPACE)
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
EXPECTED_COUNTER_NAME)
+ .build();
+ // Hex for 123
+ byte[] payload = "\173".getBytes(Charset.defaultCharset());
+
+ MetricsApi.MonitoringInfo monitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(SUM_INT64_TYPE)
+ .setPayload(ByteString.copyFrom(payload))
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
EXPECTED_NAMESPACE)
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
EXPECTED_COUNTER_NAME)
+ .build();
+ BeamFnApi.ProcessBundleResponse response =
+ BeamFnApi.ProcessBundleResponse.newBuilder()
+ .addMonitoringInfos(emptyMonitoringInfo)
+ .addMonitoringInfos(monitoringInfo)
+ .addMonitoringInfos(emptyMonitoringInfo)
+ .build();
+
+ // Execute
+ samzaMetricsBundleProgressHandler.onCompleted(response);
+
+ // Verify
+ MetricName metricName = MetricName.named(EXPECTED_NAMESPACE,
EXPECTED_COUNTER_NAME);
+ CounterCell counter =
+ (CounterCell)
samzaMetricsContainer.getContainer(stepName).getCounter(metricName);
+
+ assertEquals(counter.getCumulative(), (Long) 123L);
+ }
+}