This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 831daaae2c0cdef6871001e856dfd6a54da2a943
Author: Yu Li <l...@apache.org>
AuthorDate: Sat Jun 20 16:59:06 2020 +0800

    Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"
    
    This reverts commit b8ddbef9a5cc5dc769ba61bd5019dd96843c932f.
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 +--
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 +--
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 -
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +++-
 .../state/RocksDBStateMisuseOptionTest.java        | 147 ---------------------
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 ---
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 .../api/operators/TimerHeapInternalTimer.java      |   2 +-
 17 files changed, 47 insertions(+), 256 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index fb9a833..364185a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import 
org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -64,9 +63,6 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
        @Nonnull
        private final RocksDB db;
 
-       @Nonnull
-       private final ReadOptions readOptions;
-
        /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
        @Nonnull
        private final ColumnFamilyHandle columnFamilyHandle;
@@ -116,7 +112,6 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                @Nonnegative int keyGroupId,
                @Nonnegative int keyGroupPrefixBytes,
                @Nonnull RocksDB db,
-               @Nonnull ReadOptions readOptions,
                @Nonnull ColumnFamilyHandle columnFamilyHandle,
                @Nonnull TypeSerializer<E> byteOrderProducingSerializer,
                @Nonnull DataOutputSerializer outputStream,
@@ -124,7 +119,6 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                @Nonnull RocksDBWriteBatchWrapper batchWrapper,
                @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
                this.db = db;
-               this.readOptions = readOptions;
                this.columnFamilyHandle = columnFamilyHandle;
                this.byteOrderProducingSerializer = 
byteOrderProducingSerializer;
                this.batchWrapper = batchWrapper;
@@ -310,7 +304,7 @@ public class RocksDBCachingPriorityQueueSet<E extends 
HeapPriorityQueueElement>
                flushWriteBatch();
                return new RocksBytesIterator(
                        new RocksIteratorWrapper(
-                               db.newIterator(columnFamilyHandle, 
readOptions)));
+                               db.newIterator(columnFamilyHandle)));
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5bce695..1f43dd0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,7 +21,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -117,8 +116,7 @@ public class RocksDBIncrementalCheckpointUtils {
                @Nonnegative long writeBatchSize) throws RocksDBException {
 
                for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
-                       try (ReadOptions readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-                               RocksIteratorWrapper iteratorWrapper = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
+                       try (RocksIteratorWrapper iteratorWrapper = 
RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
                                RocksDBWriteBatchWrapper writeBatchWrapper = 
new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
                                iteratorWrapper.seek(beginKeyBytes);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 505bd2a..2ddb79b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,7 +66,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -151,12 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final WriteOptions writeOptions;
 
        /**
-        * The read options to use when creating iterators.
-        * We ensure total order seek in case user misuse, see FLINK-17800 for 
more details.
-        */
-       private final ReadOptions readOptions;
-
-       /**
         * The max memory size for one batch in {@link 
RocksDBWriteBatchWrapper}.
         */
        private final long writeBatchSize;
@@ -219,8 +212,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider,
                RocksDB db,
-               WriteOptions writeOptions,
-               ReadOptions readOptions,
                LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
                int keyGroupPrefixBytes,
                CloseableRegistry cancelStreamRegistry,
@@ -259,8 +250,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.kvStateInformation = kvStateInformation;
 
-               this.writeOptions = writeOptions;
-               this.readOptions = readOptions;
+               this.writeOptions = new WriteOptions().setDisableWAL(true);
                checkArgument(writeBatchSize >= 0, "Write batch size have to be 
no negative value.");
                this.writeBatchSize = writeBatchSize;
                this.db = db;
@@ -300,7 +290,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
                }
 
-               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, 
readOptions);
+               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
                iterator.seekToFirst();
 
                final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, getKeySerializer(), 
keyGroupPrefixBytes,
@@ -370,7 +360,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
                        IOUtils.closeQuietly(optionsContainer);
-                       IOUtils.closeQuietly(readOptions);
                        IOUtils.closeQuietly(writeOptions);
 
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -418,10 +407,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return writeOptions;
        }
 
-       public ReadOptions getReadOptions() {
-               return readOptions;
-       }
-
        RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
                return sharedRocksKeyBuilder;
        }
