This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd20288318d Support DoFn metrics in portable Samza Runner  (#25068)
cd20288318d is described below

commit cd20288318d4478c1a62a4d18b4989121def3015
Author: Katie Liu <[email protected]>
AuthorDate: Fri Jan 20 17:17:01 2023 -0800

    Support DoFn metrics in portable Samza Runner  (#25068)
---
 CHANGES.md                                         |   1 +
 .../runners/samza/runtime/SamzaDoFnRunners.java    |  32 +++-
 .../runtime/SamzaMetricsBundleProgressHandler.java | 154 +++++++++++++++++
 .../SamzaMetricsBundleProgressHandlerTest.java     | 187 +++++++++++++++++++++
 4 files changed, 368 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 35eee824a6c..8d286fcd3fb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 * Adding override of allowed TLS algorithms (Java), now maintaining the 
disabled/legacy algorithms
   present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java 
versions). This is accompanied
   by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11.
+* Add UDF metrics support for Samza portable mode.
 
 ## Breaking Changes
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 8af62059de9..6f658262e96 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -27,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -39,7 +41,7 @@ import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -246,7 +248,8 @@ public class SamzaDoFnRunners {
             idToTupleTagMap,
             bundledEventsBag,
             stateRequestHandler,
-            samzaExecutionContext);
+            samzaExecutionContext,
+            executableStage.getTransforms());
     return pipelineOptions.getEnableMetrics()
         ? DoFnRunnerWithMetrics.wrap(
             underlyingRunner, executionContext.getMetricsContainer(), 
transformFullName)
@@ -270,7 +273,8 @@ public class SamzaDoFnRunners {
     private final StateRequestHandler stateRequestHandler;
     private final SamzaExecutionContext samzaExecutionContext;
     private long startBundleTime;
-    private final String metricName;
+    private final String stepName;
+    private final Collection<PipelineNode.PTransformNode> pTransformNodes;
 
     private SdkHarnessDoFnRunner(
         SamzaPipelineOptions pipelineOptions,
@@ -282,7 +286,8 @@ public class SamzaDoFnRunners {
         Map<String, TupleTag<?>> idToTupleTagMap,
         BagState<WindowedValue<InT>> bundledEventsBag,
         StateRequestHandler stateRequestHandler,
-        SamzaExecutionContext samzaExecutionContext) {
+        SamzaExecutionContext samzaExecutionContext,
+        Collection<PipelineNode.PTransformNode> pTransformNodes) {
       this.pipelineOptions = pipelineOptions;
       this.timerInternalsFactory = timerInternalsFactory;
       this.windowingStrategy = windowingStrategy;
@@ -292,7 +297,8 @@ public class SamzaDoFnRunners {
       this.bundledEventsBag = bundledEventsBag;
       this.stateRequestHandler = stateRequestHandler;
       this.samzaExecutionContext = samzaExecutionContext;
-      this.metricName = "ExecutableStage-" + stepName + "-process-ns";
+      this.stepName = stepName;
+      this.pTransformNodes = pTransformNodes;
     }
 
     @SuppressWarnings("unchecked")
@@ -324,12 +330,25 @@ public class SamzaDoFnRunners {
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, 
this::timerDataConsumer, windowCoder);
 
+        Map<String, String> transformFullNameToUniqueName =
+            pTransformNodes.stream()
+                .collect(
+                    Collectors.toMap(
+                        pTransformNode -> pTransformNode.getId(),
+                        pTransformNode -> 
pTransformNode.getTransform().getUniqueName()));
+
+        SamzaMetricsBundleProgressHandler samzaMetricsBundleProgressHandler =
+            new SamzaMetricsBundleProgressHandler(
+                stepName,
+                samzaExecutionContext.getMetricsContainer(),
+                transformFullNameToUniqueName);
+
         remoteBundle =
             stageBundleFactory.getBundle(
                 receiverFactory,
                 timerReceiverFactory,
                 stateRequestHandler,
-                BundleProgressHandler.ignored());
+                samzaMetricsBundleProgressHandler);
 
         startBundleTime = getStartBundleTime();
 
@@ -396,6 +415,7 @@ public class SamzaDoFnRunners {
       final long finishBundleTime = System.nanoTime();
       final long averageProcessTime = (finishBundleTime - startBundleTime) / 
count;
 
+      String metricName = "ExecutableStage-" + stepName + "-process-ns";
       samzaExecutionContext
           .getMetricsContainer()
           .updateExecutableStageBundleMetric(metricName, averageProcessTime);
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
new file mode 100644
index 00000000000..20227db10f7
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@inheritDoc} Parses metrics information contained in the bundle progress 
messages. Passed the
+ * updated metrics to the provided SamzaMetricsContainer.
+ */
+class SamzaMetricsBundleProgressHandler implements BundleProgressHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamzaMetricsBundleProgressHandler.class);
+  private final String stepName;
+
+  private final SamzaMetricsContainer samzaMetricsContainer;
+  private final Map<String, String> transformIdToUniqueName;
+
+  /**
+   * Constructor of a SamzaMetricsBundleProgressHandler.
+   *
+   * <p>The full metric names in classic mode is 
{transformUniqueName}:{className}:{metricName}. We
+   * attempt to follow the same format in portable mode, but the 
monitoringInfos returned by the
+   * worker only contains the transformId. The current solution is to provide 
a mapping from
+   * transformId back to uniqueName. A future improvement would be making the 
monitoring infos
+   * contain the uniqueName.
+   *
+   * @param stepName Default stepName provided by the runner.
+   * @param samzaMetricsContainer The destination for publishing the metrics.
+   * @param transformIdToUniqueName A mapping from transformId to uniqueName 
for pTransforms.
+   */
+  public SamzaMetricsBundleProgressHandler(
+      String stepName,
+      SamzaMetricsContainer samzaMetricsContainer,
+      Map<String, String> transformIdToUniqueName) {
+    this.stepName = stepName;
+    this.samzaMetricsContainer = samzaMetricsContainer;
+    this.transformIdToUniqueName = transformIdToUniqueName;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc} Handles a progress report from the bundle while it is 
executing. We choose to
+   * ignore the progress report. The metrics do not have to be updated on 
every progress report, so
+   * we save computation resources by ignoring it.
+   */
+  public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
+
+  @Override
+  /**
+   * {@inheritDoc} Handles the bundle's completion report. Parses the 
monitoringInfos in the
+   * response, then updates the MetricsRegistry.
+   */
+  public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
+    response.getMonitoringInfosList().stream()
+        .filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
+        .forEach(this::parseAndUpdateMetric);
+  }
+
+  /**
+   * Parses the metric contained in monitoringInfo, then publishes the metric 
to the
+   * metricContainer.
+   *
+   * <p>We attempt to construct a classic mode metricName
+   * ({transformUniqueName}:{className}:{metricName}). All the info should be 
in the labels, but we
+   * have fallbacks in case the labels don't exist.
+   *
+   * <p>Priorities for the transformUniqueName 1. Obtained transformUniqueName 
using the
+   * transformIdToUniqueName 2. The transformId provided by the monitoringInfo 
3. The stepName
+   * provided by the runner, which maybe a result of fusing.
+   *
+   * <p>Priorities for the className 1. The namespace label 2. The 
monitoringInfo urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * <p>Priorities for the metricName 1. The name label 2. The monitoringInfo 
urn. Copying the
+   * implementation in MonitoringInfoMetricName.
+   *
+   * @see
+   *     
org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
+   */
+  private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
+    String pTransformId =
+        
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, 
stepName);
+    String transformUniqueName = 
transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
+    String className =
+        monitoringInfo.getLabelsOrDefault(
+            MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn());
+    String userMetricName =
+        monitoringInfo.getLabelsOrDefault(
+            MonitoringInfoConstants.Labels.NAME, 
monitoringInfo.getLabelsMap().toString());
+
+    MetricsContainer metricsContainer = 
samzaMetricsContainer.getContainer(transformUniqueName);
+    MetricName metricName = MetricName.named(className, userMetricName);
+
+    switch (monitoringInfo.getType()) {
+      case SUM_INT64_TYPE:
+        Counter counter = metricsContainer.getCounter(metricName);
+        counter.inc(decodeInt64Counter(monitoringInfo.getPayload()));
+        break;
+
+      case DISTRIBUTION_INT64_TYPE:
+        Distribution distribution = 
metricsContainer.getDistribution(metricName);
+        DistributionData data = 
decodeInt64Distribution(monitoringInfo.getPayload());
+        distribution.update(data.sum(), data.count(), data.min(), data.max());
+        break;
+
+      case LATEST_INT64_TYPE:
+        Gauge gauge = metricsContainer.getGauge(metricName);
+        // Gauge doesn't expose update as public. This will reset the 
timestamp.
+
+        gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value());
+        break;
+
+      default:
+        LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
+    }
+  }
+}
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
new file mode 100644
index 00000000000..20bb53c0e3e
--- /dev/null
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandlerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.DistributionCell;
+import org.apache.beam.runners.core.metrics.GaugeCell;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SamzaMetricsBundleProgressHandlerTest {
+
+  public static final String EXPECTED_NAMESPACE = "namespace";
+  public static final String EXPECTED_COUNTER_NAME = "counterName";
+  private MetricsRegistryMap metricsRegistryMap;
+  private SamzaMetricsContainer samzaMetricsContainer;
+
+  private SamzaMetricsBundleProgressHandler samzaMetricsBundleProgressHandler;
+  private String stepName = "stepName";
+  Map<String, String> transformIdToUniqueName = new HashMap<>();
+
+  @Before
+  public void setup() {
+    metricsRegistryMap = new MetricsRegistryMap();
+    samzaMetricsContainer = new SamzaMetricsContainer(metricsRegistryMap);
+    samzaMetricsBundleProgressHandler =
+        new SamzaMetricsBundleProgressHandler(
+            stepName, samzaMetricsContainer, transformIdToUniqueName);
+  }
+
+  @Test
+  public void testCounter() {
+    // Hex for 123
+    byte[] payload = "\173".getBytes(Charset.defaultCharset());
+
+    MetricsApi.MonitoringInfo monitoringInfo =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(SUM_INT64_TYPE)
+            .setPayload(ByteString.copyFrom(payload))
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
EXPECTED_NAMESPACE)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
EXPECTED_COUNTER_NAME)
+            .build();
+    BeamFnApi.ProcessBundleResponse response =
+        
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+    // Execute
+    samzaMetricsBundleProgressHandler.onCompleted(response);
+
+    // Verify
+    MetricName metricName = MetricName.named(EXPECTED_NAMESPACE, 
EXPECTED_COUNTER_NAME);
+    CounterCell counter =
+        (CounterCell) 
samzaMetricsContainer.getContainer(stepName).getCounter(metricName);
+
+    assertEquals(counter.getCumulative(), (Long) 123L);
+  }
+
+  @Test
+  public void testGauge() {
+    // TimeStamp = 0, Value = 123
+    byte[] payload = "\000\173".getBytes(Charset.defaultCharset());
+
+    MetricsApi.MonitoringInfo monitoringInfo =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(LATEST_INT64_TYPE)
+            .setPayload(ByteString.copyFrom(payload))
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
EXPECTED_NAMESPACE)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
EXPECTED_COUNTER_NAME)
+            .build();
+    BeamFnApi.ProcessBundleResponse response =
+        
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+    // Execute
+    samzaMetricsBundleProgressHandler.onCompleted(response);
+
+    // Verify
+    MetricName metricName = MetricName.named(EXPECTED_NAMESPACE, 
EXPECTED_COUNTER_NAME);
+    GaugeCell gauge = (GaugeCell) 
samzaMetricsContainer.getContainer(stepName).getGauge(metricName);
+
+    assertEquals(123L, gauge.getCumulative().value());
+    assertTrue(
+        
gauge.getCumulative().timestamp().isBefore(Instant.now().plus(Duration.millis(500))));
+    assertTrue(
+        
gauge.getCumulative().timestamp().isAfter(Instant.now().minus(Duration.millis(500))));
+  }
+
+  @Test
+  public void testDistribution() {
+    // Count = 123, sum = 124, min = 125, max = 126
+    byte[] payload = "\173\174\175\176".getBytes(Charset.defaultCharset());
+
+    MetricsApi.MonitoringInfo monitoringInfo =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(DISTRIBUTION_INT64_TYPE)
+            .setPayload(ByteString.copyFrom(payload))
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
EXPECTED_NAMESPACE)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
EXPECTED_COUNTER_NAME)
+            .build();
+    BeamFnApi.ProcessBundleResponse response =
+        
BeamFnApi.ProcessBundleResponse.newBuilder().addMonitoringInfos(monitoringInfo).build();
+
+    // Execute
+    samzaMetricsBundleProgressHandler.onCompleted(response);
+
+    // Verify
+    MetricName metricName = MetricName.named(EXPECTED_NAMESPACE, 
EXPECTED_COUNTER_NAME);
+    DistributionCell gauge =
+        (DistributionCell) 
samzaMetricsContainer.getContainer(stepName).getDistribution(metricName);
+
+    assertEquals(123L, gauge.getCumulative().count());
+    assertEquals(124L, gauge.getCumulative().sum());
+    assertEquals(125L, gauge.getCumulative().min());
+    assertEquals(126L, gauge.getCumulative().max());
+  }
+
+  @Test
+  public void testEmptyPayload() {
+
+    byte[] emptyPayload = "".getBytes(Charset.defaultCharset());
+
+    MetricsApi.MonitoringInfo emptyMonitoringInfo =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(SUM_INT64_TYPE)
+            .setPayload(ByteString.copyFrom(emptyPayload))
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
EXPECTED_NAMESPACE)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
EXPECTED_COUNTER_NAME)
+            .build();
+    // Hex for 123
+    byte[] payload = "\173".getBytes(Charset.defaultCharset());
+
+    MetricsApi.MonitoringInfo monitoringInfo =
+        MetricsApi.MonitoringInfo.newBuilder()
+            .setType(SUM_INT64_TYPE)
+            .setPayload(ByteString.copyFrom(payload))
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
EXPECTED_NAMESPACE)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
EXPECTED_COUNTER_NAME)
+            .build();
+    BeamFnApi.ProcessBundleResponse response =
+        BeamFnApi.ProcessBundleResponse.newBuilder()
+            .addMonitoringInfos(emptyMonitoringInfo)
+            .addMonitoringInfos(monitoringInfo)
+            .addMonitoringInfos(emptyMonitoringInfo)
+            .build();
+
+    // Execute
+    samzaMetricsBundleProgressHandler.onCompleted(response);
+
+    // Verify
+    MetricName metricName = MetricName.named(EXPECTED_NAMESPACE, 
EXPECTED_COUNTER_NAME);
+    CounterCell counter =
+        (CounterCell) 
samzaMetricsContainer.getContainer(stepName).getCounter(metricName);
+
+    assertEquals(counter.getCumulative(), (Long) 123L);
+  }
+}

Reply via email to