lukecwik opened a new issue, #22945: URL: https://github.com/apache/beam/issues/22945
### What happened? `PCollectionView.listView` materialization for streaming runners produces this graph segment: Users transforms -> [ToListViewDoFn](https://github.com/apache/beam/blob/4ae54b2e1e28096f2b173d5f5574910e8cfd80c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java#L284) -> [Combine.globally(Concatenate)](https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java#L67) The issue is that the output from `ToListViewDoFn` is being chopped up into multiple panes for the `Combine.globally`. This would mean that the metadata records described below could exist while some of the previous records that should have been written are missing for key order preserving runners or any arbitrary records could be missing for non key order preserving runners. The work around is to not use `PCollectionView.listView` with a discarding fired panes windowing strategy that fires multiple times or to migrate away from `PCollectionView.listView` to a singleton if using the slowly changing side input pattern. --- The `ToListViewDoFn` generates a starting `index` in the range `[Long.MIN_VALUE + 1, Long.MAX_VALUE - Integer.MAX_VALUE]` for each unique window and tag values incrementing the index. For example: elements A, B, C -> generate random index of 5 -> produces `KV<5, A>, KV<6, B>, KV<7, C>` and a metadata record `KV<Long.MIN_VALUE, [5, 8)>`. if there is another bundle: elements X, Y, Z -> generate random index of 7 -> produces `KV<7, X>, KV<8, Y>, KV<9, Z>` and a metadata record `KV<Long.MIN_VALUE, [7, 9)>`. this would create a multimap like: ``` Long.MIN_VALUE -> {[5,8), [7,10)} 5 -> {A} 6 -> {B} 7 -> {C, X} 8 -> {Y} 9 -> {Z} ``` This allows us to produce a mapping of range to # elements like: ``` [5, 7) -> 1 [7, 8) -> 2 [8, 10) -> 1 ``` Using this we can compute a global position by summing `(range.end - range.start) * # elements` in the ordered list of ranges until we find a range that would produce a position that is greater than the index. Once we have this range we can compute a sub position within that range. For example, if we wanted to find index 3, we would iterate over the range `[5, 7) -> 1` and know that we have advanced 2 elements `((7-5)*1)`. Then we would notice that `[7,8)` has 2 elements in it so no need to advance further and would find that our position is `7` and subPosition `1` telling is to access key `7` and iterate over the 0-based iterable till we get to position `1` returning `X`. ### Issue Priority Priority: 2 ### Issue Component Component: sdk-java-core -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
