kileys commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r687372293



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2172,22 +2170,6 @@ static void verifyDoFnSupported(DoFn<?, ?> fn, boolean 
streaming, boolean stream
               "%s does not currently support @RequiresTimeSortedInput in 
streaming mode.",
               DataflowRunner.class.getSimpleName()));
     }
-    if (DoFnSignatures.usesSetState(fn)) {

Review comment:
       Did you mean it's only supported if useUnifiedWorker is true?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {

Review comment:
       It'll wait until the read is called to be added. Based it off the 
existing implementation in WindmillStateInternals

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't 
ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || 
pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), 
pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), 
pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this 
multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), 
pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> 
!pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), 
pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), 
pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    
StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Update keys
+    if (isCleared || !pendingRemoves.isEmpty()) {
+      beamFnStateClient.handle(
+          
keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              
.setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(keys())).build()),
+          new CompletableFuture<>());
+    } else {
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              
.setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(pendingKeys)).build()),
+          new CompletableFuture<>());
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeKeys(Iterable<K> keys) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (K key : keys) {
+        keyCoder.encode(key, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode keys for multimap user state id 
%s.", stateId), e);
+    }
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id 
%s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    ByteString.Output keyStream = ByteString.newOutput();
+    try {
+      encodedKey.writeTo(keyStream);
+      keyCoder.encode(key, keyStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode key %s for multimap user state id 
%s.", key, stateId), e);
+    }
+
+    StateRequest.Builder request = StateRequest.newBuilder();
+    request
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(keyStream.toByteString());
+    return request.build();
+  }
+
+  private Iterable<V> getPersistedValues(K key) {
+    if (persistedValues.get(key).isEmpty()) {

Review comment:
       I kept this method dumb and use the negativeCache to store non-persisted 
keys

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, 
Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state 
to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, 
BooleanCoder.of());

Review comment:
       Existing implementation used boolean and I was wondering if it would 
cause problems in runner side storing nulls

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }

Review comment:
       Wrote some basic info, let me know if there's more you think would be 
good

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =

Review comment:
       Since we're returning a set, removed this

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State 
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't 
ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, 
if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || 
pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), 
pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), 
pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)

Review comment:
       There's an issue with lambdas in Checker Framework. It's fixed in a more 
updated version that we don't have yet.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to