damccorm opened a new issue, #21591:
URL: https://github.com/apache/beam/issues/21591
Python Reshuffle holds elements when pipeline is running, and likely release
them in a batch. In contrast, Java Reshuffle triggers on every element as noted
in its documentation
"the trigger used with {@link Reshuffle} which triggers on every element and
never buffers
* state."
Here is a working example:
```
def test(p: Pipeline):
class SlowProcessFn(beam.DoFn):
def process(self, element):
time.sleep(0.5)
yield element
result = (p
| beam.Create(range(100))
| beam.ParDo(SlowProcessFn())
| beam.Reshuffle() # HERE
| beam.Map(lambda x: print(x, time.time())))
return result
```
Tested on local runner and flink runner (1.14), the elements are printed
after 50 secs. If commenting out Reshuffle, every half second an element gets
printed.
This behavior introduces issue when downstream PTransform involves some kind
of time-sensitive operation, like receiving a list of updated files from input
and read them done by filebasedsource.ReadAllFiles transform. Because there is
a Reshuffle in ReadAll, the actual read will be blocked.
Imported from Jira
[BEAM-14497](https://issues.apache.org/jira/browse/BEAM-14497). Original Jira
may contain additional context.
Reported by: yihu.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]