This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c0de8d6d5a9eea1a779cf3703412f522bece54c
Author: Yun Tang <myas...@live.com>
AuthorDate: Fri Jun 5 01:16:27 2020 +0800

    [FLINK-17800][roksdb] Ensure total order seek to avoid user misuse
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 ++-
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 ++-
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 +
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +---
 .../state/RocksDBStateMisuseOptionTest.java        | 147 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 +++
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 16 files changed, 255 insertions(+), 46 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index 364185a..fb9a833 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import 
org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
        @Nonnull
        private final RocksDB db;
 
+       @Nonnull
+       private final ReadOptions readOptions;
+
        /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
        @Nonnull
        private final ColumnFamilyHandle columnFamilyHandle;
@@ -112,6 +116,7 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                @Nonnegative int keyGroupId,
                @Nonnegative int keyGroupPrefixBytes,
                @Nonnull RocksDB db,
+               @Nonnull ReadOptions readOptions,
                @Nonnull ColumnFamilyHandle columnFamilyHandle,
                @Nonnull TypeSerializer<E> byteOrderProducingSerializer,
                @Nonnull DataOutputSerializer outputStream,
@@ -119,6 +124,7 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                @Nonnull RocksDBWriteBatchWrapper batchWrapper,
                @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
                this.db = db;
+               this.readOptions = readOptions;
                this.columnFamilyHandle = columnFamilyHandle;
                this.byteOrderProducingSerializer = 
byteOrderProducingSerializer;
                this.batchWrapper = batchWrapper;
@@ -304,7 +310,7 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                flushWriteBatch();
                return new RocksBytesIterator(
                        new RocksIteratorWrapper(
-                               db.newIterator(columnFamilyHandle)));
+                               db.newIterator(columnFamilyHandle, 
readOptions)));
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 1f43dd0..5bce695 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -116,7 +117,8 @@ public class RocksDBIncrementalCheckpointUtils {
                @Nonnegative long writeBatchSize) throws RocksDBException {
 
                for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
-                       try (RocksIteratorWrapper iteratorWrapper = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
+                       try (ReadOptions readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+                               RocksIteratorWrapper iteratorWrapper = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
                                RocksDBWriteBatchWrapper writeBatchWrapper = 
new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
                                iteratorWrapper.seek(beginKeyBytes);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 61d8688..f0cce0b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -150,6 +151,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final WriteOptions writeOptions;
 
        /**
+        * The read options to use when creating iterators.
+        * We ensure total order seek in case user misuse, see FLINK-17800 for 
more details.
+        */
+       private final ReadOptions readOptions;
+
+       /**
         * The max memory size for one batch in {@link 
RocksDBWriteBatchWrapper}.
         */
        private final long writeBatchSize;
@@ -212,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
                RocksDB db,
+               WriteOptions writeOptions,
+               ReadOptions readOptions,
                LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
                int keyGroupPrefixBytes,
                CloseableRegistry cancelStreamRegistry,
@@ -250,7 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.kvStateInformation = kvStateInformation;
 
-               this.writeOptions = new WriteOptions().setDisableWAL(true);
+               this.writeOptions = writeOptions;
+               this.readOptions = readOptions;
                checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
                this.writeBatchSize = writeBatchSize;
                this.db = db;
@@ -290,7 +300,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
                }
 
-               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
+               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, 
readOptions);
                iterator.seekToFirst();
 
                final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, getKeySerializer(), 
keyGroupPrefixBytes,
@@ -360,6 +370,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
                        IOUtils.closeQuietly(optionsContainer);
+                       IOUtils.closeQuietly(readOptions);
                        IOUtils.closeQuietly(writeOptions);
 
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -407,6 +418,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return writeOptions;
        }
 
+       public ReadOptions getReadOptions() {
+               return readOptions;
+       }
+
        RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
                return sharedRocksKeyBuilder;
        }
