tzulitai closed pull request #7428: [FLINK-11280] [state backends] Let 
RocksDBSerializedCompositeKeyBuilder accept key serializer lazily
URL: https://github.com/apache/flink/pull/7428
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index fb19a8a2816..92e1e4d25a0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -134,11 +134,11 @@ public void setCurrentNamespace(N namespace) {
 
                RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
                                                new 
RocksDBSerializedCompositeKeyBuilder<>(
-                                                       safeKeySerializer,
                                                        
backend.getKeyGroupPrefixBytes(),
-                                                       32
+                                                       32,
+                                                       
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(safeKeySerializer)
                                                );
-               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup, 
safeKeySerializer);
                byte[] key = 
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
                return backend.db.get(columnFamily, key);
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a47b306596..e033e1cfbab 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -297,7 +297,10 @@ public RocksDBKeyedStateBackend(
                this.kvStateInformation = new LinkedHashMap<>();
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
-               this.sharedRocksKeyBuilder = new 
RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
+               this.sharedRocksKeyBuilder = new 
RocksDBSerializedCompositeKeyBuilder<>(
+                       keyGroupPrefixBytes,
+                       32,
+                       
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer));
 
                this.metricOptions = metricOptions;
                this.metricGroup = metricGroup;
@@ -339,8 +342,9 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
                        (RegisteredKeyValueStateBackendMetaInfo<N, ?>) 
columnInfo.f1;
 
                final TypeSerializer<N> namespaceSerializer = 
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
+               final TypeSerializer<K> keySerializer = getKeySerializer();
                final DataOutputSerializer namespaceOutputView = new 
DataOutputSerializer(8);
-               boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), 
namespaceSerializer);
+               boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
                final byte[] nameSpaceBytes;
                try {
                        RocksDBKeySerializationUtils.writeNameSpace(
@@ -356,7 +360,7 @@ private static void checkAndCreateDirectory(File directory) 
throws IOException {
                RocksIteratorWrapper iterator = getRocksIterator(db, 
columnInfo.f0);
                iterator.seekToFirst();
 
-               final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, getKeySerializer(), 
keyGroupPrefixBytes,
+               final RocksStateKeysIterator<K> iteratorWrapper = new 
RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
                        ambiguousKeyPossible, nameSpaceBytes);
 
                Stream<K> targetStream = 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 
Spliterator.ORDERED), false);
@@ -380,7 +384,7 @@ private void registerKvStateInformation(String 
columnFamilyName, Tuple2<ColumnFa
        @Override
        public void setCurrentKey(K newKey) {
                super.setCurrentKey(newKey);
-               sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), 
getCurrentKeyGroupIndex());
+               sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), 
getCurrentKeyGroupIndex(), getKeySerializer());
        }
 
        /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 4c174d43ef8..7b1e4594832 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -261,11 +261,11 @@ public void clear() {
 
                RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
                        new RocksDBSerializedCompositeKeyBuilder<>(
-                               safeKeySerializer,
                                backend.getKeyGroupPrefixBytes(),
-                               32);
+                               32,
+                               
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(safeKeySerializer));
 
