[ 
https://issues.apache.org/jira/browse/BEAM-4826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566224#comment-16566224
 ] 

Ankur Goenka commented on BEAM-4826:
------------------------------------

[~herohde] Quick question.

What do you mean by "Dataflow removes the flatten when it does the same split".

Does the dataflow drop the flatten whole together or does it drop the redundant 
inputs from flatten transform when sending process bundle descriptor to the 
SDKHarness?

 

The challenge with flink and in general with portability implementation is that 
it can potentially create different ProcessBundleDescriptor for a flatten for 
each input based on how those inputs are created. 

There are 2 potential fix for this.
 # Remove the redundant input at the execution time in the runner.
 # Create multiple flatten transforms for each stage created after fusion.

I think fix 1 is better because it gives runner more information about how to 
fuse things which in fix 2, this information is hard to attain.

> Flink runner sends bad flatten to SDK
> -------------------------------------
>
>                 Key: BEAM-4826
>                 URL: https://issues.apache.org/jira/browse/BEAM-4826
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Henning Rohde
>            Assignee: Ankur Goenka
>            Priority: Major
>              Labels: portability
>
> For a Go flatten test w/ 3 input, the Flink runner splits this into 3 bundle 
> descriptors. But it sends the original 3-input flatten but w/ 1 actual input 
> present in each bundle descriptor. This is inconsistent and the SDK shouldn't 
> expect dangling PCollections. In contrast, Dataflow removes the flatten when 
> it does the same split.
> Snippet:
> register: <
>   process_bundle_descriptor: <
>     id: "3"
>     transforms: <
>       key: "e4"
>       value: <
>         unique_name: "github.com/apache/beam/sdks/go/pkg/beam.createFn'1"
>         spec: <
>           urn: "urn:beam:transform:pardo:v1"
>           payload: [...]
>         >
>         inputs: <
>           key: "i0"
>           value: "n3"
>         >
>         outputs: <
>           key: "i0"
>           value: "n4"
>         >
>       >
>     >
>     transforms: <
>       key: "e7"
>       value: <
>         unique_name: "Flatten"
>         spec: <
>           urn: "beam:transform:flatten:v1"
>         >
>         inputs: <
>           key: "i0"
>           value: "n2"
>         >
>         inputs: <
>           key: "i1"
>           value: "n4" . // <----------- only one present.
>         >
>         inputs: <
>           key: "i2"
>           value: "n6"
>         >
>         outputs: <
>           key: "i0"
>           value: "n7"
>         >
>       >
>     >
> [...]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to