jrmccluskey commented on issue #28716: URL: https://github.com/apache/beam/issues/28716#issuecomment-1739994770
Okay so I can repro the issue, specifically when the batch size specified is > the number of elements in the input PCollection **and** the another GBK happens in the pipeline after the GroupIntoBatches call. (GBK is marked as complete [here](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L982) and then re-invoked [here.](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L939)) This would happen if [`_is_final_bundle()`](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L916) returns true, AKA the watermark was advanced to infinity prematurely for the next invocation of a GBK. This seems to be the case if batch size > num elements, with the watermark still being at positive infinity on the next invocation instead of being reset to negative infinity for the next DoFn. I can demonstrate this with this pipeline + some extra logging within the GBK transform evaluator: ``` def run(argv=None, save_main_session=True): parser = argparse.ArgumentParser() # Parse the arguments from the command line as defined in the options class known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) with (beam.Pipeline(options=pipeline_options) as p): ( p | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"]) | "Assign dummy keys" >> beam.Map(lambda x: ("key", x)) | "Batch up to 30 elements" >> beam.GroupIntoBatches(3) | "Flat map" >> beam.FlatMap(lambda x: x[1]) | "re-key" >> beam.Map(lambda x: ("key2", x)) | "gbk" >> beam.GroupByKey() ) if __name__ == "__main__": run() ``` With batch size = 3 we get each element through both GBKs, but with batch size > 3 the second GBK invocation fails on the first element because the [Completion Tag](https://github.com/apache/beam/blob/a4ee8548424b335cb3ffdf6aeda899fd1e3ba8ad/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L901) and watermark are not reset. I haven't quite found where the reset between DoFn invocations happens yet, but this is where the problem is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
