[ https://issues.apache.org/jira/browse/BEAM-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550021#comment-17550021 ]
Danny McCormick commented on BEAM-14497: ---------------------------------------- This issue has been migrated to https://github.com/apache/beam/issues/21591 > Python Reshuffle holds elements > ------------------------------- > > Key: BEAM-14497 > URL: https://issues.apache.org/jira/browse/BEAM-14497 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Yi Hu > Priority: P2 > > Python Reshuffle holds elements when pipeline is running, and likely release > them in a batch. In contrast, Java Reshuffle triggers on every element as > noted in its documentation > "the trigger used with {@link Reshuffle} which triggers on every element and > never buffers > * state." > Here is a working example: > {code:python} > def test(p: Pipeline): > class SlowProcessFn(beam.DoFn): > def process(self, element): > time.sleep(0.5) > yield element > result = (p > | beam.Create(range(100)) > | beam.ParDo(SlowProcessFn()) > | beam.Reshuffle() # HERE > | beam.Map(lambda x: print(x, time.time()))) > return result > {code} > Tested on local runner and flink runner (1.14), the elements are printed > after 50 secs. If commenting out Reshuffle, every half second an element gets > printed. > This behavior introduces issue when downstream PTransform involves some kind > of time-sensitive operation, like receiving a list of updated files from > input and read them done by filebasedsource.ReadAllFiles transform. Because > there is a Reshuffle in ReadAll, the actual read will be blocked. -- This message was sent by Atlassian Jira (v8.20.7#820007)