asfgit closed pull request #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0c781..698a9f97dc0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public void setPosition(int pos) {
        public void setData(@Nonnull byte[] buffer, int offset, int length) {
                inStreamWithPos.setBuffer(buffer, offset, length);
        }
+
+       public void setData(@Nonnull byte[] buffer) {
+               setData(buffer, 0, buffer.length);
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f84e46..2a9ab7589a9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ SV getInternal(byte[] key) {
                        if (valueBytes == null) {
                                return null;
                        }
-                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+                       dataInputView.setData(valueBytes);
+                       return valueSerializer.deserialize(dataInputView);
                } catch (IOException | RocksDBException e) {
                        throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
                }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089106f..65b7f1fa4a7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@
 
        protected final WriteOptions writeOptions;
 
-       protected final ByteArrayOutputStreamWithPos keySerializationStream;
+       protected final ByteArrayDataOutputView dataOutputView;
 
-       protected final DataOutputView keySerializationDataOutputView;
+       protected final ByteArrayDataInputView dataInputView;
 
        private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ protected AbstractRocksDBState(
                this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "State value serializer");
                this.defaultValue = defaultValue;
 
-               this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-               this.keySerializationDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
-               this.ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), 
namespaceSerializer);
+               this.dataOutputView = new ByteArrayDataOutputView(128);
+               this.dataInputView = new ByteArrayDataInputView();
+               this.ambiguousKeyPossible =
+                       
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), 
namespaceSerializer);
        }
 
        // 
