[ 
https://issues.apache.org/jira/browse/BEAM-4653?focusedWorklogId=125237&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125237
 ]

ASF GitHub Bot logged work on BEAM-4653:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jul/18 20:24
            Start Date: 19/Jul/18 20:24
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #5898: 
[BEAM-4653] Add support to the Java SDK harness to execute timers.
URL: https://github.com/apache/beam/pull/5898#discussion_r203575993
 
 

 ##########
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 ##########
 @@ -433,4 +589,178 @@ public void updateWatermark(Instant watermark) {
       throw new UnsupportedOperationException("TODO: Add support for 
SplittableDoFn");
     }
   }
+
+  /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer 
@OnTimer}. */
+  private class OnTimerContext extends DoFn<InputT, OutputT>.OnTimerContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OnTimerContext() {
+      context.doFn.super();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return currentWindow;
+    }
+
+    @Override
+    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access paneInfo outside of @ProcessElement methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle 
method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, 
OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public InputT element(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Element parameters are not 
supported.");
+    }
+
+    @Override
+    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+      return timestamp();
+    }
+
+    @Override
+    public Row asRow(@Nullable String id) {
+      throw new UnsupportedOperationException(
+          "Cannot access element outside of @ProcessElement method.");
+    }
+
+    @Override
+    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+      return timeDomain();
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.windowedReceiver(this, null);
+    }
+
+    @Override
+    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
+      return DoFnOutputReceivers.rowReceiver(this, null, 
mainOutputSchemaCoder);
+    }
+
+    @Override
+    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
+      return DoFnOutputReceivers.windowedMultiReceiver(this);
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public RestrictionTracker<?, ?> restrictionTracker() {
+      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      StateDeclaration stateDeclaration = 
context.doFnSignature.stateDeclarations().get(stateId);
+      checkNotNull(stateDeclaration, "No state declaration found for %s", 
stateId);
+      StateSpec<?> spec;
+      try {
+        spec = (StateSpec<?>) stateDeclaration.field().get(context.doFn);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+      return spec.bind(stateId, stateAccessor);
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      checkState(
+          currentTimer.getValue() instanceof KV,
+          "Accessing timer in unkeyed context. Current timer is not a KV: %s.",
+          currentTimer);
+
+      return new FnApiTimer(timerId, (WindowedValue) currentTimer);
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return context.pipelineOptions;
+    }
+
+    @Override
+    public void output(OutputT output) {
+      outputTo(
+          mainOutputConsumers,
+          WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, 
PaneInfo.NO_FIRING));
 
 Review comment:
   Currently replicating existing timer output behavior. I do believe we should 
get a copy of the PaneInfo that was on the timer.
   
   @takidau / @relax / @kennknowles Should the PaneInfo get propagated from the 
element that created the timer to the output of that timer when it fires?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 125237)
    Time Spent: 3h 50m  (was: 3h 40m)

> Java SDK harness should support user timers
> -------------------------------------------
>
>                 Key: BEAM-4653
>                 URL: https://issues.apache.org/jira/browse/BEAM-4653
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Wire up the onTimer method in the Java SDK harness FnApiDoFnRunner connecting 
> it to the RemoteGrpcPort read/write that is responsible for 
> producing/consumer timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to