[ 
https://issues.apache.org/jira/browse/BEAM-6233?focusedWorklogId=178811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178811
 ]

ASF GitHub Bot logged work on BEAM-6233:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Dec/18 23:26
            Start Date: 26/Dec/18 23:26
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on pull request #7330: [BEAM-6233]: 
Add initial user timer support in Dataflow for batch pipelines
URL: https://github.com/apache/beam/pull/7330#discussion_r244060686
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
 ##########
 @@ -48,25 +61,98 @@
       new OutputReceiverFactory() {
         @Override
         public FnDataReceiver<?> create(String pCollectionId) {
-          return receivedElement -> {
-            LOG.debug("Consume element {}", receivedElement);
-            outputReceiverMap.get(pCollectionId).process((WindowedValue<?>) 
receivedElement);
-          };
+          return receivedElement -> receive(pCollectionId, receivedElement);
         }
       };
   private final StateRequestHandler stateRequestHandler;
   private final BundleProgressHandler progressHandler;
   private RemoteBundle remoteBundle;
+  private final DataflowExecutionContext<?> executionContext;
+  private final Map<String, ProcessBundleDescriptors.TimerSpec> 
timerOutputIdToSpecMap;
+  private final Map<String, Coder<BoundedWindow>> timerWindowCodersMap;
+  private final Map<String, ProcessBundleDescriptors.TimerSpec> 
timerIdToTimerSpecMap;
+  private final Map<String, Object> timerIdToKey;
+  private ExecutableStage executableStage;
 
   public ProcessRemoteBundleOperation(
-      OperationContext context,
+      ExecutableStage executableStage,
+      DataflowExecutionContext<?> executionContext,
+      DataflowOperationContext operationContext,
       StageBundleFactory stageBundleFactory,
       Map<String, OutputReceiver> outputReceiverMap) {
-    super(EMPTY_RECEIVER_ARRAY, context);
+    super(EMPTY_RECEIVER_ARRAY, operationContext);
+
     this.stageBundleFactory = stageBundleFactory;
-    this.outputReceiverMap = outputReceiverMap;
     this.stateRequestHandler = StateRequestHandler.unsupported();
     this.progressHandler = BundleProgressHandler.ignored();
+    this.executionContext = executionContext;
+    this.timerOutputIdToSpecMap = new HashMap<>();
+    this.timerWindowCodersMap = new HashMap<>();
+    this.executableStage = executableStage;
+    this.timerIdToKey = new HashMap<>();
+    this.outputReceiverMap = outputReceiverMap;
+    timerIdToTimerSpecMap = new HashMap<>();
+
+    for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap :
+        
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
+      for (ProcessBundleDescriptors.TimerSpec timerSpec : 
transformTimerMap.values()) {
+        timerIdToTimerSpecMap.put(timerSpec.timerId(), timerSpec);
+      }
+    }
+
+    for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap :
+        
stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
+      for (ProcessBundleDescriptors.TimerSpec timerSpec : 
transformTimerMap.values()) {
+        timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
+      }
+    }
+
+    for (RunnerApi.PTransform pTransform :
+        stageBundleFactory
+            .getProcessBundleDescriptor()
+            .getProcessBundleDescriptor()
+            .getTransformsMap()
+            .values()) {
+      for (String timerId : timerIdToTimerSpecMap.keySet()) {
+        if (!pTransform.getInputsMap().containsKey(timerId)) {
+          continue;
+        }
+
+        String timerPCollectionId = pTransform.getInputsMap().get(timerId);
+        RunnerApi.PCollection timerPCollection =
+            stageBundleFactory
+                .getProcessBundleDescriptor()
+                .getProcessBundleDescriptor()
+                .getPcollectionsMap()
+                .get(timerPCollectionId);
+
+        String windowingStrategyId = timerPCollection.getWindowingStrategyId();
+        RunnerApi.WindowingStrategy windowingStrategy =
+            stageBundleFactory
+                .getProcessBundleDescriptor()
+                .getProcessBundleDescriptor()
+                .getWindowingStrategiesMap()
+                .get(windowingStrategyId);
+
+        String windowingCoderId = windowingStrategy.getWindowCoderId();
+        RunnerApi.Coder windowingCoder =
+            stageBundleFactory
+                .getProcessBundleDescriptor()
+                .getProcessBundleDescriptor()
+                .getCodersMap()
+                .get(windowingCoderId);
+
+        RehydratedComponents components =
+            
RehydratedComponents.forComponents(executableStage.getComponents());
+        try {
+          timerWindowCodersMap.put(
+              timerId,
+              (Coder<BoundedWindow>) 
CoderTranslation.fromProto(windowingCoder, components));
+        } catch (IOException e) {
+          LOG.error(e.getMessage());
 
 Review comment:
   Feel like it's better to make this error msg more detailed, e.g. "Failed to 
translate what window coder into proto" + e.getMessage(). But it's totally up 
to you as long as it's easy to pin point if this error gets thrown.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 178811)
    Time Spent: 1h 20m  (was: 1h 10m)

> Make bundle execution with ExecutableStage support timer/states
> ---------------------------------------------------------------
>
>                 Key: BEAM-6233
>                 URL: https://issues.apache.org/jira/browse/BEAM-6233
>             Project: Beam
>          Issue Type: Task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Sam Rohde
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to