tvalentyn commented on issue #31571:
URL: https://github.com/apache/beam/issues/31571#issuecomment-3354355152

   It should be possible, but seems to require some coordination between the 
thread that populates the data plane and the bundle processor. I think I have a 
repro. The repro has:
   
   * many elements for a single instruction
   * waits in DoFn.setup() to saturate input queue
   * fails in setup 1 in 3 times. 
   
   
   ```
   class FlakySetup(beam.DoFn):
     import collections
   
     counter = collections.defaultdict(int)
   
     def process(self, x):
       yield x
   
     def setup(self):
       self.counter['setup'] = self.counter['setup']+1
       logging.warning("In setup.")
       if self.counter['setup'] % 3 == 0:
         logging.warning("Will fail in 60 seconds")
         time.sleep(60)
         raise RuntimeError
   
     def start_bundle(self):
       self.counter['start_bundle'] = self.counter['start_bundle']+1
       logging.warning("In start_bundle.")
   
   def flaky_setup(p):
     for i in range(20):
       p | beam.Create(list(range(1000000))) | beam.Reshuffle() | 
beam.ParDo(FlakySetup()) #| beam.LogElements(level=logging.WARNING)
   
   
   def run(argv=None):
     parser = argparse.ArgumentParser()
   
     known_args, pipeline_args = parser.parse_known_args(argv)
   
     with beam.Pipeline(argv=pipeline_args) as p:
       flaky_setup(p)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to