[ 
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=113421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113421
 ]

ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jun/18 22:35
            Start Date: 19/Jun/18 22:35
    Worklog Time Spent: 10m 
      Work Description: bsidhom commented on a change in pull request #5688: 
[BEAM-4285] Implement Flink batch side input handler
URL: https://github.com/apache/beam/pull/5688#discussion_r196599886
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
 ##########
 @@ -54,7 +61,26 @@ private BatchFlinkExecutableStageContext(JobBundleFactory 
jobBundleFactory) {
   @Override
   public StateRequestHandler getStateRequestHandler(
       ExecutableStage executableStage, RuntimeContext runtimeContext) {
-    return FlinkBatchStateRequestHandler.forStage(executableStage, 
runtimeContext);
+    MultimapSideInputHandlerFactory sideInputHandlerFactory =
+        FlinkBatchSideInputHandlerFactory.forStage(executableStage, 
runtimeContext);
+    ExecutableProcessBundleDescriptor processBundleDescriptor;
+    try {
+      // NOTE: We require an executable bundle descriptor for the 
StateRequestHandlers construction
+      // below. This only uses the bundle descriptor for side input specs and 
effectively ignores
+      // data and state endpoints. We rely on the fact that PCollections and 
coders are structurally
+      // identical between instantiations here to prevent having to wire the 
original executable
+      // bundle descriptor here. The correct long-term fix is to move side 
input logic out of
 
 Review comment:
   Basically, the problem is that `ExecutableProcessBundleDescriptor` does not 
do quite what we want right now. It was intended to be an instantiation 
`ExecutableStage` with the gRPC endpoints filled out. All the other logic 
should go into `ExecutableStage` itself. However, we now have side input logic 
and coder instantiation on EPBD in addition to gRPC endpoints. This general 
pipeline logic would be better refactored into `ExecutableStage` because we 
don't have access to the data or state API endpoints here and cannot fill those 
out.
   
   Luckily, this is not a matter of correctness because EPBD construction (at 
least for the component ids that we care about) is deterministic. It's just 
strange to pass in API descriptors where we do not have them and where they 
will not be used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113421)
    Time Spent: 1h 20m  (was: 1h 10m)

> Flink batch state request handler
> ---------------------------------
>
>                 Key: BEAM-4285
>                 URL: https://issues.apache.org/jira/browse/BEAM-4285
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler. 
> As in the non-portable we can start by handling batch side inputs by Flink 
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  or 
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  can be used as a starting point. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to