lostluck opened a new issue, #35869:
URL: https://github.com/apache/beam/issues/35869

   ### What happened?
   
   The Go SDK doesn't consistently follow it's own semantics for the state API, 
leading to a gap in how bag states work in the SDK within a bundle.
   
   ## What do you do happens
   
   Have a DoFn that has a Bag State. 
   
   In a single ProcessElement call:
   
   1. Write an element E to the Bag.
   2. Read all the elements from the Bag.
   
   ## What do you expect?
   
   The Read produces element E, along with any other elements stored in the Bag 
for that key.
   
   ## What actually happens?
   
   The Read will produce the element E twice.
   
   ## Do you know why?
   
   The Go SDK's State API always Eagerly writes elements to Runner via the 
State API, as well as storing transactions locally for performance. This, when 
combined with the standard Runner implementation of the State API semantics 
lead to a duplicated element when reading after a write to the same bundle.
   
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/userstate.go#L180
   
   ### Do the other SDKs experience this?
   
   They do not. They operate on a more concretely bundle based model in their 
implementations. All state operations are *local* until the end of bundle, when 
any outstanding transactions are processed and persisted. This can lead to 
advantageous performance due to avoiding unnecessary remote calls, including 
when the total transaction in a bundle lead to no State operations being needed.
   
   ### What runners does this happen on
   
   Both Prism, and Dataflow end up returning the *currently cached state* for 
the state cell on a Read call.
   
   ### User impact
   
   Go SDK users performing blind writes to Bag State, and reading in the same 
processing method. Blind writes without a read wouldn't have this effect 
because the Go SDK doesn't maintain local state *within* a bundle call, only 
per element. Overall this approach leads to lower SDK side memory usage, but 
also additional latency per element when applicable.
   
   Only the blind writes before a read are at risk. After the first Read, the 
call then has a local initial state to work from for reads, and won't re-do 
that read.
   
   This leads to a correctness issue of Duplicated, blind written elements when 
Reading to the state. This is unfortunately, but is certainly not Data loss.
   
   Other uses of the raw Bag state in the Go SDK eagerly clear cells runner 
side, and thus should not run into this issue.
   
   ## Proposed fix?
   
   While ideally, for performance, the Go SDK should re-write it's internal 
handling to match the Java and Python SDKs.
   
   The quickest fix is to check on write whether a Read has been performed, and 
if not, pre-emptively do so.  This makes blind writes more expensive, but 
they're already doing more work than Java and Python due to the implementation 
anyway. 
   
   HT to @acrites for finding the issue.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [x] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to