This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a907d92  [FLINK-24597][state] Avoid 
KeyedStateBackend#getKeysAndNamespaces to return duplicate data
a907d92 is described below

commit a907d92673a711612b287d184c00dad7fa42269f
Author: mayue.fight <[email protected]>
AuthorDate: Wed Oct 20 14:46:04 2021 +0800

    [FLINK-24597][state] Avoid KeyedStateBackend#getKeysAndNamespaces to return 
duplicate data
    
    This closes #17525
---
 .../flink/runtime/state/StateBackendTestBase.java  | 45 ++++++++++++++++++++++
 .../RocksStateKeysAndNamespaceIterator.java        | 23 +++++++----
 2 files changed, 60 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3e35344..3bade95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -5085,6 +5086,50 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
     }
 
     @Test
+    public void testMapStateGetKeysAndNamespaces() throws Exception {
+        final int elementsNum = 1000;
+        String fieldName = "get-keys-test";
+        CheckpointableKeyedStateBackend<Integer> backend =
+                createKeyedBackend(IntSerializer.INSTANCE);
+        try {
+            InternalMapState<Integer, String, String, Integer> internalState =
+                    backend.createInternalState(
+                            StringSerializer.INSTANCE,
+                            new MapStateDescriptor<>(
+                                    fieldName, StringSerializer.INSTANCE, 
IntSerializer.INSTANCE));
+            String[] namespaces = new String[] {"ns1", "ns2"};
+
+            for (int key = 0; key < elementsNum; key++) {
+                backend.setCurrentKey(key);
+                for (String ns : namespaces) {
+                    internalState.setCurrentNamespace(ns);
+                    internalState.put("hello", key);
+                    internalState.put("world", key);
+                }
+            }
+
+            try (Stream<Tuple2<Integer, String>> stream = 
backend.getKeysAndNamespaces(fieldName)) {
+                final Map<String, Set<Integer>> keysByNamespace = new 
HashMap<>();
+                stream.forEach(
+                        entry -> {
+                            assertThat("Unexpected namespace", entry.f1, 
isOneOf(namespaces));
+                            assertThat(
+                                    "Unexpected key",
+                                    entry.f0,
+                                    
is(both(greaterThanOrEqualTo(0)).and(lessThan(elementsNum))));
+
+                            Set<Integer> keys =
+                                    keysByNamespace.computeIfAbsent(entry.f1, 
k -> new HashSet<>());
+                            assertTrue("Duplicate key for namespace", 
keys.add(entry.f0));
+                        });
+            }
+        } finally {
+            IOUtils.closeQuietly(backend);
+            backend.dispose();
+        }
+    }
+
+    @Test
     public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() 
throws Exception {
         CheckpointStreamFactory streamFactory = createStreamFactory();
         ExecutorService executorService = Executors.newScheduledThreadPool(1);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
index 386c33b..f01a31d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
@@ -28,6 +28,7 @@ import javax.annotation.Nonnull;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 
 /**
  * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over
@@ -41,7 +42,8 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends 
AbstractRocksState
 
     @Nonnull private final TypeSerializer<N> namespaceSerializer;
 
-    private Tuple2<K, N> nextKey;
+    private Tuple2<K, N> nextKeyAndNamespace;
+    private Tuple2<K, N> previousKeyAndNamespace;
 
     public RocksStateKeysAndNamespaceIterator(
             @Nonnull RocksIteratorWrapper iterator,
@@ -53,26 +55,31 @@ public class RocksStateKeysAndNamespaceIterator<K, N> 
extends AbstractRocksState
         super(iterator, state, keySerializer, keyGroupPrefixBytes, 
ambiguousKeyPossible);
 
         this.namespaceSerializer = namespaceSerializer;
-        this.nextKey = null;
+        this.nextKeyAndNamespace = null;
+        this.previousKeyAndNamespace = null;
     }
 
     @Override
     public boolean hasNext() {
         try {
-            while (nextKey == null && iterator.isValid()) {
+            while (nextKeyAndNamespace == null && iterator.isValid()) {
 
                 final byte[] keyBytes = iterator.key();
                 final K currentKey = deserializeKey(keyBytes, 
byteArrayDataInputView);
                 final N currentNamespace =
                         CompositeKeySerializationUtils.readNamespace(
                                 namespaceSerializer, byteArrayDataInputView, 
ambiguousKeyPossible);
-                nextKey = Tuple2.of(currentKey, currentNamespace);
+                final Tuple2<K, N> currentKeyAndNamespace = 
Tuple2.of(currentKey, currentNamespace);
+                if (!Objects.equals(previousKeyAndNamespace, 
currentKeyAndNamespace)) {
+                    previousKeyAndNamespace = currentKeyAndNamespace;
+                    nextKeyAndNamespace = currentKeyAndNamespace;
+                }
                 iterator.next();
             }
         } catch (Exception e) {
             throw new FlinkRuntimeException("Failed to access state [" + state 
+ "]", e);
         }
-        return nextKey != null;
+        return nextKeyAndNamespace != null;
     }
 
     @Override
@@ -81,8 +88,8 @@ public class RocksStateKeysAndNamespaceIterator<K, N> extends 
AbstractRocksState
             throw new NoSuchElementException("Failed to access state [" + 
state + "]");
         }
 
-        Tuple2<K, N> tmpKey = nextKey;
-        nextKey = null;
-        return tmpKey;
+        Tuple2<K, N> tmpKeyAndNamespace = nextKeyAndNamespace;
+        nextKeyAndNamespace = null;
+        return tmpKeyAndNamespace;
     }
 }

Reply via email to