asfgit closed pull request #7288: [FLINK-9702] Improvement in (de)serialization 
of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 6668c572930..55ba8ab477f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -25,6 +25,7 @@
 
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,16 +87,21 @@ public boolean isImmutableType() {
 
        @Override
        public C[] copy(C[] from) {
-               C[] copy = create(from.length);
 
-               for (int i = 0; i < copy.length; i++) {
-                       C val = from[i];
-                       if (val != null) {
-                               copy[i] = this.componentSerializer.copy(val);
+               final TypeSerializer<C> serializer = this.componentSerializer;
+
+               if (serializer.isImmutableType()) {
+                       return Arrays.copyOf(from, from.length);
+               } else {
+                       C[] copy = create(from.length);
+                       for (int i = 0; i < copy.length; i++) {
+                               C val = from[i];
+                               if (val != null) {
+                                       copy[i] = serializer.copy(val);
+                               }
                        }
+                       return copy;
                }
-
-               return copy;
        }
        
        @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5f360..2f4fdfe5b1e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 
@@ -112,7 +111,7 @@ public void setPosition(int position) {
        }
 
        @Override
-       public void close() throws IOException {
+       public void close() {
        }
 
        public byte[] getBuf() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae03380..b749c84711c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.core.memory;
 
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes) 
throws IOException {
                this.position += numBytes;
        }
 
+       public void setPosition(int position) {
+               Preconditions.checkArgument(position >= 0 && position <= 
this.position, "Position out of bounds.");
+               this.position = position;
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 8c0f4d7da7b..04e665a6e87 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -28,8 +27,8 @@
 
 import java.io.IOException;
 
-abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends 
State>
-       extends AbstractRocksDBState<K, N, SV, S>
+abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT>
+       extends AbstractRocksDBState<K, N, SV>
        implements InternalAppendingState<K, N, IN, SV, OUT> {
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 2218bc0d3e0..fb19a8a2816 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,6 +34,7 @@
 import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Base class for {@link State} implementations that store state in a RocksDB 
database.
@@ -44,9 +45,8 @@
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of values kept internally in state.
- * @param <S> The type of {@link State}.
  */
-public abstract class AbstractRocksDBState<K, N, V, S extends State> 
implements InternalKvState<K, N, V>, State {
+public abstract class AbstractRocksDBState<K, N, V> implements 
InternalKvState<K, N, V>, State {
 
        /** Serializer for the namespace. */
        final TypeSerializer<N> namespaceSerializer;
@@ -71,7 +71,7 @@
 
        protected final DataInputDeserializer dataInputView;
 
-       private final boolean ambiguousKeyPossible;
+       private final RocksDBSerializedCompositeKeyBuilder<K> 
sharedKeyNamespaceSerializer;
 
        /**
         * Creates a new RocksDB backed state.
@@ -100,8 +100,7 @@ protected AbstractRocksDBState(
 
                this.dataOutputView = new DataOutputSerializer(128);
                this.dataInputView = new DataInputDeserializer();
-               this.ambiguousKeyPossible =
-                       
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), 
namespaceSerializer);
+               this.sharedKeyNamespaceSerializer = 
backend.getSharedRocksKeyBuilder();
        }
 
        // 
------------------------------------------------------------------------
@@ -109,17 +108,15 @@ protected AbstractRocksDBState(
        @Override
        public void clear() {
                try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = dataOutputView.getCopyOfBuffer();
-                       backend.db.delete(columnFamily, writeOptions, key);
-               } catch (IOException | RocksDBException e) {
+                       backend.db.delete(columnFamily, writeOptions, 
serializeCurrentKeyWithGroupAndNamespace());
+               } catch (RocksDBException e) {
                        throw new FlinkRuntimeException("Error while removing 
entry from RocksDB", e);
                }
        }
 
        @Override
        public void setCurrentNamespace(N namespace) {
-               this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace");
+               this.currentNamespace = namespace;
        }
 
        @Override
@@ -129,30 +126,78 @@ public void setCurrentNamespace(N namespace) {
                        final TypeSerializer<N> safeNamespaceSerializer,
                        final TypeSerializer<V> safeValueSerializer) throws 
Exception {
 
-               Preconditions.checkNotNull(serializedKeyAndNamespace);
-               Preconditions.checkNotNull(safeKeySerializer);
-               Preconditions.checkNotNull(safeNamespaceSerializer);
-               Preconditions.checkNotNull(safeValueSerializer);
-
                //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
                Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
                                serializedKeyAndNamespace, safeKeySerializer, 
safeNamespaceSerializer);
 
                int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, 
backend.getNumberOfKeyGroups());
 
-               // we cannot reuse the keySerializationStream member since this 
method
-               // is called concurrently to the other ones and it may thus 
contain garbage
-               DataOutputSerializer tmpKeySerializationView = new 
DataOutputSerializer(128);
+               RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+                                               new 
RocksDBSerializedCompositeKeyBuilder<>(
+                                                       safeKeySerializer,
+                                                       
backend.getKeyGroupPrefixBytes(),
+                                                       32
+                                               );
+               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+               byte[] key = 
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+               return backend.db.get(columnFamily, key);
+       }
+
+       <UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+               UK userKey,
+               TypeSerializer<UK> userKeySerializer) throws IOException {
+               return 
sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+                       currentNamespace,
+                       namespaceSerializer,
+                       userKey,
+                       userKeySerializer
+               );
+       }
+
+       private <T> byte[] serializeValueInternal(T value, TypeSerializer<T> 
serializer) throws IOException {
+               serializer.serialize(value, dataOutputView);
+               return dataOutputView.getCopyOfBuffer();
+       }
+
+       byte[] serializeCurrentKeyWithGroupAndNamespace() {
+               return 
sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, 
namespaceSerializer);
+       }
+
+       byte[] serializeValue(V value) throws IOException {
+               return serializeValue(value, valueSerializer);
+       }
+
+       <T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> 
serializer) throws IOException {
+               dataOutputView.clear();
+               dataOutputView.writeBoolean(value == null);
+               return serializeValueInternal(value, serializer);
+       }
+
+       <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws 
IOException {
+               dataOutputView.clear();
+               return serializeValueInternal(value, serializer);
+       }
+
+       <T> byte[] serializeValueList(
+               List<T> valueList,
+               TypeSerializer<T> elementSerializer,
+               byte delimiter) throws IOException {
+
+               dataOutputView.clear();
+               boolean first = true;
 
-               writeKeyWithGroupAndNamespace(
-                               keyGroup,
-                               keyAndNamespace.f0,
-                               safeKeySerializer,
-                               keyAndNamespace.f1,
-                               safeNamespaceSerializer,
-                               tmpKeySerializationView);
+               for (T value : valueList) {
+                       Preconditions.checkNotNull(value, "You cannot add null 
to a value list.");
+
+                       if (first) {
+                               first = false;
+                       } else {
+                               dataOutputView.write(delimiter);
+                       }
+                       elementSerializer.serialize(value, dataOutputView);
+               }
 
-               return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
+               return dataOutputView.getCopyOfBuffer();
        }
 
        public void migrateSerializedValue(
@@ -170,12 +215,7 @@ public void migrateSerializedValue(
        }
 
        byte[] getKeyBytes() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       return dataOutputView.getCopyOfBuffer();
-               } catch (IOException e) {
-                       throw new FlinkRuntimeException("Error while 
serializing key", e);
-               }
+               return serializeCurrentKeyWithGroupAndNamespace();
        }
 
        byte[] getValueBytes(V value) {
@@ -188,45 +228,6 @@ public void migrateSerializedValue(
                }
        }
 
-       protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
-               writeKeyWithGroupAndNamespace(
-                       backend.getCurrentKeyGroupIndex(),
-                       backend.getCurrentKey(),
-                       currentNamespace,
-                       dataOutputView);
-       }
-
-       protected void writeKeyWithGroupAndNamespace(
-                       int keyGroup, K key, N namespace,
-                       DataOutputSerializer keySerializationDataOutputView) 
throws IOException {
-
-               writeKeyWithGroupAndNamespace(
-                               keyGroup,
-                               key,
-                               backend.getKeySerializer(),
-                               namespace,
-                               namespaceSerializer,
-                               keySerializationDataOutputView);
-       }
-
-       protected void writeKeyWithGroupAndNamespace(
-                       final int keyGroup,
-                       final K key,
-                       final TypeSerializer<K> keySerializer,
-                       final N namespace,
-                       final TypeSerializer<N> namespaceSerializer,
-                       final DataOutputSerializer 
keySerializationDataOutputView) throws IOException {
-
-               Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
-               Preconditions.checkNotNull(keySerializer);
-               Preconditions.checkNotNull(namespaceSerializer);
-
-               keySerializationDataOutputView.clear();
-               RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, 
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
-               RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationDataOutputView, ambiguousKeyPossible);
-               RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
-       }
-
        protected V getDefaultValue() {
                if (defaultValue != null) {
                        return valueSerializer.copy(defaultValue);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 2085fb86256..fbd2979feb1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -43,7 +43,7 @@
  * @param <R> The type of the value returned from the state
  */
 class RocksDBAggregatingState<K, N, T, ACC, R>
-       extends AbstractRocksDBAppendingState<K, N, T, ACC, R, 
AggregatingState<T, R>>
+       extends AbstractRocksDBAppendingState<K, N, T, ACC, R>
        implements InternalAggregatingState<K, N, T, ACC, R> {
 
        /** User-specified aggregation function. */
@@ -109,19 +109,14 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        return;
                }
 
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
                try {
                        ACC current = null;
 
                        // merge the sources to the target
                        for (N source : sources) {
                                if (source != null) {
-                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
-
-                                       final byte[] sourceKey = 
dataOutputView.getCopyOfBuffer();
+                                       setCurrentNamespace(source);
+                                       final byte[] sourceKey = 
serializeCurrentKeyWithGroupAndNamespace();
                                        final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
@@ -141,10 +136,9 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
 
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
+                               setCurrentNamespace(target);
                                // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(keyGroup, key, 
target, dataOutputView);
-
-                               final byte[] targetKey = 
dataOutputView.getCopyOfBuffer();
+                               final byte[] targetKey = 
serializeCurrentKeyWithGroupAndNamespace();
                                final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
 
                                if (targetValueBytes != null) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 4d6635704b9..c5e830fcedf 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -42,7 +42,7 @@
  */
 @Deprecated
 class RocksDBFoldingState<K, N, T, ACC>
-       extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC, 
FoldingState<T, ACC>>
+       extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC>
        implements InternalFoldingState<K, N, T, ACC> {
 
        /** User-specified fold function. */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bfece1..02b4ffcecfd 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 /**
@@ -80,8 +82,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, 
DataInputView inputView) throws
                }
        }
 
+       public static boolean isSerializerTypeVariableSized(@Nonnull 
TypeSerializer<?> serializer) {
+               return serializer.getLength() < 0;
+       }
+
        public static boolean isAmbiguousKeyPossible(TypeSerializer 
keySerializer, TypeSerializer namespaceSerializer) {
-               return (keySerializer.getLength() < 0) && 
(namespaceSerializer.getLength() < 0);
+               return (isSerializerTypeVariableSized(keySerializer) && 
isSerializerTypeVariableSized(namespaceSerializer));
        }
 
        public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public static void writeKeyGroup(
                }
        }
 
-       private static void readVariableIntBytes(DataInputView inputView, int 
value) throws IOException {
+       public static void readVariableIntBytes(DataInputView inputView, int 
value) throws IOException {
                do {
                        inputView.readByte();
                        value >>>= 8;
@@ -122,7 +128,7 @@ private static void writeLengthFrom(
                writeVariableIntBytes(length, 
keySerializationDateDataOutputView);
        }
 
-       private static void writeVariableIntBytes(
+       public static void writeVariableIntBytes(
                int value,
                DataOutputView keySerializationDateDataOutputView)
                throws IOException {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 50caa0d912a..f5532e74dd4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -249,6 +249,9 @@
        /** The native metrics monitor. */
        private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
+       /** Helper to build the byte arrays of composite keys to address data 
in RocksDB. Shared across all states.*/
+       private final RocksDBSerializedCompositeKeyBuilder<K> 
sharedRocksKeyBuilder;
+
        public RocksDBKeyedStateBackend(
                String operatorIdentifier,
                ClassLoader userCodeClassLoader,
@@ -300,6 +303,7 @@ public RocksDBKeyedStateBackend(
                this.restoredKvStateMetaInfos = new HashMap<>();
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
+               this.sharedRocksKeyBuilder = new 
RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
                this.metricOptions = metricOptions;
                this.metricGroup = metricGroup;
@@ -379,6 +383,12 @@ private void registerKvStateInformation(String 
columnFamilyName, Tuple2<ColumnFa
                }
        }
 
+       @Override
+       public void setCurrentKey(K newKey) {
+               super.setCurrentKey(newKey);
+               sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), 
getCurrentKeyGroupIndex());
+       }
+
        /**
         * Should only be called by one thread, and only after all accesses to 
the DB happened.
         */
@@ -463,6 +473,10 @@ public WriteOptions getWriteOptions() {
                return writeOptions;
        }
 
+       RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+               return sharedRocksKeyBuilder;
+       }
+
        /**
         * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can be canceled and
         * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always
@@ -1479,7 +1493,7 @@ private void copyStateDataHandleData(
                }
 
                @SuppressWarnings("unchecked")
-               AbstractRocksDBState<?, ?, SV, S> rocksDBState = 
(AbstractRocksDBState<?, ?, SV, S>) state;
+               AbstractRocksDBState<?, ?, SV> rocksDBState = 
(AbstractRocksDBState<?, ?, SV>) state;
 
                Snapshot rocksDBSnapshot = db.getSnapshot();
                try (
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 6904c853fe3..13f5559405e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -59,7 +59,7 @@
  * @param <V> The type of the values in the list state.
  */
 class RocksDBListState<K, N, V>
-       extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
+       extends AbstractRocksDBState<K, N, List<V>>
        implements InternalListState<K, N, V> {
 
        /** Serializer for the values. */
@@ -115,11 +115,10 @@ private RocksDBListState(
        @Override
        public List<V> getInternal() {
                try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = dataOutputView.getCopyOfBuffer();
+                       byte[] key = serializeCurrentKeyWithGroupAndNamespace();
                        byte[] valueBytes = backend.db.get(columnFamily, key);
                        return deserializeList(valueBytes);
-               } catch (IOException | RocksDBException e) {
+               } catch (RocksDBException e) {
                        throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
                }
        }
@@ -160,11 +159,12 @@ public void add(V value) {
                Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
 
                try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = dataOutputView.getCopyOfBuffer();
-                       dataOutputView.clear();
-                       elementSerializer.serialize(value, dataOutputView);
-                       backend.db.merge(columnFamily, writeOptions, key, 
dataOutputView.getCopyOfBuffer());
+                       backend.db.merge(
+                               columnFamily,
+                               writeOptions,
+                               serializeCurrentKeyWithGroupAndNamespace(),
+                               serializeValue(value, elementSerializer)
+                       );
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Error while adding 
data to RocksDB", e);
                }
@@ -176,21 +176,17 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        return;
                }
 
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
                try {
                        // create the target full-binary-key
-                       writeKeyWithGroupAndNamespace(keyGroup, key, target, 
dataOutputView);
-                       final byte[] targetKey = 
dataOutputView.getCopyOfBuffer();
+                       setCurrentNamespace(target);
+                       final byte[] targetKey = 
serializeCurrentKeyWithGroupAndNamespace();
 
                        // merge the sources to the target
                        for (N source : sources) {
                                if (source != null) {
-                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
+                                       setCurrentNamespace(source);
+                                       final byte[] sourceKey = 
serializeCurrentKeyWithGroupAndNamespace();
 
-                                       byte[] sourceKey = 
dataOutputView.getCopyOfBuffer();
                                        byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
@@ -218,10 +214,11 @@ public void updateInternal(List<V> values) {
 
                if (!values.isEmpty()) {
                        try {
-                               writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = dataOutputView.getCopyOfBuffer();
-                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, dataOutputView);
-                               backend.db.put(columnFamily, writeOptions, key, 
premerge);
+                               backend.db.put(
+                                       columnFamily,
+                                       writeOptions,
+                                       
serializeCurrentKeyWithGroupAndNamespace(),
+                                       serializeValueList(values, 
elementSerializer, DELIMITER));
                        } catch (IOException | RocksDBException e) {
                                throw new FlinkRuntimeException("Error while 
updating data to RocksDB", e);
                        }
@@ -234,10 +231,11 @@ public void addAll(List<V> values) {
 
                if (!values.isEmpty()) {
                        try {
-                               writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = dataOutputView.getCopyOfBuffer();
-                               byte[] premerge = getPreMergedValue(values, 
elementSerializer, dataOutputView);
-                               backend.db.merge(columnFamily, writeOptions, 
key, premerge);
+                               backend.db.merge(
+                                       columnFamily,
+                                       writeOptions,
+                                       
serializeCurrentKeyWithGroupAndNamespace(),
+                                       serializeValueList(values, 
elementSerializer, DELIMITER));
                        } catch (IOException | RocksDBException e) {
                                throw new FlinkRuntimeException("Error while 
updating data to RocksDB", e);
                        }
@@ -273,26 +271,6 @@ public void migrateSerializedValue(
                }
        }
 
-       private static <V> byte[] getPreMergedValue(
-               List<V> values,
-               TypeSerializer<V> elementSerializer,
-               DataOutputSerializer keySerializationStream) throws IOException 
{
-
-               keySerializationStream.clear();
-               boolean first = true;
-               for (V value : values) {
-                       Preconditions.checkNotNull(value, "You cannot add null 
to a ListState.");
-                       if (first) {
-                               first = false;
-                       } else {
-                               keySerializationStream.write(DELIMITER);
-                       }
-                       elementSerializer.serialize(value, 
keySerializationStream);
-               }
-
-               return keySerializationStream.getCopyOfBuffer();
-       }
-
        @SuppressWarnings("unchecked")
        static <E, K, N, SV, S extends State, IS extends S> IS create(
                StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ public void migrateSerializedValue(
                                prevPosition = in.getPosition();
                        }
                        try {
-                               return result.isEmpty() ? null : 
getPreMergedValue(result, elementSerializer, out);
+                               return result.isEmpty() ? null : 
serializeValueList(result, elementSerializer, DELIMITER);
                        } catch (IOException e) {
                                throw new FlinkRuntimeException("Failed to 
serialize transformed list", e);
                        }
                }
+
+               byte[] serializeValueList(
+                       List<T> valueList,
+                       TypeSerializer<T> elementSerializer,
+                       byte delimiter) throws IOException {
+
+                       out.clear();
+                       boolean first = true;
+
+                       for (T value : valueList) {
+                               Preconditions.checkNotNull(value, "You cannot 
add null to a value list.");
+
+                               if (first) {
+                                       first = false;
+                               } else {
+                                       out.write(delimiter);
+                               }
+                               elementSerializer.serialize(value, out);
+                       }
+
+                       return out.getCopyOfBuffer();
+               }
        }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index cb656b53b1b..13cbdedb1d9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -64,7 +64,7 @@
  * @param <UV> The type of the values in the map state.
  */
 class RocksDBMapState<K, N, UK, UV>
-       extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
+       extends AbstractRocksDBState<K, N, Map<UK, UV>>
        implements InternalMapState<K, N, UK, UV> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
@@ -119,7 +119,7 @@ private RocksDBMapState(
 
        @Override
        public UV get(UK userKey) throws IOException, RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawKeyBytes = 
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
                byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
 
                return (rawValueBytes == null ? null : 
deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ public UV get(UK userKey) throws IOException, 
RocksDBException {
        @Override
        public void put(UK userKey, UV userValue) throws IOException, 
RocksDBException {
 
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-               byte[] rawValueBytes = serializeUserValue(userValue, 
userValueSerializer, dataOutputView);
+               byte[] rawKeyBytes = 
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+               byte[] rawValueBytes = serializeValueNullSensitive(userValue, 
userValueSerializer);
 
                backend.db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
        }
@@ -142,8 +142,8 @@ public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
 
                try (RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
                        for (Map.Entry<UK, UV> entry : map.entrySet()) {
-                               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
-                               byte[] rawValueBytes = 
serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+                               byte[] rawKeyBytes = 
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(), 
userKeySerializer);
+                               byte[] rawValueBytes = 
serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                                writeBatchWrapper.put(columnFamily, 
rawKeyBytes, rawValueBytes);
                        }
                }
@@ -151,21 +151,21 @@ public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
 
        @Override
        public void remove(UK userKey) throws IOException, RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawKeyBytes = 
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
 
                backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
        }
 
        @Override
        public boolean contains(UK userKey) throws IOException, 
RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawKeyBytes = 
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
                byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
 
                return (rawValueBytes != null);
        }
 
        @Override
-       public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+       public Iterable<Map.Entry<UK, UV>> entries() {
                final Iterator<Map.Entry<UK, UV>> iterator = iterator();
 
                // Return null to make the behavior consistent with other 
states.
@@ -177,10 +177,11 @@ public boolean contains(UK userKey) throws IOException, 
RocksDBException {
        }
 
        @Override
-       public Iterable<UK> keys() throws IOException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+       public Iterable<UK> keys() {
+               final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
 
                return () -> new RocksDBMapIterator<UK>(backend.db, 
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+                       @Nullable
                        @Override
                        public UK next() {
                                RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ public UK next() {
        }
 
        @Override
-       public Iterable<UV> values() throws IOException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+       public Iterable<UV> values() {
+               final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
 
                return () -> new RocksDBMapIterator<UV>(backend.db, 
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
                        @Override
@@ -203,8 +204,8 @@ public UV next() {
        }
 
        @Override
-       public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+       public Iterator<Map.Entry<UK, UV>> iterator() {
+               final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
 
                return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, 
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
                        @Override
@@ -220,7 +221,7 @@ public void clear() {
                        try (RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
                                WriteBatch writeBatch = new WriteBatch(128)) {
 
-                               final byte[] keyPrefixBytes = 
serializeCurrentKeyAndNamespace();
+                               final byte[] keyPrefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
                                iterator.seek(keyPrefixBytes);
 
                                while (iterator.isValid()) {
@@ -259,18 +260,15 @@ public void clear() {
 
                int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, 
backend.getNumberOfKeyGroups());
 
-               DataOutputSerializer outputView = new DataOutputSerializer(128);
-               DataInputDeserializer inputView = new DataInputDeserializer();
-
-               writeKeyWithGroupAndNamespace(
-                               keyGroup,
-                               keyAndNamespace.f0,
+               RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+                       new RocksDBSerializedCompositeKeyBuilder<>(
                                safeKeySerializer,
-                               keyAndNamespace.f1,
-                               safeNamespaceSerializer,
-                               outputView);
+                               backend.getKeyGroupPrefixBytes(),
+                               32);
+
+               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
 
-               final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+               final byte[] keyPrefixBytes = 
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 
                final MapSerializer<UK, UV> serializer = (MapSerializer<UK, 
UV>) safeValueSerializer;
 
@@ -282,7 +280,8 @@ public void clear() {
                                keyPrefixBytes,
                                dupUserKeySerializer,
                                dupUserValueSerializer,
-                               inputView) {
+                               dataInputView
+                       ) {
 
                        @Override
                        public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ public void clear() {
        //  Serialization Methods
        // 
------------------------------------------------------------------------
 
-       private byte[] serializeCurrentKeyAndNamespace() throws IOException {
-               writeCurrentKeyWithGroupAndNamespace();
-
-               return dataOutputView.getCopyOfBuffer();
-       }
-
-       private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
-               serializeCurrentKeyAndNamespace();
-               userKeySerializer.serialize(userKey, dataOutputView);
-
-               return dataOutputView.getCopyOfBuffer();
-       }
-
-       private static <UV> byte[] serializeUserValue(
-               UV userValue,
-               TypeSerializer<UV> valueSerializer,
-               DataOutputSerializer dataOutputView) throws IOException {
-
-               dataOutputView.clear();
-
-               if (userValue == null) {
-                       dataOutputView.writeBoolean(true);
-               } else {
-                       dataOutputView.writeBoolean(false);
-                       valueSerializer.serialize(userValue, dataOutputView);
-               }
-
-               return dataOutputView.getCopyOfBuffer();
-       }
-
        private static <UK> UK deserializeUserKey(
                DataInputDeserializer dataInputView,
                int userKeyOffset,
@@ -474,7 +443,7 @@ public UV setValue(UV value) {
 
                        try {
                                userValue = value;
-                               rawValueBytes = serializeUserValue(value, 
valueSerializer, dataOutputView);
+                               rawValueBytes = 
serializeValueNullSensitive(value, valueSerializer);
 
                                db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
                        } catch (IOException | RocksDBException e) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 138357b0d77..a5eb46eace6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -41,7 +41,7 @@
  * @param <V> The type of value that the state state stores.
  */
 class RocksDBReducingState<K, N, V>
-       extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
+       extends AbstractRocksDBAppendingState<K, N, V, V, V>
        implements InternalReducingState<K, N, V> {
 
        /** User-specified reduce function. */
@@ -102,20 +102,14 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        return;
                }
 
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
                try {
                        V current = null;
 
                        // merge the sources to the target
                        for (N source : sources) {
                                if (source != null) {
-
-                                       writeKeyWithGroupAndNamespace(keyGroup, 
key, source, dataOutputView);
-
-                                       final byte[] sourceKey = 
dataOutputView.getCopyOfBuffer();
+                                       setCurrentNamespace(source);
+                                       final byte[] sourceKey = 
serializeCurrentKeyWithGroupAndNamespace();
                                        final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
                                        backend.db.delete(columnFamily, 
writeOptions, sourceKey);
 
@@ -136,9 +130,8 @@ public void mergeNamespaces(N target, Collection<N> 
sources) {
                        // if something came out of merging the sources, merge 
it or write it to the target
                        if (current != null) {
                                // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(keyGroup, key, 
target, dataOutputView);
-
-                               final byte[] targetKey = 
dataOutputView.getCopyOfBuffer();
+                               setCurrentNamespace(target);
+                               final byte[] targetKey = 
serializeCurrentKeyWithGroupAndNamespace();
                                final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
 
                                if (targetValueBytes != null) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 00000000000..8e83e292820
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder<K> {
+
+       /** The serializer for the key. */
+       @Nonnull
+       private final TypeSerializer<K> keySerializer;
+
+       /** The output to write the key into. */
+       @Nonnull
+       private final DataOutputSerializer keyOutView;
+
+       /** The number of Key-group-prefix bytes for the key. */
+       @Nonnegative
+       private final int keyGroupPrefixBytes;
+
+       /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+       private final boolean keySerializerTypeVariableSized;
+
+       /** Mark for the position after the serialized key. */
+       @Nonnegative
+       private int afterKeyMark;
+
+       public RocksDBSerializedCompositeKeyBuilder(
+               @Nonnull TypeSerializer<K> keySerializer,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnegative int initialSize) {
+               this(
+                       keySerializer,
+                       new DataOutputSerializer(initialSize),
+                       keyGroupPrefixBytes,
+                       
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+                       0);
+       }
+
+       @VisibleForTesting
+       RocksDBSerializedCompositeKeyBuilder(
+               @Nonnull TypeSerializer<K> keySerializer,
+               @Nonnull DataOutputSerializer keyOutView,
+               @Nonnegative int keyGroupPrefixBytes,
+               boolean keySerializerTypeVariableSized,
+               @Nonnegative int afterKeyMark) {
+               this.keySerializer = keySerializer;
+               this.keyOutView = keyOutView;
+               this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+               this.keySerializerTypeVariableSized = 
keySerializerTypeVariableSized;
+               this.afterKeyMark = afterKeyMark;
+       }
+
+       /**
+        * Sets the key and key-group as prefix. This will serialize them into 
the buffer and the will be used to create
+        * composite keys with provided namespaces.
+        *
+        * @param key        the key.
+        * @param keyGroupId the key-group id for the key.
+        */
+       public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int 
keyGroupId) {
+               try {
+                       serializeKeyGroupAndKey(key, keyGroupId);
+               } catch (IOException shouldNeverHappen) {
+                       throw new FlinkRuntimeException(shouldNeverHappen);
+               }
+       }
+
+       /**
+        * Returns a serialized composite key, from the key and key-group 
provided in a previous call to
+        * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+        *
+        * @param namespace           the namespace to concatenate for the 
serialized composite key bytes.
+        * @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
+        * @param <N>                 the type of the namespace.
+        * @return the bytes for the serialized composite key of key-group, 
key, namespace.
+        */
+       @Nonnull
+       public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, 
@Nonnull TypeSerializer<N> namespaceSerializer) {
+               try {
+                       serializeNamespace(namespace, namespaceSerializer);
+                       final byte[] result = keyOutView.getCopyOfBuffer();
+                       resetToKey();
+                       return result;
+               } catch (IOException shouldNeverHappen) {
+                       throw new FlinkRuntimeException(shouldNeverHappen);
+               }
+       }
+
+       /**
+        * Returns a serialized composite key, from the key and key-group 
provided in a previous call to
+        * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, 
folloed by the given user-key.
+        *
+        * @param namespace           the namespace to concatenate for the 
serialized composite key bytes.
+        * @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
+        * @param userKey             the user-key to concatenate for the 
serialized composite key, after the namespace.
+        * @param userKeySerializer   the serializer to obtain the serialized 
form of the user-key.
+        * @param <N>                 the type of the namespace.
+        * @param <UK>                the type of the user-key.
+        * @return the bytes for the serialized composite key of key-group, 
key, namespace.
+        */
+       @Nonnull
+       public <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+               @Nonnull N namespace,
+               @Nonnull TypeSerializer<N> namespaceSerializer,
+               @Nonnull UK userKey,
+               @Nonnull TypeSerializer<UK> userKeySerializer) throws 
IOException {
+               serializeNamespace(namespace, namespaceSerializer);
+               userKeySerializer.serialize(userKey, keyOutView);
+               byte[] result = keyOutView.getCopyOfBuffer();
+               resetToKey();
+               return result;
+       }
+
+       private void serializeKeyGroupAndKey(K key, int keyGroupId) throws 
IOException {
+
+               // clear buffer and mark
+               resetFully();
+
+               // write key-group
+               RocksDBKeySerializationUtils.writeKeyGroup(
+                       keyGroupId,
+                       keyGroupPrefixBytes,
+                       keyOutView);
+               // write key
+               keySerializer.serialize(key, keyOutView);
+               afterKeyMark = keyOutView.length();
+       }
+
+       private <N> void serializeNamespace(
+               @Nonnull N namespace,
+               @Nonnull TypeSerializer<N> namespaceSerializer) throws 
IOException {
+
+               // this should only be called when there is already a key 
written so that we build the composite.
+               assert isKeyWritten();
+
+               final boolean ambiguousCompositeKeyPossible = 
isAmbiguousCompositeKeyPossible(namespaceSerializer);
+               if (ambiguousCompositeKeyPossible) {
+                       RocksDBKeySerializationUtils.writeVariableIntBytes(
+                               afterKeyMark - keyGroupPrefixBytes,
+                               keyOutView);
+               }
+               RocksDBKeySerializationUtils.writeNameSpace(
+                       namespace,
+                       namespaceSerializer,
+                       keyOutView,
+                       ambiguousCompositeKeyPossible);
+       }
+
+       private void resetFully() {
+               afterKeyMark = 0;
+               keyOutView.clear();
+       }
+
+       private void resetToKey() {
+               keyOutView.setPosition(afterKeyMark);
+       }
+
+       private boolean isKeyWritten() {
+               return afterKeyMark > 0;
+       }
+
+       @VisibleForTesting
+       boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> 
namespaceSerializer) {
+               return keySerializerTypeVariableSized &
+                       
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+       }
+
+       @VisibleForTesting
+       boolean isKeySerializerTypeVariableSized() {
+               return keySerializerTypeVariableSized;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 0ca90d4a521..a8f5163a7fd 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -40,7 +40,7 @@
  * @param <V> The type of value that the state state stores.
  */
 class RocksDBValueState<K, N, V>
-       extends AbstractRocksDBState<K, N, V, ValueState<V>>
+       extends AbstractRocksDBState<K, N, V>
        implements InternalValueState<K, N, V> {
 
        /**
@@ -80,9 +80,9 @@ private RocksDBValueState(
        @Override
        public V value() {
                try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = dataOutputView.getCopyOfBuffer();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
+                       byte[] valueBytes = backend.db.get(columnFamily,
+                               serializeCurrentKeyWithGroupAndNamespace());
+
                        if (valueBytes == null) {
                                return getDefaultValue();
                        }
@@ -101,11 +101,9 @@ public void update(V value) {
                }
 
                try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = dataOutputView.getCopyOfBuffer();
-                       dataOutputView.clear();
-                       valueSerializer.serialize(value, dataOutputView);
-                       backend.db.put(columnFamily, writeOptions, key, 
dataOutputView.getCopyOfBuffer());
+                       backend.db.put(columnFamily, writeOptions,
+                               serializeCurrentKeyWithGroupAndNamespace(),
+                               serializeValue(value));
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Error while adding 
data to RocksDB", e);
                }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
new file mode 100644
index 00000000000..70a4c3cfef7
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+       private final DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(128);
+
+       private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+       private static final Collection<Integer> TEST_INTS = Arrays.asList(42, 
4711);
+       private static final Collection<String> TEST_STRINGS = 
Arrays.asList("test123", "abc");
+
+       @Before
+       public void before() {
+               dataOutputSerializer.clear();
+       }
+
+       @Test
+       public void testSetKey() throws IOException {
+               for (int parallelism : TEST_PARALLELISMS) {
+                       testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, 
parallelism);
+                       testSetKeyInternal(StringSerializer.INSTANCE, 
TEST_STRINGS, parallelism);
+               }
+       }
+
+       @Test
+       public void testSetKeyNamespace() throws IOException {
+               for (int parallelism : TEST_PARALLELISMS) {
+                       testSetKeyNamespaceInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+                       testSetKeyNamespaceInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+                       testSetKeyNamespaceInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+                       testSetKeyNamespaceInternal(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+               }
+       }
+
+       @Test
+       public void testSetKeyNamespaceUserKey() throws IOException {
+               for (int parallelism : TEST_PARALLELISMS) {
+                       
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, 
TEST_INTS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, 
TEST_INTS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, 
TEST_INTS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, 
TEST_INTS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, 
TEST_STRINGS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, 
TEST_STRINGS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, 
TEST_STRINGS, parallelism);
+                       
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, 
TEST_STRINGS, TEST_STRINGS, parallelism);
+               }
+       }
+
+       private <K> void testSetKeyInternal(TypeSerializer<K> serializer, 
Collection<K> testKeys, int maxParallelism) throws IOException {
+               final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+               RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+                       createRocksDBSerializedCompositeKeyBuilder(serializer, 
prefixBytes);
+
+               final DataInputDeserializer deserializer = new 
DataInputDeserializer();
+               for (K testKey : testKeys) {
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       byte[] result = dataOutputSerializer.getCopyOfBuffer();
+                       deserializer.setBuffer(result);
+                       assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, 
serializer, deserializer, false);
+                       Assert.assertEquals(0, deserializer.available());
+               }
+       }
+
+       private <K, N> void testSetKeyNamespaceInternal(
+               TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               Collection<K> testKeys,
+               Collection<N> testNamespaces,
+               int maxParallelism) throws IOException {
+               final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+               RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+                       
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+               final DataInputDeserializer deserializer = new 
DataInputDeserializer();
+
+               final boolean ambiguousPossible = 
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+               for (K testKey : testKeys) {
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       for (N testNamespace : testNamespaces) {
+                               byte[] compositeBytes = 
keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+                               deserializer.setBuffer(compositeBytes);
+                               assertKeyGroupKeyNamespaceBytes(
+                                       testKey,
+                                       keyGroup,
+                                       prefixBytes,
+                                       keySerializer,
+                                       testNamespace,
+                                       namespaceSerializer,
+                                       deserializer,
+                                       ambiguousPossible);
+                               Assert.assertEquals(0, 
deserializer.available());
+                       }
+               }
+       }
+
+       private <K, N, U> void testSetKeyNamespaceUserKeyInternal(
+               TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               TypeSerializer<U> userKeySerializer,
+               Collection<K> testKeys,
+               Collection<N> testNamespaces,
+               Collection<U> testUserKeys,
+               int maxParallelism) throws IOException {
+               final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+               RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+                       
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+               final DataInputDeserializer deserializer = new 
DataInputDeserializer();
+
+               final boolean ambiguousPossible = 
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+               for (K testKey : testKeys) {
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       for (N testNamespace : testNamespaces) {
+                               for (U testUserKey : testUserKeys) {
+                                       byte[] compositeBytes = 
keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+                                               testNamespace,
+                                               namespaceSerializer,
+                                               testUserKey,
+                                               userKeySerializer);
+
+                                       deserializer.setBuffer(compositeBytes);
+                                       assertKeyGroupKeyNamespaceUserKeyBytes(
+                                               testKey,
+                                               keyGroup,
+                                               prefixBytes,
+                                               keySerializer,
+                                               testNamespace,
+                                               namespaceSerializer,
+                                               testUserKey,
+                                               userKeySerializer,
+                                               deserializer,
+                                               ambiguousPossible);
+
+                                       Assert.assertEquals(0, 
deserializer.available());
+                               }
+                       }
+               }
+       }
+
+       private <K> RocksDBSerializedCompositeKeyBuilder<K> 
createRocksDBSerializedCompositeKeyBuilder(
+               TypeSerializer<K> serializer,
+               int prefixBytes) {
+               final boolean variableSize = 
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+               return new RocksDBSerializedCompositeKeyBuilder<>(
+                       serializer,
+                       dataOutputSerializer,
+                       prefixBytes,
+                       variableSize,
+                       0);
+       }
+
+       private <K> int setKeyAndReturnKeyGroup(
+               RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
+               K key,
+               int maxParallelism) {
+
+               int keyGroup = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
maxParallelism);
+               compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+               return keyGroup;
+       }
+
+       private <K> void assertKeyKeyGroupBytes(
+               K key,
+               int keyGroup,
+               int prefixBytes,
+               TypeSerializer<K> typeSerializer,
+               DataInputDeserializer deserializer,
+               boolean ambiguousCompositeKeyPossible) throws IOException {
+
+               Assert.assertEquals(keyGroup, 
RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+               Assert.assertEquals(key, 
RocksDBKeySerializationUtils.readKey(typeSerializer, deserializer, 
ambiguousCompositeKeyPossible));
+       }
+
+       private <K, N> void assertKeyGroupKeyNamespaceBytes(
+               K key,
+               int keyGroup,
+               int prefixBytes,
+               TypeSerializer<K> keySerializer,
+               N namespace,
+               TypeSerializer<N> namespaceSerializer,
+               DataInputDeserializer deserializer,
+               boolean ambiguousCompositeKeyPossible) throws IOException {
+               assertKeyKeyGroupBytes(key, keyGroup, prefixBytes, 
keySerializer, deserializer, ambiguousCompositeKeyPossible);
+               N readNamespace =
+                       
RocksDBKeySerializationUtils.readNamespace(namespaceSerializer, deserializer, 
ambiguousCompositeKeyPossible);
+               Assert.assertEquals(namespace, readNamespace);
+       }
+
+       private <K, N, U> void assertKeyGroupKeyNamespaceUserKeyBytes(
+               K key,
+               int keyGroup,
+               int prefixBytes,
+               TypeSerializer<K> keySerializer,
+               N namespace,
+               TypeSerializer<N> namespaceSerializer,
+               U userKey,
+               TypeSerializer<U> userKeySerializer,
+               DataInputDeserializer deserializer,
+               boolean ambiguousCompositeKeyPossible) throws IOException {
+               assertKeyGroupKeyNamespaceBytes(
+                       key,
+                       keyGroup,
+                       prefixBytes,
+                       keySerializer,
+                       namespace,
+                       namespaceSerializer,
+                       deserializer,
+                       ambiguousCompositeKeyPossible);
+               Assert.assertEquals(userKey, 
userKeySerializer.deserialize(deserializer));
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to