-               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+               keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup, 
safeKeySerializer);
 
                final byte[] keyPrefixBytes = 
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
index 8e83e292820..16f2a095721 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -39,10 +39,6 @@
 @Internal
 class RocksDBSerializedCompositeKeyBuilder<K> {
 
-       /** The serializer for the key. */
-       @Nonnull
-       private final TypeSerializer<K> keySerializer;
-
        /** The output to write the key into. */
        @Nonnull
        private final DataOutputSerializer keyOutView;
@@ -59,25 +55,22 @@
        private int afterKeyMark;
 
        public RocksDBSerializedCompositeKeyBuilder(
-               @Nonnull TypeSerializer<K> keySerializer,
                @Nonnegative int keyGroupPrefixBytes,
-               @Nonnegative int initialSize) {
+               @Nonnegative int initialSize,
+               boolean keySerializerTypeVariableSized) {
                this(
-                       keySerializer,
                        new DataOutputSerializer(initialSize),
                        keyGroupPrefixBytes,
-                       
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+                       keySerializerTypeVariableSized,
                        0);
        }
 
        @VisibleForTesting
        RocksDBSerializedCompositeKeyBuilder(
-               @Nonnull TypeSerializer<K> keySerializer,
                @Nonnull DataOutputSerializer keyOutView,
                @Nonnegative int keyGroupPrefixBytes,
                boolean keySerializerTypeVariableSized,
                @Nonnegative int afterKeyMark) {
-               this.keySerializer = keySerializer;
                this.keyOutView = keyOutView;
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.keySerializerTypeVariableSized = 
keySerializerTypeVariableSized;
@@ -91,9 +84,9 @@ public RocksDBSerializedCompositeKeyBuilder(
         * @param key        the key.
         * @param keyGroupId the key-group id for the key.
         */
-       public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int 
keyGroupId) {
+       public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int 
keyGroupId, @Nonnull TypeSerializer<K> keySerializer) {
                try {
-                       serializeKeyGroupAndKey(key, keyGroupId);
+                       serializeKeyGroupAndKey(key, keyGroupId, keySerializer);
                } catch (IOException shouldNeverHappen) {
                        throw new FlinkRuntimeException(shouldNeverHappen);
                }
@@ -101,7 +94,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative 
int keyGroupId) {
 
        /**
         * Returns a serialized composite key, from the key and key-group 
provided in a previous call to
-        * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+        * {@link #setKeyAndKeyGroup(Object, int, TypeSerializer)} and the 
given namespace.
         *
         * @param namespace           the namespace to concatenate for the 
serialized composite key bytes.
         * @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
@@ -122,7 +115,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative 
int keyGroupId) {
 
        /**
         * Returns a serialized composite key, from the key and key-group 
provided in a previous call to
-        * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace, 
folloed by the given user-key.
+        * {@link #setKeyAndKeyGroup(Object, int, TypeSerializer)} and the 
given namespace, folloed by the given user-key.
         *
         * @param namespace           the namespace to concatenate for the 
serialized composite key bytes.
         * @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
@@ -145,7 +138,7 @@ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative 
int keyGroupId) {
                return result;
        }
 
-       private void serializeKeyGroupAndKey(K key, int keyGroupId) throws 
IOException {
+       private void serializeKeyGroupAndKey(K key, int keyGroupId, 
TypeSerializer<K> keySerializer) throws IOException {
 
                // clear buffer and mark
                resetFully();
@@ -198,9 +191,4 @@ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> 
namespaceSerializer) {
                return keySerializerTypeVariableSized &
                        
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
        }
-
-       @VisibleForTesting
-       boolean isKeySerializerTypeVariableSized() {
-               return keySerializerTypeVariableSized;
-       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
index 70a4c3cfef7..ba91564a544 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -88,7 +88,7 @@ public void testSetKeyNamespaceUserKey() throws IOException {
 
                final DataInputDeserializer deserializer = new 
DataInputDeserializer();
                for (K testKey : testKeys) {
-                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, serializer, maxParallelism);
                        byte[] result = dataOutputSerializer.getCopyOfBuffer();
                        deserializer.setBuffer(result);
                        assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes, 
serializer, deserializer, false);
@@ -112,7 +112,7 @@ public void testSetKeyNamespaceUserKey() throws IOException 
{
                final boolean ambiguousPossible = 
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
 
                for (K testKey : testKeys) {
-                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, keySerializer, maxParallelism);
                        for (N testNamespace : testNamespaces) {
                                byte[] compositeBytes = 
keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
                                deserializer.setBuffer(compositeBytes);
@@ -148,7 +148,7 @@ public void testSetKeyNamespaceUserKey() throws IOException 
{
                final boolean ambiguousPossible = 
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
 
                for (K testKey : testKeys) {
-                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, maxParallelism);
+                       int keyGroup = setKeyAndReturnKeyGroup(keyBuilder, 
testKey, keySerializer, maxParallelism);
                        for (N testNamespace : testNamespaces) {
                                for (U testUserKey : testUserKeys) {
                                        byte[] compositeBytes = 
keyBuilder.buildCompositeKeyNamesSpaceUserKey(
@@ -181,7 +181,6 @@ public void testSetKeyNamespaceUserKey() throws IOException 
{
                int prefixBytes) {
                final boolean variableSize = 
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
                return new RocksDBSerializedCompositeKeyBuilder<>(
-                       serializer,
                        dataOutputSerializer,
                        prefixBytes,
                        variableSize,
@@ -191,10 +190,11 @@ public void testSetKeyNamespaceUserKey() throws 
IOException {
        private <K> int setKeyAndReturnKeyGroup(
                RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
                K key,
+               TypeSerializer<K> keySerializer,
                int maxParallelism) {
 
                int keyGroup = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
maxParallelism);
-               compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+               compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup, 
keySerializer);
                return keyGroup;
        }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to