[ 
https://issues.apache.org/jira/browse/BEAM-4653?focusedWorklogId=124689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-124689
 ]

ASF GitHub Bot logged work on BEAM-4653:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/18 23:15
            Start Date: 18/Jul/18 23:15
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #5898: 
[BEAM-4653] Add support to the Java SDK harness to execute timers.
URL: https://github.com/apache/beam/pull/5898#discussion_r203559596
 
 

 ##########
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 ##########
 @@ -505,10 +516,226 @@ public void 
testSideInputIsAccessibleForDownstreamCallers() throws Exception {
     assertEquals(stateData, fakeClient.getData());
   }
 
+  private static class TestTimerfulDoFn extends DoFn<KV<String, String>, 
String> {
+    private static final TupleTag<String> mainOutput = new 
TupleTag<>("mainOutput");
+    private static final TupleTag<String> additionalOutput = new 
TupleTag<>("output");
+
+    @TimerId("event")
+    private final TimerSpec eventTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @TimerId("processing")
+    private final TimerSpec processingTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("main" + context.element().getKey());
+      eventTimeTimer.set(context.timestamp().plus(1L));
+      processingTimeTimer.offset(Duration.millis(2L));
+      processingTimeTimer.setRelative();
+    }
+
+    @OnTimer("event")
+    public void eventTimer(
+        OnTimerContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("event");
+      eventTimeTimer.set(context.timestamp().plus(11L));
+      processingTimeTimer.offset(Duration.millis(12L));
+      processingTimeTimer.setRelative();
+    }
+
+    @OnTimer("processing")
+    public void processingTimer(
+        OnTimerContext context,
+        @TimerId("event") Timer eventTimeTimer,
+        @TimerId("processing") Timer processingTimeTimer) {
+      context.output("processing");
+      eventTimeTimer.set(context.timestamp().plus(21L));
+      processingTimeTimer.offset(Duration.millis(22L));
+      processingTimeTimer.setRelative();
+    }
+  }
+
+  @Test
+  public void testTimers() throws Exception {
+    dateTimeProvider.setDateTimeFixed(10000L);
+
+    Pipeline p = Pipeline.create();
+    PCollection<KV<String, String>> valuePCollection =
+        p.apply(Create.of(KV.of("unused", "unused")));
+    PCollection<String> outputPCollection =
+        valuePCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new 
TestTimerfulDoFn()));
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    sdkComponents.registerEnvironment(Environment.getDefaultInstance());
+    // Note that the pipeline translation for timers creates a loop between 
the ParDo with
+    // the timer and the PCollection for that timer. This loop is unrolled by 
runners
+    // during execution which we redo here manually.
+    RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+    String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
+    String outputPCollectionId = 
sdkComponents.registerPCollection(outputPCollection);
+    String eventTimerInputPCollectionId = 
"pTransformId/ParMultiDo(TestTimerful).event";
 
 Review comment:
   Proto translation should rarely change so I don't expect this to be much of 
an issue but in reality once we have end to end validates runner tests we 
should really delete a lot of the translation/execution tests since validates 
runner tests are much simpler to write and maintain.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 124689)
    Time Spent: 1h 20m  (was: 1h 10m)

> Java SDK harness should support user timers
> -------------------------------------------
>
>                 Key: BEAM-4653
>                 URL: https://issues.apache.org/jira/browse/BEAM-4653
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Wire up the onTimer method in the Java SDK harness FnApiDoFnRunner connecting 
> it to the RemoteGrpcPort read/write that is responsible for 
> producing/consumer timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to