lukecwik opened a new issue, #22945:
URL: https://github.com/apache/beam/issues/22945

   ### What happened?
   
   `PCollectionView.listView` materialization for streaming runners produces 
this graph segment:
   Users transforms -> 
[ToListViewDoFn](https://github.com/apache/beam/blob/4ae54b2e1e28096f2b173d5f5574910e8cfd80c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java#L284)
 -> 
[Combine.globally(Concatenate)](https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java#L67)
   
   The issue is that the output from `ToListViewDoFn` is being chopped up into 
multiple panes for the `Combine.globally`. This would mean that the metadata 
records described below could exist while some of the previous records that 
should have been written are missing for key order preserving runners or any 
arbitrary records could be missing for non key order preserving runners.
   
   The work around is to not use `PCollectionView.listView` with a discarding 
fired panes windowing strategy that fires multiple times or to migrate away 
from `PCollectionView.listView` to a singleton if using the slowly changing 
side input pattern.
   ---
   
   The `ToListViewDoFn` generates a starting `index` in the range 
`[Long.MIN_VALUE + 1, Long.MAX_VALUE - Integer.MAX_VALUE]` for each unique 
window and tag values incrementing the index. For example:
   elements A, B, C -> generate random index of 5 -> produces `KV<5, A>, KV<6, 
B>, KV<7, C>` and a metadata record `KV<Long.MIN_VALUE, [5, 8)>`.
   if there is another bundle:
   elements X, Y, Z -> generate random index of 7 -> produces `KV<7, X>, KV<8, 
Y>, KV<9, Z>` and a metadata record `KV<Long.MIN_VALUE, [7, 9)>`.
   this would create a multimap like:
   ```
   Long.MIN_VALUE -> {[5,8), [7,10)}
   5 -> {A}
   6 -> {B}
   7 -> {C, X}
   8 -> {Y}
   9 -> {Z}
   ```
   
   This allows us to produce a mapping of range to # elements like:
   ```
   [5, 7) -> 1
   [7, 8) -> 2
   [8, 10) -> 1
   ```
   
   Using this we can compute a global position by summing `(range.end - 
range.start) * # elements` in the ordered list of ranges until we find a range 
that would produce a position that is greater than the index. Once we have this 
range we can compute a sub position within that range. For example, if we 
wanted to find index 3, we would iterate over the range `[5, 7) -> 1` and know 
that we have advanced 2 elements `((7-5)*1)`. Then we would notice that `[7,8)` 
has 2 elements in it so no need to advance further and would find that our 
position is `7` and subPosition `1` telling is to access key `7` and iterate 
over the 0-based iterable till we get to position `1` returning `X`.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-java-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to