[
https://issues.apache.org/jira/browse/FLINK-10267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610573#comment-16610573
]
ASF GitHub Bot commented on FLINK-10267:
----------------------------------------
asfgit closed pull request #6638: [FLINK-10267][State] Fix arbitrary iterator
access on RocksDBMapIterator
URL: https://github.com/apache/flink/pull/6638
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 649c6d03a6a..2634268c947 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -117,6 +117,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -2916,6 +2917,52 @@ public void testMapState() throws Exception {
backend.dispose();
}
+ /**
+ * Verify iterator of {@link MapState} supporting arbitrary access, see
[FLINK-10267] to know more details.
+ */
+ @Test
+ public void testMapStateIteratorArbitraryAccess() throws Exception {
+ MapStateDescriptor<Integer, Long> kvId = new
MapStateDescriptor<>("id", Integer.class, Long.class);
+
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
+
+ try {
+ MapState<Integer, Long> state =
backend.getPartitionedState(VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ int stateSize = 4096;
+ for (int i = 0; i < stateSize; i++) {
+ state.put(i, i * 2L);
+ }
+ Iterator<Map.Entry<Integer, Long>> iterator =
state.iterator();
+ int iteratorCount = 0;
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Long> entry =
iterator.next();
+ assertEquals(iteratorCount, (int)
entry.getKey());
+ switch (ThreadLocalRandom.current().nextInt() %
3) {
+ case 0: // remove twice
+ iterator.remove();
+ try {
+ iterator.remove();
+ fail();
+ } catch (IllegalStateException
e) {
+ // ignore expected
exception
+ }
+ break;
+ case 1: // hasNext -> remove
+ iterator.hasNext();
+ iterator.remove();
+ break;
+ case 2: // nothing to do
+ break;
+ }
+ iteratorCount++;
+ }
+ assertEquals(stateSize, iteratorCount);
+ } finally {
+ backend.dispose();
+ }
+ }
+
/**
* Verify that {@link ValueStateDescriptor} allows {@code null} as
default.
*/
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5c9f7f9f30c..cb656b53b1b 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -498,6 +498,7 @@ public UV setValue(UV value) {
* have the same prefix, hence we can stop iterating once
coming across an
* entry with a different prefix.
*/
+ @Nonnull
private final byte[] keyPrefixBytes;
/**
@@ -508,6 +509,9 @@ public UV setValue(UV value) {
/** A in-memory cache for the entries in the rocksdb. */
private ArrayList<RocksDBMapEntry> cacheEntries = new
ArrayList<>();
+
+ /** The entry pointing to the current position which is last
returned by calling {@link #nextEntry()}. */
+ private RocksDBMapEntry currentEntry;
private int cacheIndex = 0;
private final TypeSerializer<UK> keySerializer;
@@ -537,12 +541,11 @@ public boolean hasNext() {
@Override
public void remove() {
- if (cacheIndex == 0 || cacheIndex >
cacheEntries.size()) {
+ if (currentEntry == null || currentEntry.deleted) {
throw new IllegalStateException("The remove
operation must be called after a valid next operation.");
}
- RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex
- 1);
- lastEntry.remove();
+ currentEntry.remove();
}
final RocksDBMapEntry nextEntry() {
@@ -556,10 +559,10 @@ final RocksDBMapEntry nextEntry() {
return null;
}
- RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+ this.currentEntry = cacheEntries.get(cacheIndex);
cacheIndex++;
- return entry;
+ return currentEntry;
}
private void loadCache() {
@@ -577,12 +580,11 @@ private void loadCache() {
try (RocksIteratorWrapper iterator =
RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
/*
- * The iteration starts from the prefix bytes
at the first loading. The cache then is
- * reloaded when the next entry to return is
the last one in the cache. At that time,
- * we will start the iterating from the last
returned entry.
- */
- RocksDBMapEntry lastEntry = cacheEntries.size()
== 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
- byte[] startBytes = (lastEntry == null ?
keyPrefixBytes : lastEntry.rawKeyBytes);
+ * The iteration starts from the prefix bytes
at the first loading. After #nextEntry() is called,
+ * the currentEntry points to the last returned
entry, and at that time, we will start
+ * the iterating from currentEntry if reloading
cache is needed.
+ */
+ byte[] startBytes = (currentEntry == null ?
keyPrefixBytes : currentEntry.rawKeyBytes);
cacheEntries.clear();
cacheIndex = 0;
@@ -590,10 +592,10 @@ private void loadCache() {
iterator.seek(startBytes);
/*
- * If the last returned entry is not deleted,
it will be the first entry in the
- * iterating. Skip it to avoid redundant access
in such cases.
+ * If the entry pointing to the current
position is not removed, it will be the first entry in the
+ * new iterating. Skip it to avoid redundant
access in such cases.
*/
- if (lastEntry != null && !lastEntry.deleted) {
+ if (currentEntry != null &&
!currentEntry.deleted) {
iterator.next();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [State] Fix arbitrary iterator access on RocksDBMapIterator
> -----------------------------------------------------------
>
> Key: FLINK-10267
> URL: https://issues.apache.org/jira/browse/FLINK-10267
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.3, 1.6.0
> Reporter: Yun Tang
> Assignee: Yun Tang
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.1, 1.5.4
>
>
> Currently, RocksDBMapIterator would load 128 entries into local cacheEntries
> every time if needed. Both RocksDBMapIterator#next() and
> RocksDBMapIterator#hasNext() action might trigger to load RocksDBEntry into
> cacheEntries.
> However, if the iterator's size larger than 128 and we continue to access the
> iterator with following order: hasNext() -> next() -> hasNext() -> remove(),
> we would meet weird exception when we try to remove the 128th element:
> {code:java}
> java.lang.IllegalStateException: The remove operation must be called after a
> valid next operation.
> {code}
> Since we could not control user's access on iterator, we should fix this bug
> to avoid unexpected exception.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)