damccorm opened a new issue, #21591:
URL: https://github.com/apache/beam/issues/21591

   Python Reshuffle holds elements when pipeline is running, and likely release 
them in a batch. In contrast, Java Reshuffle triggers on every element as noted 
in its documentation 
   "the trigger used with {@link Reshuffle} which triggers on every element and 
never buffers
    * state."
   
   Here is a working example:
   ```
   
   def test(p: Pipeline):
     class SlowProcessFn(beam.DoFn):
       def process(self, element):
         time.sleep(0.5)
   
        yield element
   
     result = (p 
       | beam.Create(range(100)) 
       | beam.ParDo(SlowProcessFn())
   
      | beam.Reshuffle() # HERE
       | beam.Map(lambda x: print(x, time.time())))
     return result
   
   ```
   
   Tested on local runner and flink runner (1.14), the elements are printed 
after 50 secs. If commenting out Reshuffle, every half second an element gets 
printed.
   
   This behavior introduces issue when downstream PTransform involves some kind 
of time-sensitive operation, like receiving a list of updated files from input 
and read them done by filebasedsource.ReadAllFiles transform. Because there is 
a Reshuffle in ReadAll, the actual read will be blocked.
   
   Imported from Jira 
[BEAM-14497](https://issues.apache.org/jira/browse/BEAM-14497). Original Jira 
may contain additional context.
   Reported by: yihu.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to