shunping opened a new pull request, #34602:
URL: https://github.com/apache/beam/pull/34602
We see some test flakiness after #34582.
Here is a simple pipeline to reproduce:
```python
with beam.Pipeline(options=options) as p:
side1 = p | 'side1' >> beam.Create([('a', 1)])
side2 = p | 'side2' >> beam.Create([('b', 2)])
third_element = [('another_type')]
side3 = p | 'side3' >> beam.Create(third_element)
side = (side1, side2) | 'Flatten1' >> beam.Flatten()
_ = (side, side3) | 'Flatten2' >> beam.Flatten() | beam.Map(print)
```
In #34582, we replace the coder ids in the input PCollections of each
flatten transform. However, if there are multiple flatten transforms **and**
they are connected to each other, the order of replacing matters:
- If we replace the coder of `Flatten1` and then `Flatten2`:
- the coder of `side1` and `side2` will be `Coder(Tuple[str, int])`
- the coder of `side` (flattened output of `side1` and `side2`) and
`side3` will be `Coder(Tuple[str])`
- If we replace the coder of `Flatten2` and then `Flatten1`:
- the coder of `side` (flattened output of `side1` and `side2`) and
`side3` will be `Coder(Tuple[str])`
- the coder of `side1` and `side2` will also be `Coder(Tuple[str])` (same
as the coder of `side`)
The first scenario will cause some problem during prism handling runner
transform of flatten, because when it tries to collect elements of Flatten2, it
will get elements encoded with `Coder(Tuple[str, int]))` from side1 and side2,
and elements encoded with `Coder(Tuple[str])` from side3.
addresses #34587
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]