lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r705696420



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java
##########
@@ -56,12 +56,6 @@
    * <p>Changes will not be reflected in the results returned by previous 
calls to {@link
    * ReadableState#read} on the results any of the reading methods ({@link 
#get}, {@link #keys},

Review comment:
       We should also update the contract for `SetState#contains` to say that 
it also returns whether the value is there on the return of `#read`. Similar to 
the `#isEmpty` contract.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    Iterable<Void> values = impl.get(t);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(t, null);
+                    }
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(values);
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);
+                    impl.put(value, null);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(impl.keys());
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public @Nullable Iterable<T> read() {

Review comment:
       I don't think we want the return to be `@Nullable`.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    Iterable<Void> values = impl.get(t);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(t, null);
+                    }
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(values);
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);
+                    impl.put(value, null);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(impl.keys());
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;

Review comment:
       ```suggestion
                           // TODO: Support prefetching.
                           return this;
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;

Review comment:
       ```suggestion
                           // TODO: Support prefetching.
                           return this;
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {

Review comment:
       Since we resolved the read immediately here we should use 
`ReadableStates#immediate` to return `true`/`false` instead of keeping the 
reference to values around.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {

Review comment:
       ```suggestion
                       if (!values.hasNext()) {
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {

Review comment:
       Ditto on not making the return value `@Nullable`

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {

Review comment:
       The key could be `null` since this is a user key.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));

Review comment:
       For a follow-up change:
   Note that if we use a `Map<K, List<V>>` instead of a `ArrayListMultimap` we 
can control the lifecycle of the List and how it is cleared/replaced allowing 
us to replace the copy being done via `Lists.newArrayList` to rely on 
`List#sublist` to restrict the range. Clearing the `Map` will remove the 
references to the lists and any current _sub lists_ that the user is holding 
onto will remain alive. `ArrayListMultimap` doesn't explicitly state these 
guarantees. 
   
   It is harder to do something like this for _keys_ but also possible.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<ValueT>> values() {
+                    return new ReadableState<Iterable<ValueT>>() {
+                      @Override
+                      public @Nullable Iterable<ValueT> read() {
+                        return Iterables.transform(entries().read(), e -> 
e.getValue());
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<ValueT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> 
entries() {
+                    return new ReadableState<Iterable<Map.Entry<KeyT, 
ValueT>>>() {
+                      @Override
+                      public @Nullable Iterable<Map.Entry<KeyT, ValueT>> 
read() {
+                        Iterable<KeyT> keys = keys().read();
+                        return Iterables.transform(
+                            keys, key -> Maps.immutableEntry(key, 
get(key).read()));
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> 
readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {

Review comment:
       The return here shouldn't be `@Nullable`.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));
+    if (isCleared || pendingRemoves.contains(key)) {
+      return pendingValues;
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    return Iterables.concat(persistedValues, pendingValues);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing all distinct keys in this multimap.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    if (isCleared) {
+      return 
Collections.unmodifiableCollection(Sets.newHashSet(pendingAdds.keySet()));
+    }
+
+    Set<K> keys = Sets.newHashSet(getPersistedKeys());

Review comment:
       add a short circuit for `isCleared` to only return the `pendingAdds` 
`keySet`

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<ValueT>> values() {
+                    return new ReadableState<Iterable<ValueT>>() {
+                      @Override
+                      public @Nullable Iterable<ValueT> read() {
+                        return Iterables.transform(entries().read(), e -> 
e.getValue());
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<ValueT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> 
entries() {
+                    return new ReadableState<Iterable<Map.Entry<KeyT, 
ValueT>>>() {
+                      @Override
+                      public @Nullable Iterable<Map.Entry<KeyT, ValueT>> 
read() {
+                        Iterable<KeyT> keys = keys().read();
+                        return Iterables.transform(
+                            keys, key -> Maps.immutableEntry(key, 
get(key).read()));
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> 
readLater() {
+                        return this;

Review comment:
       TODO: support prefetch

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class MultimapUserStateTest {
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedKey = "encodedKey";
+  private final String encodedWindow = "encodedWindow";
+
+  @Test
+  public void testNoPersistedValues() throws Exception {
+    FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+    MultimapUserState<String, String> userState =
+        new MultimapUserState<>(
+            fakeClient,
+            "instructionId",
+            pTransformId,
+            stateId,
+            encode(encodedWindow),
+            encode(encodedKey),
+            StringUtf8Coder.of(),
+            StringUtf8Coder.of());
+    assertTrue(Iterables.isEmpty(userState.keys()));
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    FakeBeamFnStateClient fakeClient =
+        new FakeBeamFnStateClient(
+            ImmutableMap.of(
+                createMultimapKeyStateKey(),
+                encode("A1"),
+                createMultimapValueStateKey("A1"),
+                encode("V1", "V2")));
+    MultimapUserState<String, String> userState =
+        new MultimapUserState<>(
+            fakeClient,
+            "instructionId",
+            pTransformId,
+            stateId,
+            encode(encodedWindow),
+            encode(encodedKey),
+            StringUtf8Coder.of(),
+            StringUtf8Coder.of());
+
+    Iterable<String> initValues = userState.get("A1");
+    userState.put("A1", "V3");
+    assertArrayEquals(new String[] {"V1", "V2"}, Iterables.toArray(initValues, 
String.class));
+    assertArrayEquals(
+        new String[] {"V1", "V2", "V3"}, 
Iterables.toArray(userState.get("A1"), String.class));
+    assertArrayEquals(new String[] {}, Iterables.toArray(userState.get("A2"), 
String.class));
+    userState.asyncClose();
+    assertThrows(IllegalStateException.class, () -> userState.get("A1"));
+  }
+
+  @Test
+  public void testClear() throws Exception {
+    FakeBeamFnStateClient fakeClient =
+        new FakeBeamFnStateClient(
+            ImmutableMap.of(
+                createMultimapKeyStateKey(),
+                encode("A1"),
+                createMultimapValueStateKey("A1"),
+                encode("V1", "V2")));
+    MultimapUserState<String, String> userState =
+        new MultimapUserState<>(
+            fakeClient,
+            "instructionId",
+            pTransformId,
+            stateId,
+            encode(encodedWindow),
+            encode(encodedKey),
+            StringUtf8Coder.of(),
+            StringUtf8Coder.of());
+
+    Iterable<String> initValues = userState.get("A1");
+    userState.clear();
+    assertArrayEquals(new String[] {"V1", "V2"}, Iterables.toArray(initValues, 
String.class));
+    assertTrue(Iterables.isEmpty(userState.get("A1")));

Review comment:
       nit: rely on `assertThat(collection, IsEmpty());`
   
   Makes for better error messages when tests fail.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class MultimapUserStateTest {
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedKey = "encodedKey";
+  private final String encodedWindow = "encodedWindow";
+

Review comment:
       Add tests that cover
   * usage of null keys and null values. You should be able to use 
`NullableCoder.of(StringUtf8Coder.of())`
   * usage of negative cache not invoking more state calls
   * put after remove
   * put after clear
   * remove before clear showing that the key specific clear is dropped
   * put before clear showing that the append put is dropped
   * put before remove showing that the append is dropped
   
   I'm probably missing more cases since there are a bunch of state management 
relationships we want to enforce.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;

Review comment:
       TODO: support prefetch

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));
+    if (isCleared || pendingRemoves.contains(key)) {
+      return pendingValues;
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    return Iterables.concat(persistedValues, pendingValues);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing all distinct keys in this multimap.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    if (isCleared) {
+      return 
Collections.unmodifiableCollection(Sets.newHashSet(pendingAdds.keySet()));
+    }
+
+    Set<K> keys = Sets.newHashSet(getPersistedKeys());
+    keys.removeAll(pendingRemoves);
+    keys.addAll(pendingAdds.keySet());
+    return Collections.unmodifiableCollection(keys);
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    if (!isCleared) {
+      pendingRemoves.add(key);
+    }
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      beamFnStateClient.handle(
+          
keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), 
pendingRemoves::contains);

Review comment:
       You should be able to remove a key that isn't there so you won't need to 
filter for keys that exist since that is expected to be slow if we need to read 
them all from the runner.
   
   If we knew that we had them all in memory already then it would be 
worthwhile to filter upfront.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));
+    if (isCleared || pendingRemoves.contains(key)) {
+      return pendingValues;
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    return Iterables.concat(persistedValues, pendingValues);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing all distinct keys in this multimap.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    if (isCleared) {
+      return 
Collections.unmodifiableCollection(Sets.newHashSet(pendingAdds.keySet()));
+    }
+
+    Set<K> keys = Sets.newHashSet(getPersistedKeys());
+    keys.removeAll(pendingRemoves);
+    keys.addAll(pendingAdds.keySet());
+    return Collections.unmodifiableCollection(keys);
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    if (!isCleared) {
+      pendingRemoves.add(key);
+    }
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      beamFnStateClient.handle(
+          
keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), 
pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    
StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id 
%s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      mapKeyCoder.encode(key, output);
+      StateRequest.Builder request = userStateRequest.toBuilder();
+      
request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
+      return request.build();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode key for multimap user state id %s.", 
stateId), e);
+    }
+  }
+
+  private Iterable<V> getPersistedValues(@NonNull K key) {
+    if (negativeCache.contains(key)) {
+      return Collections.emptyList();
+    }
+
+    if (persistedValues.get(key).isEmpty()) {
+      Iterable<V> values =
+          StateFetchingIterators.readAllAndDecodeStartingFrom(
+              beamFnStateClient, createUserStateRequest(key), valueCoder);
+      if (Iterables.isEmpty(values)) {
+        negativeCache.add(key);
+      }
+      persistedValues.putAll(key, values);
+    }
+    return Iterables.unmodifiableIterable(persistedValues.get(key));
+  }
+
+  private Iterable<K> getPersistedKeys() {
+    if (persistedKeys == null) {

Review comment:
       ```suggestion
       checkState(!isCleared);
       if (persistedKeys == null) {
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    Iterable<Void> values = impl.get(t);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(t, null);
+                    }
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(values);
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);
+                    impl.put(value, null);
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> isEmpty() {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return Iterables.isEmpty(impl.keys());

Review comment:
       ```suggestion
                           return !impl.keys().hasNext();
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    Iterable<Void> values = impl.get(t);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(t, null);
+                    }

Review comment:
       Since you resolved the read immediately, we should use 
`ReadableStates#immediate` to return `true`/`false` instead of holding onto 
values in memory.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +327,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Void> impl =
+                      createMultimapUserState(id, elemCoder, VoidCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));

Review comment:
       nit: no point in using a library for the same functionality since it 
doesn't reduce the amount of code
   ```suggestion
                           return impl.get(t).hasNext();
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<ValueT>> values() {
+                    return new ReadableState<Iterable<ValueT>>() {
+                      @Override
+                      public @Nullable Iterable<ValueT> read() {
+                        return Iterables.transform(entries().read(), e -> 
e.getValue());
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<ValueT>> readLater() {
+                        return this;

Review comment:
       TODO: support prefetch

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +423,138 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> 
mappingFunction) {
+                    Iterable<ValueT> values = impl.get(key);
+                    if (Iterables.isEmpty(values)) {
+                      impl.put(key, mappingFunction.apply(key));
+                    }
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        return this;

Review comment:
       TODO: support prefetch

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));
+    if (isCleared || pendingRemoves.contains(key)) {
+      return pendingValues;
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    return Iterables.concat(persistedValues, pendingValues);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing all distinct keys in this multimap.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    if (isCleared) {
+      return 
Collections.unmodifiableCollection(Sets.newHashSet(pendingAdds.keySet()));
+    }
+
+    Set<K> keys = Sets.newHashSet(getPersistedKeys());
+    keys.removeAll(pendingRemoves);
+    keys.addAll(pendingAdds.keySet());
+    return Collections.unmodifiableCollection(keys);
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    if (!isCleared) {
+      pendingRemoves.add(key);
+    }
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+

Review comment:
       You might as well move the `isClosed = true;` to just after the 
`checkState` and remove it from the two places down below.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues =
+        
Collections.unmodifiableCollection(Lists.newArrayList(pendingAdds.get(key)));
+    if (isCleared || pendingRemoves.contains(key)) {
+      return pendingValues;
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    return Iterables.concat(persistedValues, pendingValues);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing all distinct keys in this multimap.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    if (isCleared) {
+      return 
Collections.unmodifiableCollection(Sets.newHashSet(pendingAdds.keySet()));
+    }
+
+    Set<K> keys = Sets.newHashSet(getPersistedKeys());
+    keys.removeAll(pendingRemoves);
+    keys.addAll(pendingAdds.keySet());
+    return Collections.unmodifiableCollection(keys);
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    if (!isCleared) {
+      pendingRemoves.add(key);
+    }
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      beamFnStateClient.handle(
+          
keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), 
pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    
StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id 
%s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      mapKeyCoder.encode(key, output);
+      StateRequest.Builder request = userStateRequest.toBuilder();
+      
request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
+      return request.build();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode key for multimap user state id %s.", 
stateId), e);
+    }
+  }
+
+  private Iterable<V> getPersistedValues(@NonNull K key) {

Review comment:
       User key can be null.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> mapKeyCoder;
+  private final Coder<V> valueCoder;
+  private final String stateId;
+  private final StateRequest keysStateRequest;
+  private final StateRequest userStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  // Pending updates to persistent storage
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  // Map keys with no values in persistent storage
+  private Set<K> negativeCache = Sets.newHashSet();
+  // Values retrieved from persistent storage
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> mapKeyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.mapKeyCoder = mapKeyCoder;
+    this.valueCoder = valueCoder;
+    this.stateId = stateId;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setKey(encodedKey)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+
+    StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
+    userStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(encodedKey);
+    userStateRequest = userStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;

Review comment:
       negative cache should be cleared.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to