@@ -614,7 +599,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                Snapshot rocksDBSnapshot = db.getSnapshot();
                try (
-                       RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
+                       RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
                        RocksDBWriteBatchWrapper batchWriter = new 
RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
                ) {
                        iterator.seekToFirst();
@@ -689,7 +674,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) 
{
                        //TODO maybe filterOrTransform only for k/v states
-                       try (RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, 
readOptions)) {
+                       try (RocksIteratorWrapper rocksIterator = 
RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
                                rocksIterator.seekToFirst();
 
                                while (rocksIterator.isValid()) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 08b4864..70c438e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,6 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -251,7 +250,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
                // The write options to use in the states.
                WriteOptions writeOptions = null;
-               ReadOptions readOptions = null;
                LinkedHashMap<String, 
RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new 
LinkedHashMap<>();
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -290,7 +288,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        }
 
                        writeOptions = new WriteOptions().setDisableWAL(true);
-                       readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
                        writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
writeOptions, writeBatchSize);
                        // it is important that we only create the key builder 
after the restore, and not before;
                        // restore operations may reconfigure the key 
serializer, so accessing the key serializer
@@ -303,13 +300,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        snapshotStrategy = 
initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, 
rocksDBResourceGuard,
                                kvStateInformation, keyGroupPrefixBytes, db, 
backendUID, materializedSstFiles, lastCompletedCheckpointId);
                        // init priority queue factory
