This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push: new d8e7741 [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator d8e7741 is described below commit d8e7741bf4fa5d625d381dea0d5b7512b2a56372 Author: Yun Tang <myas...@live.com> AuthorDate: Thu Aug 30 19:30:56 2018 +0200 [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator --- .../flink/runtime/state/StateBackendTestBase.java | 47 ++++++++++++++++++++++ .../contrib/streaming/state/RocksDBMapState.java | 30 +++++++------- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 5aec36f..d897360 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -2749,6 +2750,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } /** + * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details. + */ + @Test + public void testMapStateIteratorArbitraryAccess() throws Exception { + MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class); + + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + try { + MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + int stateSize = 4096; + for (int i = 0; i < stateSize; i++) { + state.put(i, i * 2L); + } + Iterator<Map.Entry<Integer, Long>> iterator = state.iterator(); + int iteratorCount = 0; + while (iterator.hasNext()) { + Map.Entry<Integer, Long> entry = iterator.next(); + assertEquals(iteratorCount, (int) entry.getKey()); + switch (ThreadLocalRandom.current().nextInt() % 3) { + case 0: // remove twice + iterator.remove(); + try { + iterator.remove(); + fail(); + } catch (IllegalStateException e) { + // ignore expected exception + } + break; + case 1: // hasNext -> remove + iterator.hasNext(); + iterator.remove(); + break; + case 2: // nothing to do + break; + } + iteratorCount++; + } + assertEquals(stateSize, iteratorCount); + } finally { + backend.dispose(); + } + } + + /** * Verify that {@link ValueStateDescriptor} allows {@code null} as default. */ @Test diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 04b4af3..d1e95a0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -506,6 +506,7 @@ public class RocksDBMapState<K, N, UK, UV> * have the same prefix, hence we can stop iterating once coming across an * entry with a different prefix. */ + @Nonnull private final byte[] keyPrefixBytes; /** @@ -516,6 +517,9 @@ public class RocksDBMapState<K, N, UK, UV> /** A in-memory cache for the entries in the rocksdb. */ private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>(); + + /** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */ + private RocksDBMapEntry currentEntry; private int cacheIndex = 0; private final TypeSerializer<UK> keySerializer; @@ -542,12 +546,11 @@ public class RocksDBMapState<K, N, UK, UV> @Override public void remove() { - if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) { + if (currentEntry == null || currentEntry.deleted) { throw new IllegalStateException("The remove operation must be called after a valid next operation."); } - RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1); - lastEntry.remove(); + currentEntry.remove(); } final RocksDBMapEntry nextEntry() { @@ -561,10 +564,10 @@ public class RocksDBMapState<K, N, UK, UV> return null; } - RocksDBMapEntry entry = cacheEntries.get(cacheIndex); + this.currentEntry = cacheEntries.get(cacheIndex); cacheIndex++; - return entry; + return currentEntry; } private void loadCache() { @@ -582,12 +585,11 @@ public class RocksDBMapState<K, N, UK, UV> try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) { /* - * The iteration starts from the prefix bytes at the first loading. The cache then is - * reloaded when the next entry to return is the last one in the cache. At that time, - * we will start the iterating from the last returned entry. - */ - RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); - byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); + * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called, + * the currentEntry points to the last returned entry, and at that time, we will start + * the iterating from currentEntry if reloading cache is needed. + */ + byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes); cacheEntries.clear(); cacheIndex = 0; @@ -595,10 +597,10 @@ public class RocksDBMapState<K, N, UK, UV> iterator.seek(startBytes); /* - * If the last returned entry is not deleted, it will be the first entry in the - * iterating. Skip it to avoid redundant access in such cases. + * If the entry pointing to the current position is not removed, it will be the first entry in the + * new iterating. Skip it to avoid redundant access in such cases. */ - if (lastEntry != null && !lastEntry.deleted) { + if (currentEntry != null && !currentEntry.deleted) { iterator.next(); }