@@ -606,7 +621,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                Snapshot rocksDBSnapshot = db.getSnapshot();
                try (
-                       RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
+                       RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
                        RocksDBWriteBatchWrapper batchWriter = new 
RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
                ) {
                        iterator.seekToFirst();
@@ -681,7 +696,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) 
{
                        //TODO maybe filterOrTransform only for k/v states
-                       try (RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
+                       try (RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, 
readOptions)) {
                                rocksIterator.seekToFirst();
 
                                while (rocksIterator.isValid()) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d1a0184..b9bdbb8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
                // The write options to use in the states.
                WriteOptions writeOptions = null;
+               ReadOptions readOptions = null;
                LinkedHashMap<String, 
RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new 
LinkedHashMap<>();
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -282,6 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        }
 
                        writeOptions = new WriteOptions().setDisableWAL(true);
+                       readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
                        writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
writeOptions, writeBatchSize);
                        // it is important that we only create the key builder 
after the restore, and not before;
                        // restore operations may reconfigure the key 
serializer, so accessing the key serializer
@@ -294,8 +297,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        snapshotStrategy = 
initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, 
rocksDBResourceGuard,
                                kvStateInformation, keyGroupPrefixBytes, db, 
backendUID, materializedSstFiles, lastCompletedCheckpointId);
                        // init priority queue factory
