[
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=170995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170995
]
ASF GitHub Bot logged work on BEAM-6138:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Nov/18 06:22
Start Date: 30/Nov/18 06:22
Worklog Time Spent: 10m
Work Description: Ardagan commented on a change in pull request #6799:
[BEAM-6138] Add User Counter Metric Support to Java SDK
URL: https://github.com/apache/beam/pull/6799#discussion_r237756846
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -514,6 +532,137 @@ public void
testSideInputIsAccessibleForDownstreamCallers() throws Exception {
assertEquals(stateData, fakeClient.getData());
}
+ /**
+ * A simple Tuple class for creating a list of ExpectedMetrics using the
stepName, metricName and
+ * value of the MetricUpdate classes.
+ */
+ @AutoValue
+ public abstract static class ExpectedMetric implements Serializable {
+ static ExpectedMetric create(String stepName, MetricName metricName, long
value) {
+ return new AutoValue_FnApiDoFnRunnerTest_ExpectedMetric(stepName,
metricName, value);
+ }
+
+ public abstract String stepName();
+
+ public abstract MetricName metricName();
+
+ public abstract long value();
+ }
+
+ @Test
+ public void testUsingMetrics() throws Exception {
+ MetricsContainerImpl metricsContainer = new
MetricsContainerImpl("testUsingMetrics");
+ Closeable closeable =
MetricsEnvironment.scopedMetricsContainer(metricsContainer);
+ FixedWindows windowFn = FixedWindows.of(Duration.millis(1L));
+ IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
+ IntervalWindow windowB = windowFn.assignWindow(new Instant(2L));
+ ByteString encodedWindowA =
+
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(),
windowA));
+ ByteString encodedWindowB =
+
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(),
windowB));
+
+ Pipeline p = Pipeline.create();
+ PCollection<String> valuePCollection =
+ p.apply(Create.of("unused")).apply(Window.into(windowFn));
+ PCollectionView<Iterable<String>> iterableSideInputView =
+ valuePCollection.apply(View.asIterable());
+ PCollection<Iterable<String>> outputPCollection =
+ valuePCollection.apply(
+ TEST_PTRANSFORM_ID,
+ ParDo.of(new
TestSideInputIsAccessibleForDownstreamCallersDoFn(iterableSideInputView))
+ .withSideInputs(iterableSideInputView));
+
+ SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
+ RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents,
true);
+ String inputPCollectionId =
sdkComponents.registerPCollection(valuePCollection);
+ String outputPCollectionId =
sdkComponents.registerPCollection(outputPCollection);
+
+ RunnerApi.PTransform pTransform =
+ pProto
+ .getComponents()
+ .getTransformsOrThrow(
+ pProto
+ .getComponents()
+ .getTransformsOrThrow(TEST_PTRANSFORM_ID)
+ .getSubtransforms(0));
+
+ ImmutableMap<StateKey, ByteString> stateData =
+ ImmutableMap.of(
+ multimapSideInputKey(
+ iterableSideInputView.getTagInternal().getId(),
ByteString.EMPTY, encodedWindowA),
+ encode("iterableValue1A", "iterableValue2A", "iterableValue3A"),
+ multimapSideInputKey(
+ iterableSideInputView.getTagInternal().getId(),
ByteString.EMPTY, encodedWindowB),
+ encode("iterableValue1B", "iterableValue2B", "iterableValue3B"));
+
+ FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(stateData);
+
+ List<WindowedValue<Iterable<String>>> mainOutputValues = new ArrayList<>();
+ ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers =
ArrayListMultimap.create();
+ consumers.put(
+ Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
+ (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>)
mainOutputValues::add);
+ List<ThrowingRunnable> startFunctions = new ArrayList<>();
+ List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+ new FnApiDoFnRunner.Factory<>()
+ .createRunnerForPTransform(
+ PipelineOptionsFactory.create(),
+ null /* beamFnDataClient */,
+ fakeClient,
+ TEST_PTRANSFORM_ID,
+ pTransform,
+ Suppliers.ofInstance("57L")::get,
+ pProto.getComponents().getPcollectionsMap(),
+ pProto.getComponents().getCodersMap(),
+ pProto.getComponents().getWindowingStrategiesMap(),
+ consumers,
+ startFunctions::add,
+ finishFunctions::add,
+ null /* splitListener */);
+
+ Iterables.getOnlyElement(startFunctions).run();
+ mainOutputValues.clear();
+
+ assertThat(consumers.keySet(), containsInAnyOrder(inputPCollectionId,
outputPCollectionId));
Review comment:
We are testing metrics here, so we can skip other check.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 170995)
Time Spent: 50m (was: 40m)
> Add User Metric Support to Java SDK
> -----------------------------------
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
> Issue Type: New Feature
> Components: java-fn-execution
> Reporter: Alex Amato
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)