kileys commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r687372293
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2172,22 +2170,6 @@ static void verifyDoFnSupported(DoFn<?, ?> fn, boolean
streaming, boolean stream
"%s does not currently support @RequiresTimeSortedInput in
streaming mode.",
DataflowRunner.class.getSimpleName()));
}
- if (DoFnSignatures.usesSetState(fn)) {
Review comment:
Did you mean it's only supported if useUnifiedWorker is true?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Boolean> impl =
+ createMultimapUserState(id, elemCoder,
BooleanCoder.of());
+
+ @Override
+ public void clear() {
+ impl.clear();
+ }
+
+ @Override
+ public ReadableState<Boolean> contains(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
+ return !Iterables.isEmpty(impl.get(t));
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public ReadableState<Boolean> addIfAbsent(T t) {
+ return new ReadableState<Boolean>() {
+ @Override
+ public @Nullable Boolean read() {
Review comment:
It'll wait until the read is called to be added. Based it off the
existing implementation in WindmillStateInternals
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+ private final BeamFnStateClient beamFnStateClient;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private final String instructionId;
+ private final String pTransformId;
+ private final String stateId;
+ private final ByteString encodedWindow;
+ private final ByteString encodedKey;
+ private final StateRequest keysStateRequest;
+
+ private boolean isClosed;
+ private boolean isCleared;
+ private Set<K> pendingRemoves = Sets.newHashSet();
+ private Set<K> negativeCache = Sets.newHashSet();
+ private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+ private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+ private @Nullable Iterable<K> persistedKeys = null;
+ private List<K> pendingKeys =
+ Lists.newArrayList(); // separate from pendingAdd since keys aren't
ordered.
+
+ public MultimapUserState(
+ BeamFnStateClient beamFnStateClient,
+ String instructionId,
+ String pTransformId,
+ String stateId,
+ ByteString encodedWindow,
+ ByteString encodedKey,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder) {
+ this.beamFnStateClient = beamFnStateClient;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.instructionId = instructionId;
+ this.pTransformId = pTransformId;
+ this.stateId = stateId;
+ this.encodedWindow = encodedWindow;
+ this.encodedKey = encodedKey;
+
+ StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+ keysStateRequestBuilder
+ .setInstructionId(instructionId)
+ .getStateKeyBuilder()
+ .getMultimapKeysUserStateBuilder()
+ .setTransformId(pTransformId)
+ .setUserStateId(stateId)
+ .setWindow(encodedWindow);
+ keysStateRequest = keysStateRequestBuilder.build();
+ }
+
+ public void clear() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ isCleared = true;
+ persistedValues = ArrayListMultimap.create();
+ persistedKeys = null;
+ pendingRemoves = Sets.newHashSet();
+ pendingAdds = ArrayListMultimap.create();
+ pendingKeys = Lists.newArrayList();
+ }
+
+ /*
+ * Returns an iterable of the values associated with key in this multimap,
if any.
+ * If there are no values, this returns an empty collection, not null.
+ */
+ public Iterable<V> get(@NonNull K key) {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ Collection<V> pendingValues = pendingAdds.get(key);
+ if (isCleared || negativeCache.contains(key) ||
pendingRemoves.contains(key)) {
+ return Iterables.limit(Iterables.unmodifiableIterable(pendingValues),
pendingValues.size());
+ }
+
+ Iterable<V> persistedValues = getPersistedValues(key);
+ if (Iterables.isEmpty(persistedValues)) {
+ negativeCache.add(key);
+ }
+ return Iterables.concat(
+ persistedValues,
+ Iterables.limit(Iterables.unmodifiableIterable(pendingValues),
pendingValues.size()));
+ }
+
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+ })
+ /*
+ * Returns an iterables containing the key from each key-value pair in this
multimap, without collapsing duplicates.
+ */
+ public Iterable<K> keys() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ if (isCleared) {
+ return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys),
pendingKeys.size());
+ }
+
+ Iterable<K> persistedKeys = getPersistedKeys();
+ persistedKeys = Iterables.filter(persistedKeys, key ->
!pendingRemoves.contains(key));
+ return Iterables.concat(
+ persistedKeys,
+ Iterables.limit(Iterables.unmodifiableIterable(pendingKeys),
pendingKeys.size()));
+ }
+
+ /*
+ * Store a key-value pair in the multimap.
+ * Allows duplicate key-value pairs.
+ */
+ public void put(K key, V value) {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ pendingAdds.put(key, value);
+ pendingKeys.add(key);
+ }
+
+ /*
+ * Removes all values for this key in the multimap.
+ */
+ public void remove(@NonNull K key) {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ pendingAdds.removeAll(key);
+ pendingKeys.removeAll(Collections.singletonList(key));
+ pendingRemoves.add(key);
+ }
+
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+ })
+ // Update data in persistent store
+ public void asyncClose() throws Exception {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ // Nothing to persist
+ if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+ isClosed = true;
+ return;
+ }
+
+ // Clear currently persisted key-values
+ if (isCleared) {
+ for (K key : getPersistedKeys()) {
+ beamFnStateClient.handle(
+ createUserStateRequest(key)
+ .toBuilder()
+ .setClear(StateClearRequest.getDefaultInstance()),
+ new CompletableFuture<>());
+ }
+ } else if (!pendingRemoves.isEmpty()) {
+ Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(),
pendingRemoves::contains);
+ for (K key : removeKeys) {
+ beamFnStateClient.handle(
+ createUserStateRequest(key)
+ .toBuilder()
+ .setClear(StateClearRequest.getDefaultInstance()),
+ new CompletableFuture<>());
+ }
+ }
+
+ // Persist pending key-values
+ if (!pendingAdds.isEmpty()) {
+ for (K key : pendingAdds.keySet()) {
+ beamFnStateClient.handle(
+ createUserStateRequest(key)
+ .toBuilder()
+ .setAppend(
+
StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+ new CompletableFuture<>());
+ }
+ }
+
+ // Update keys
+ if (isCleared || !pendingRemoves.isEmpty()) {
+ beamFnStateClient.handle(
+
keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+ new CompletableFuture<>());
+ beamFnStateClient.handle(
+ keysStateRequest
+ .toBuilder()
+
.setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(keys())).build()),
+ new CompletableFuture<>());
+ } else {
+ beamFnStateClient.handle(
+ keysStateRequest
+ .toBuilder()
+
.setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(pendingKeys)).build()),
+ new CompletableFuture<>());
+ }
+ isClosed = true;
+ }
+
+ private ByteString encodeKeys(Iterable<K> keys) {
+ try {
+ ByteString.Output output = ByteString.newOutput();
+ for (K key : keys) {
+ keyCoder.encode(key, output);
+ }
+ return output.toByteString();
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Failed to encode keys for multimap user state id
%s.", stateId), e);
+ }
+ }
+
+ private ByteString encodeValues(Iterable<V> values) {
+ try {
+ ByteString.Output output = ByteString.newOutput();
+ for (V value : values) {
+ valueCoder.encode(value, output);
+ }
+ return output.toByteString();
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Failed to encode values for multimap user state id
%s.", stateId), e);
+ }
+ }
+
+ private StateRequest createUserStateRequest(K key) {
+ ByteString.Output keyStream = ByteString.newOutput();
+ try {
+ encodedKey.writeTo(keyStream);
+ keyCoder.encode(key, keyStream);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Failed to encode key %s for multimap user state id
%s.", key, stateId), e);
+ }
+
+ StateRequest.Builder request = StateRequest.newBuilder();
+ request
+ .setInstructionId(instructionId)
+ .getStateKeyBuilder()
+ .getMultimapUserStateBuilder()
+ .setTransformId(pTransformId)
+ .setUserStateId(stateId)
+ .setWindow(encodedWindow)
+ .setKey(keyStream.toByteString());
+ return request.build();
+ }
+
+ private Iterable<V> getPersistedValues(K key) {
+ if (persistedValues.get(key).isEmpty()) {
Review comment:
I kept this method dumb and use the negativeCache to store non-persisted
keys
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
@Override
public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec,
Coder<T> elemCoder) {
- throw new UnsupportedOperationException("TODO: Add support for a map state
to the Fn API.");
+ return (SetState<T>)
+ stateKeyObjectCache.computeIfAbsent(
+ createMultimapUserStateKey(id),
+ new Function<StateKey, Object>() {
+ @Override
+ public Object apply(StateKey key) {
+ return new SetState<T>() {
+ private final MultimapUserState<T, Boolean> impl =
+ createMultimapUserState(id, elemCoder,
BooleanCoder.of());
Review comment:
Existing implementation used boolean and I was wondering if it would
cause problems in runner side storing nulls
##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
bytes key = 4;
}
Review comment:
Wrote some basic info, let me know if there's more you think would be
good
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+ private final BeamFnStateClient beamFnStateClient;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private final String instructionId;
+ private final String pTransformId;
+ private final String stateId;
+ private final ByteString encodedWindow;
+ private final ByteString encodedKey;
+ private final StateRequest keysStateRequest;
+
+ private boolean isClosed;
+ private boolean isCleared;
+ private Set<K> pendingRemoves = Sets.newHashSet();
+ private Set<K> negativeCache = Sets.newHashSet();
+ private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+ private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+ private @Nullable Iterable<K> persistedKeys = null;
+ private List<K> pendingKeys =
Review comment:
Since we're returning a set, removed this
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State
API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled
based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+ private final BeamFnStateClient beamFnStateClient;
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private final String instructionId;
+ private final String pTransformId;
+ private final String stateId;
+ private final ByteString encodedWindow;
+ private final ByteString encodedKey;
+ private final StateRequest keysStateRequest;
+
+ private boolean isClosed;
+ private boolean isCleared;
+ private Set<K> pendingRemoves = Sets.newHashSet();
+ private Set<K> negativeCache = Sets.newHashSet();
+ private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+ private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+ private @Nullable Iterable<K> persistedKeys = null;
+ private List<K> pendingKeys =
+ Lists.newArrayList(); // separate from pendingAdd since keys aren't
ordered.
+
+ public MultimapUserState(
+ BeamFnStateClient beamFnStateClient,
+ String instructionId,
+ String pTransformId,
+ String stateId,
+ ByteString encodedWindow,
+ ByteString encodedKey,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder) {
+ this.beamFnStateClient = beamFnStateClient;
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ this.instructionId = instructionId;
+ this.pTransformId = pTransformId;
+ this.stateId = stateId;
+ this.encodedWindow = encodedWindow;
+ this.encodedKey = encodedKey;
+
+ StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+ keysStateRequestBuilder
+ .setInstructionId(instructionId)
+ .getStateKeyBuilder()
+ .getMultimapKeysUserStateBuilder()
+ .setTransformId(pTransformId)
+ .setUserStateId(stateId)
+ .setWindow(encodedWindow);
+ keysStateRequest = keysStateRequestBuilder.build();
+ }
+
+ public void clear() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ isCleared = true;
+ persistedValues = ArrayListMultimap.create();
+ persistedKeys = null;
+ pendingRemoves = Sets.newHashSet();
+ pendingAdds = ArrayListMultimap.create();
+ pendingKeys = Lists.newArrayList();
+ }
+
+ /*
+ * Returns an iterable of the values associated with key in this multimap,
if any.
+ * If there are no values, this returns an empty collection, not null.
+ */
+ public Iterable<V> get(@NonNull K key) {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+
+ Collection<V> pendingValues = pendingAdds.get(key);
+ if (isCleared || negativeCache.contains(key) ||
pendingRemoves.contains(key)) {
+ return Iterables.limit(Iterables.unmodifiableIterable(pendingValues),
pendingValues.size());
+ }
+
+ Iterable<V> persistedValues = getPersistedValues(key);
+ if (Iterables.isEmpty(persistedValues)) {
+ negativeCache.add(key);
+ }
+ return Iterables.concat(
+ persistedValues,
+ Iterables.limit(Iterables.unmodifiableIterable(pendingValues),
pendingValues.size()));
+ }
+
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
Review comment:
There's an issue with lambdas in Checker Framework. It's fixed in a more
updated version that we don't have yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]