-                       priorityQueueFactory = 
initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
-                               writeBatchWrapper, nativeMetricMonitor);
+                       priorityQueueFactory = initPriorityQueueFactory(
+                               keyGroupPrefixBytes,
+                               kvStateInformation,
+                               db,
+                               readOptions,
+                               writeBatchWrapper,
+                               nativeMetricMonitor);
                } catch (Throwable e) {
                        // Do clean up
                        List<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>(kvStateInformation.values().size());
@@ -313,6 +321,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        IOUtils.closeQuietly(restoreOperation);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
                        IOUtils.closeQuietly(optionsContainer);
+                       IOUtils.closeQuietly(readOptions);
                        IOUtils.closeQuietly(writeOptions);
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
                        kvStateInformation.clear();
@@ -344,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        this.executionConfig,
                        this.ttlTimeProvider,
                        db,
+                       writeOptions,
+                       readOptions,
                        kvStateInformation,
                        keyGroupPrefixBytes,
                        cancelStreamRegistryForBackend,
@@ -472,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                int keyGroupPrefixBytes,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
+               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor) {
                PriorityQueueSetFactory priorityQueueFactory;
@@ -486,6 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                        numberOfKeyGroups,
                                        kvStateInformation,
                                        db,
+                                       readOptions,
                                        writeBatchWrapper,
                                        nativeMetricMonitor,
                                        columnFamilyOptionsFactory
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index f0b47cd..6c7f541 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
        public boolean isEmpty() {
                final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
 
-               try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+               try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, 
backend.getReadOptions())) {
 
                        iterator.seek(prefixBytes);
 
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
        @Override
        public void clear() {
                try {
-                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
+                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, 
backend.getReadOptions());
                                RocksDBWriteBatchWrapper 
rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, 
backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
                                final byte[] keyPrefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
 
                        // use try-with-resources to ensure RocksIterator can 
be release even some runtime exception
                        // occurred in the below code block.
-                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
+                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamily, 
backend.getReadOptions())) {
 
                                /*
                                 * The iteration starts from the prefix bytes 
at the first loading. After #nextEntry() is called,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1455c1b..0f564d5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,6 +32,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -94,12 +95,18 @@ public class RocksDBOperationUtils {
                return dbRef;
        }
 
-       public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
-               return new RocksIteratorWrapper(db.newIterator());
+       public static RocksIteratorWrapper getRocksIterator(RocksDB db, 
ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
+               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
        }
 
-       public static RocksIteratorWrapper getRocksIterator(RocksDB db, 
ColumnFamilyHandle columnFamilyHandle) {
-               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+       /**
+        * Create a total order read option to avoid user misuse, see 
FLINK-17800 for more details.
+        *
+        * <p>Note, remember to close the generated {@link ReadOptions} when 
dispose.
+        */
+       // TODO We would remove this method once we bump RocksDB version larger 
than 6.2.2.
+       public static ReadOptions createTotalOrderSeekReadOptions() {
+               return new ReadOptions().setTotalOrderSeek(true);
        }
 
        public static void registerKvStateInformation(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 1cbeebe..59daff8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -36,6 +37,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
        /**
         * Default cache size per key-group.
         */
-       private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this 
configurable
+       @VisibleForTesting
+       static final int DEFAULT_CACHES_SIZE = 128; //TODO make this 
configurable
 
        /**
         * A shared buffer to serialize elements for the priority queue.
@@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
        private final int numberOfKeyGroups;
        private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation;
        private final RocksDB db;
+       private final ReadOptions readOptions;
        private final RocksDBWriteBatchWrapper writeBatchWrapper;
        private final RocksDBNativeMetricMonitor nativeMetricMonitor;
        private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
@@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                int numberOfKeyGroups,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
+               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor,
                Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory) {
@@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                this.numberOfKeyGroups = numberOfKeyGroups;
                this.kvStateInformation = kvStateInformation;
                this.db = db;
+               this.readOptions = readOptions;
                this.writeBatchWrapper = writeBatchWrapper;
                this.nativeMetricMonitor = nativeMetricMonitor;
                this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -122,6 +128,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                                                keyGroupId,
                                                keyGroupPrefixBytes,
                                                db,
+                                               readOptions,
                                                columnFamilyHandle,
                                                byteOrderedElementSerializer,
                                                sharedElementOutView,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f7abed6..53c7537 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,6 +51,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -309,7 +310,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                                null, 
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
                                                .columnFamilyHandle;
 
-                                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, 
tmpColumnFamilyHandle)) {
+                                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, 
tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
 
                                                
iterator.seek(startKeyGroupPrefixBytes);
 
@@ -376,6 +377,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                @Nonnull
                private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
 
+               private final ReadOptions readOptions;
+
                private RestoredDBInstance(
                        @Nonnull RocksDB db,
                        @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -386,6 +389,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        this.columnFamilyHandles = columnFamilyHandles;
                        this.columnFamilyDescriptors = columnFamilyDescriptors;
                        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+                       this.readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
                }
 
                @Override
@@ -397,6 +401,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        IOUtils.closeAllQuietly(columnFamilyHandles);
                        IOUtils.closeQuietly(db);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
+                       IOUtils.closeQuietly(readOptions);
                }
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index d402c3d..fd68d70 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,6 +60,7 @@ public class 
KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
                                keyGroupId,
                                keyGroupPrefixBytes,
                                rocksDBResource.getRocksDB(),
+                               rocksDBResource.getReadOptions(),
                                rocksDBResource.getDefaultColumnFamily(),
                                TestElementSerializer.INSTANCE,
                                outputStreamWithPos,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index d25baa7..3b3b697 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
                                        LOG.error("Close previous 
ColumnOptions's instance failed.", e);
                                }
 
-                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
+                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
                        }
                });
        }
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
                        
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
                this.writeOptions = new WriteOptions();
                this.writeOptions.disableWAL();
-               this.readOptions = new ReadOptions();
+               this.readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
                this.columnFamilyHandles = new ArrayList<>(1);
                this.rocksDB = RocksDB.open(
                        dbOptions,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 8f71b65..1a4808f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
                        ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
 
                        try (
-                               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
+                               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, 
keyedStateBackend.getReadOptions());
                                RocksStateKeysIterator<K> iteratorWrapper =
                                        new RocksStateKeysIterator<>(
                                                iterator,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d3eb6db..197f7ca 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -123,7 +124,7 @@ public class RocksDBStateBackendConfigTest {
                assertArrayEquals(new String[] { testDir1, testDir2 }, 
rocksDbBackend.getDbStoragePaths());
 
                final MockEnvironment env = 
getMockEnvironment(tempFolder.newFolder());
-               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
@@ -160,7 +161,7 @@ public class RocksDBStateBackendConfigTest {
 
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
                Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend.getPriorityQueueFactory().getClass());
                keyedBackend.dispose();
 
@@ -168,7 +169,7 @@ public class RocksDBStateBackendConfigTest {
                conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, 
RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
                rocksDbBackend = rocksDbBackend.configure(conf, 
Thread.currentThread().getContextClassLoader());
-               keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+               keyedBackend = createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
                Assert.assertEquals(
                        HeapPriorityQueueSetFactory.class,
                        keyedBackend.getPriorityQueueFactory().getClass());
@@ -197,7 +198,7 @@ public class RocksDBStateBackendConfigTest {
                final RocksDBStateBackend configuredRocksDBStateBackend = 
backend.configure(
                        configFromConfFile,
                        Thread.currentThread().getContextClassLoader());
-               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(configuredRocksDBStateBackend, env);
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(configuredRocksDBStateBackend, env, 
IntSerializer.INSTANCE);
 
                // priorityQueueStateType of the job backend should be preserved
                assertThat(keyedBackend.getPriorityQueueFactory(), 
instanceOf(HeapPriorityQueueSetFactory.class));
@@ -254,7 +255,7 @@ public class RocksDBStateBackendConfigTest {
                rocksDbBackend.setDbStoragePath(configuredPath);
 
                final MockEnvironment env = 
getMockEnvironment(tempFolder.newFolder());
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
@@ -725,23 +726,6 @@ public class RocksDBStateBackendConfigTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
-                       RocksDBStateBackend rocksDbBackend, Environment env) 
throws Exception {
-
-               return (RocksDBKeyedStateBackend<Integer>) 
rocksDbBackend.createKeyedStateBackend(
-                       env,
-                       env.getJobID(),
-                       "test_op",
-                       IntSerializer.INSTANCE,
-                       1,
-                       new KeyGroupRange(0, 0),
-                       env.getTaskKvStateRegistry(),
-                       TtlTimeProvider.DEFAULT,
-                       new UnregisteredMetricsGroup(),
-                       Collections.emptyList(),
-                       new CloseableRegistry());
-       }
-
        static MockEnvironment getMockEnvironment(File... tempDirs) {
                final String[] tempDirStrings = new String[tempDirs.length];
                for (int i = 0; i < tempDirs.length; i++) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
new file mode 100644
index 0000000..59a4822
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to cover cases that even user misuse some options, RocksDB 
state-backend could still work as expected or give explicit feedback.
+ *
+ * <p>RocksDB state-backend has some internal operations based on RocksDB's 
APIs which is transparent for users.
+ * However, user could still configure options via {@link 
RocksDBOptionsFactory}, and might lead some operations
+ * could not get expected result, e.g. FLINK-17800
+ */
+public class RocksDBStateMisuseOptionTest {
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       /**
+        * Tests to cover case when user misuse optimizeForPointLookup with 
iterator interfaces on map state.
+        *
+        * <p>The option {@link 
ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek 
with prefix bytes invalid.
+        */
+       @Test
+       public void testMisuseOptimizePointLookupWithMapState() throws 
Exception {
+               RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
+               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+               MapStateDescriptor<Integer, Long> stateDescriptor = new 
MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+               MapState<Integer, Long> mapState = 
keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+               keyedStateBackend.setCurrentKey(1);
+               Map<Integer, Long> expectedResult = new HashMap<>();
+               for (int i = 0; i < 100; i++) {
+                       long uv = ThreadLocalRandom.current().nextLong();
+                       mapState.put(i, uv);
+                       expectedResult.put(i, uv);
+               }
+
+               Iterator<Map.Entry<Integer, Long>> iterator = 
mapState.entries().iterator();
+               while (iterator.hasNext()) {
+                       Map.Entry<Integer, Long> entry = iterator.next();
+                       assertEquals(entry.getValue(), 
expectedResult.remove(entry.getKey()));
+                       iterator.remove();
+               }
+               assertTrue(expectedResult.isEmpty());
+               assertTrue(mapState.isEmpty());
+       }
+
+       /**
+        * Tests to cover case when user misuse optimizeForPointLookup with 
peek operations on priority queue.
+        *
+        * <p>The option {@link 
ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek 
with prefix bytes invalid.
+        */
+       @Test
+       public void testMisuseOptimizePointLookupWithPriorityQueue() throws 
IOException {
+               RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
+               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+               KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, 
VoidNamespace>> priorityQueue =
+                       keyedStateBackend.create("timer", new 
TimerSerializer<>(keyedStateBackend.getKeySerializer(), 
VoidNamespaceSerializer.INSTANCE));
+
+               PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> 
expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) 
(o1.getTimestamp() - o2.getTimestamp()));
+               // ensure we insert timers more than cache capacity.
+               int queueSize = 
RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+               List<Integer> timeStamps = IntStream.range(0, 
queueSize).boxed().collect(Collectors.toList());
+               Collections.shuffle(timeStamps);
+               for (Integer timeStamp : timeStamps) {
+                       TimerHeapInternalTimer<Integer, VoidNamespace> timer = 
new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+                       priorityQueue.add(timer);
+                       expectedPriorityQueue.add(timer);
+               }
+               assertEquals(queueSize, priorityQueue.size());
+               TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+               while ((timer = priorityQueue.poll()) != null) {
+                       assertEquals(expectedPriorityQueue.poll(), timer);
+               }
+               assertTrue(expectedPriorityQueue.isEmpty());
+               assertTrue(priorityQueue.isEmpty());
+
+       }
+
+       private RocksDBStateBackend createStateBackendWithOptimizePointLookup() 
throws IOException {
+               RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
+               
rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+               rocksDBStateBackend.setRocksDBOptions(new 
RocksDBOptionsFactory() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public DBOptions createDBOptions(DBOptions 
currentOptions, Collection<AutoCloseable> handlesToClose) {
+                               return currentOptions;
+                       }
+
+                       @Override
+                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                               return 
currentOptions.optimizeForPointLookup(64);
+                       }
+               });
+               return rocksDBStateBackend;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index ed44a73..cee56aa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -35,6 +36,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -98,4 +100,23 @@ public final class RocksDBTestUtils {
                                defaultCFHandle,
                                new CloseableRegistry());
        }
