Awesome, looking forward to it.
> On Sep 10, 2021, at 10:30 AM, Luke Cwik <[email protected]> wrote:
>
> Yes, https://github.com/apache/beam/pull/15238
> <https://github.com/apache/beam/pull/15238> is in progress for MapState and
> SetState.
>
> On Fri, Sep 10, 2021 at 9:22 AM Ke Wu <[email protected]
> <mailto:[email protected]>> wrote:
> Another question on this topic, is any work planned to add the map state
> support in portable mode [1], same for set state, list state?
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
>
> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337>
>
>
>> On Sep 9, 2021, at 9:31 AM, Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> The underlying state implementation that the FnApiDoFnRunner uses has a
>> "close" method specifically meant to flush any pending operations so that
>> part should be trivial. I don't think persisting the "changelog" would be
>> difficult but migrating to using it in the future would cause issues for job
>> update since we would be changing the encoding so deciding upfront which
>> approach we want would be useful.
>>
>>
>>
>> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax <[email protected]
>> <mailto:[email protected]>> wrote:
>> I think a changelog could be kept in memory - persisting a changelog seems
>> overly complex. We would have to make sure to flush the changelog at the end
>> of every bundle though, so it would only help if the map key was accessed
>> multiple times in the same bundle. I don't think it's correct that we won't
>> be able to remove the key. In your example, all of the calls are in a single
>> processElement or a single bundle (I'm not sure if it's legal to maintain
>> ReadableState references between elements within the same bundle though). In
>> this case the call to remove would also be an operation journaled into the
>> log, so after flushing the whole log at the end of the bundle the key would
>> be removed.
>>
>> I don't think the above would be that hard, but IMO it's also fine to do
>> something simpler initially.
>>
>> Reuven
>>
>> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok <[email protected]
>> <mailto:[email protected]>> wrote:
>> Is adding the complexity of the changelog worth it instead of resolving
>> immediately? We would be storing much more persisted data and wouldn't be
>> able to clear the key even on a remove().
>>
>> For example:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> mapState.remove(keyA); // still need to keep the changelog
>> V maybePut2Value = maybePut2.read(); // should return value1
>>
>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>> I believe we should consider these bugs and fix them since they are
>> surprising behaviors for users based upon what people are used to from
>> similar APIs in other Map constructs.
>>
>> The benefit of the changelog approach is to allow for putIfAbsent to have a
>> blind append if read is never called. To expand on my example of putIfAbsent:
>> First Bundle (assuming no read in this bundle):
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2); // Ignore
>> this since we now know that the map has an entry
>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>>
>> Second Bundle:
>> ReadableState<V> get = mapState.get(keyA);
>> V value = get.read();
>> The underlying values will be a list of changes applied in order to this
>> map. The first read that sees multiple values should clear the list and
>> resolve it similar to what a combining state does, returning the only Put
>> (since Put = clear + append) in the list or the first PutIfAbsent.
>>
>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok <[email protected]
>> <mailto:[email protected]>> wrote:
>> Would this be a breaking change then? Going by your first examples, it's no
>> longer a deferred read-then-write.
>>
>> I'm not seeing the benefit of having a changelog. If I'm reading at your
>> examples correctly, it's saying it should evaluate putIfAbsent immediately
>> and store the result in ReadableState until read is called?
>>
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>> I agree with the comment that you linked to Kiley and your analysis of what
>> happens today. Yes I believe the write will not happen until the read() is
>> invoked on ReadableState<V> returned from the putIfAbsent(). The rest of the
>> comment also says that if there are multiple putIfAbsent calls then which
>> one is written is dependent on which ReadableState<V> read() is called
>> leading to this behavior:
>> ReadableState<V> maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState<V> maybePut2 = mapState.putIfAbsent(keyA, value2);
>> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise
>> it will maintain its old value.
>> maybePut1.read(); // This is effectively ignored
>>
>> I started this thread because a lot of these behaviors are surprising and we
>> should fix them to have ordering semantics based upon the actual order of
>> interactions with the MapState and not the current semantics which rely on
>> read() being invoked.
>>
>> Conveniently we don't have to resolve the read() immediately after a
>> put/putIfAbsent and can still have a blind append on to a multimap state. We
>> can treat the underlying value store as a changelog which we can figure out
>> the true value when performing a future read if we store <Operation, Value>
>> (where Operation is Put or PutIfAbsent) and can read from the start to the
>> end looking for the last Put in the list and if none exists the first
>> PutIfAbsent.
>>
>> On Tue, Sep 7, 2021 at 6:26 PM Kiley Sok <[email protected]
>> <mailto:[email protected]>> wrote:
>> As a side note, SEO needs some work. The first result for "mapstate beam" on
>> Google is from version 2.2.0
>>
>> On Tue, Sep 7, 2021 at 6:15 PM Kiley Sok <[email protected]
>> <mailto:[email protected]>> wrote:
>> This section of the comment [1] says that calling read() is required for the
>> condition to be evaluated.
>>
>> Example in the comment is:
>> ReadableState<V> maybePut = mapState.putIfAbsent(keyA, valueA);
>> mapState.remove(keyA);
>> maybePut.read(); // writes to map
>>
>> Also, it states that it should be a deferred read-followed-by-write.
>> Deferred until read is called?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64
>>
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java#L60-L64>
>> On Tue, Aug 17, 2021 at 1:21 PM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>> I believe that all our ReadableState calls are point in time snapshots of
>> what the underlying state is plus any changes that happened within the
>> bundle already (e.g. you can see your own writes/appends/clears assuming
>> that the creation of the ReadableState happened after the mutation).
>>
>> Here are some example of what I think should happen:
>>
>> #1
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> ReadableState<T> read = mapState.get(keyA); // value should equal "valueA"
>>
>> #2
>> ReadableState<T> read = mapState.get(keyA); // read should equal prior
>> value
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>>
>> #3
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA);
>> // any calls to MapState put/remove for keyA can effectively drop the write
>> that would be pending via maybePut
>> // maybePut should also be resolved before the write happens to ensure that
>> it can return true/false truthfully if the state was empty
>>
>> #4
>> ReadableState<Boolean> maybePut = mapState.putIfAbsent(keyA, valueA)
>> // in a future bundle for same key and window
>> ReadableState<T> read = mapState.get(keyA); // value should equal "valueA"
>>
>> I think #1, #3, and #4 don't currently work and #2 doesn't work if the
>> mapState didn't contain keyA
>>
>> On Tue, Aug 17, 2021 at 12:28 PM Kenneth Knowles <[email protected]
>> <mailto:[email protected]>> wrote:
>> Can the change be observed in that case? I think the semantics should be
>> that the timing of the actual write cannot be observed.
>>
>> Kenn
>>
>> On Tue, Aug 17, 2021 at 10:19 AM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>> Yeah, the "or state is committed" means that we should resolve it before any
>> additional writes but that only addresses part of the surprise I had.
>>
>> I would have expected that the putIfAbsent would force a resolution before
>> transitioning to process the next key and window pair regardless whether the
>> read() was invoked or there were additional writes.
>>
>>
>> On Mon, Aug 16, 2021 at 10:04 AM Kenneth Knowles <[email protected]
>> <mailto:[email protected]>> wrote:
>> I think it is an implementation bug. The javadoc for putIfAbsent says "When
>> {@code read()} is called on the result or state is committed, it forces a
>> read of the map and reconciliation with any pending modifications."
>>
>> My reading of this is that the value changes for anything that happens after
>> the call to putIfAbsent. It would be good to make this clear, that any
>> subsequent observation of this cell should observe the new value.
>>
>> Kenn
>>
>> On Fri, Aug 13, 2021 at 9:32 AM Reuven Lax <[email protected]
>> <mailto:[email protected]>> wrote:
>> Yeah - I remember thinking that the computeIfAbsent/putIfAbsent semantics
>> were very weird. I almost would have preferred not having those methods in
>> MapState, even though they can be useful.
>>
>> On Fri, Aug 13, 2021 at 9:21 AM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>> I was surprised to see the MapState API for computeIfAbsent/putIfAbsent only
>> performs the write if the ReadableState that is returned is resolved.
>>
>> For example:
>> ReadableState<String> value = mapState.putIfAbsent("key", "new value maybe");
>> does nothing until
>> value.read();
>>
>> It would seem like an obvious mistake for developers to make to forget to do
>> the value.read() when coming from the Java Map API which always performs the
>> write.
>