jrmccluskey commented on issue #28716:
URL: https://github.com/apache/beam/issues/28716#issuecomment-1739994770

   Okay so I can repro the issue, specifically when the batch size specified is 
> the number of elements in the input PCollection **and** the another GBK 
happens in the pipeline after the GroupIntoBatches call. (GBK is marked as 
complete 
[here](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L982)
 and then re-invoked 
[here.](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L939))
   
   This would happen if 
[`_is_final_bundle()`](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L916)
 returns true, AKA the watermark was advanced to infinity prematurely for the 
next invocation of a GBK. This seems to be the case if batch size > num 
elements, with the watermark still being at positive infinity on the next 
invocation instead of being reset to negative infinity for the next DoFn. 
   
   I can demonstrate this with this pipeline + some extra logging within the 
GBK transform evaluator:
   
   ```
   def run(argv=None, save_main_session=True):
       parser = argparse.ArgumentParser()
   
       # Parse the arguments from the command line as defined in the options 
class
       known_args, pipeline_args = parser.parse_known_args(argv)
   
       pipeline_options = PipelineOptions(pipeline_args)
   
       with (beam.Pipeline(options=pipeline_options) as p):
           (
               p
               | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
               | "Assign dummy keys" >> beam.Map(lambda x: ("key", x))
               | "Batch up to 30 elements" >> beam.GroupIntoBatches(3)
               | "Flat map" >> beam.FlatMap(lambda x: x[1])
               | "re-key" >> beam.Map(lambda x: ("key2", x))
               | "gbk" >> beam.GroupByKey()
           )
   
   
   if __name__ == "__main__":
       run()
   ```
   
   With batch size = 3 we get each element through both GBKs, but with batch 
size > 3 the second GBK invocation fails on the first element because the 
[Completion 
Tag](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L901)
 and watermark are not reset. I haven't quite found where the reset between 
DoFn invocations happens yet, but this is where the problem is. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to