[ 
https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=167326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167326
 ]

ASF GitHub Bot logged work on BEAM-4681:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/18 09:40
            Start Date: 19/Nov/18 09:40
    Worklog Time Spent: 10m 
      Work Description: mxm commented on a change in pull request #7008: 
[BEAM-4681] Add support for portable timers in Flink batch mode 
URL: https://github.com/apache/beam/pull/7008#discussion_r234548677
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##########
 @@ -166,40 +180,126 @@ private StateRequestHandler getStateRequestHandler(
   public void mapPartition(
       Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
       throws Exception {
-    processElements(iterable, collector);
+
+    ReceiverFactory receiverFactory = new ReceiverFactory(collector, 
outputMap);
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+      processElements(iterable, bundle);
+    }
   }
 
-  /** For stateful processing via a GroupReduceFunction. */
+  /** For stateful and timer processing via a GroupReduceFunction. */
   @Override
   public void reduce(Iterable<WindowedValue<InputT>> iterable, 
Collector<RawUnionValue> collector)
       throws Exception {
-    bagUserStateHandlerFactory.resetForNewKey();
-    processElements(iterable, collector);
+
+    // Need to discard the old key's state
+    if (bagUserStateHandlerFactory != null) {
+      bagUserStateHandlerFactory.resetForNewKey();
+    }
+
+    // Used with Batch, we know that all the data is available for this key. 
We can't use the
+    // timer manager from the context because it doesn't exist. So we create 
one and advance
+    // time to the end after processing all elements.
+    final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+    ReceiverFactory receiverFactory =
+        new ReceiverFactory(
+            collector,
+            outputMap,
+            new TimerReceiverFactory(
+                stageBundleFactory,
+                executableStage.getTimers(),
+                
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs(),
+                (WindowedValue timerElement, TimerInternals.TimerData 
timerData) -> {
+                  currentTimerKey = (((KV) timerElement.getValue()).getKey());
+                  timerInternals.setTimer(timerData);
+                },
+                windowCoder));
+
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+      processElements(iterable, bundle);
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    try (RemoteBundle bundle =
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+
+      fireEligibleTimers(
+          timerInternals,
+          (String timerId, WindowedValue timerValue) -> {
+            FnDataReceiver<WindowedValue<?>> fnTimerReceiver =
+                bundle.getInputReceivers().get(timerId);
+            Preconditions.checkNotNull(fnTimerReceiver, "No FnDataReceiver 
found for %s", timerId);
+            try {
+              fnTimerReceiver.accept(timerValue);
+            } catch (Exception e) {
+              throw new RuntimeException(
+                  String.format(Locale.ENGLISH, "Failed to process timer: %s", 
timerValue));
+            }
+          });
+    }
   }
 
-  private void processElements(
-      Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
+  private void processElements(Iterable<WindowedValue<InputT>> iterable, 
RemoteBundle bundle)
       throws Exception {
-    checkState(
-        runtimeContext == getRuntimeContext(),
-        "RuntimeContext changed from under us. State handler invalid.");
-    checkState(
-        stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
-    checkState(
-        stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
+    Preconditions.checkArgument(bundle != null, "RemoteBundle must not be 
null");
+
+    String inputPCollectionId = executableStage.getInputPCollection().getId();
+    FnDataReceiver<WindowedValue<?>> mainReceiver =
+        Preconditions.checkNotNull(
+            bundle.getInputReceivers().get(inputPCollectionId),
+            "Main input receiver for %s could not be initialized",
+            inputPCollectionId);
+    for (WindowedValue<InputT> input : iterable) {
+      mainReceiver.accept(input);
+    }
+  }
 
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            new ReceiverFactory(collector, outputMap), stateRequestHandler, 
progressHandler)) {
-      // TODO(BEAM-4681): Add support to Flink to support portable timers.
-      FnDataReceiver<WindowedValue<?>> receiver =
-          Iterables.getOnlyElement(bundle.getInputReceivers().values());
-      for (WindowedValue<InputT> input : iterable) {
-        receiver.accept(input);
+  private void fireEligibleTimers(
 
 Review comment:
   I find that `fireEligibleTimers` says exactly that: Fire all timers which 
are ready to be fired.
   
   It does not have the semantics of "fire all timers". It only happens to be 
the case because, beforehand, we advance processing and event time to the max 
timestamp.
   
   I'll add a comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 167326)
    Time Spent: 22h 40m  (was: 22.5h)

> Integrate support for timers using the portability APIs into Flink
> ------------------------------------------------------------------
>
>                 Key: BEAM-4681
>                 URL: https://issues.apache.org/jira/browse/BEAM-4681
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Luke Cwik
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: portability, portability-flink
>             Fix For: 2.9.0
>
>          Time Spent: 22h 40m
>  Remaining Estimate: 0h
>
> Consider using the code produced in BEAM-4658 to support timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to