-                       priorityQueueFactory = initPriorityQueueFactory(
-                               keyGroupPrefixBytes,
-                               kvStateInformation,
-                               db,
-                               readOptions,
-                               writeBatchWrapper,
-                               nativeMetricMonitor);
+                       priorityQueueFactory = 
initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
+                               writeBatchWrapper, nativeMetricMonitor);
                } catch (Throwable e) {
                        // Do clean up
                        List<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>(kvStateInformation.values().size());
@@ -327,7 +319,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        IOUtils.closeQuietly(restoreOperation);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
                        IOUtils.closeQuietly(optionsContainer);
-                       IOUtils.closeQuietly(readOptions);
                        IOUtils.closeQuietly(writeOptions);
                        
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
                        kvStateInformation.clear();
@@ -359,8 +350,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        this.executionConfig,
                        this.ttlTimeProvider,
                        db,
-                       writeOptions,
-                       readOptions,
                        kvStateInformation,
                        keyGroupPrefixBytes,
                        cancelStreamRegistryForBackend,
@@ -489,7 +478,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                int keyGroupPrefixBytes,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
-               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor) {
                PriorityQueueSetFactory priorityQueueFactory;
@@ -504,7 +492,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                        numberOfKeyGroups,
                                        kvStateInformation,
                                        db,
-                                       readOptions,
                                        writeBatchWrapper,
                                        nativeMetricMonitor,
                                        columnFamilyOptionsFactory
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 7ba3edd..ad9281c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -243,7 +243,7 @@ class RocksDBMapState<K, N, UK, UV>
        public boolean isEmpty() {
                final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
 
-               try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, 
backend.getReadOptions())) {
+               try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
 
                        iterator.seek(prefixBytes);
 
@@ -254,7 +254,7 @@ class RocksDBMapState<K, N, UK, UV>
        @Override
        public void clear() {
                try {
-                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, 
backend.getReadOptions());
+                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
                                RocksDBWriteBatchWrapper 
rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, 
backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
                                final byte[] keyPrefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
@@ -577,7 +577,7 @@ class RocksDBMapState<K, N, UK, UV>
 
                        // use try-with-resources to ensure RocksIterator can 
be release even some runtime exception
                        // occurred in the below code block.
-                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamily, 
backend.getReadOptions())) {
+                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
 
                                /*
                                 * The iteration starts from the prefix bytes 
at the first loading. After #nextEntry() is called,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 0f564d5..1455c1b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,7 +32,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -95,18 +94,12 @@ public class RocksDBOperationUtils {
                return dbRef;
        }
 
-       public static RocksIteratorWrapper getRocksIterator(RocksDB db, 
ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
-               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+       public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+               return new RocksIteratorWrapper(db.newIterator());
        }
 
-       /**
-        * Create a total order read option to avoid user misuse, see 
FLINK-17800 for more details.
-        *
-        * <p>Note, remember to close the generated {@link ReadOptions} when 
dispose.
-        */
-       // TODO We would remove this method once we bump RocksDB version larger 
than 6.2.2.
-       public static ReadOptions createTotalOrderSeekReadOptions() {
-               return new ReadOptions().setTotalOrderSeek(true);
+       public static RocksIteratorWrapper getRocksIterator(RocksDB db, 
ColumnFamilyHandle columnFamilyHandle) {
+               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
        }
 
        public static void registerKvStateInformation(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 59daff8..1cbeebe 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,7 +36,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -54,8 +52,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
        /**
         * Default cache size per key-group.
         */
-       @VisibleForTesting
-       static final int DEFAULT_CACHES_SIZE = 128; //TODO make this 
configurable
+       private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this 
configurable
 
        /**
         * A shared buffer to serialize elements for the priority queue.
@@ -74,7 +71,6 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
        private final int numberOfKeyGroups;
        private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation;
        private final RocksDB db;
-       private final ReadOptions readOptions;
        private final RocksDBWriteBatchWrapper writeBatchWrapper;
        private final RocksDBNativeMetricMonitor nativeMetricMonitor;
        private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
@@ -85,7 +81,6 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                int numberOfKeyGroups,
                Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
                RocksDB db,
-               ReadOptions readOptions,
                RocksDBWriteBatchWrapper writeBatchWrapper,
                RocksDBNativeMetricMonitor nativeMetricMonitor,
                Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory) {
@@ -94,7 +89,6 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                this.numberOfKeyGroups = numberOfKeyGroups;
                this.kvStateInformation = kvStateInformation;
                this.db = db;
-               this.readOptions = readOptions;
                this.writeBatchWrapper = writeBatchWrapper;
                this.nativeMetricMonitor = nativeMetricMonitor;
                this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -128,7 +122,6 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                                                keyGroupId,
                                                keyGroupPrefixBytes,
                                                db,
-                                               readOptions,
                                                columnFamilyHandle,
                                                byteOrderedElementSerializer,
                                                sharedElementOutView,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 53c7537..f7abed6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,7 +51,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -310,7 +309,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                                null, 
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
                                                .columnFamilyHandle;
 
-                                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, 
tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
+                                       try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, 
tmpColumnFamilyHandle)) {
 
                                                
iterator.seek(startKeyGroupPrefixBytes);
 
@@ -377,8 +376,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                @Nonnull
                private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
 
-               private final ReadOptions readOptions;
-
                private RestoredDBInstance(
                        @Nonnull RocksDB db,
                        @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -389,7 +386,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        this.columnFamilyHandles = columnFamilyHandles;
                        this.columnFamilyDescriptors = columnFamilyDescriptors;
                        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-                       this.readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
                }
 
                @Override
@@ -401,7 +397,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        IOUtils.closeAllQuietly(columnFamilyHandles);
                        IOUtils.closeQuietly(db);
                        IOUtils.closeAllQuietly(columnFamilyOptions);
-                       IOUtils.closeQuietly(readOptions);
                }
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index fd68d70..d402c3d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,7 +60,6 @@ public class 
KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
                                keyGroupId,
                                keyGroupPrefixBytes,
                                rocksDBResource.getRocksDB(),
-                               rocksDBResource.getReadOptions(),
                                rocksDBResource.getDefaultColumnFamily(),
                                TestElementSerializer.INSTANCE,
                                outputStreamWithPos,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 3b3b697..d25baa7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
                                        LOG.error("Close previous 
ColumnOptions's instance failed.", e);
                                }
 
-                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
+                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
                        }
                });
        }
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
                        
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
                this.writeOptions = new WriteOptions();
                this.writeOptions.disableWAL();
-               this.readOptions = 
RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+               this.readOptions = new ReadOptions();
                this.columnFamilyHandles = new ArrayList<>(1);
                this.rocksDB = RocksDB.open(
                        dbOptions,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 1a4808f..8f71b65 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
                        ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
 
                        try (
-                               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, 
keyedStateBackend.getReadOptions());
+                               RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
                                RocksStateKeysIterator<K> iteratorWrapper =
                                        new RocksStateKeysIterator<>(
                                                iterator,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index cf1fdfa..2167a00 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -59,7 +59,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
-import static 
org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -122,7 +121,7 @@ public class RocksDBStateBackendConfigTest {
                assertArrayEquals(new String[] { testDir1, testDir2 }, 
rocksDbBackend.getDbStoragePaths());
 
                final MockEnvironment env = 
getMockEnvironment(tempFolder.newFolder());
-               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
@@ -159,7 +158,7 @@ public class RocksDBStateBackendConfigTest {
 
                RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
                Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, 
keyedBackend.getPriorityQueueFactory().getClass());
                keyedBackend.dispose();
 
@@ -169,7 +168,7 @@ public class RocksDBStateBackendConfigTest {
                        
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
 
                rocksDbBackend = rocksDbBackend.configure(conf, 
Thread.currentThread().getContextClassLoader());
-               keyedBackend = createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+               keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
                Assert.assertEquals(
                        HeapPriorityQueueSetFactory.class,
                        keyedBackend.getPriorityQueueFactory().getClass());
@@ -198,7 +197,7 @@ public class RocksDBStateBackendConfigTest {
                final RocksDBStateBackend configuredRocksDBStateBackend = 
backend.configure(
                        configFromConfFile,
                        Thread.currentThread().getContextClassLoader());
-               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(configuredRocksDBStateBackend, env, 
IntSerializer.INSTANCE);
+               final RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(configuredRocksDBStateBackend, env);
 
                // priorityQueueStateType of the job backend should be preserved
                assertThat(keyedBackend.getPriorityQueueFactory(), 
instanceOf(HeapPriorityQueueSetFactory.class));
@@ -255,7 +254,7 @@ public class RocksDBStateBackendConfigTest {
                rocksDbBackend.setDbStoragePath(configuredPath);
 
                final MockEnvironment env = 
getMockEnvironment(tempFolder.newFolder());
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
createKeyedStateBackend(rocksDbBackend, env);
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
@@ -726,6 +725,23 @@ public class RocksDBStateBackendConfigTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
+       static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+                       RocksDBStateBackend rocksDbBackend, Environment env) 
throws Exception {
+
+               return (RocksDBKeyedStateBackend<Integer>) 
rocksDbBackend.createKeyedStateBackend(
+                       env,
+                       env.getJobID(),
+                       "test_op",
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
+       }
+
        static MockEnvironment getMockEnvironment(File... tempDirs) {
                final String[] tempDirStrings = new String[tempDirs.length];
                for (int i = 0; i < tempDirs.length; i++) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
deleted file mode 100644
index 59a4822..0000000
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static 
org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests to cover cases that even user misuse some options, RocksDB 
state-backend could still work as expected or give explicit feedback.
- *
- * <p>RocksDB state-backend has some internal operations based on RocksDB's 
APIs which is transparent for users.
- * However, user could still configure options via {@link 
RocksDBOptionsFactory}, and might lead some operations
- * could not get expected result, e.g. FLINK-17800
- */
-public class RocksDBStateMisuseOptionTest {
-
-       @Rule
-       public final TemporaryFolder tempFolder = new TemporaryFolder();
-
-       /**
-        * Tests to cover case when user misuse optimizeForPointLookup with 
iterator interfaces on map state.
-        *
-        * <p>The option {@link 
ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek 
with prefix bytes invalid.
-        */
-       @Test
-       public void testMisuseOptimizePointLookupWithMapState() throws 
Exception {
-               RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
-               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-               MapStateDescriptor<Integer, Long> stateDescriptor = new 
MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
-               MapState<Integer, Long> mapState = 
keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
-               keyedStateBackend.setCurrentKey(1);
-               Map<Integer, Long> expectedResult = new HashMap<>();
-               for (int i = 0; i < 100; i++) {
-                       long uv = ThreadLocalRandom.current().nextLong();
-                       mapState.put(i, uv);
-                       expectedResult.put(i, uv);
-               }
-
-               Iterator<Map.Entry<Integer, Long>> iterator = 
mapState.entries().iterator();
-               while (iterator.hasNext()) {
-                       Map.Entry<Integer, Long> entry = iterator.next();
-                       assertEquals(entry.getValue(), 
expectedResult.remove(entry.getKey()));
-                       iterator.remove();
-               }
-               assertTrue(expectedResult.isEmpty());
-               assertTrue(mapState.isEmpty());
-       }
-
-       /**
-        * Tests to cover case when user misuse optimizeForPointLookup with 
peek operations on priority queue.
-        *
-        * <p>The option {@link 
ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek 
with prefix bytes invalid.
-        */
-       @Test
-       public void testMisuseOptimizePointLookupWithPriorityQueue() throws 
IOException {
-               RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
-               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-               KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, 
VoidNamespace>> priorityQueue =
-                       keyedStateBackend.create("timer", new 
TimerSerializer<>(keyedStateBackend.getKeySerializer(), 
VoidNamespaceSerializer.INSTANCE));
-
-               PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> 
expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) 
(o1.getTimestamp() - o2.getTimestamp()));
-               // ensure we insert timers more than cache capacity.
-               int queueSize = 
RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
-               List<Integer> timeStamps = IntStream.range(0, 
queueSize).boxed().collect(Collectors.toList());
-               Collections.shuffle(timeStamps);
-               for (Integer timeStamp : timeStamps) {
-                       TimerHeapInternalTimer<Integer, VoidNamespace> timer = 
new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
-                       priorityQueue.add(timer);
-                       expectedPriorityQueue.add(timer);
-               }
-               assertEquals(queueSize, priorityQueue.size());
-               TimerHeapInternalTimer<Integer, VoidNamespace> timer;
-               while ((timer = priorityQueue.poll()) != null) {
-                       assertEquals(expectedPriorityQueue.poll(), timer);
-               }
-               assertTrue(expectedPriorityQueue.isEmpty());
-               assertTrue(priorityQueue.isEmpty());
-
-       }
-
-       private RocksDBStateBackend createStateBackendWithOptimizePointLookup() 
throws IOException {
-               RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
-               
rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
-               rocksDBStateBackend.setRocksDBOptions(new 
RocksDBOptionsFactory() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public DBOptions createDBOptions(DBOptions 
currentOptions, Collection<AutoCloseable> handlesToClose) {
-                               return currentOptions;
-                       }
-
-                       @Override
-                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
-                               return 
currentOptions.optimizeForPointLookup(64);
-                       }
-               });
-               return rocksDBStateBackend;
-       }
-}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index cee56aa..ed44a73 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -36,7 +35,6 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -100,23 +98,4 @@ public final class RocksDBTestUtils {
                                defaultCFHandle,
                                new CloseableRegistry());
        }
