[
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=174981&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174981
]
ASF GitHub Bot logged work on BEAM-6138:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Dec/18 18:10
Start Date: 13/Dec/18 18:10
Worklog Time Spent: 10m
Work Description: swegner closed pull request #6799: [BEAM-6138] Add User
Counter Metric Support to Java SDK
URL: https://github.com/apache/beam/pull/6799
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 95bfa7438c25..9147919cc7d6 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -21,8 +21,10 @@
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.annotations.Experimental;
@@ -136,6 +138,23 @@ public MetricUpdates getUpdates() {
extractUpdates(counters), extractUpdates(distributions),
extractUpdates(gauges));
}
+ /** Return the cumulative values for any metrics in this container as
MonitoringInfos. */
+ public Iterable<MonitoringInfo> getMonitoringInfos() {
+ // Extract user metrics and store as MonitoringInfos.
+ ArrayList<MonitoringInfo> monitoringInfos = new
ArrayList<MonitoringInfo>();
+ MetricUpdates mus = this.getUpdates();
+
+ for (MetricUpdate<Long> mu : mus.counterUpdates()) {
+ SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder(true);
+ builder.setUrnForUserMetric(
+ mu.getKey().metricName().getNamespace(),
mu.getKey().metricName().getName());
+ builder.setInt64Value(mu.getUpdate());
+ builder.setTimestampToNow();
+ monitoringInfos.add(builder.build());
+ }
+ return monitoringInfos;
+ }
+
private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>>
cells) {
for (MetricCell<?> cell : cells.values()) {
cell.getDirty().afterCommit();
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index f53257fe07f6..1fdf6dba05a5 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -50,12 +50,16 @@
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
@@ -83,6 +87,7 @@
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
@@ -109,16 +114,20 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
+import org.hamcrest.CoreMatchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the execution of a pipeline from specification time to executing a
single fused stage,
@@ -128,6 +137,8 @@
public class RemoteExecutionTest implements Serializable {
@Rule public transient ResetDateTimeProvider resetDateTimeProvider = new
ResetDateTimeProvider();
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteExecutionTest.class);
+
private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
private transient GrpcFnServer<GrpcDataService> dataServer;
private transient GrpcFnServer<GrpcStateService> stateServer;
@@ -486,6 +497,124 @@ public void processElement(ProcessContext context) {
}
}
+ @Test
+ public void testMetrics() throws Exception {
+ final String counterMetricName = "counterMetric";
+ Pipeline p = Pipeline.create();
+ PCollection<String> input =
+ p.apply("impulse", Impulse.create())
+ .apply(
+ "create",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ Metrics.counter(RemoteExecutionTest.class,
counterMetricName).inc();
+ }
+ }))
+ .setCoder(StringUtf8Coder.of());
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+ Optional<ExecutableStage> optionalStage =
+ Iterables.tryFind(fused.getFusedStages(), (ExecutableStage stage) ->
true);
+ checkState(optionalStage.isPresent(), "Expected a stage with side
inputs.");
+ ExecutableStage stage = optionalStage.get();
+
+ ExecutableProcessBundleDescriptor descriptor =
+ ProcessBundleDescriptors.fromExecutableStage(
+ "test_stage",
+ stage,
+ dataServer.getApiServiceDescriptor(),
+ stateServer.getApiServiceDescriptor());
+
+ BundleProcessor processor =
+ controlClient.getProcessor(
+ descriptor.getProcessBundleDescriptor(),
+ descriptor.getRemoteInputDestinations(),
+ stateDelegator);
+
+ Map<Target, Coder<WindowedValue<?>>> outputTargets =
descriptor.getOutputTargetCoders();
+ Map<Target, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
+ Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+ for (Entry<Target, Coder<WindowedValue<?>>> targetCoder :
outputTargets.entrySet()) {
+ List<WindowedValue<?>> outputContents = Collections.synchronizedList(new
ArrayList<>());
+ outputValues.put(targetCoder.getKey(), outputContents);
+ outputReceivers.put(
+ targetCoder.getKey(),
+ RemoteOutputReceiver.of(targetCoder.getValue(),
outputContents::add));
+ }
+
+ Iterable<byte[]> sideInputData =
+ Arrays.asList(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
+
+ StateRequestHandler stateRequestHandler =
+ StateRequestHandlers.forSideInputHandlerFactory(
+ descriptor.getSideInputSpecs(),
+ new SideInputHandlerFactory() {
+ @Override
+ public <T, V, W extends BoundedWindow> SideInputHandler<V, W>
forSideInput(
+ String pTransformId,
+ String sideInputId,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
+ Coder<W> windowCoder) {
+ return new SideInputHandler<V, W>() {
+ @Override
+ public Iterable<V> get(byte[] key, W window) {
+ return (Iterable) sideInputData;
+ }
+
+ @Override
+ public Coder<V> resultCoder() {
+ return ((KvCoder) elementCoder).getValueCoder();
+ }
+ };
+ }
+ });
+
+ BundleProgressHandler progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse progress) {}
+
+ @Override
+ public void onCompleted(ProcessBundleResponse response) {
+ // Assert the timestamps are non empty then 0 them out before
comparing.
+ List<MonitoringInfo> actualMIs = new ArrayList<>();
+ for (MonitoringInfo mi : response.getMonitoringInfosList()) {
+ MonitoringInfo.Builder builder = MonitoringInfo.newBuilder();
+ Assert.assertTrue(mi.getTimestamp().getSeconds() > 0);
+ builder.mergeFrom(mi);
+ builder.clearTimestamp();
+ actualMIs.add(builder.build());
+ }
+
+ SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder();
+ builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(),
counterMetricName);
+ builder.setInt64Value(2);
+ MonitoringInfo expectedCounter = builder.build();
+
+ assertThat(actualMIs, CoreMatchers.hasItems(expectedCounter));
+ }
+ };
+
+ try (ActiveBundle bundle =
+ processor.newBundle(outputReceivers, stateRequestHandler,
progressHandler)) {
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
+ .accept(
+ WindowedValue.valueInGlobalWindow(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
+ .accept(
+ WindowedValue.valueInGlobalWindow(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
+ }
+ }
+
@Test
public void testExecutionWithUserState() throws Exception {
Pipeline p = Pipeline.create();
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index e22a6c2b1c39..3018343c56dc 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -26,6 +26,7 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -47,6 +48,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
@@ -60,8 +62,10 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -292,21 +296,29 @@ private void
createRunnerAndConsumersForPTransformRecursively(
splitListener);
}
- // Already in reverse topological order so we don't need to do anything.
- for (ThrowingRunnable startFunction : startFunctions) {
- LOG.debug("Starting function {}", startFunction);
- startFunction.run();
- }
+ MetricsContainerImpl metricsContainer = new
MetricsContainerImpl(request.getInstructionId());
+ try (Closeable closeable =
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+
+ // Already in reverse topological order so we don't need to do
anything.
+ for (ThrowingRunnable startFunction : startFunctions) {
+ LOG.debug("Starting function {}", startFunction);
+ startFunction.run();
+ }
- queueingClient.drainAndBlock();
+ queueingClient.drainAndBlock();
- // Need to reverse this since we want to call finish in topological
order.
- for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
- LOG.debug("Finishing function {}", finishFunction);
- finishFunction.run();
- }
- if (!allResiduals.isEmpty()) {
- response.addAllResidualRoots(allResiduals.values());
+ // Need to reverse this since we want to call finish in topological
order.
+ for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions))
{
+ LOG.debug("Finishing function {}", finishFunction);
+ finishFunction.run();
+ }
+ if (!allResiduals.isEmpty()) {
+ response.addAllResidualRoots(allResiduals.values());
+ }
+
+ for (MonitoringInfo mi : metricsContainer.getMonitoringInfos()) {
+ response.addMonitoringInfos(mi);
+ }
}
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 9149edb8f928..5d8c41317fc9 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -27,11 +27,13 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import com.google.auto.value.AutoValue;
import com.google.common.base.Suppliers;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
+import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -44,10 +46,18 @@
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
@@ -86,6 +96,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Tests for {@link FnApiDoFnRunner}. */
@RunWith(JUnit4.class)
@@ -93,6 +105,8 @@
@Rule public transient ResetDateTimeProvider dateTimeProvider = new
ResetDateTimeProvider();
+ private static final Logger LOG =
LoggerFactory.getLogger(FnApiDoFnRunnerTest.class);
+
public static final String TEST_PTRANSFORM_ID = "pTransformId";
private static class ConcatCombineFn extends CombineFn<String, String,
String> {
@@ -408,6 +422,9 @@ public void testBasicWithSideInputsAndOutputs() throws
Exception {
private static class TestSideInputIsAccessibleForDownstreamCallersDoFn
extends DoFn<String, Iterable<String>> {
+ private final Counter countedElements =
+
Metrics.counter(TestSideInputIsAccessibleForDownstreamCallersDoFn.class,
"countedElems");
+
private final PCollectionView<Iterable<String>> iterableSideInput;
private TestSideInputIsAccessibleForDownstreamCallersDoFn(
@@ -417,6 +434,7 @@ private TestSideInputIsAccessibleForDownstreamCallersDoFn(
@ProcessElement
public void processElement(ProcessContext context) {
+ countedElements.inc();
context.output(context.sideInput(iterableSideInput));
}
}
@@ -514,6 +532,124 @@ public void
testSideInputIsAccessibleForDownstreamCallers() throws Exception {
assertEquals(stateData, fakeClient.getData());
}
+ /**
+ * A simple Tuple class for creating a list of ExpectedMetrics using the
stepName, metricName and
+ * value of the MetricUpdate classes.
+ */
+ @AutoValue
+ public abstract static class ExpectedMetric implements Serializable {
+ static ExpectedMetric create(String stepName, MetricName metricName, long
value) {
+ return new AutoValue_FnApiDoFnRunnerTest_ExpectedMetric(stepName,
metricName, value);
+ }
+
+ public abstract String stepName();
+
+ public abstract MetricName metricName();
+
+ public abstract long value();
+ }
+
+ @Test
+ public void testUsingMetrics() throws Exception {
+ MetricsContainerImpl metricsContainer = new
MetricsContainerImpl("testUsingMetrics");
+ Closeable closeable =
MetricsEnvironment.scopedMetricsContainer(metricsContainer);
+ FixedWindows windowFn = FixedWindows.of(Duration.millis(1L));
+ IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
+ IntervalWindow windowB = windowFn.assignWindow(new Instant(2L));
+ ByteString encodedWindowA =
+
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(),
windowA));
+ ByteString encodedWindowB =
+
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(),
windowB));
+
+ Pipeline p = Pipeline.create();
+ PCollection<String> valuePCollection =
+ p.apply(Create.of("unused")).apply(Window.into(windowFn));
+ PCollectionView<Iterable<String>> iterableSideInputView =
+ valuePCollection.apply(View.asIterable());
+ PCollection<Iterable<String>> outputPCollection =
+ valuePCollection.apply(
+ TEST_PTRANSFORM_ID,
+ ParDo.of(new
TestSideInputIsAccessibleForDownstreamCallersDoFn(iterableSideInputView))
+ .withSideInputs(iterableSideInputView));
+
+ SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
+ RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents,
true);
+ String inputPCollectionId =
sdkComponents.registerPCollection(valuePCollection);
+
+ RunnerApi.PTransform pTransform =
+ pProto
+ .getComponents()
+ .getTransformsOrThrow(
+ pProto
+ .getComponents()
+ .getTransformsOrThrow(TEST_PTRANSFORM_ID)
+ .getSubtransforms(0));
+
+ ImmutableMap<StateKey, ByteString> stateData =
+ ImmutableMap.of(
+ multimapSideInputKey(
+ iterableSideInputView.getTagInternal().getId(),
ByteString.EMPTY, encodedWindowA),
+ encode("iterableValue1A", "iterableValue2A", "iterableValue3A"),
+ multimapSideInputKey(
+ iterableSideInputView.getTagInternal().getId(),
ByteString.EMPTY, encodedWindowB),
+ encode("iterableValue1B", "iterableValue2B", "iterableValue3B"));
+
+ FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(stateData);
+
+ List<WindowedValue<Iterable<String>>> mainOutputValues = new ArrayList<>();
+ ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers =
ArrayListMultimap.create();
+ consumers.put(
+ Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
+ (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>)
mainOutputValues::add);
+ List<ThrowingRunnable> startFunctions = new ArrayList<>();
+ List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+ new FnApiDoFnRunner.Factory<>()
+ .createRunnerForPTransform(
+ PipelineOptionsFactory.create(),
+ null /* beamFnDataClient */,
+ fakeClient,
+ TEST_PTRANSFORM_ID,
+ pTransform,
+ Suppliers.ofInstance("57L")::get,
+ pProto.getComponents().getPcollectionsMap(),
+ pProto.getComponents().getCodersMap(),
+ pProto.getComponents().getWindowingStrategiesMap(),
+ consumers,
+ startFunctions::add,
+ finishFunctions::add,
+ null /* splitListener */);
+
+ Iterables.getOnlyElement(startFunctions).run();
+ mainOutputValues.clear();
+
+ // Ensure that bag user state that is initially empty or populated works.
+ // Ensure that the bagUserStateKey order does not matter when we traverse
over KV pairs.
+ FnDataReceiver<WindowedValue<?>> mainInput =
+ Iterables.getOnlyElement(consumers.get(inputPCollectionId));
+ mainInput.accept(valueInWindow("X", windowA));
+ mainInput.accept(valueInWindow("Y", windowB));
+
+ MetricsContainer mc = MetricsEnvironment.getCurrentContainer();
+ MetricName metricName =
+
MetricName.named(TestSideInputIsAccessibleForDownstreamCallersDoFn.class,
"countedElems");
+ List<ExpectedMetric> expectedMetrics = new ArrayList<ExpectedMetric>();
+ expectedMetrics.add(ExpectedMetric.create("testUsingMetrics", metricName,
2));
+
+ closeable.close();
+ MetricUpdates updates = metricsContainer.getUpdates();
+
+ // Validate MetricUpdates
+ int i = 0;
+ for (MetricUpdate mu : updates.counterUpdates()) {
+ assertEquals(expectedMetrics.get(i).metricName(),
mu.getKey().metricName());
+ assertEquals(expectedMetrics.get(i).stepName(), mu.getKey().stepName());
+ assertEquals(expectedMetrics.get(i).value(), mu.getUpdate());
+ i++;
+ }
+ assertEquals(1, i); // Validate the length.
+ }
+
private static class TestTimerfulDoFn extends DoFn<KV<String, String>,
String> {
@StateId("bag")
private final StateSpec<BagState<String>> bagStateSpec =
StateSpecs.bag(StringUtf8Coder.of());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 174981)
Time Spent: 5h 20m (was: 5h 10m)
> Add User Metric Support to Java SDK
> -----------------------------------
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
> Issue Type: New Feature
> Components: java-fn-execution
> Reporter: Alex Amato
> Assignee: Alex Amato
> Priority: Major
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)