[
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=113421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113421
]
ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jun/18 22:35
Start Date: 19/Jun/18 22:35
Worklog Time Spent: 10m
Work Description: bsidhom commented on a change in pull request #5688:
[BEAM-4285] Implement Flink batch side input handler
URL: https://github.com/apache/beam/pull/5688#discussion_r196599886
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
##########
@@ -54,7 +61,26 @@ private BatchFlinkExecutableStageContext(JobBundleFactory
jobBundleFactory) {
@Override
public StateRequestHandler getStateRequestHandler(
ExecutableStage executableStage, RuntimeContext runtimeContext) {
- return FlinkBatchStateRequestHandler.forStage(executableStage,
runtimeContext);
+ MultimapSideInputHandlerFactory sideInputHandlerFactory =
+ FlinkBatchSideInputHandlerFactory.forStage(executableStage,
runtimeContext);
+ ExecutableProcessBundleDescriptor processBundleDescriptor;
+ try {
+ // NOTE: We require an executable bundle descriptor for the
StateRequestHandlers construction
+ // below. This only uses the bundle descriptor for side input specs and
effectively ignores
+ // data and state endpoints. We rely on the fact that PCollections and
coders are structurally
+ // identical between instantiations here to prevent having to wire the
original executable
+ // bundle descriptor here. The correct long-term fix is to move side
input logic out of
Review comment:
Basically, the problem is that `ExecutableProcessBundleDescriptor` does not
do quite what we want right now. It was intended to be an instantiation
`ExecutableStage` with the gRPC endpoints filled out. All the other logic
should go into `ExecutableStage` itself. However, we now have side input logic
and coder instantiation on EPBD in addition to gRPC endpoints. This general
pipeline logic would be better refactored into `ExecutableStage` because we
don't have access to the data or state API endpoints here and cannot fill those
out.
Luckily, this is not a matter of correctness because EPBD construction (at
least for the component ids that we care about) is deterministic. It's just
strange to pass in API descriptors where we do not have them and where they
will not be used.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113421)
Time Spent: 1h 20m (was: 1h 10m)
> Flink batch state request handler
> ---------------------------------
>
> Key: BEAM-4285
> URL: https://issues.apache.org/jira/browse/BEAM-4285
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler.
> As in the non-portable we can start by handling batch side inputs by Flink
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> or
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
> can be used as a starting point.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)