[ 
https://issues.apache.org/jira/browse/BEAM-6233?focusedWorklogId=179214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-179214
 ]

ASF GitHub Bot logged work on BEAM-6233:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Dec/18 19:59
            Start Date: 27/Dec/18 19:59
    Worklog Time Spent: 10m 
      Work Description: Ardagan commented on pull request #7330: [BEAM-6233]: 
Add initial user timer support in Dataflow for batch pipelines
URL: https://github.com/apache/beam/pull/7330#discussion_r244215221
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
 ##########
 @@ -100,6 +197,75 @@ public void finish() throws Exception {
       } catch (Exception e) {
         throw new RuntimeException("Failed to finish remote bundle", e);
       }
+
+      // TODO(BEAM-6274): do we have to put this in the "start" method as well?
+      try (RemoteBundle bundle =
+          stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
+
+        // TODO(BEAM-6274): Why do we need to namespace this to "user"?
+        DataflowExecutionContext.DataflowStepContext stepContext =
+            executionContext
+                .getStepContext((DataflowOperationContext) this.context)
+                .namespacedToUser();
+
+        // TODO(BEAM-6274): investigate if this is the correct window
+        TimerInternals.TimerData timerData =
+            stepContext.getNextFiredTimer(GlobalWindow.Coder.INSTANCE);
+        while (timerData != null) {
+          LOG.debug("Found fired timer in start {}", timerData);
+
+          // TODO(BEAM-6274): get the correct payload and payload coder
+          StateNamespaces.WindowNamespace windowNamespace =
+              (StateNamespaces.WindowNamespace) timerData.getNamespace();
+          BoundedWindow window = windowNamespace.getWindow();
+
+          WindowedValue<KV<Object, Timer>> timerValue =
+              WindowedValue.of(
+                  KV.of(
+                      timerIdToKey.get(timerData.getTimerId()),
+                      Timer.of(timerData.getTimestamp(), new byte[0])),
+                  timerData.getTimestamp(),
+                  Collections.singleton(window),
+                  PaneInfo.NO_FIRING);
+
+          String mainInputId =
+              
timerIdToTimerSpecMap.get(timerData.getTimerId()).inputCollectionId();
+
+          bundle.getInputReceivers().get(mainInputId).accept(timerValue);
+
+          // TODO(BEAM-6274): investigate if this is the correct window
+          timerData = 
stepContext.getNextFiredTimer(GlobalWindow.Coder.INSTANCE);
+        }
+      }
+    }
+  }
+
+  private void receive(String pCollectionId, Object receivedElement) throws 
Exception {
+    LOG.debug("Received element {} for pcollection {}", receivedElement, 
pCollectionId);
+    // TODO(BEAM-6274): move this out into its own receiver class
 
 Review comment:
   I'm not sure if relevant ticket covers it based on description, skip if it 
is.
   I feel that it is possible to move most of code that is added to a separate 
class with clear responsibility of tracking time.
   It would be really great if we can do it. 
   It should simplify unittesting and add cleaner boundary between 
functionality.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 179214)
    Time Spent: 2h 20m  (was: 2h 10m)

> Make bundle execution with ExecutableStage support timer/states
> ---------------------------------------------------------------
>
>                 Key: BEAM-6233
>                 URL: https://issues.apache.org/jira/browse/BEAM-6233
>             Project: Beam
>          Issue Type: Task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Sam Rohde
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to