This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 547b3be [FLINK-24597][state] Avoid
KeyedStateBackend#getKeysAndNamespaces to return duplicate data
547b3be is described below
commit 547b3befcff50bf8fc2bef4e596cd39a55c7d4b2
Author: mayue.fight <[email protected]>
AuthorDate: Wed Oct 20 14:46:04 2021 +0800
[FLINK-24597][state] Avoid KeyedStateBackend#getKeysAndNamespaces to return
duplicate data
This closes #17525
---
.../flink/runtime/state/StateBackendTestBase.java | 45 ++++++++++++++++++++++
.../RocksStateKeysAndNamespaceIterator.java | 23 +++++++----
2 files changed, 60 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5867a4f..0d15b32 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -5080,6 +5081,50 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> exten
}
@Test
+ public void testMapStateGetKeysAndNamespaces() throws Exception {
+ final int elementsNum = 1000;
+ String fieldName = "get-keys-test";
+ CheckpointableKeyedStateBackend<Integer> backend =
+ createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ InternalMapState<Integer, String, String, Integer> internalState =
+ backend.createInternalState(
+ StringSerializer.INSTANCE,
+ new MapStateDescriptor<>(
+ fieldName, StringSerializer.INSTANCE,
IntSerializer.INSTANCE));
+ String[] namespaces = new String[] {"ns1", "ns2"};
+
+ for (int key = 0; key < elementsNum; key++) {
+ backend.setCurrentKey(key);
+ for (String ns : namespaces) {
+ internalState.setCurrentNamespace(ns);
+ internalState.put("hello", key);
+ internalState.put("world", key);
+ }
+ }
+
+ try (Stream<Tuple2<Integer, String>> stream =
backend.getKeysAndNamespaces(fieldName)) {
+ final Map<String, Set<Integer>> keysByNamespace = new
HashMap<>();
+ stream.forEach(
+ entry -> {
+ assertThat("Unexpected namespace", entry.f1,
isOneOf(namespaces));
+ assertThat(
+ "Unexpected key",
+ entry.f0,
+
is(both(greaterThanOrEqualTo(0)).and(lessThan(elementsNum))));
+
+ Set<Integer> keys =
+ keysByNamespace.computeIfAbsent(entry.f1,
k -> new HashSet<>());
+ assertTrue("Duplicate key for namespace",
keys.add(entry.f0));
+ });
+ }
+ } finally {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+
+ @Test
public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync()
throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
index 386c33b..f01a31d 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
/**
* Adapter class to bridge between {@link RocksIteratorWrapper} and {@link
Iterator} to iterate over
@@ -41,7 +42,8 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends
AbstractRocksState
@Nonnull private final TypeSerializer<N> namespaceSerializer;
- private Tuple2<K, N> nextKey;
+ private Tuple2<K, N> nextKeyAndNamespace;
+ private Tuple2<K, N> previousKeyAndNamespace;
public RocksStateKeysAndNamespaceIterator(
@Nonnull RocksIteratorWrapper iterator,
@@ -53,26 +55,31 @@ public class RocksStateKeysAndNamespaceIterator<K, N>
extends AbstractRocksState
super(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible);
this.namespaceSerializer = namespaceSerializer;
- this.nextKey = null;
+ this.nextKeyAndNamespace = null;
+ this.previousKeyAndNamespace = null;
}
@Override
public boolean hasNext() {
try {
- while (nextKey == null && iterator.isValid()) {
+ while (nextKeyAndNamespace == null && iterator.isValid()) {
final byte[] keyBytes = iterator.key();
final K currentKey = deserializeKey(keyBytes,
byteArrayDataInputView);
final N currentNamespace =
CompositeKeySerializationUtils.readNamespace(
namespaceSerializer, byteArrayDataInputView,
ambiguousKeyPossible);
- nextKey = Tuple2.of(currentKey, currentNamespace);
+ final Tuple2<K, N> currentKeyAndNamespace =
Tuple2.of(currentKey, currentNamespace);
+ if (!Objects.equals(previousKeyAndNamespace,
currentKeyAndNamespace)) {
+ previousKeyAndNamespace = currentKeyAndNamespace;
+ nextKeyAndNamespace = currentKeyAndNamespace;
+ }
iterator.next();
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to access state [" + state
+ "]", e);
}
- return nextKey != null;
+ return nextKeyAndNamespace != null;
}
@Override
@@ -81,8 +88,8 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends
AbstractRocksState
throw new NoSuchElementException("Failed to access state [" +
state + "]");
}
- Tuple2<K, N> tmpKey = nextKey;
- nextKey = null;
- return tmpKey;
+ Tuple2<K, N> tmpKeyAndNamespace = nextKeyAndNamespace;
+ nextKeyAndNamespace = null;
+ return tmpKeyAndNamespace;
}
}