[ 
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=173076&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173076
 ]

ASF GitHub Bot logged work on BEAM-6138:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Dec/18 21:36
            Start Date: 07/Dec/18 21:36
    Worklog Time Spent: 10m 
      Work Description: ajamato commented on a change in pull request #6799: 
[BEAM-6138] Add User Counter Metric Support to Java SDK
URL: https://github.com/apache/beam/pull/6799#discussion_r239951564
 
 

 ##########
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 ##########
 @@ -514,6 +532,137 @@ public void 
testSideInputIsAccessibleForDownstreamCallers() throws Exception {
     assertEquals(stateData, fakeClient.getData());
   }
 
+  /**
+   * A simple Tuple class for creating a list of ExpectedMetrics using the 
stepName, metricName and
+   * value of the MetricUpdate classes.
+   */
+  @AutoValue
+  public abstract static class ExpectedMetric implements Serializable {
+    static ExpectedMetric create(String stepName, MetricName metricName, long 
value) {
+      return new AutoValue_FnApiDoFnRunnerTest_ExpectedMetric(stepName, 
metricName, value);
+    }
+
+    public abstract String stepName();
+
+    public abstract MetricName metricName();
+
+    public abstract long value();
+  }
+
+  @Test
+  public void testUsingMetrics() throws Exception {
+    MetricsContainerImpl metricsContainer = new 
MetricsContainerImpl("testUsingMetrics");
+    Closeable closeable = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer);
+    FixedWindows windowFn = FixedWindows.of(Duration.millis(1L));
+    IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
+    IntervalWindow windowB = windowFn.assignWindow(new Instant(2L));
+    ByteString encodedWindowA =
+        
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(), 
windowA));
+    ByteString encodedWindowB =
+        
ByteString.copyFrom(CoderUtils.encodeToByteArray(windowFn.windowCoder(), 
windowB));
+
+    Pipeline p = Pipeline.create();
+    PCollection<String> valuePCollection =
+        p.apply(Create.of("unused")).apply(Window.into(windowFn));
+    PCollectionView<Iterable<String>> iterableSideInputView =
+        valuePCollection.apply(View.asIterable());
+    PCollection<Iterable<String>> outputPCollection =
+        valuePCollection.apply(
+            TEST_PTRANSFORM_ID,
+            ParDo.of(new 
TestSideInputIsAccessibleForDownstreamCallersDoFn(iterableSideInputView))
+                .withSideInputs(iterableSideInputView));
+
+    SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
+    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents, 
true);
+    String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
+    String outputPCollectionId = 
sdkComponents.registerPCollection(outputPCollection);
+
+    RunnerApi.PTransform pTransform =
+        pProto
+            .getComponents()
+            .getTransformsOrThrow(
+                pProto
+                    .getComponents()
+                    .getTransformsOrThrow(TEST_PTRANSFORM_ID)
+                    .getSubtransforms(0));
+
+    ImmutableMap<StateKey, ByteString> stateData =
+        ImmutableMap.of(
+            multimapSideInputKey(
+                iterableSideInputView.getTagInternal().getId(), 
ByteString.EMPTY, encodedWindowA),
+            encode("iterableValue1A", "iterableValue2A", "iterableValue3A"),
+            multimapSideInputKey(
+                iterableSideInputView.getTagInternal().getId(), 
ByteString.EMPTY, encodedWindowB),
+            encode("iterableValue1B", "iterableValue2B", "iterableValue3B"));
+
+    FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(stateData);
+
+    List<WindowedValue<Iterable<String>>> mainOutputValues = new ArrayList<>();
+    ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = 
ArrayListMultimap.create();
+    consumers.put(
+        Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) 
mainOutputValues::add);
+    List<ThrowingRunnable> startFunctions = new ArrayList<>();
+    List<ThrowingRunnable> finishFunctions = new ArrayList<>();
+
+    new FnApiDoFnRunner.Factory<>()
+        .createRunnerForPTransform(
+            PipelineOptionsFactory.create(),
+            null /* beamFnDataClient */,
+            fakeClient,
+            TEST_PTRANSFORM_ID,
+            pTransform,
+            Suppliers.ofInstance("57L")::get,
+            pProto.getComponents().getPcollectionsMap(),
+            pProto.getComponents().getCodersMap(),
+            pProto.getComponents().getWindowingStrategiesMap(),
+            consumers,
+            startFunctions::add,
+            finishFunctions::add,
+            null /* splitListener */);
+
+    Iterables.getOnlyElement(startFunctions).run();
+    mainOutputValues.clear();
+
+    assertThat(consumers.keySet(), containsInAnyOrder(inputPCollectionId, 
outputPCollectionId));
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 173076)
    Time Spent: 2h 20m  (was: 2h 10m)

> Add User Metric Support to Java SDK
> -----------------------------------
>
>                 Key: BEAM-6138
>                 URL: https://issues.apache.org/jira/browse/BEAM-6138
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Alex Amato
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to