Another question on this topic, is any work planned to add the map state support in portable mode [1], same for set state, list state?
[1] https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337 <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337> > On Sep 9, 2021, at 9:31 AM, Luke Cwik <[email protected]> wrote: > > The underlying state implementation that the FnApiDoFnRunner uses has a > "close" method specifically meant to flush any pending operations so that > part should be trivial. I don't think persisting the "changelog" would be > difficult but migrating to using it in the future would cause issues for job > update since we would be changing the encoding so deciding upfront which > approach we want would be useful. > > > > On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <[email protected] > <mailto:[email protected]>> wrote: > I think a changelog could be kept in memory - persisting a changelog seems > overly complex. We would have to make sure to flush the changelog at the end > of every bundle though, so it would only help if the map key was accessed > multiple times in the same bundle. I don't think it's correct that we won't > be able to remove the key. In your example, all of the calls are in a single > processElement or a single bundle (I'm not sure if it's legal to maintain > ReadableState references between elements within the same bundle though). In > this case the call to remove would also be an operation journaled into the > log, so after flushing the whole log at the end of the bundle the key would > be removed. > > I don't think the above would be that hard, but IMO it's also fine to do > something simpler initially. > > Reuven > > On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <[email protected] > <mailto:[email protected]>> wrote: > Is adding the complexity of the changelog worth it instead of resolving > immediately? We would be storing much more persisted data and wouldn't be > able to clear the key even on a remove(). > > For example: > ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1); > ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2); > mapState.remove(keyA); // still need to keep the changelog > V maybePut2Value = maybePut2.read(); // should return value1 > > On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > I believe we should consider these bugs and fix them since they are > surprising behaviors for users based upon what people are used to from > similar APIs in other Map constructs. > > The benefit of the changelog approach is to allow for putIfAbsent to have a > blind append if read is never called. To expand on my example of putIfAbsent: > First Bundle (assuming no read in this bundle): > ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1); > ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2); // Ignore > this since we now know that the map has an entry > Produces one blind append of (keyA, (PutIfAbsent, value1)) > > Second Bundle: > ReadableState<V> get = mapState.get(keyA); > V value = get.read(); > The underlying values will be a list of changes applied in order to this map. > The first read that sees multiple values should clear the list and resolve it > similar to what a combining state does, returning the only Put (since Put = > clear + append) in the list or the first PutIfAbsent. > > On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <[email protected] > <mailto:[email protected]>> wrote: > Would this be a breaking change then? Going by your first examples, it's no > longer a deferred read-then-write. > > I'm not seeing the benefit of having a changelog. If I'm reading at your > examples correctly, it's saying it should evaluate putIfAbsent immediately > and store the result in ReadableState until read is called? > > On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > I agree with the comment that you linked to Kiley and your analysis of what > happens today. Yes I believe the write will not happen until the read() is > invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the > comment also says that if there are multiple putIfAbsent calls then which one > is written is dependent on which ReadableState<V> read() is called leading to > this behavior: > ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1); > ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2); > maybePut2.read(); // If keyA was absent, then it will have value2 otherwise > it will maintain its old value. > maybePut1.read(); // This is effectively ignored > > I started this thread because a lot of these behaviors are surprising and we > should fix them to have ordering semantics based upon the actual order of > interactions with the MapState and not the current semantics which rely on > read() being invoked. > > Conveniently we don't have to resolve the read() immediately after a > put/putIfAbsent and can still have a blind append on to a multimap state. We > can treat the underlying value store as a changelog which we can figure out > the true value when performing a future read if we store <Operation, Value> > (where Operation is Put or PutIfAbsent) and can read from the start to the > end looking for the last Put in the list and if none exists the first > PutIfAbsent. > > On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <[email protected] > <mailto:[email protected]>> wrote: > As a side note, SEO needs some work. The first result for "mapstate beam" on > Google is from version 2.2.0 > > On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <[email protected] > <mailto:[email protected]>> wrote: > This section of the comment [1] says that calling read() is required for the > condition to be evaluated. > > Example in the comment is: > ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA); > mapState.remove(keyA); > maybePut.read(); // writes to map > > Also, it states that it should be a deferred read-followed-by-write. > Deferred until read is called? > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64 > > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64> > On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > I believe that all our ReadableState calls are point in time snapshots of > what the underlying state is plus any changes that happened within the bundle > already (e.g. you can see your own writes/appends/clears assuming that the > creation of the ReadableState happened after the mutation). > > Here are some example of what I think should happen: > > #1 > ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA) > ReadableState<T> read = mapState.get(keyA); // value should equal "valueA" > > #2 > ReadableState<T> read = mapState.get(keyA); // read should equal prior value > ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA); > > #3 > ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA); > // any calls to MapState put/remove for keyA can effectively drop the write > that would be pending via maybePut > // maybePut should also be resolved before the write happens to ensure that > it can return true/false truthfully if the state was empty > > #4 > ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA) > // in a future bundle for same key and window > ReadableState<T> read = mapState.get(keyA); // value should equal "valueA" > > I think #1, #3, and #4 don't currently work and #2 doesn't work if the > mapState didn't contain keyA > > On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <[email protected] > <mailto:[email protected]>> wrote: > Can the change be observed in that case? I think the semantics should be that > the timing of the actual write cannot be observed. > > Kenn > > On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > Yeah, the "or state is committed" means that we should resolve it before any > additional writes but that only addresses part of the surprise I had. > > I would have expected that the putIfAbsent would force a resolution before > transitioning to process the next key and window pair regardless whether the > read() was invoked or there were additional writes. > > > On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <[email protected] > <mailto:[email protected]>> wrote: > I think it is an implementation bug. The javadoc for putIfAbsent says "When > {@code read()} is called on the result or state is committed, it forces a > read of the map and reconciliation with any pending modifications." > > My reading of this is that the value changes for anything that happens after > the call to putIfAbsent. It would be good to make this clear, that any > subsequent observation of this cell should observe the new value. > > Kenn > > On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <[email protected] > <mailto:[email protected]>> wrote: > Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics > were very weird. I almost would have preferred not having those methods in > MapState, even though they can be useful. > > On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only > performs the write if the ReadableState that is returned is resolved. > > For example: > ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe"); > does nothing until > value.read(); > > It would seem like an obvious mistake for developers to make to forget to do > the value.read() when coming from the Java Map API which always performs the > write.
