ndopj opened a new issue, #31200:
URL: https://github.com/apache/beam/issues/31200

   ### What happened?
   
   I am not sure whether this is correct place to raise this issue. I am acting 
based on the fact that `Dataflow Runner` source code is located in this 
repository, but the issue might be more related to the Google Cloud Dataflow 
runtime behaviour. In such case, I would welcome if anybody could help me to 
raise this issue directly with Google Cloud.
   
   I've an unbounded pipeline _(Python SDK)_, consuming data from Google Cloud 
Pub/Sub subscriptions, that is being deployed to the Google Cloud Dataflow. 
This pipeline has 3 branches which are at some point being **merged** by 
[Flatten()](https://beam.apache.org/documentation/transforms/python/other/flatten/)
 function.
   
   As you can see in the below image, the pipeline is successfully started and 
processing elements. However, two of the branches are not yielding any input to 
the Flatten/merge step. This is intentional behaviour.
   
   <img width="1417" alt="Screenshot 2024-05-06 at 22 56 41" 
src="https://github.com/apache/beam/assets/46056966/9a28c9c1-d95b-4bf7-8946-9d4fd98adcb8";>
   
   In such configuration Flatten/Merge step is not yielding any output, despite 
one of the branches continuously yielding input to it. Output collection 
details can be observed on below image:
   
   <img width="1403" alt="Screenshot 2024-05-07 at 0 01 32" 
src="https://github.com/apache/beam/assets/46056966/8e5d74ff-a5da-4460-aa20-2ac041615a90";>
   
   I would expect the Flatten/Merge step to yield at least elements of the 
branch `Map BE events to Commands`
   
   I've also tried to configure windowing in each branch right before the 
Flatten/Merge. Windowing was configured to fixed windows of 60s in each branch, 
with the following code. However, this did not change the behaviour in any way 
(I've taught that the main issue here are empty `PCollection` branches and that 
windowing will force Flatten() to merge windows from each branch).
   ```python
   T = TypeVar('T')
   def with_ts(value: T) -> TimestampedValue[T]:
       return TimestampedValue(value, int(time.time()))
   
   .....
       | "Map WEB events to commands" >> beam.Map(lambda x: 
with_ts(web_event_to_commands(x)))
       | "Window WEB commands" >> beam.WindowInto(FixedWindows(60)))
   ```
   
   I believe that according to the official documentations of `Flatten()`:
   
[https://beam.apache.org/documentation/transforms/python/other/flatten/](https://beam.apache.org/documentation/transforms/python/other/flatten/)
   
[https://beam.apache.org/documentation/programming-guide/#flatten](https://beam.apache.org/documentation/programming-guide/#flatten)
   the current behaviour is not expected and Flatten() should yield output 
elements from each input `PCollection` independently of other input 
`PCollection's` and windowing strategy (_if such strategy is same in each input 
`PCollection`_).
   
   - **Apache Beam version:** 2.55.1
   - **Python version:** 3.11
   - **Dataflow Prime**: Disabled
   - **Runner v2**: Enabled
   - **Streaming**: Engine Enabled
   - **Vertical auto-scaling**: Disabled
   - **Streaming mode**: Exactly once
   
   Thanks in advance for any help in case this is not a bug but a mistake on my 
side.
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to