This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 82582b3a7b7 [FLINK-36364][state/forst] Do not reuse serialized key in 
Forst map state and/or other namespaces (#25394)
82582b3a7b7 is described below

commit 82582b3a7b75b7ffae9d48895088189b1324caa3
Author: Zakelly <[email protected]>
AuthorDate: Thu Sep 26 12:05:01 2024 +0800

    [FLINK-36364][state/forst] Do not reuse serialized key in Forst map state 
and/or other namespaces (#25394)
---
 .../org/apache/flink/state/forst/ContextKey.java   | 12 +++--
 .../flink/state/forst/ForStDBMapCheckRequest.java  |  4 +-
 .../apache/flink/state/forst/ForStListState.java   | 25 +++++----
 .../apache/flink/state/forst/ForStMapState.java    | 25 ++++-----
 .../flink/state/forst/ForStReducingState.java      | 25 +++++----
 .../flink/state/forst/ForStSerializerUtils.java    | 63 ++++++++++++++++++++++
 .../apache/flink/state/forst/ForStValueState.java  | 25 +++++----
 7 files changed, 131 insertions(+), 48 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
index 53e7573a118..f85927c4d26 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
@@ -87,16 +87,18 @@ public class ContextKey<K, N> {
     public byte[] getOrCreateSerializedKey(
             FunctionWithException<ContextKey<K, N>, byte[], IOException> 
serializeKeyFunc)
             throws IOException {
-        if (recordContext.getExtra() != null) {
-            return (byte[]) recordContext.getExtra();
+        byte[] serializedKey = (byte[]) recordContext.getExtra();
+        if (serializedKey != null) {
+            return serializedKey;
         }
         synchronized (recordContext) {
-            if (recordContext.getExtra() == null) {
-                byte[] serializedKey = serializeKeyFunc.apply(this);
+            serializedKey = (byte[]) recordContext.getExtra();
+            if (serializedKey == null) {
+                serializedKey = serializeKeyFunc.apply(this);
                 recordContext.setExtra(serializedKey);
             }
         }
-        return (byte[]) recordContext.getExtra();
+        return serializedKey;
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
index 003fad1010b..1c799253c2f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
@@ -37,6 +37,8 @@ import static 
org.apache.flink.state.forst.ForStDBIterRequest.startWithKeyPrefix
  */
 public class ForStDBMapCheckRequest<K, N, V> extends ForStDBGetRequest<K, N, 
V, Boolean> {
 
+    private static final byte[] VALID_PLACEHOLDER = new byte[0];
+
     /** Number of bytes required to prefix the key groups. */
     private final int keyGroupPrefixBytes;
 
@@ -60,7 +62,7 @@ public class ForStDBMapCheckRequest<K, N, V> extends 
ForStDBGetRequest<K, N, V,
             try (RocksIterator iter = db.newIterator(getColumnFamilyHandle())) 
{
                 iter.seek(key);
                 if (iter.isValid() && startWithKeyPrefix(key, iter.key(), 
keyGroupPrefixBytes)) {
-                    completeStateFuture(new byte[0]);
+                    completeStateFuture(VALID_PLACEHOLDER);
                 } else {
                     completeStateFuture(null);
                 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
index 7b5da10e960..d1ddfa9db54 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractListState;
 import org.apache.flink.runtime.state.v2.ListStateDescriptor;
 import org.apache.flink.util.Preconditions;
@@ -70,6 +72,9 @@ public class ForStListState<K, N, V> extends 
AbstractListState<K, N, V>
     /** The data inputStream used for value deserializer, which should be 
thread-safe. */
     private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
 
+    /** Whether to enable the reuse of serialized key(and namespace). */
+    private final boolean enableKeyReuse;
+
     public ForStListState(
             StateRequestHandler stateRequestHandler,
             ColumnFamilyHandle columnFamily,
@@ -86,6 +91,11 @@ public class ForStListState<K, N, V> extends 
AbstractListState<K, N, V>
         this.namespaceSerializer = 
ThreadLocal.withInitial(namespaceSerializerInitializer);
         this.valueSerializerView = 
ThreadLocal.withInitial(valueSerializerViewInitializer);
         this.valueDeserializerView = 
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+        // We only enable key reuse for the most common namespace across all 
states.
+        this.enableKeyReuse =
+                (defaultNamespace instanceof VoidNamespace)
+                        && (namespaceSerializerInitializer.get()
+                                instanceof VoidNamespaceSerializer);
     }
 
     @Override
@@ -95,15 +105,12 @@ public class ForStListState<K, N, V> extends 
AbstractListState<K, N, V>
 
     @Override
     public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException 
{
-        return contextKey.getOrCreateSerializedKey(
-                ctxKey -> {
-                    SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
-                    builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace();
-                    return builder.buildCompositeKeyNamespace(
-                            namespace == null ? defaultNamespace : namespace,
-                            namespaceSerializer.get());
-                });
+        return ForStSerializerUtils.serializeKeyAndNamespace(
+                contextKey,
+                serializedKeyBuilder.get(),
+                defaultNamespace,
+                namespaceSerializer.get(),
+                enableKeyReuse);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
index 2a2595cdfd6..05b448bc743 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
@@ -114,21 +114,16 @@ public class ForStMapState<K, N, UK, UV> extends 
AbstractMapState<K, N, UK, UV>
 
     @Override
     public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException 
{
-        contextKey.resetExtra();
-        return contextKey.getOrCreateSerializedKey(
-                ctxKey -> {
-                    SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
-                    builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace();
-                    builder.setNamespace(
-                            namespace == null ? defaultNamespace : namespace,
-                            namespaceSerializer.get());
-                    if (contextKey.getUserKey() == null) { // value get
-                        return builder.build();
-                    }
-                    UK userKey = (UK) contextKey.getUserKey(); // map get
-                    return builder.buildCompositeKeyUserKey(userKey, 
userKeySerializer);
-                });
+        SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get();
+        builder.setKeyAndKeyGroup(contextKey.getRawKey(), 
contextKey.getKeyGroup());
+        N namespace = contextKey.getNamespace();
+        builder.setNamespace(
+                namespace == null ? defaultNamespace : namespace, 
namespaceSerializer.get());
+        if (contextKey.getUserKey() == null) { // value get
+            return builder.build();
+        }
+        UK userKey = (UK) contextKey.getUserKey(); // map get
+        return builder.buildCompositeKeyUserKey(userKey, userKeySerializer);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
index 9861a2b451c..f640da7f9c1 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractReducingState;
 import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
 import org.apache.flink.util.Preconditions;
@@ -64,6 +66,9 @@ public class ForStReducingState<K, N, V> extends 
AbstractReducingState<K, N, V>
     /** The data inputStream used for value deserializer, which should be 
thread-safe. */
     private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
 
+    /** Whether to enable the reuse of serialized key(and namespace). */
+    private final boolean enableKeyReuse;
+
     public ForStReducingState(
             StateRequestHandler stateRequestHandler,
             ColumnFamilyHandle columnFamily,
@@ -80,6 +85,11 @@ public class ForStReducingState<K, N, V> extends 
AbstractReducingState<K, N, V>
         this.namespaceSerializer = 
ThreadLocal.withInitial(namespaceSerializerInitializer);
         this.valueSerializerView = 
ThreadLocal.withInitial(valueSerializerViewInitializer);
         this.valueDeserializerView = 
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+        // We only enable key reuse for the most common namespace across all 
states.
+        this.enableKeyReuse =
+                (defaultNamespace instanceof VoidNamespace)
+                        && (namespaceSerializerInitializer.get()
+                                instanceof VoidNamespaceSerializer);
     }
 
     @Override
@@ -89,15 +99,12 @@ public class ForStReducingState<K, N, V> extends 
AbstractReducingState<K, N, V>
 
     @Override
     public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException 
{
-        return contextKey.getOrCreateSerializedKey(
-                ctxKey -> {
-                    SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
-                    builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = ctxKey.getNamespace();
-                    return builder.buildCompositeKeyNamespace(
-                            namespace == null ? defaultNamespace : namespace,
-                            namespaceSerializer.get());
-                });
+        return ForStSerializerUtils.serializeKeyAndNamespace(
+                contextKey,
+                serializedKeyBuilder.get(),
+                defaultNamespace,
+                namespaceSerializer.get(),
+                enableKeyReuse);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
new file mode 100644
index 00000000000..d600b9b324d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+
+import java.io.IOException;
+
+/** A utility of serialization keys in ForSt. */
+public class ForStSerializerUtils {
+
+    /**
+     * Serialize a key and namespace. No user key.
+     *
+     * @param contextKey the context key of current request
+     * @param builder key builder
+     * @param defaultNamespace default namespace of the state
+     * @param namespaceSerializer the namespace serializer
+     * @param enableKeyReuse whether to enable key reuse
+     */
+    public static <K, N> byte[] serializeKeyAndNamespace(
+            ContextKey<K, N> contextKey,
+            SerializedCompositeKeyBuilder<K> builder,
+            N defaultNamespace,
+            TypeSerializer<N> namespaceSerializer,
+            boolean enableKeyReuse)
+            throws IOException {
+        N namespace = contextKey.getNamespace();
+        namespace = (namespace == null ? defaultNamespace : namespace);
+        if (enableKeyReuse && namespace == defaultNamespace) {
+            // key reuse.
+            return contextKey.getOrCreateSerializedKey(
+                    ctxKey -> {
+                        builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
+                        return builder.buildCompositeKeyNamespace(
+                                defaultNamespace, namespaceSerializer);
+                    });
+        } else {
+            // no key reuse, serialize again.
+            builder.setKeyAndKeyGroup(contextKey.getRawKey(), 
contextKey.getKeyGroup());
+            return builder.buildCompositeKeyNamespace(namespace, 
namespaceSerializer);
+        }
+    }
+
+    private ForStSerializerUtils() {}
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index 519f4634686..15dcd72cec6 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
 import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.v2.AbstractValueState;
 import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
 import org.apache.flink.util.Preconditions;
@@ -64,6 +66,9 @@ public class ForStValueState<K, N, V> extends 
AbstractValueState<K, N, V>
     /** The data inputStream used for value deserializer, which should be 
thread-safe. */
     private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
 
+    /** Whether to enable the reuse of serialized key(and namespace). */
+    private final boolean enableKeyReuse;
+
     public ForStValueState(
             StateRequestHandler stateRequestHandler,
             ColumnFamilyHandle columnFamily,
@@ -80,6 +85,11 @@ public class ForStValueState<K, N, V> extends 
AbstractValueState<K, N, V>
         this.namespaceSerializer = 
ThreadLocal.withInitial(namespaceSerializerInitializer);
         this.valueSerializerView = 
ThreadLocal.withInitial(valueSerializerViewInitializer);
         this.valueDeserializerView = 
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+        // We only enable key reuse for the most common namespace across all 
states.
+        this.enableKeyReuse =
+                (defaultNamespace instanceof VoidNamespace)
+                        && (namespaceSerializerInitializer.get()
+                                instanceof VoidNamespaceSerializer);
     }
 
     @Override
@@ -89,15 +99,12 @@ public class ForStValueState<K, N, V> extends 
AbstractValueState<K, N, V>
 
     @Override
     public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException 
{
-        return contextKey.getOrCreateSerializedKey(
-                ctxKey -> {
-                    SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
-                    builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace();
-                    return builder.buildCompositeKeyNamespace(
-                            namespace == null ? defaultNamespace : namespace,
-                            namespaceSerializer.get());
-                });
+        return ForStSerializerUtils.serializeKeyAndNamespace(
+                contextKey,
+                serializedKeyBuilder.get(),
+                defaultNamespace,
+                namespaceSerializer.get(),
+                enableKeyReuse);
     }
 
     @Override

Reply via email to