Awesome, looking forward to it.

> On Sep 10, 2021, at 10:30 AM, Luke Cwik <[email protected]> wrote:
> 
> Yes, https://github.com/apache/beam/pull/15238 
> <https://github.com/apache/beam/pull/15238> is in progress for MapState and 
> SetState.
> 
> On Fri, Sep 10, 2021 at 9:22 AM Ke Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Another question on this topic, is any work planned to add the map state 
> support in portable mode [1], same for set state, list state?
> 
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
>  
> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337>
>  
> 
>> On Sep 9, 2021, at 9:31 AM, Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> The underlying state implementation that the FnApiDoFnRunner uses has a 
>> "close" method specifically meant to flush any pending operations so that 
>> part should be trivial. I don't think persisting the "changelog" would be 
>> difficult but migrating to using it in the future would cause issues for job 
>> update since we would be changing the encoding so deciding upfront which 
>> approach we want would be useful.
>> 
>> 
>> 
>> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I think a changelog could be kept in memory - persisting a changelog seems 
>> overly complex. We would have to make sure to flush the changelog at the end 
>> of every bundle though, so it would only help if the map key was accessed 
>> multiple times in the same bundle. I don't think it's correct that we won't 
>> be able to remove the key. In your example, all of the calls are in a single 
>> processElement or a single bundle (I'm not sure if it's legal to maintain 
>> ReadableState references between elements within the same bundle though). In 
>> this case the call to remove would also be an operation journaled into the 
>> log, so after flushing the whole log at the end of the bundle the key would 
>> be removed.
>> 
>> I don't think the above would be that hard, but IMO it's also fine to do 
>> something simpler initially.
>> 
>> Reuven
>> 
>> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Is adding the complexity of the changelog worth it instead of resolving 
>> immediately? We would be storing much more persisted data and wouldn't be 
>> able to clear the key even on a remove().
>> 
>> For example:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> mapState.remove(keyA); // still need to keep the changelog
>> V maybePut2Value = maybePut2.read(); // should return value1
>> 
>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I believe we should consider these bugs and fix them since they are 
>> surprising behaviors for users based upon what people are used to from 
>> similar APIs in other Map constructs.
>> 
>> The benefit of the changelog approach is to allow for putIfAbsent to have a 
>> blind append if read is never called. To expand on my example of putIfAbsent:
>> First Bundle (assuming no read in this bundle):
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore 
>> this since we now know that the map has an entry
>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>> 
>> Second Bundle:
>> ReadableState<V> get = mapState.get(keyA);
>> V value = get.read();
>> The underlying values will be a list of changes applied in order to this 
>> map. The first read that sees multiple values should clear the list and 
>> resolve it similar to what a combining state does, returning the only Put 
>> (since Put = clear + append) in the list or the first PutIfAbsent.
>> 
>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Would this be a breaking change then? Going by your first examples, it's no 
>> longer a deferred read-then-write. 
>> 
>> I'm not seeing the benefit of having a changelog. If I'm reading at your 
>> examples correctly, it's saying it should evaluate putIfAbsent immediately 
>> and store the result in ReadableState until read is called?
>> 
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I agree with the comment that you linked to Kiley and your analysis of what 
>> happens today. Yes I believe the write will not happen until the read() is 
>> invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the 
>> comment also says that if there are multiple putIfAbsent calls then which 
>> one is written is dependent on which ReadableState<V> read() is called 
>> leading to this behavior:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise 
>> it will maintain its old value.
>> maybePut1.read(); // This is effectively ignored
>> 
>> I started this thread because a lot of these behaviors are surprising and we 
>> should fix them to have ordering semantics based upon the actual order of 
>> interactions with the MapState and not the current semantics which rely on 
>> read() being invoked.
>> 
>> Conveniently we don't have to resolve the read() immediately after a 
>> put/putIfAbsent and can still have a blind append on to a multimap state. We 
>> can treat the underlying value store as a changelog which we can figure out 
>> the true value when performing a future read if we store <Operation, Value> 
>> (where Operation is Put or PutIfAbsent) and can read from the start to the 
>> end looking for the last Put in the list and if none exists the first 
>> PutIfAbsent.
>> 
>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <[email protected] 
>> <mailto:[email protected]>> wrote:
>> As a side note, SEO needs some work. The first result for "mapstate beam" on 
>> Google is from version 2.2.0
>> 
>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <[email protected] 
>> <mailto:[email protected]>> wrote:
>> This section of the comment [1] says that calling read() is required for the 
>> condition to be evaluated.
>> 
>> Example in the comment is:
>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>> mapState.remove(keyA);
>> maybePut.read(); // writes to map
>> 
>>  Also, it states that it should be a deferred read-followed-by-write. 
>> Deferred until read is called?
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64>
>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I believe that all our ReadableState calls are point in time snapshots of 
>> what the underlying state is plus any changes that happened within the 
>> bundle already (e.g. you can see your own writes/appends/clears assuming 
>> that the creation of the ReadableState happened after the mutation).
>> 
>> Here are some example of what I think should happen:
>> 
>> #1
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>> 
>> #2
>> ReadableState<T> read = mapState.get(keyA);   // read should equal prior 
>> value
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> 
>> #3
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> // any calls to MapState put/remove for keyA can effectively drop the write 
>> that would be pending via maybePut
>> // maybePut should also be resolved before the write happens to ensure that 
>> it can return true/false truthfully if the state was empty
>> 
>> #4
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> // in a future bundle for same key and window
>> ReadableState<T> read = mapState.get(keyA);   // value should equal "valueA"
>> 
>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the 
>> mapState didn't contain keyA
>> 
>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Can the change be observed in that case? I think the semantics should be 
>> that the timing of the actual write cannot be observed.
>> 
>> Kenn
>> 
>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Yeah, the "or state is committed" means that we should resolve it before any 
>> additional writes but that only addresses part of the surprise I had.
>> 
>> I would have expected that the putIfAbsent would force a resolution before 
>> transitioning to process the next key and window pair regardless whether the 
>> read() was invoked or there were additional writes.
>> 
>> 
>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I think it is an implementation bug. The javadoc for putIfAbsent says "When 
>> {@code read()} is called on the result or state is committed, it forces a 
>> read of the map and reconciliation with any pending modifications."
>> 
>> My reading of this is that the value changes for anything that happens after 
>> the call to putIfAbsent. It would be good to make this clear, that any 
>> subsequent observation of this cell should observe the new value.
>> 
>> Kenn
>> 
>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics 
>> were very weird. I almost would have preferred not having those methods in 
>> MapState, even though they can be useful.
>> 
>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only 
>> performs the write if the ReadableState that is returned is resolved.
>> 
>> For example:
>> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
>> does nothing until
>> value.read();
>> 
>> It would seem like an obvious mistake for developers to make to forget to do 
>> the value.read() when coming from the Java Map API which always performs the 
>> write.
> 

Reply via email to