+
+       public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
+                       RocksDBStateBackend rocksDbBackend,
+                       Environment env,
+                       TypeSerializer<K> keySerializer) throws IOException {
+
+               return (RocksDBKeyedStateBackend<K>) 
rocksDbBackend.createKeyedStateBackend(
+                       env,
+                       env.getJobID(),
+                       "test_op",
+                       keySerializer,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 4447f08..2c73fc7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -74,7 +75,8 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
        public void testMergeIterator(int maxParallelism) throws Exception {
                Random random = new Random(1234);
 
-               try (RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+               try (ReadOptions readOptions = new ReadOptions();
+                       RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
                        List<Tuple2<RocksIteratorWrapper, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
                        List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -108,7 +110,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
                        int id = 0;
                        for (Tuple2<ColumnFamilyHandle, Integer> 
columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, 
columnFamilyHandle.f0), id));
+                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, 
readOptions), id));
                                ++id;
                        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 96003d6..ce06d1f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -167,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
                        int pos = 0;
 
-                       try (final RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(rocksDB)) {
+                       try (final RocksIteratorWrapper iterator = new 
RocksIteratorWrapper(rocksDB.newIterator())) {
                                // seek to start
                                unsafe.putInt(keyTemplate, offset, 0);
                                iterator.seek(keyTemplate);

Reply via email to