Another question on this topic, is any work planned to add the map state 
support in portable mode [1], same for set state, list state?

[1] 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
 
<https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337>
 

> On Sep 9, 2021, at 9:31 AM, Luke Cwik <[email protected]> wrote:
> 
> The underlying state implementation that the FnApiDoFnRunner uses has a 
> "close" method specifically meant to flush any pending operations so that 
> part should be trivial. I don't think persisting the "changelog" would be 
> difficult but migrating to using it in the future would cause issues for job 
> update since we would be changing the encoding so deciding upfront which 
> approach we want would be useful.
> 
> 
> 
> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <[email protected] 
> <mailto:[email protected]>> wrote:
> I think a changelog could be kept in memory - persisting a changelog seems 
> overly complex. We would have to make sure to flush the changelog at the end 
> of every bundle though, so it would only help if the map key was accessed 
> multiple times in the same bundle. I don't think it's correct that we won't 
> be able to remove the key. In your example, all of the calls are in a single 
> processElement or a single bundle (I'm not sure if it's legal to maintain 
> ReadableState references between elements within the same bundle though). In 
> this case the call to remove would also be an operation journaled into the 
> log, so after flushing the whole log at the end of the bundle the key would 
> be removed.
> 
> I don't think the above would be that hard, but IMO it's also fine to do 
> something simpler initially.
> 
> Reuven
> 
> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <[email protected] 
> <mailto:[email protected]>> wrote:
> Is adding the complexity of the changelog worth it instead of resolving 
> immediately? We would be storing much more persisted data and wouldn't be 
> able to clear the key even on a remove().
> 
> For example:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> mapState.remove(keyA); // still need to keep the changelog
> V maybePut2Value = maybePut2.read(); // should return value1
> 
> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> I believe we should consider these bugs and fix them since they are 
> surprising behaviors for users based upon what people are used to from 
> similar APIs in other Map constructs.
> 
> The benefit of the changelog approach is to allow for putIfAbsent to have a 
> blind append if read is never called. To expand on my example of putIfAbsent:
> First Bundle (assuming no read in this bundle):
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore 
> this since we now know that the map has an entry
> Produces one blind append of (keyA, (PutIfAbsent, value1))
> 
> Second Bundle:
> ReadableState<V> get = mapState.get(keyA);
> V value = get.read();
> The underlying values will be a list of changes applied in order to this map. 
> The first read that sees multiple values should clear the list and resolve it 
> similar to what a combining state does, returning the only Put (since Put = 
> clear + append) in the list or the first PutIfAbsent.
> 
> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <[email protected] 
> <mailto:[email protected]>> wrote:
> Would this be a breaking change then? Going by your first examples, it's no 
> longer a deferred read-then-write. 
> 
> I'm not seeing the benefit of having a changelog. If I'm reading at your 
> examples correctly, it's saying it should evaluate putIfAbsent immediately 
> and store the result in ReadableState until read is called?
> 
> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> I agree with the comment that you linked to Kiley and your analysis of what 
> happens today. Yes I believe the write will not happen until the read() is 
> invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the 
> comment also says that if there are multiple putIfAbsent calls then which one 
> is written is dependent on which ReadableState<V> read() is called leading to 
> this behavior:
> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise 
> it will maintain its old value.
> maybePut1.read(); // This is effectively ignored
> 
> I started this thread because a lot of these behaviors are surprising and we 
> should fix them to have ordering semantics based upon the actual order of 
> interactions with the MapState and not the current semantics which rely on 
> read() being invoked.
> 
> Conveniently we don't have to resolve the read() immediately after a 
> put/putIfAbsent and can still have a blind append on to a multimap state. We 
> can treat the underlying value store as a changelog which we can figure out 
> the true value when performing a future read if we store <Operation, Value> 
> (where Operation is Put or PutIfAbsent) and can read from the start to the 
> end looking for the last Put in the list and if none exists the first 
> PutIfAbsent.
> 
> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <[email protected] 
> <mailto:[email protected]>> wrote:
> As a side note, SEO needs some work. The first result for "mapstate beam" on 
> Google is from version 2.2.0
> 
> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <[email protected] 
> <mailto:[email protected]>> wrote:
> This section of the comment [1] says that calling read() is required for the 
> condition to be evaluated.
> 
> Example in the comment is:
> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
> mapState.remove(keyA);
> maybePut.read(); // writes to map
> 
>  Also, it states that it should be a deferred read-followed-by-write. 
> Deferred until read is called?
> 
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>  
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64>
> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> I believe that all our ReadableState calls are point in time snapshots of 
> what the underlying state is plus any changes that happened within the bundle 
> already (e.g. you can see your own writes/appends/clears assuming that the 
> creation of the ReadableState happened after the mutation).
> 
> Here are some example of what I think should happen:
> 
> #1
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
> 
> #2
> ReadableState<T> read = mapState.get(keyA);   // read should equal prior value
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
> 
> #3
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
> // any calls to MapState put/remove for keyA can effectively drop the write 
> that would be pending via maybePut
> // maybePut should also be resolved before the write happens to ensure that 
> it can return true/false truthfully if the state was empty
> 
> #4
> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
> // in a future bundle for same key and window
> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
> 
> I think #1, #3, and #4 don't currently work and #2 doesn't work if the 
> mapState didn't contain keyA
> 
> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <[email protected] 
> <mailto:[email protected]>> wrote:
> Can the change be observed in that case? I think the semantics should be that 
> the timing of the actual write cannot be observed.
> 
> Kenn
> 
> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> Yeah, the "or state is committed" means that we should resolve it before any 
> additional writes but that only addresses part of the surprise I had.
> 
> I would have expected that the putIfAbsent would force a resolution before 
> transitioning to process the next key and window pair regardless whether the 
> read() was invoked or there were additional writes.
> 
> 
> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <[email protected] 
> <mailto:[email protected]>> wrote:
> I think it is an implementation bug. The javadoc for putIfAbsent says "When 
> {@code read()} is called on the result or state is committed, it forces a 
> read of the map and reconciliation with any pending modifications."
> 
> My reading of this is that the value changes for anything that happens after 
> the call to putIfAbsent. It would be good to make this clear, that any 
> subsequent observation of this cell should observe the new value.
> 
> Kenn
> 
> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <[email protected] 
> <mailto:[email protected]>> wrote:
> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics 
> were very weird. I almost would have preferred not having those methods in 
> MapState, even though they can be useful.
> 
> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only 
> performs the write if the ReadableState that is returned is resolved.
> 
> For example:
> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
> does nothing until
> value.read();
> 
> It would seem like an obvious mistake for developers to make to forget to do 
> the value.read() when coming from the Java Map API which always performs the 
> write.

Reply via email to