lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r712417372
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +416,133 @@ public void clear() {
StateSpec<MapState<KeyT, ValueT>> spec,
Coder<KeyT> mapKeyCoder,
Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (MapState<KeyT, ValueT>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new MapState<KeyT, ValueT>() {
+ private final MultimapUserState<KeyT, ValueT> impl =
+ createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public void put(KeyT key, ValueT value) {
+ impl.remove(key);
+ impl.put(key, value);
+ }
+
+ @Override
+ public ReadableState<ValueT> computeIfAbsent(
+ KeyT key, Function<? super KeyT, ? extends ValueT>
mappingFunction) {
+ Iterable<ValueT> values = impl.get(key);
+ if (Iterables.isEmpty(values)) {
+ impl.put(key, mappingFunction.apply(key));
+ }
+ return
ReadableStates.immediate(Iterables.getOnlyElement(values, null));
+ }
+
+ @Override
+ public void remove(KeyT key) {
+ impl.remove(key);
+ }
+
+ @Override
+ public ReadableState<ValueT> get(KeyT key) {
+ return getOrDefault(key, null);
+ }
+
+ @Override
+ public ReadableState<ValueT> getOrDefault(
+ KeyT key, @Nullable ValueT defaultValue) {
+ return new ReadableState<ValueT>() {
+ @Override
+ public @Nullable ValueT read() {
+ Iterable<ValueT> values = impl.get(key);
+ return Iterables.getOnlyElement(values, defaultValue);
+ }
+
+ @Override
+ public ReadableState<ValueT> readLater() {
+ // TODO: Support prefetching.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Iterable<KeyT>> keys() {
+ return new ReadableState<Iterable<KeyT>>() {
+ @Override
+ public Iterable<KeyT> read() {
+ return impl.keys();
+ }
+
+ @Override
+ public ReadableState<Iterable<KeyT>> readLater() {
+ // TODO: Support prefetching.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Iterable<ValueT>> values() {
+ return new ReadableState<Iterable<ValueT>>() {
+ @Override
+ public Iterable<ValueT> read() {
+ return Iterables.transform(entries().read(), e ->
e.getValue());
+ }
+
+ @Override
+ public ReadableState<Iterable<ValueT>> readLater() {
+ // TODO: Support prefetching.
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>
entries() {
+ return new ReadableState<Iterable<Map.Entry<KeyT,
ValueT>>>() {
+ @Override
+ public @Nullable Iterable<Map.Entry<KeyT, ValueT>>
read() {
Review comment:
```suggestion
public Iterable<Map.Entry<KeyT, ValueT>> read() {
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Void> impl =
+ createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> contains(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return !Iterables.isEmpty(impl.get(t));
Review comment:
sg
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
##########
@@ -56,12 +56,6 @@
* <p>Changes will not be reflected in the results returned by previous
calls to {@link
* ReadableState#read} on the results any of the reading methods ({@link
#get}, {@link #keys},
* {@link #values}, and {@link #entries}).
- *
- * <p>Since the condition is not evaluated until {@link ReadableState#read}
is called, a call to
Review comment:
We'll need a follow-up change fixing the other implementations of
MapState to perform the correct order of actions for `putIfAbsent` and
`computeIfAbsent`.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Void> impl =
+ createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> contains(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return !Iterables.isEmpty(impl.get(t));
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Boolean> addIfAbsent(T t) {
+ Iterable<Void> values = impl.get(t);
+ if (Iterables.isEmpty(values)) {
+ impl.put(t, null);
+ }
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return Iterables.isEmpty(values);
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public void remove(T t) {
+ impl.remove(t);
+ }
+
+ @Override
+ public void add(T value) {
+ impl.remove(value);
+ impl.put(value, null);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return Iterables.isEmpty(impl.keys());
Review comment:
sg
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]