[
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=113408&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113408
]
ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jun/18 22:14
Start Date: 19/Jun/18 22:14
Worklog Time Spent: 10m
Work Description: jkff commented on a change in pull request #5688:
[BEAM-4285] Implement Flink batch side input handler
URL: https://github.com/apache/beam/pull/5688#discussion_r196595956
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
##########
@@ -54,7 +61,26 @@ private BatchFlinkExecutableStageContext(JobBundleFactory
jobBundleFactory) {
@Override
public StateRequestHandler getStateRequestHandler(
ExecutableStage executableStage, RuntimeContext runtimeContext) {
- return FlinkBatchStateRequestHandler.forStage(executableStage,
runtimeContext);
+ MultimapSideInputHandlerFactory sideInputHandlerFactory =
+ FlinkBatchSideInputHandlerFactory.forStage(executableStage,
runtimeContext);
+ ExecutableProcessBundleDescriptor processBundleDescriptor;
+ try {
+ // NOTE: We require an executable bundle descriptor for the
StateRequestHandlers construction
+ // below. This only uses the bundle descriptor for side input specs and
effectively ignores
+ // data and state endpoints. We rely on the fact that PCollections and
coders are structurally
+ // identical between instantiations here to prevent having to wire the
original executable
+ // bundle descriptor here. The correct long-term fix is to move side
input logic out of
Review comment:
I don't quite understand this note. What is the problem with the current
code, and which side input logic are you referring to that must be moved into
ExecutableStage? Would that require also changing ExecutableStagePayload?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113408)
Time Spent: 50m (was: 40m)
> Flink batch state request handler
> ---------------------------------
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler.
> As in the non-portable we can start by handling batch side inputs by Flink
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> or
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> can be used as a starting point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)