shunping opened a new pull request, #34602:
URL: https://github.com/apache/beam/pull/34602

   We see some test flakiness after #34582. 
   
   
   Here is a simple pipeline to reproduce:
   ```python
   with beam.Pipeline(options=options) as p:
     side1 = p | 'side1' >> beam.Create([('a', 1)])
     side2 = p | 'side2' >> beam.Create([('b', 2)])
     third_element = [('another_type')]
   
     side3 = p | 'side3' >> beam.Create(third_element)
     side = (side1, side2) | 'Flatten1' >> beam.Flatten()
     _ = (side, side3) | 'Flatten2' >> beam.Flatten() | beam.Map(print)
   ```
   
   In #34582, we replace the coder ids in the input PCollections of each 
flatten transform. However, if there are multiple flatten transforms **and** 
they are connected to each other, the order of replacing matters:
   - If we replace the coder of `Flatten1` and then `Flatten2`:
     - the coder of `side1` and `side2` will be `Coder(Tuple[str, int])`
     - the coder of `side` (flattened output of `side1` and `side2`) and 
`side3` will be `Coder(Tuple[str])`
   - If we replace the coder of `Flatten2` and then `Flatten1`: 
     - the coder of `side` (flattened output of `side1` and `side2`) and 
`side3` will be `Coder(Tuple[str])`
     - the coder of `side1` and `side2` will also be `Coder(Tuple[str])` (same 
as the coder of `side`)
   
   The first scenario will cause some problem during prism handling runner 
transform of flatten, because when it tries to collect elements of Flatten2, it 
will get elements encoded with `Coder(Tuple[str, int]))` from side1 and side2, 
and elements encoded with `Coder(Tuple[str])` from side3.
   
   addresses #34587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to