-
-       public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
-                       RocksDBStateBackend rocksDbBackend,
-                       Environment env,
-                       TypeSerializer<K> keySerializer) throws IOException {
-
-               return (RocksDBKeyedStateBackend<K>) 
rocksDbBackend.createKeyedStateBackend(
-                       env,
-                       env.getJobID(),
-                       "test_op",
-                       keySerializer,
-                       1,
-                       new KeyGroupRange(0, 0),
-                       env.getTaskKvStateRegistry(),
-                       TtlTimeProvider.DEFAULT,
-                       new UnregisteredMetricsGroup(),
-                       Collections.emptyList(),
-                       new CloseableRegistry());
-       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 2c73fc7..4447f08 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -75,8 +74,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
        public void testMergeIterator(int maxParallelism) throws Exception {
                Random random = new Random(1234);
 
-               try (ReadOptions readOptions = new ReadOptions();
-                       RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+               try (RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
                        List<Tuple2<RocksIteratorWrapper, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
                        List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -110,7 +108,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
                        int id = 0;
                        for (Tuple2<ColumnFamilyHandle, Integer> 
columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, 
readOptions), id));
+                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, 
columnFamilyHandle.f0), id));
                                ++id;
                        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index ce06d1f..96003d6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -166,7 +167,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
                        int pos = 0;
 
-                       try (final RocksIteratorWrapper iterator = new 
RocksIteratorWrapper(rocksDB.newIterator())) {
+                       try (final RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(rocksDB)) {
                                // seek to start
                                unsafe.putInt(keyTemplate, offset, 0);
                                iterator.seek(keyTemplate);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index e02901b..a6194ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -50,7 +50,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N>,
         */
        private transient int timerHeapIndex;
 
-       public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull 
N namespace) {
+       TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N 
namespace) {
                this.timestamp = timestamp;
                this.key = key;
                this.namespace = namespace;

Reply via email to