------------------------------------------------------------------------
@@ -109,7 +109,7 @@ protected AbstractRocksDBState(
        public void clear() {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] key = dataOutputView.toByteArray();
                        backend.db.delete(columnFamily, writeOptions, key);
                } catch (IOException | RocksDBException e) {
                        throw new FlinkRuntimeException("Error while removing 
entry from RocksDB", e);
@@ -141,8 +141,7 @@ public void setCurrentNamespace(N namespace) {
 
                // we cannot reuse the keySerializationStream member since this 
method
                // is called concurrently to the other ones and it may thus 
contain garbage
-               ByteArrayOutputStreamWithPos tmpKeySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-               DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView = new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+               ByteArrayDataOutputView tmpKeySerializationView = new 
ByteArrayDataOutputView(128);
 
                writeKeyWithGroupAndNamespace(
                                keyGroup,
@@ -150,16 +149,15 @@ public void setCurrentNamespace(N namespace) {
                                safeKeySerializer,
                                keyAndNamespace.f1,
                                safeNamespaceSerializer,
-                               tmpKeySerializationStream,
-                               tmpKeySerializationDateDataOutputView);
+                               tmpKeySerializationView);
 
-               return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
+               return backend.db.get(columnFamily, 
tmpKeySerializationView.toByteArray());
        }
 
        byte[] getKeyBytes() {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       return keySerializationStream.toByteArray();
+                       return dataOutputView.toByteArray();
                } catch (IOException e) {
                        throw new FlinkRuntimeException("Error while 
serializing key", e);
                }
@@ -167,9 +165,9 @@ public void setCurrentNamespace(N namespace) {
 
        byte[] getValueBytes(V value) {
                try {
-                       keySerializationStream.reset();
-                       valueSerializer.serialize(value, new 
DataOutputViewStreamWrapper(keySerializationStream));
-                       return keySerializationStream.toByteArray();
+                       dataOutputView.reset();
+                       valueSerializer.serialize(value, dataOutputView);
+                       return dataOutputView.toByteArray();
                } catch (IOException e) {
                        throw new FlinkRuntimeException("Error while 
serializing value", e);
                }
@@ -180,14 +178,12 @@ protected void writeCurrentKeyWithGroupAndNamespace() 
throws IOException {
                        backend.getCurrentKeyGroupIndex(),
                        backend.getCurrentKey(),
                        currentNamespace,
-                       keySerializationStream,
-                       keySerializationDataOutputView);
+                       dataOutputView);
        }
 
        protected void writeKeyWithGroupAndNamespace(
                        int keyGroup, K key, N namespace,
-                       ByteArrayOutputStreamWithPos keySerializationStream,
-                       DataOutputView keySerializationDataOutputView) throws 
IOException {
+                       ByteArrayDataOutputView keySerializationDataOutputView) 
throws IOException {
 
                writeKeyWithGroupAndNamespace(
                                keyGroup,
@@ -195,7 +191,6 @@ protected void writeKeyWithGroupAndNamespace(
                                backend.getKeySerializer(),
                                namespace,
                                namespaceSerializer,
-                               keySerializationStream,
                                keySerializationDataOutputView);
        }
 
@@ -205,17 +200,16 @@ protected void writeKeyWithGroupAndNamespace(
                        final TypeSerializer<K> keySerializer,
                        final N namespace,
                        final TypeSerializer<N> namespaceSerializer,
-                       final ByteArrayOutputStreamWithPos 
keySerializationStream,
-                       final DataOutputView keySerializationDataOutputView) 
throws IOException {
+                       final ByteArrayDataOutputView 
keySerializationDataOutputView) throws IOException {
 
                Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
                Preconditions.checkNotNull(keySerializer);
                Preconditions.checkNotNull(namespaceSerializer);
 
-               keySerializationStream.reset();
+               keySerializationDataOutputView.reset();
                RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, 
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-               RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
-               RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
+               RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationDataOutputView, ambiguousKeyPossible);
+               RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
        }
 
        protected V getDefaultValue() {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 209d18f43c8..4f9ef2f811c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -25,8 +25,6 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -121,17 +119,15 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        // merge the sources to the target
                        for (N source : sources) {
                                if (source != null) {
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
+                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
 
-                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
+                                       final byte[] sourceKey = 
dataOutputView.toByteArray();
                                        final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
                                        if (valueBytes != null) {
-                                               ACC value = 
valueSerializer.deserialize(
-                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+                                               
dataInputView.setData(valueBytes);
+                                               ACC value = 
valueSerializer.deserialize(dataInputView);
 
                                                if (current != null) {
                                                        current = 
aggFunction.merge(current, value);
@@ -146,27 +142,25 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
                                // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(
-                                               keyGroup, key, target,
-                                               keySerializationStream, 
keySerializationDataOutputView);
+                               writeKeyWithGroupAndNamespace(keyGroup, key, 
target, dataOutputView);
 
-                               final byte[] targetKey = 
keySerializationStream.toByteArray();
+                               final byte[] targetKey = 
dataOutputView.toByteArray();
                                final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
 
                                if (targetValueBytes != null) {
                                        // target also had a value, merge
-                                       ACC value = valueSerializer.deserialize(
-                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+                                       dataInputView.setData(targetValueBytes);
+                                       ACC value = 
valueSerializer.deserialize(dataInputView);
 
                                        current = aggFunction.merge(current, 
value);
                                }
 
                                // serialize the resulting value
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
+                               dataOutputView.reset();
+                               valueSerializer.serialize(current, 
dataOutputView);
 
                                // write the resulting value
-                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
+                               backend.db.put(columnFamily, writeOptions, 
targetKey, dataOutputView.toByteArray());
                        }
                }
                catch (Exception e) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index 5f1c650de45..7c9e3f8c3f0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -18,8 +18,8 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -28,7 +28,7 @@
 /**
  * Utils for RocksDB state serialization and deserialization.
  */
-class RocksDBKeySerializationUtils {
+public class RocksDBKeySerializationUtils {
 
        static int readKeyGroup(int keyGroupPrefixBytes, DataInputView 
inputView) throws IOException {
                int keyGroup = 0;
@@ -41,13 +41,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, 
DataInputView inputView) throws
 
        public static <K> K readKey(
                TypeSerializer<K> keySerializer,
-               ByteArrayInputStreamWithPos inputStream,
-               DataInputView inputView,
+               ByteArrayDataInputView inputView,
                boolean ambiguousKeyPossible) throws IOException {
-               int beforeRead = inputStream.getPosition();
+               int beforeRead = inputView.getPosition();
                K key = keySerializer.deserialize(inputView);
                if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
+                       int length = inputView.getPosition() - beforeRead;
                        readVariableIntBytes(inputView, length);
                }
                return key;
@@ -55,13 +54,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, 
DataInputView inputView) throws
 
        public static <N> N readNamespace(
                TypeSerializer<N> namespaceSerializer,
-               ByteArrayInputStreamWithPos inputStream,
-               DataInputView inputView,
+               ByteArrayDataInputView inputView,
                boolean ambiguousKeyPossible) throws IOException {
-               int beforeRead = inputStream.getPosition();
+               int beforeRead = inputView.getPosition();
                N namespace = namespaceSerializer.deserialize(inputView);
                if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
+                       int length = inputView.getPosition() - beforeRead;
                        readVariableIntBytes(inputView, length);
                }
                return namespace;
@@ -70,17 +68,15 @@ static int readKeyGroup(int keyGroupPrefixBytes, 
DataInputView inputView) throws
        public static <N> void writeNameSpace(
                N namespace,
                TypeSerializer<N> namespaceSerializer,
-               ByteArrayOutputStreamWithPos keySerializationStream,
-               DataOutputView keySerializationDataOutputView,
+               ByteArrayDataOutputView keySerializationDataOutputView,
                boolean ambiguousKeyPossible) throws IOException {
 
-               int beforeWrite = keySerializationStream.getPosition();
+               int beforeWrite = keySerializationDataOutputView.getPosition();
                namespaceSerializer.serialize(namespace, 
keySerializationDataOutputView);
 
                if (ambiguousKeyPossible) {
                        //write length of namespace
-                       writeLengthFrom(beforeWrite, keySerializationStream,
-                               keySerializationDataOutputView);
+                       writeLengthFrom(beforeWrite, 
keySerializationDataOutputView);
                }
        }
 
@@ -100,17 +96,15 @@ public static void writeKeyGroup(
        public static <K> void writeKey(
                K key,
                TypeSerializer<K> keySerializer,
-               ByteArrayOutputStreamWithPos keySerializationStream,
-               DataOutputView keySerializationDataOutputView,
+               ByteArrayDataOutputView keySerializationDataOutputView,
                boolean ambiguousKeyPossible) throws IOException {
                //write key
-               int beforeWrite = keySerializationStream.getPosition();
+               int beforeWrite = keySerializationDataOutputView.getPosition();
                keySerializer.serialize(key, keySerializationDataOutputView);
 
                if (ambiguousKeyPossible) {
                        //write size of key
-                       writeLengthFrom(beforeWrite, keySerializationStream,
-                               keySerializationDataOutputView);
+                       writeLengthFrom(beforeWrite, 
keySerializationDataOutputView);
                }
        }
 
@@ -123,9 +117,8 @@ private static void readVariableIntBytes(DataInputView 
inputView, int value) thr
 
        private static void writeLengthFrom(
                int fromPosition,
-               ByteArrayOutputStreamWithPos keySerializationStream,
-               DataOutputView keySerializationDateDataOutputView) throws 
IOException {
-               int length = keySerializationStream.getPosition() - 
fromPosition;
+               ByteArrayDataOutputView keySerializationDateDataOutputView) 
throws IOException {
+               int length = keySerializationDateDataOutputView.getPosition() - 
fromPosition;
                writeVariableIntBytes(length, 
keySerializationDateDataOutputView);
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0fd11252b2f..c159976f293 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -34,6 +34,9 @@
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,8 +45,6 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -131,16 +132,12 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.Spliterator;
@@ -365,17 +362,16 @@ private static void checkAndCreateDirectory(File 
directory) throws IOException {
                        (RegisteredKeyValueStateBackendMetaInfo<N, ?>) 
columnInfo.f1;
 
                final TypeSerializer<N> namespaceSerializer = 
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
-               final ByteArrayOutputStreamWithPos namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
+               final ByteArrayDataOutputView namespaceOutputView = new 
ByteArrayDataOutputView(8);
                boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
                final byte[] nameSpaceBytes;
                try {
                        RocksDBKeySerializationUtils.writeNameSpace(
                                namespace,
                                namespaceSerializer,
-                               namespaceOutputStream,
-                               new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+                               namespaceOutputView,
                                ambiguousKeyPossible);
-                       nameSpaceBytes = namespaceOutputStream.toByteArray();
+                       nameSpaceBytes = namespaceOutputView.toByteArray();
                } catch (IOException ex) {
                        throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
                }
@@ -383,7 +379,7 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
                RocksIteratorWrapper iterator = getRocksIterator(db, 
columnInfo.f0);
                iterator.seekToFirst();
 
-               final RocksIteratorForKeysWrapper<K> iteratorWrapper = new 
RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, 
keyGroupPrefixBytes,
+               final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
                        ambiguousKeyPossible, nameSpaceBytes);
 
                Stream<K> targetStream = 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 
Spliterator.ORDERED), false);
@@ -391,7 +387,7 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
        }
 
        @VisibleForTesting
-       ColumnFamilyHandle getColumnFamilyHandle(String state) {
+       public ColumnFamilyHandle getColumnFamilyHandle(String state) {
                Tuple2<ColumnFamilyHandle, ?> columnInfo = 
kvStateInformation.get(state);
                return columnInfo != null ? columnInfo.f0 : null;
        }
@@ -1451,385 +1447,6 @@ public int numKeyValueStateEntries() {
                return count;
        }
 
-       /**
-        * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
-        * The resulting iteration sequence is ordered by (key-group, kv-state).
-        */
-       @VisibleForTesting
-       static class RocksDBMergeIterator implements AutoCloseable {
-
-               private final 
PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
-               private final int keyGroupPrefixByteCount;
-               private boolean newKeyGroup;
-               private boolean newKVState;
-               private boolean valid;
-
-               MergeIterator currentSubIterator;
-
-               private static final List<Comparator<MergeIterator>> 
COMPARATORS;
-
-               static {
-                       int maxBytes = 2;
-                       COMPARATORS = new ArrayList<>(maxBytes);
-                       for (int i = 0; i < maxBytes; ++i) {
-                               final int currentBytes = i + 1;
-                               COMPARATORS.add((o1, o2) -> {
-                                       int arrayCmpRes = 
compareKeyGroupsForByteArrays(
-                                               o1.currentKey, o2.currentKey, 
currentBytes);
-                                       return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-                               });
-                       }
-               }
-
-               RocksDBMergeIterator(
-                       List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators,
-                       final int keyGroupPrefixByteCount) {
-                       Preconditions.checkNotNull(kvStateIterators);
-                       Preconditions.checkArgument(keyGroupPrefixByteCount >= 
1);
-
-                       this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
-                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
-
-                       if (kvStateIterators.size() > 0) {
-                               PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
-                                       new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
-                               for (Tuple2<RocksIteratorWrapper, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
-                                       final RocksIteratorWrapper 
rocksIterator = rocksIteratorWithKVStateId.f0;
-                                       rocksIterator.seekToFirst();
-                                       if (rocksIterator.isValid()) {
-                                               iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
-                                       } else {
-                                               
IOUtils.closeQuietly(rocksIterator);
-                                       }
-                               }
-
-                               kvStateIterators.clear();
-
-                               this.heap = iteratorPriorityQueue;
-                               this.valid = !heap.isEmpty();
-                               this.currentSubIterator = heap.poll();
-                       } else {
-                               // creating a PriorityQueue of size 0 results 
in an exception.
-                               this.heap = null;
-                               this.valid = false;
-                       }
-
-                       this.newKeyGroup = true;
-                       this.newKVState = true;
-               }
-
-               /**
-                * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
-                * calls to {@link #next()}.
-                */
-               public void next() {
-                       newKeyGroup = false;
-                       newKVState = false;
-
-                       final RocksIteratorWrapper rocksIterator = 
currentSubIterator.getIterator();
-                       rocksIterator.next();
-
-                       byte[] oldKey = currentSubIterator.getCurrentKey();
-                       if (rocksIterator.isValid()) {
-
-                               currentSubIterator.currentKey = 
rocksIterator.key();
-
-                               if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
-                                       heap.offer(currentSubIterator);
-                                       currentSubIterator = heap.poll();
-                                       newKVState = 
currentSubIterator.getIterator() != rocksIterator;
-                                       detectNewKeyGroup(oldKey);
-                               }
-                       } else {
-                               IOUtils.closeQuietly(rocksIterator);
-
-                               if (heap.isEmpty()) {
-                                       currentSubIterator = null;
-                                       valid = false;
-                               } else {
-                                       currentSubIterator = heap.poll();
-                                       newKVState = true;
-                                       detectNewKeyGroup(oldKey);
-                               }
-                       }
-               }
-
-               private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
-                       return 0 != compareKeyGroupsForByteArrays(a, b, 
keyGroupPrefixByteCount);
-               }
-
-               private void detectNewKeyGroup(byte[] oldKey) {
-                       if (isDifferentKeyGroup(oldKey, 
currentSubIterator.currentKey)) {
-                               newKeyGroup = true;
-                       }
-               }
-
-               /**
-                * @return key-group for the current key
-                */
-               public int keyGroup() {
-                       int result = 0;
-                       //big endian decode
-                       for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
-                               result <<= 8;
-                               result |= (currentSubIterator.currentKey[i] & 
0xFF);
-                       }
-                       return result;
-               }
-
-               public byte[] key() {
-                       return currentSubIterator.getCurrentKey();
-               }
-
-               public byte[] value() {
-                       return currentSubIterator.getIterator().value();
-               }
-
-               /**
-                * @return Id of K/V state to which the current key belongs.
-                */
-               public int kvStateId() {
-                       return currentSubIterator.getKvStateId();
-               }
-
-               /**
-                * Indicates if current key starts a new k/v-state, i.e. belong 
to a different k/v-state than it's predecessor.
-                * @return true iff the current key belong to a different 
k/v-state than it's predecessor.
-                */
-               public boolean isNewKeyValueState() {
-                       return newKVState;
-               }
-
-               /**
-                * Indicates if current key starts a new key-group, i.e. belong 
to a different key-group than it's predecessor.
-                * @return true iff the current key belong to a different 
key-group than it's predecessor.
-                */
-               public boolean isNewKeyGroup() {
-                       return newKeyGroup;
-               }
-
-               /**
-                * Check if the iterator is still valid. Getters like {@link 
#key()}, {@link #value()}, etc. as well as
-                * {@link #next()} should only be called if valid returned 
true. Should be checked after each call to
-                * {@link #next()} before accessing iterator state.
-                * @return True iff this iterator is valid.
-                */
-               public boolean isValid() {
-                       return valid;
-               }
-
-               private static int compareKeyGroupsForByteArrays(byte[] a, 
byte[] b, int len) {
-                       for (int i = 0; i < len; ++i) {
-                               int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
-                               if (diff != 0) {
-                                       return diff;
-                               }
-                       }
-                       return 0;
-               }
-
-               @Override
-               public void close() {
-                       IOUtils.closeQuietly(currentSubIterator);
-                       currentSubIterator = null;
-
-                       IOUtils.closeAllQuietly(heap);
-                       heap.clear();
-               }
-       }
-
-       /**
-        * Wraps a RocksDB iterator to cache it's current key and assigns an id 
for the key/value state to the iterator.
-        * Used by #MergeIterator.
-        */
-       @VisibleForTesting
-       protected static final class MergeIterator implements AutoCloseable {
-
-               /**
-                * @param iterator  The #RocksIterator to wrap .
-                * @param kvStateId Id of the K/V state to which this iterator 
belongs.
-                */
-               MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
-                       this.iterator = Preconditions.checkNotNull(iterator);
-                       this.currentKey = iterator.key();
-                       this.kvStateId = kvStateId;
-               }
-
-               private final RocksIteratorWrapper iterator;
-               private byte[] currentKey;
-               private final int kvStateId;
-
-               public byte[] getCurrentKey() {
-                       return currentKey;
-               }
-
-               public void setCurrentKey(byte[] currentKey) {
-                       this.currentKey = currentKey;
-               }
-
-               public RocksIteratorWrapper getIterator() {
-                       return iterator;
-               }
-
-               public int getKvStateId() {
-                       return kvStateId;
-               }
-
-               @Override
-               public void close() {
-                       IOUtils.closeQuietly(iterator);
-               }
-       }
-
-       private static final class TransformingRocksIteratorWrapper extends 
RocksIteratorWrapper {
-               @Nonnull
-               private final StateSnapshotTransformer<byte[]> 
stateSnapshotTransformer;
-               private byte[] current;
-
-               public TransformingRocksIteratorWrapper(
-                       @Nonnull RocksIterator iterator,
-                       @Nonnull StateSnapshotTransformer<byte[]> 
stateSnapshotTransformer) {
-                       super(iterator);
-                       this.stateSnapshotTransformer = 
stateSnapshotTransformer;
-               }
-
-               @Override
-               public void seekToFirst() {
-                       super.seekToFirst();
-                       filterOrTransform(super::next);
-               }
-
-               @Override
-               public void seekToLast() {
-                       super.seekToLast();
-                       filterOrTransform(super::prev);
-               }
-
-               @Override
-               public void next() {
-                       super.next();
-                       filterOrTransform(super::next);
-               }
-
-               @Override
-               public void prev() {
-                       super.prev();
-                       filterOrTransform(super::prev);
-               }
-
-               private void filterOrTransform(Runnable advance) {
-                       while (isValid() && (current = 
stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
-                               advance.run();
-                       }
-               }
-
-               @Override
-               public byte[] value() {
-                       if (!isValid()) {
-                               throw new IllegalStateException("value() method 
cannot be called if isValid() is false");
-                       }
-                       return current;
-               }
-       }
-
-       /**
-        * Adapter class to bridge between {@link RocksIteratorWrapper} and 
{@link Iterator} to iterate over the keys. This class
-        * is not thread safe.
-        *
-        * @param <K> the type of the iterated objects, which are keys in 
RocksDB.
-        */
-       static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, 
AutoCloseable {
-               private final RocksIteratorWrapper iterator;
-               private final String state;
-               private final TypeSerializer<K> keySerializer;
-               private final int keyGroupPrefixBytes;
-               private final byte[] namespaceBytes;
-               private final boolean ambiguousKeyPossible;
-               private K nextKey;
-               private K previousKey;
-
-               RocksIteratorForKeysWrapper(
-                       RocksIteratorWrapper iterator,
-                       String state,
-                       TypeSerializer<K> keySerializer,
-                       int keyGroupPrefixBytes,
-                       boolean ambiguousKeyPossible,
-                       byte[] namespaceBytes) {
-                       this.iterator = Preconditions.checkNotNull(iterator);
-                       this.state = Preconditions.checkNotNull(state);
-                       this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
-                       this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
-                       this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
-                       this.nextKey = null;
-                       this.previousKey = null;
-                       this.ambiguousKeyPossible = ambiguousKeyPossible;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       try {
-                               while (nextKey == null && iterator.isValid()) {
-
-                                       byte[] key = iterator.key();
-
-                                       ByteArrayInputStreamWithPos inputStream 
=
-                                               new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes);
-
-                                       DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(inputStream);
-
-                                       K value = 
RocksDBKeySerializationUtils.readKey(
-                                               keySerializer,
-                                               inputStream,
-                                               dataInput,
-                                               ambiguousKeyPossible);
-
-                                       int namespaceByteStartPos = 
inputStream.getPosition();
-
-                                       if (isMatchingNameSpace(key, 
namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
-                                               previousKey = value;
-                                               nextKey = value;
-                                       }
-                                       iterator.next();
-                               }
-                       } catch (Exception e) {
-                               throw new FlinkRuntimeException("Failed to 
access state [" + state + "]", e);
-                       }
-                       return nextKey != null;
-               }
-
-               @Override
-               public K next() {
-                       if (!hasNext()) {
-                               throw new NoSuchElementException("Failed to 
access state [" + state + "]");
-                       }
-
-                       K tmpKey = nextKey;
-                       nextKey = null;
-                       return tmpKey;
-               }
-
-               private boolean isMatchingNameSpace(@Nonnull byte[] key, int 
beginPos) {
-                       final int namespaceBytesLength = namespaceBytes.length;
-                       final int basicLength = namespaceBytesLength + beginPos;
-                       if (key.length >= basicLength) {
-                               for (int i = 0; i < namespaceBytesLength; ++i) {
-                                       if (key[beginPos + i] != 
namespaceBytes[i]) {
-                                               return false;
-                                       }
-                               }
-                               return true;
-                       }
-                       return false;
-               }
-
-               @Override
-               public void close() {
-                       iterator.close();
-               }
-       }
-
        private class FullSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
 
                @Override
@@ -2214,8 +1831,8 @@ private void writeKVStateData() throws IOException, 
InterruptedException {
                                
checkpointStreamWithResultProvider.getCheckpointOutputStream();
 
                        try {
-                               // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
-                               try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
+                               // Here we transfer ownership of RocksIterators 
to the RocksStatesPerKeyGroupMergeIterator
+                               try (RocksStatesPerKeyGroupMergeIterator 
mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
                                        kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
 
                                        // handover complete, null out to 
prevent double close
@@ -2729,7 +2346,7 @@ private static RocksIteratorWrapper getRocksIterator(
                RocksIterator rocksIterator = 
db.newIterator(columnFamilyHandle, readOptions);
                return stateSnapshotTransformer == null ?
                        new RocksIteratorWrapper(rocksIterator) :
-                       new TransformingRocksIteratorWrapper(rocksIterator, 
stateSnapshotTransformer);
+                       new RocksTransformingIteratorWrapper(rocksIterator, 
stateSnapshotTransformer);
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 176f48cda98..cdd7afb7d9a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -25,9 +25,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -116,32 +115,31 @@ private RocksDBListState(
        public List<V> getInternal() {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] key = dataOutputView.toByteArray();
                        byte[] valueBytes = backend.db.get(columnFamily, key);
-                       return deserializeList(valueBytes, elementSerializer);
+                       return deserializeList(valueBytes);
                } catch (IOException | RocksDBException e) {
                        throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
                }
        }
 
-       private static <V> List<V> deserializeList(
-               byte[] valueBytes, TypeSerializer<V> elementSerializer) {
+       private List<V> deserializeList(
+               byte[] valueBytes) {
                if (valueBytes == null) {
                        return null;
                }
 
-               DataInputViewStreamWrapper in = new 
ByteArrayDataInputView(valueBytes);
+               dataInputView.setData(valueBytes);
 
                List<V> result = new ArrayList<>();
                V next;
-               while ((next = deserializeNextElement(in, elementSerializer)) 
!= null) {
+               while ((next = deserializeNextElement(dataInputView, 
elementSerializer)) != null) {
                        result.add(next);
                }
                return result;
        }
 
-       private static <V> V deserializeNextElement(
-               DataInputViewStreamWrapper in, TypeSerializer<V> 
elementSerializer) {
+       private static <V> V deserializeNextElement(DataInputViewStreamWrapper 
in, TypeSerializer<V> elementSerializer) {
                try {
                        if (in.available() > 0) {
                                V element = elementSerializer.deserialize(in);
@@ -162,11 +160,10 @@ public void add(V value) {
 
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       keySerializationStream.reset();
-                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-                       elementSerializer.serialize(value, out);
-                       backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       byte[] key = dataOutputView.toByteArray();
+                       dataOutputView.reset();
+                       elementSerializer.serialize(value, dataOutputView);
+                       backend.db.merge(columnFamily, writeOptions, key, 
dataOutputView.toByteArray());
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Error while adding 
data to RocksDB", e);
                }
@@ -184,19 +181,15 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
 
                try {
                        // create the target full-binary-key
-                       writeKeyWithGroupAndNamespace(
-                                       keyGroup, key, target,
-                                       keySerializationStream, 
keySerializationDataOutputView);
-                       final byte[] targetKey = 
keySerializationStream.toByteArray();
+                       writeKeyWithGroupAndNamespace(keyGroup, key, target, 
dataOutputView);
+                       final byte[] targetKey = dataOutputView.toByteArray();
 
                        // merge the sources to the target
                        for (N source : sources) {
                                if (source != null) {
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
+                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
 
-                                       byte[] sourceKey = 
keySerializationStream.toByteArray();
+                                       byte[] sourceKey = 
dataOutputView.toByteArray();
                                        byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
@@ -225,14 +218,9 @@ public void updateInternal(List<V> values) {
                if (!values.isEmpty()) {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = 
keySerializationStream.toByteArray();
-
-                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, keySerializationStream);
-                               if (premerge != null) {
-                                       backend.db.put(columnFamily, 
writeOptions, key, premerge);
-                               } else {
-                                       throw new IOException("Failed pre-merge 
values in update()");
-                               }
+                               byte[] key = dataOutputView.toByteArray();
+                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, dataOutputView);
+                               backend.db.put(columnFamily, writeOptions, key, 
premerge);
                        } catch (IOException | RocksDBException e) {
                                throw new FlinkRuntimeException("Error while 
updating data to RocksDB", e);
                        }
@@ -246,14 +234,9 @@ public void addAll(List<V> values) {
                if (!values.isEmpty()) {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = 
keySerializationStream.toByteArray();
-
-                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, keySerializationStream);
-                               if (premerge != null) {
-                                       backend.db.merge(columnFamily, 
writeOptions, key, premerge);
-                               } else {
-                                       throw new IOException("Failed pre-merge 
values in addAll()");
-                               }
+                               byte[] key = dataOutputView.toByteArray();
+                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, dataOutputView);
+                               backend.db.merge(columnFamily, writeOptions, 
key, premerge);
                        } catch (IOException | RocksDBException e) {
                                throw new FlinkRuntimeException("Error while 
updating data to RocksDB", e);
                        }
@@ -263,8 +246,7 @@ public void addAll(List<V> values) {
        private static <V> byte[] getPreMergedValue(
                List<V> values,
                TypeSerializer<V> elementSerializer,
-               ByteArrayOutputStreamWithPos keySerializationStream) throws 
IOException {
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+               ByteArrayDataOutputView keySerializationStream) throws 
IOException {
 
                keySerializationStream.reset();
                boolean first = true;
@@ -275,7 +257,7 @@ public void addAll(List<V> values) {
                        } else {
                                keySerializationStream.write(DELIMITER);
                        }
-                       elementSerializer.serialize(value, out);
+                       elementSerializer.serialize(value, 
keySerializationStream);
                }
 
                return keySerializationStream.toByteArray();
@@ -298,7 +280,7 @@ public void addAll(List<V> values) {
        static class StateSnapshotTransformerWrapper<T> implements 
StateSnapshotTransformer<byte[]> {
                private final StateSnapshotTransformer<T> elementTransformer;
                private final TypeSerializer<T> elementSerializer;
-               private final ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(128);
+               private final ByteArrayDataOutputView out = new 
ByteArrayDataOutputView(128);
                private final 
CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
 
                StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> 
elementTransformer, TypeSerializer<T> elementSerializer) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index b08eade53b9..ad6b7c22ec4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,10 +26,6 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
@@ -263,8 +259,7 @@ public void clear() {
 
                int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, 
backend.getNumberOfKeyGroups());
 
-               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(128);
-               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               ByteArrayDataOutputView outputView = new 
ByteArrayDataOutputView(128);
 
                writeKeyWithGroupAndNamespace(
                                keyGroup,
@@ -272,10 +267,9 @@ public void clear() {
                                safeKeySerializer,
                                keyAndNamespace.f1,
                                safeNamespaceSerializer,
-                               outputStream,
                                outputView);
 
-               final byte[] keyPrefixBytes = outputStream.toByteArray();
+               final byte[] keyPrefixBytes = outputView.toByteArray();
 
                final MapSerializer<UK, UV> serializer = (MapSerializer<UK, 
UV>) safeValueSerializer;
 
@@ -309,14 +303,14 @@ public void clear() {
        private byte[] serializeCurrentKeyAndNamespace() throws IOException {
                writeCurrentKeyWithGroupAndNamespace();
 
-               return keySerializationStream.toByteArray();
+               return dataOutputView.toByteArray();
        }
 
        private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
                serializeCurrentKeyAndNamespace();
-               userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
+               userKeySerializer.serialize(userKey, dataOutputView);
 
-               return keySerializationStream.toByteArray();
+               return dataOutputView.toByteArray();
        }
 
        private byte[] serializeUserValue(UV userValue) throws IOException {
@@ -328,34 +322,29 @@ private UV deserializeUserValue(byte[] rawValueBytes) 
throws IOException {
        }
 
        private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> 
valueSerializer) throws IOException {
-               keySerializationStream.reset();
+               dataOutputView.reset();
 
                if (userValue == null) {
-                       keySerializationDataOutputView.writeBoolean(true);
+                       dataOutputView.writeBoolean(true);
                } else {
-                       keySerializationDataOutputView.writeBoolean(false);
-                       valueSerializer.serialize(userValue, 
keySerializationDataOutputView);
+                       dataOutputView.writeBoolean(false);
+                       valueSerializer.serialize(userValue, dataOutputView);
                }
 
-               return keySerializationStream.toByteArray();
+               return dataOutputView.toByteArray();
        }
 
        private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, 
TypeSerializer<UK> keySerializer) throws IOException {
-               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawKeyBytes);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-
-               in.skipBytes(userKeyOffset);
-
-               return keySerializer.deserialize(in);
+               dataInputView.setData(rawKeyBytes, userKeyOffset, 
rawKeyBytes.length - userKeyOffset);
+               return keySerializer.deserialize(dataInputView);
        }
 
        private UV deserializeUserValue(byte[] rawValueBytes, 
TypeSerializer<UV> valueSerializer) throws IOException {
-               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawValueBytes);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+               dataInputView.setData(rawValueBytes);
 
-               boolean isNull = in.readBoolean();
+               boolean isNull = dataInputView.readBoolean();
 
-               return isNull ? null : valueSerializer.deserialize(in);
+               return isNull ? null : 
valueSerializer.deserialize(dataInputView);
        }
 
        private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] 
rawKeyBytes) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 490960e39be..d1fe3bd3798 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -25,15 +25,12 @@
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.rocksdb.ColumnFamilyHandle;
 
-import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -87,7 +84,7 @@ private RocksDBReducingState(ColumnFamilyHandle columnFamily,
        }
 
        @Override
-       public V get() throws IOException {
+       public V get() {
                return getInternal();
        }
 
@@ -100,7 +97,7 @@ public void add(V value) throws Exception {
        }
 
        @Override
-       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
+       public void mergeNamespaces(N target, Collection<N> sources) {
                if (sources == null || sources.isEmpty()) {
                        return;
                }
@@ -116,17 +113,15 @@ public void mergeNamespaces(N target, Collection<N> 
sources) throws Exception {
                        for (N source : sources) {
                                if (source != null) {
 
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
+                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
 
-                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
+                                       final byte[] sourceKey = 
dataOutputView.toByteArray();
                                        final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
                                        if (valueBytes != null) {
-                                               V value = 
valueSerializer.deserialize(
-                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+                                               
dataInputView.setData(valueBytes);
+                                               V value = 
valueSerializer.deserialize(dataInputView);
 
                                                if (current != null) {
                                                        current = 
reduceFunction.reduce(current, value);
@@ -141,27 +136,25 @@ public void mergeNamespaces(N target, Collection<N> 
sources) throws Exception {
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
                                // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(
-                                               keyGroup, key, target,
-                                               keySerializationStream, 
keySerializationDataOutputView);
+                               writeKeyWithGroupAndNamespace(keyGroup, key, 
target, dataOutputView);
 
-                               final byte[] targetKey = 
keySerializationStream.toByteArray();
+                               final byte[] targetKey = 
dataOutputView.toByteArray();
                                final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
 
                                if (targetValueBytes != null) {
+                                       dataInputView.setData(targetValueBytes);
                                        // target also had a value, merge
-                                       V value = valueSerializer.deserialize(
-                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+                                       V value = 
valueSerializer.deserialize(dataInputView);
 
                                        current = 
reduceFunction.reduce(current, value);
                                }
 
                                // serialize the resulting value
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
+                               dataOutputView.reset();
+                               valueSerializer.serialize(current, 
dataOutputView);
 
                                // write the resulting value
-                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
+                               backend.db.put(columnFamily, writeOptions, 
targetKey, dataOutputView.toByteArray());
                        }
                }
                catch (Exception e) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 5ae894e8177..e9399e12a32 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -23,8 +23,6 @@
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -32,7 +30,6 @@
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 /**
@@ -84,12 +81,13 @@ private RocksDBValueState(
        public V value() {
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] key = dataOutputView.toByteArray();
                        byte[] valueBytes = backend.db.get(columnFamily, key);
                        if (valueBytes == null) {
                                return getDefaultValue();
                        }
-                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+                       dataInputView.setData(valueBytes);
+                       return valueSerializer.deserialize(dataInputView);
                } catch (IOException | RocksDBException e) {
                        throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB.", e);
                }
@@ -101,13 +99,13 @@ public void update(V value) {
                        clear();
                        return;
                }
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
                try {
                        writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       keySerializationStream.reset();
-                       valueSerializer.serialize(value, out);
-                       backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       byte[] key = dataOutputView.toByteArray();
+                       dataOutputView.reset();
+                       valueSerializer.serialize(value, dataOutputView);
+                       backend.db.put(columnFamily, writeOptions, key, 
dataOutputView.toByteArray());
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Error while adding 
data to RocksDB", e);
                }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
new file mode 100644
index 00000000000..993b35ae2b6
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for 
the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+       /**
+        * @param iterator underlying {@link RocksIteratorWrapper}
+        * @param kvStateId Id of the K/V state to which this iterator belongs.
+        */
+       RocksSingleStateIterator(@Nonnull RocksIteratorWrapper iterator, int 
kvStateId) {
+               this.iterator = iterator;
+               this.currentKey = iterator.key();
+               this.kvStateId = kvStateId;
+       }
+
+       @Nonnull
+       private final RocksIteratorWrapper iterator;
+       private byte[] currentKey;
+       private final int kvStateId;
+
+       public byte[] getCurrentKey() {
+               return currentKey;
+       }
+
+       public void setCurrentKey(byte[] currentKey) {
+               this.currentKey = currentKey;
+       }
+
+       @Nonnull
+       public RocksIteratorWrapper getIterator() {
+               return iterator;
+       }
+
+       public int getKvStateId() {
+               return kvStateId;
+       }
+
+       @Override
+       public void close() {
+               IOUtils.closeQuietly(iterator);
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
new file mode 100644
index 00000000000..0fa93dc8a1f
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param <K> the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator<K> implements Iterator<K>, AutoCloseable {
+
+       @Nonnull
+       private final RocksIteratorWrapper iterator;
+
+       @Nonnull
+       private final String state;
+
+       @Nonnull
+       private final TypeSerializer<K> keySerializer;
+
+       @Nonnull
+       private final byte[] namespaceBytes;
+
+       private final boolean ambiguousKeyPossible;
+       private final int keyGroupPrefixBytes;
+       private final ByteArrayDataInputView byteArrayDataInputView;
+       private K nextKey;
+       private K previousKey;
+
+       public RocksStateKeysIterator(
+               @Nonnull RocksIteratorWrapper iterator,
+               @Nonnull String state,
+               @Nonnull TypeSerializer<K> keySerializer,
+               int keyGroupPrefixBytes,
+               boolean ambiguousKeyPossible,
+               @Nonnull byte[] namespaceBytes) {
+               this.iterator = iterator;
+               this.state = state;
+               this.keySerializer = keySerializer;
+               this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+               this.namespaceBytes = namespaceBytes;
+               this.nextKey = null;
+               this.previousKey = null;
+               this.ambiguousKeyPossible = ambiguousKeyPossible;
+               this.byteArrayDataInputView = new ByteArrayDataInputView();
+       }
+
+       @Override
+       public boolean hasNext() {
+               try {
+                       while (nextKey == null && iterator.isValid()) {
+
+                               final byte[] keyBytes = iterator.key();
+                               final K currentKey = deserializeKey(keyBytes, 
byteArrayDataInputView);
+                               final int namespaceByteStartPos = 
byteArrayDataInputView.getPosition();
+
+                               if (isMatchingNameSpace(keyBytes, 
namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
+                                       previousKey = currentKey;
+                                       nextKey = currentKey;
+                               }
+                               iterator.next();
+                       }
+               } catch (Exception e) {
+                       throw new FlinkRuntimeException("Failed to access state 
[" + state + "]", e);
+               }
+               return nextKey != null;
+       }
+
+       @Override
+       public K next() {
+               if (!hasNext()) {
+                       throw new NoSuchElementException("Failed to access 
state [" + state + "]");
+               }
+
+               K tmpKey = nextKey;
+               nextKey = null;
+               return tmpKey;
+       }
+
+       private K deserializeKey(byte[] keyBytes, ByteArrayDataInputView 
readView) throws IOException {
+               readView.setData(keyBytes, keyGroupPrefixBytes, keyBytes.length 
- keyGroupPrefixBytes);
+               return RocksDBKeySerializationUtils.readKey(
+                       keySerializer,
+                       byteArrayDataInputView,
+                       ambiguousKeyPossible);
+       }
+
+       private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
+               final int namespaceBytesLength = namespaceBytes.length;
+               final int basicLength = namespaceBytesLength + beginPos;
+               if (key.length >= basicLength) {
+                       for (int i = 0; i < namespaceBytesLength; ++i) {
+                               if (key[beginPos + i] != namespaceBytes[i]) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+               return false;
+       }
+
+       @Override
+       public void close() {
+               iterator.close();
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
new file mode 100644
index 00000000000..20e5dd03ce3
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+       private final PriorityQueue<RocksSingleStateIterator> heap;
+       private final int keyGroupPrefixByteCount;
+       private boolean newKeyGroup;
+       private boolean newKVState;
+       private boolean valid;
+       private RocksSingleStateIterator currentSubIterator;
+
+       private static final List<Comparator<RocksSingleStateIterator>> 
COMPARATORS;
+
+       static {
+               int maxBytes = 2;
+               COMPARATORS = new ArrayList<>(maxBytes);
+               for (int i = 0; i < maxBytes; ++i) {
+                       final int currentBytes = i + 1;
+                       COMPARATORS.add((o1, o2) -> {
+                               int arrayCmpRes = compareKeyGroupsForByteArrays(
+                                       o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+                               return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+                       });
+               }
+       }
+
+       public RocksStatesPerKeyGroupMergeIterator(
+               List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+               final int keyGroupPrefixByteCount) {
+               Preconditions.checkNotNull(kvStateIterators);
+               Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+               this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+               if (kvStateIterators.size() > 0) {
+                       this.heap = buildIteratorHeap(kvStateIterators);
+                       this.valid = !heap.isEmpty();
+                       this.currentSubIterator = heap.poll();
+                       kvStateIterators.clear();
+               } else {
+                       // creating a PriorityQueue of size 0 results in an 
exception.
+                       this.heap = null;
+                       this.valid = false;
+               }
+
+               this.newKeyGroup = true;
+               this.newKVState = true;
+       }
+
+       /**
+        * Advances the iterator. Should only be called if {@link #isValid()} 
returned true.
+        * Valid flag can only change after calling {@link #next()}.
+        */
+       public void next() {
+               newKeyGroup = false;
+               newKVState = false;
+
+               final RocksIteratorWrapper rocksIterator = 
currentSubIterator.getIterator();
+               rocksIterator.next();
+
+               byte[] oldKey = currentSubIterator.getCurrentKey();
+               if (rocksIterator.isValid()) {
+
+                       currentSubIterator.setCurrentKey(rocksIterator.key());
+
+                       if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
+                               heap.offer(currentSubIterator);
+                               currentSubIterator = heap.remove();
+                               newKVState = currentSubIterator.getIterator() 
!= rocksIterator;
+                               detectNewKeyGroup(oldKey);
+                       }
+               } else {
+                       IOUtils.closeQuietly(rocksIterator);
+
+                       if (heap.isEmpty()) {
+                               currentSubIterator = null;
+                               valid = false;
+                       } else {
+                               currentSubIterator = heap.remove();
+                               newKVState = true;
+                               detectNewKeyGroup(oldKey);
+                       }
+               }
+       }
+
+       private PriorityQueue<RocksSingleStateIterator> buildIteratorHeap(
+               List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators) {
+
+               Comparator<RocksSingleStateIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+               PriorityQueue<RocksSingleStateIterator> iteratorPriorityQueue =
+                       new PriorityQueue<>(kvStateIterators.size(), 
iteratorComparator);
+
+               for (Tuple2<RocksIteratorWrapper, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
+                       final RocksIteratorWrapper rocksIterator = 
rocksIteratorWithKVStateId.f0;
+                       rocksIterator.seekToFirst();
+                       if (rocksIterator.isValid()) {
+                               iteratorPriorityQueue.offer(
+                                       new 
RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                       } else {
+                               IOUtils.closeQuietly(rocksIterator);
+                       }
+               }
+               return iteratorPriorityQueue;
+       }
+
+       private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+               return 0 != compareKeyGroupsForByteArrays(a, b, 
keyGroupPrefixByteCount);
+       }
+
+       private void detectNewKeyGroup(byte[] oldKey) {
+               if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
+                       newKeyGroup = true;
+               }
+       }
+
+       /**
+        * @return key-group for the current key
+        */
+       public int keyGroup() {
+               final byte[] currentKey = currentSubIterator.getCurrentKey();
+               int result = 0;
+               //big endian decode
+               for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+                       result <<= 8;
+                       result |= (currentKey[i] & 0xFF);
+               }
+               return result;
+       }
+
+       public byte[] key() {
+               return currentSubIterator.getCurrentKey();
+       }
+
+       public byte[] value() {
+               return currentSubIterator.getIterator().value();
+       }
+
+       /**
+        * @return Id of K/V state to which the current key belongs.
+        */
+       public int kvStateId() {
+               return currentSubIterator.getKvStateId();
+       }
+
+       /**
+        * Indicates if current key starts a new k/v-state, i.e. belong to a 
different k/v-state than it's predecessor.
+        * @return true iff the current key belong to a different k/v-state 
than it's predecessor.
+        */
+       public boolean isNewKeyValueState() {
+               return newKVState;
+       }
+
+       /**
+        * Indicates if current key starts a new key-group, i.e. belong to a 
different key-group than it's predecessor.
+        * @return true iff the current key belong to a different key-group 
than it's predecessor.
+        */
+       public boolean isNewKeyGroup() {
+               return newKeyGroup;
+       }
+
+       /**
+        * Check if the iterator is still valid. Getters like {@link #key()}, 
{@link #value()}, etc. as well as
+        * {@link #next()} should only be called if valid returned true. Should 
be checked after each call to
+        * {@link #next()} before accessing iterator state.
+        * @return True iff this iterator is valid.
+        */
+       public boolean isValid() {
+               return valid;
+       }
+
+       private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, 
int len) {
+               for (int i = 0; i < len; ++i) {
+                       int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+                       if (diff != 0) {
+                               return diff;
+                       }
+               }
+               return 0;
+       }
+
+       @Override
+       public void close() {
+               IOUtils.closeQuietly(currentSubIterator);
+               currentSubIterator = null;
+
+               IOUtils.closeAllQuietly(heap);
+               heap.clear();
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
new file mode 100644
index 00000000000..e2fec423e90
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIterator} that applies a given {@link 
StateSnapshotTransformer} to the elements
+ * during the iteration.
+ */
+public class RocksTransformingIteratorWrapper extends RocksIteratorWrapper {
+
+       @Nonnull
+       private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
+       private byte[] current;
+
+       public RocksTransformingIteratorWrapper(
+               @Nonnull RocksIterator iterator,
+               @Nonnull StateSnapshotTransformer<byte[]> 
stateSnapshotTransformer) {
+               super(iterator);
+               this.stateSnapshotTransformer = stateSnapshotTransformer;
+       }
+
+       @Override
+       public void seekToFirst() {
+               super.seekToFirst();
+               filterOrTransform(super::next);
+       }
+
+       @Override
+       public void seekToLast() {
+               super.seekToLast();
+               filterOrTransform(super::prev);
+       }
+
+       @Override
+       public void next() {
+               super.next();
+               filterOrTransform(super::next);
+       }
+
+       @Override
+       public void prev() {
+               super.prev();
+               filterOrTransform(super::prev);
+       }
+
+       private void filterOrTransform(@Nonnull Runnable advance) {
+               while (isValid() && (current = 
stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
+                       advance.run();
+               }
+       }
+
+       @Override
+       public byte[] value() {
+               if (!isValid()) {
+                       throw new IllegalStateException("value() method cannot 
be called if isValid() is false");
+               }
+               return current;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4121cf08592..483b8fdd1dc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -18,9 +18,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.TestLogger;
@@ -114,35 +112,30 @@ private void testClipDBWithKeyGroupRangeHelper(
                        int currentGroupRangeStart = 
currentGroupRange.getStartKeyGroup();
                        int currentGroupRangeEnd = 
currentGroupRange.getEndKeyGroup();
 
+                       ByteArrayDataOutputView outputView = new 
ByteArrayDataOutputView(32);
                        for (int i = currentGroupRangeStart; i <= 
currentGroupRangeEnd; ++i) {
-                               ByteArrayOutputStreamWithPos 
outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-                               DataOutputView outputView = new 
DataOutputViewStreamWrapper(outputStreamWithPos);
                                for (int j = 0; j < 100; ++j) {
-                                       outputStreamWithPos.reset();
+                                       outputView.reset();
                                        
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
                                        RocksDBKeySerializationUtils.writeKey(
                                                j,
                                                IntSerializer.INSTANCE,
-                                               outputStreamWithPos,
-                                               new 
DataOutputViewStreamWrapper(outputStreamWithPos),
+                                               outputView,
                                                false);
-                                       rocksDB.put(columnFamilyHandle, 
outputStreamWithPos.toByteArray(), String.valueOf(j).getBytes());
+                                       rocksDB.put(columnFamilyHandle, 
outputView.toByteArray(), String.valueOf(j).getBytes());
                                }
                        }
 
                        for (int i = currentGroupRangeStart; i <= 
currentGroupRangeEnd; ++i) {
-                               ByteArrayOutputStreamWithPos 
outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-                               DataOutputView outputView = new 
DataOutputViewStreamWrapper(outputStreamWithPos);
                                for (int j = 0; j < 100; ++j) {
-                                       outputStreamWithPos.reset();
+                                       outputView.reset();
                                        
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
                                        RocksDBKeySerializationUtils.writeKey(
                                                j,
                                                IntSerializer.INSTANCE,
-                                               outputStreamWithPos,
-                                               new 
DataOutputViewStreamWrapper(outputStreamWithPos),
+                                               outputView,
                                                false);
-                                       byte[] value = 
rocksDB.get(columnFamilyHandle, outputStreamWithPos.toByteArray());
+                                       byte[] value = 
rocksDB.get(columnFamilyHandle, outputView.toByteArray());
                                        Assert.assertEquals(String.valueOf(j), 
new String(value));
                                }
                        }
@@ -155,19 +148,15 @@ private void testClipDBWithKeyGroupRangeHelper(
                                keyGroupPrefixBytes);
 
                        for (int i = currentGroupRangeStart; i <= 
currentGroupRangeEnd; ++i) {
-                               ByteArrayOutputStreamWithPos 
outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-                               DataOutputView outputView = new 
DataOutputViewStreamWrapper(outputStreamWithPos);
                                for (int j = 0; j < 100; ++j) {
-                                       outputStreamWithPos.reset();
+                                       outputView.reset();
                                        
RocksDBKeySerializationUtils.writeKeyGroup(i, keyGroupPrefixBytes, outputView);
                                        RocksDBKeySerializationUtils.writeKey(
                                                j,
                                                IntSerializer.INSTANCE,
-                                               outputStreamWithPos,
-                                               new 
DataOutputViewStreamWrapper(outputStreamWithPos),
+                                               outputView,
                                                false);
-                                       byte[] value = rocksDB.get(
-                                               columnFamilyHandle, 
outputStreamWithPos.toByteArray());
+                                       byte[] value = 
rocksDB.get(columnFamilyHandle, outputView.toByteArray());
                                        if (targetGroupRange.contains(i)) {
                                                
Assert.assertEquals(String.valueOf(j), new String(value));
                                        } else {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
index b1737edcca9..d92bef5e960 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtilsTest.java
@@ -19,6 +19,8 @@
 
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -61,39 +63,39 @@ public void testKeyGroupSerializationAndDeserialization() 
throws Exception {
 
        @Test
        public void testKeySerializationAndDeserialization() throws Exception {
-               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(8);
-               DataOutputView outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               final ByteArrayDataOutputView outputView = new 
ByteArrayDataOutputView(8);
+               final ByteArrayDataInputView inputView = new 
ByteArrayDataInputView();
 
                // test for key
                for (int orgKey = 0; orgKey < 100; ++orgKey) {
-                       outputStream.reset();
-                       RocksDBKeySerializationUtils.writeKey(orgKey, 
IntSerializer.INSTANCE, outputStream, outputView, false);
-                       ByteArrayInputStreamWithPos inputStream = new 
ByteArrayInputStreamWithPos(outputStream.toByteArray());
-                       int deserializedKey = 
RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new 
DataInputViewStreamWrapper(inputStream), false);
+                       outputView.reset();
+                       RocksDBKeySerializationUtils.writeKey(orgKey, 
IntSerializer.INSTANCE, outputView, false);
+                       inputView.setData(outputView.toByteArray());
+                       int deserializedKey = 
RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, false);
                        Assert.assertEquals(orgKey, deserializedKey);
 
-                       RocksDBKeySerializationUtils.writeKey(orgKey, 
IntSerializer.INSTANCE, outputStream, outputView, true);
-                       inputStream = new 
ByteArrayInputStreamWithPos(outputStream.toByteArray());
-                       deserializedKey = 
RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputStream, new 
DataInputViewStreamWrapper(inputStream), true);
+                       RocksDBKeySerializationUtils.writeKey(orgKey, 
IntSerializer.INSTANCE, outputView, true);
+                       inputView.setData(outputView.toByteArray());
+                       deserializedKey = 
RocksDBKeySerializationUtils.readKey(IntSerializer.INSTANCE, inputView, true);
                        Assert.assertEquals(orgKey, deserializedKey);
                }
        }
 
        @Test
        public void testNamespaceSerializationAndDeserialization() throws 
Exception {
-               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(8);
-               DataOutputView outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               final ByteArrayDataOutputView outputView = new 
ByteArrayDataOutputView(8);
+               final ByteArrayDataInputView inputView = new 
ByteArrayDataInputView();
 
                for (int orgNamespace = 0; orgNamespace < 100; ++orgNamespace) {
-                       outputStream.reset();
-                       
RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, 
IntSerializer.INSTANCE, outputStream, outputView, false);
-                       ByteArrayInputStreamWithPos inputStream = new 
ByteArrayInputStreamWithPos(outputStream.toByteArray());
-                       int deserializedNamepsace = 
RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, 
new DataInputViewStreamWrapper(inputStream), false);
+                       outputView.reset();
+                       
RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, 
IntSerializer.INSTANCE, outputView, false);
+                       inputView.setData(outputView.toByteArray());
+                       int deserializedNamepsace = 
RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, 
false);
                        Assert.assertEquals(orgNamespace, 
deserializedNamepsace);
 
-                       
RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, 
IntSerializer.INSTANCE, outputStream, outputView, true);
-                       inputStream = new 
ByteArrayInputStreamWithPos(outputStream.toByteArray());
-                       deserializedNamepsace = 
RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputStream, 
new DataInputViewStreamWrapper(inputStream), true);
+                       
RocksDBKeySerializationUtils.writeNameSpace(orgNamespace, 
IntSerializer.INSTANCE, outputView, true);
+                       inputView.setData(outputView.toByteArray());
+                       deserializedNamepsace = 
RocksDBKeySerializationUtils.readNamespace(IntSerializer.INSTANCE, inputView, 
true);
                        Assert.assertEquals(orgNamespace, 
deserializedNamepsace);
                }
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
similarity index 91%
rename from 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
rename to 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index f56099827ac..e042ebd0609 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -23,8 +23,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -47,7 +47,7 @@
 /**
  * Tests for the RocksIteratorWrapper.
  */
-public class RocksDBRocksIteratorForKeysWrapperTest {
+public class RocksDBRocksStateKeysIteratorTest {
 
        @Rule
        public final TemporaryFolder tmp = new TemporaryFolder();
@@ -105,13 +105,12 @@ public void testIterator() throws Exception{
                                testState.update(String.valueOf(i));
                        }
 
-                       ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(8);
+                       ByteArrayDataOutputView outputStream = new 
ByteArrayDataOutputView(8);
                        boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
                        RocksDBKeySerializationUtils.writeNameSpace(
                                namespace,
                                namespaceSerializer,
                                outputStream,
-                               new DataOutputViewStreamWrapper(outputStream),
                                ambiguousKeyPossible);
 
                        byte[] nameSpaceBytes = outputStream.toByteArray();
@@ -119,8 +118,8 @@ public void testIterator() throws Exception{
                        try (
                                ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
                                RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
-                               
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
-                                       new 
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+                               RocksStateKeysIterator<K> iteratorWrapper =
+                                       new RocksStateKeysIterator<>(
                                                iterator,
                                                testStateName,
                                                keySerializer,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
similarity index 91%
rename from 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
rename to 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index cb2b202bc63..e1240a8bfad 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.util.IOUtils;
 
@@ -39,9 +40,9 @@
 import java.util.Random;
 
 /**
- * Tests for the RocksDBMergeIterator.
+ * Tests for the RocksStatesPerKeyGroupMergeIterator.
  */
-public class RocksDBMergeIteratorTest {
+public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
        private static final int NUM_KEY_VAL_STATES = 50;
        private static final int MAX_NUM_KEYS = 20;
@@ -51,8 +52,8 @@
 
        @Test
        public void testEmptyMergeIterator() throws Exception {
-               RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
-                               new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
+               RocksStatesPerKeyGroupMergeIterator emptyIterator =
+                               new 
RocksStatesPerKeyGroupMergeIterator(Collections.emptyList(), 2);
                Assert.assertFalse(emptyIterator.isValid());
        }
 
@@ -111,7 +112,7 @@ public void testMergeIterator(int maxParallelism) throws 
Exception {
                                ++id;
                        }
 
-                       try (RocksDBKeyedStateBackend.RocksDBMergeIterator 
mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
+                       try (RocksStatesPerKeyGroupMergeIterator mergeIterator 
= new RocksStatesPerKeyGroupMergeIterator(
                                rocksIteratorsWithKVStateId,
                                maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to