lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r712417372



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +416,133 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return 
ReadableStates.immediate(Iterables.getOnlyElement(values, null));
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        // TODO: Support prefetching.
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        // TODO: Support prefetching.
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<ValueT>> values() {
+                    return new ReadableState<Iterable<ValueT>>() {
+                      @Override
+                      public Iterable<ValueT> read() {
+                        return Iterables.transform(entries().read(), e -> 
e.getValue());
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<ValueT>> readLater() {
+                        // TODO: Support prefetching.
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> 
entries() {
+                    return new ReadableState<Iterable<Map.Entry<KeyT, 
ValueT>>>() {
+                      @Override
+                      public @Nullable Iterable<Map.Entry<KeyT, ValueT>> 
read() {

Review comment:
       ```suggestion
                         public Iterable<Map.Entry<KeyT, ValueT>> read() {
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));

Review comment:
       sg

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
##########
@@ -56,12 +56,6 @@
    * <p>Changes will not be reflected in the results returned by previous 
calls to {@link
    * ReadableState#read} on the results any of the reading methods ({@link 
#get}, {@link #keys},
    * {@link #values}, and {@link #entries}).
-   *
-   * <p>Since the condition is not evaluated until {@link ReadableState#read} 
is called, a call to

Review comment:
       We'll need a follow-up change fixing the other implementations of 
MapState to perform the correct order of actions for `putIfAbsent` and 
`computeIfAbsent`.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    Iterable<Void> values = impl.get(t);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(t, null);
+                    }
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(values);
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);
+                    impl.put(value, null);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(impl.keys());

Review comment:
       sg




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to