ndopj opened a new issue, #31200: URL: https://github.com/apache/beam/issues/31200
### What happened? I am not sure whether this is correct place to raise this issue. I am acting based on the fact that `Dataflow Runner` source code is located in this repository, but the issue might be more related to the Google Cloud Dataflow runtime behaviour. In such case, I would welcome if anybody could help me to raise this issue directly with Google Cloud. I've an unbounded pipeline _(Python SDK)_, consuming data from Google Cloud Pub/Sub subscriptions, that is being deployed to the Google Cloud Dataflow. This pipeline has 3 branches which are at some point being **merged** by [Flatten()](https://beam.apache.org/documentation/transforms/python/other/flatten/) function. As you can see in the below image, the pipeline is successfully started and processing elements. However, two of the branches are not yielding any input to the Flatten/merge step. This is intentional behaviour. <img width="1417" alt="Screenshot 2024-05-06 at 22 56 41" src="https://github.com/apache/beam/assets/46056966/9a28c9c1-d95b-4bf7-8946-9d4fd98adcb8"> In such configuration Flatten/Merge step is not yielding any output, despite one of the branches continuously yielding input to it. Output collection details can be observed on below image: <img width="1403" alt="Screenshot 2024-05-07 at 0 01 32" src="https://github.com/apache/beam/assets/46056966/8e5d74ff-a5da-4460-aa20-2ac041615a90"> I would expect the Flatten/Merge step to yield at least elements of the branch `Map BE events to Commands` I've also tried to configure windowing in each branch right before the Flatten/Merge. Windowing was configured to fixed windows of 60s in each branch, with the following code. However, this did not change the behaviour in any way (I've taught that the main issue here are empty `PCollection` branches and that windowing will force Flatten() to merge windows from each branch). ```python T = TypeVar('T') def with_ts(value: T) -> TimestampedValue[T]: return TimestampedValue(value, int(time.time())) ..... | "Map WEB events to commands" >> beam.Map(lambda x: with_ts(web_event_to_commands(x))) | "Window WEB commands" >> beam.WindowInto(FixedWindows(60))) ``` I believe that according to the official documentations of `Flatten()`: [https://beam.apache.org/documentation/transforms/python/other/flatten/](https://beam.apache.org/documentation/transforms/python/other/flatten/) [https://beam.apache.org/documentation/programming-guide/#flatten](https://beam.apache.org/documentation/programming-guide/#flatten) the current behaviour is not expected and Flatten() should yield output elements from each input `PCollection` independently of other input `PCollection's` and windowing strategy (_if such strategy is same in each input `PCollection`_). - **Apache Beam version:** 2.55.1 - **Python version:** 3.11 - **Dataflow Prime**: Disabled - **Runner v2**: Enabled - **Streaming**: Engine Enabled - **Vertical auto-scaling**: Disabled - **Streaming mode**: Exactly once Thanks in advance for any help in case this is not a bug but a mistake on my side. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [X] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
