lostluck opened a new issue, #27622:
URL: https://github.com/apache/beam/issues/27622

   ### What happened?
   
   While preparing the Go SDK to make prism the default runner, I discovered 
that a DoFn that consumes it's main input as a side input as well can lead to 
the SDK to execute the dofn multiple times.
   
   ```
   imp := beam.Impulse(s)
   col0 := beam.ParDo(s, dofn1, imp)
   sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, 
beam.SideInput{Input: col0})
   beam.ParDo(s, &int64Check{
        Name: "sum sideinput check",
        Want: []int{13, 14, 15},
   }, sum)
   ```
   
   In this case, dofn3x1 is duplicated 3 times.
   
   ```
    []int{
           13,
   +       13, 13, 14, 14,
           14,
   +       15, 15,
           15,
     }
   ```
   
   The cause is that the exec.Plan building logic doesn't take whether an input 
is a "side input" or not, into account, leading to the decision that the 
consumer must be multiplexed. 
   
   I believe that other runners rename the pcollection inputs for the side 
inputs, leading to avoiding this issue accidentally.
   
   This should be validated with additional regression test pipelines, for both 
side inputs, and for flattens, which could be executed SDK side as well at the 
direction of the runner, this will validate whether this is an issue against 
Dataflow and other portable runners. I don't suspect this to be a common 
pattern however.
   
   In any case, since prism is intended to challenge SDK assumptions, I'll have 
an SDK side fix for 2.50. There's sufficient information to determine whether 
an input is a side input or not, it's simply not presently used.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to