This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new d8e7741  [FLINK-10267][state] Fix arbitrary iterator access on 
RocksDBMapIterator
d8e7741 is described below

commit d8e7741bf4fa5d625d381dea0d5b7512b2a56372
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu Aug 30 19:30:56 2018 +0200

    [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
---
 .../flink/runtime/state/StateBackendTestBase.java  | 47 ++++++++++++++++++++++
 .../contrib/streaming/state/RocksDBMapState.java   | 30 +++++++-------
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5aec36f..d897360 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2749,6 +2750,52 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        /**
+        * Verify iterator of {@link MapState} supporting arbitrary access, see 
[FLINK-10267] to know more details.
+        */
+       @Test
+       public void testMapStateIteratorArbitraryAccess() throws Exception {
+               MapStateDescriptor<Integer, Long> kvId = new 
MapStateDescriptor<>("id", Integer.class, Long.class);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               try {
+                       MapState<Integer, Long> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.setCurrentKey(1);
+                       int stateSize = 4096;
+                       for (int i = 0; i < stateSize; i++) {
+                               state.put(i, i * 2L);
+                       }
+                       Iterator<Map.Entry<Integer, Long>> iterator = 
state.iterator();
+                       int iteratorCount = 0;
+                       while (iterator.hasNext()) {
+                               Map.Entry<Integer, Long> entry = 
iterator.next();
+                               assertEquals(iteratorCount, (int) 
entry.getKey());
+                               switch (ThreadLocalRandom.current().nextInt() % 
3) {
+                                       case 0: // remove twice
+                                               iterator.remove();
+                                               try {
+                                                       iterator.remove();
+                                                       fail();
+                                               } catch (IllegalStateException 
e) {
+                                                       // ignore expected 
exception
+                                               }
+                                               break;
+                                       case 1: // hasNext -> remove
+                                               iterator.hasNext();
+                                               iterator.remove();
+                                               break;
+                                       case 2: // nothing to do
+                                               break;
+                               }
+                               iteratorCount++;
+                       }
+                       assertEquals(stateSize, iteratorCount);
+               } finally {
+                       backend.dispose();
+               }
+       }
+
+       /**
         * Verify that {@link ValueStateDescriptor} allows {@code null} as 
default.
         */
        @Test
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 04b4af3..d1e95a0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -506,6 +506,7 @@ public class RocksDBMapState<K, N, UK, UV>
                 * have the same prefix, hence we can stop iterating once 
coming across an
                 * entry with a different prefix.
                 */
+               @Nonnull
                private final byte[] keyPrefixBytes;
 
                /**
@@ -516,6 +517,9 @@ public class RocksDBMapState<K, N, UK, UV>
 
                /** A in-memory cache for the entries in the rocksdb. */
                private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
+
+               /** The entry pointing to the current position which is last 
returned by calling {@link #nextEntry()}. */
+               private RocksDBMapEntry currentEntry;
                private int cacheIndex = 0;
 
                private final TypeSerializer<UK> keySerializer;
@@ -542,12 +546,11 @@ public class RocksDBMapState<K, N, UK, UV>
 
                @Override
                public void remove() {
-                       if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
+                       if (currentEntry == null || currentEntry.deleted) {
                                throw new IllegalStateException("The remove 
operation must be called after a valid next operation.");
                        }
 
-                       RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
-                       lastEntry.remove();
+                       currentEntry.remove();
                }
 
                final RocksDBMapEntry nextEntry() {
@@ -561,10 +564,10 @@ public class RocksDBMapState<K, N, UK, UV>
                                return null;
                        }
 
-                       RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+                       this.currentEntry = cacheEntries.get(cacheIndex);
                        cacheIndex++;
 
-                       return entry;
+                       return currentEntry;
                }
 
                private void loadCache() {
@@ -582,12 +585,11 @@ public class RocksDBMapState<K, N, UK, UV>
                        try (RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
                                /*
-                                * The iteration starts from the prefix bytes 
at the first loading. The cache then is
-                                * reloaded when the next entry to return is 
the last one in the cache. At that time,
-                                * we will start the iterating from the last 
returned entry.
-                                */
-                               RocksDBMapEntry lastEntry = cacheEntries.size() 
== 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-                               byte[] startBytes = (lastEntry == null ? 
keyPrefixBytes : lastEntry.rawKeyBytes);
+                                * The iteration starts from the prefix bytes 
at the first loading. After #nextEntry() is called,
+                                * the currentEntry points to the last returned 
entry, and at that time, we will start
+                                * the iterating from currentEntry if reloading 
cache is needed.
+                                */
+                               byte[] startBytes = (currentEntry == null ? 
keyPrefixBytes : currentEntry.rawKeyBytes);
 
                                cacheEntries.clear();
                                cacheIndex = 0;
@@ -595,10 +597,10 @@ public class RocksDBMapState<K, N, UK, UV>
                                iterator.seek(startBytes);
 
                                /*
-                                * If the last returned entry is not deleted, 
it will be the first entry in the
-                                * iterating. Skip it to avoid redundant access 
in such cases.
+                                * If the entry pointing to the current 
position is not removed, it will be the first entry in the
+                                * new iterating. Skip it to avoid redundant 
access in such cases.
                                 */
-                               if (lastEntry != null && !lastEntry.deleted) {
+                               if (currentEntry != null && 
!currentEntry.deleted) {
                                        iterator.next();
                                }
 

Reply via email to