This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 82582b3a7b7 [FLINK-36364][state/forst] Do not reuse serialized key in
Forst map state and/or other namespaces (#25394)
82582b3a7b7 is described below
commit 82582b3a7b75b7ffae9d48895088189b1324caa3
Author: Zakelly <[email protected]>
AuthorDate: Thu Sep 26 12:05:01 2024 +0800
[FLINK-36364][state/forst] Do not reuse serialized key in Forst map state
and/or other namespaces (#25394)
---
.../org/apache/flink/state/forst/ContextKey.java | 12 +++--
.../flink/state/forst/ForStDBMapCheckRequest.java | 4 +-
.../apache/flink/state/forst/ForStListState.java | 25 +++++----
.../apache/flink/state/forst/ForStMapState.java | 25 ++++-----
.../flink/state/forst/ForStReducingState.java | 25 +++++----
.../flink/state/forst/ForStSerializerUtils.java | 63 ++++++++++++++++++++++
.../apache/flink/state/forst/ForStValueState.java | 25 +++++----
7 files changed, 131 insertions(+), 48 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
index 53e7573a118..f85927c4d26 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
@@ -87,16 +87,18 @@ public class ContextKey<K, N> {
public byte[] getOrCreateSerializedKey(
FunctionWithException<ContextKey<K, N>, byte[], IOException>
serializeKeyFunc)
throws IOException {
- if (recordContext.getExtra() != null) {
- return (byte[]) recordContext.getExtra();
+ byte[] serializedKey = (byte[]) recordContext.getExtra();
+ if (serializedKey != null) {
+ return serializedKey;
}
synchronized (recordContext) {
- if (recordContext.getExtra() == null) {
- byte[] serializedKey = serializeKeyFunc.apply(this);
+ serializedKey = (byte[]) recordContext.getExtra();
+ if (serializedKey == null) {
+ serializedKey = serializeKeyFunc.apply(this);
recordContext.setExtra(serializedKey);
}
}
- return (byte[]) recordContext.getExtra();
+ return serializedKey;
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
index 003fad1010b..1c799253c2f 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java
@@ -37,6 +37,8 @@ import static
org.apache.flink.state.forst.ForStDBIterRequest.startWithKeyPrefix
*/
public class ForStDBMapCheckRequest<K, N, V> extends ForStDBGetRequest<K, N,
V, Boolean> {
+ private static final byte[] VALID_PLACEHOLDER = new byte[0];
+
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -60,7 +62,7 @@ public class ForStDBMapCheckRequest<K, N, V> extends
ForStDBGetRequest<K, N, V,
try (RocksIterator iter = db.newIterator(getColumnFamilyHandle()))
{
iter.seek(key);
if (iter.isValid() && startWithKeyPrefix(key, iter.key(),
keyGroupPrefixBytes)) {
- completeStateFuture(new byte[0]);
+ completeStateFuture(VALID_PLACEHOLDER);
} else {
completeStateFuture(null);
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
index 7b5da10e960..d1ddfa9db54 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractListState;
import org.apache.flink.runtime.state.v2.ListStateDescriptor;
import org.apache.flink.util.Preconditions;
@@ -70,6 +72,9 @@ public class ForStListState<K, N, V> extends
AbstractListState<K, N, V>
/** The data inputStream used for value deserializer, which should be
thread-safe. */
private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
+ /** Whether to enable the reuse of serialized key(and namespace). */
+ private final boolean enableKeyReuse;
+
public ForStListState(
StateRequestHandler stateRequestHandler,
ColumnFamilyHandle columnFamily,
@@ -86,6 +91,11 @@ public class ForStListState<K, N, V> extends
AbstractListState<K, N, V>
this.namespaceSerializer =
ThreadLocal.withInitial(namespaceSerializerInitializer);
this.valueSerializerView =
ThreadLocal.withInitial(valueSerializerViewInitializer);
this.valueDeserializerView =
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+ // We only enable key reuse for the most common namespace across all
states.
+ this.enableKeyReuse =
+ (defaultNamespace instanceof VoidNamespace)
+ && (namespaceSerializerInitializer.get()
+ instanceof VoidNamespaceSerializer);
}
@Override
@@ -95,15 +105,12 @@ public class ForStListState<K, N, V> extends
AbstractListState<K, N, V>
@Override
public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException
{
- return contextKey.getOrCreateSerializedKey(
- ctxKey -> {
- SerializedCompositeKeyBuilder<K> builder =
serializedKeyBuilder.get();
- builder.setKeyAndKeyGroup(ctxKey.getRawKey(),
ctxKey.getKeyGroup());
- N namespace = contextKey.getNamespace();
- return builder.buildCompositeKeyNamespace(
- namespace == null ? defaultNamespace : namespace,
- namespaceSerializer.get());
- });
+ return ForStSerializerUtils.serializeKeyAndNamespace(
+ contextKey,
+ serializedKeyBuilder.get(),
+ defaultNamespace,
+ namespaceSerializer.get(),
+ enableKeyReuse);
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
index 2a2595cdfd6..05b448bc743 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
@@ -114,21 +114,16 @@ public class ForStMapState<K, N, UK, UV> extends
AbstractMapState<K, N, UK, UV>
@Override
public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException
{
- contextKey.resetExtra();
- return contextKey.getOrCreateSerializedKey(
- ctxKey -> {
- SerializedCompositeKeyBuilder<K> builder =
serializedKeyBuilder.get();
- builder.setKeyAndKeyGroup(ctxKey.getRawKey(),
ctxKey.getKeyGroup());
- N namespace = contextKey.getNamespace();
- builder.setNamespace(
- namespace == null ? defaultNamespace : namespace,
- namespaceSerializer.get());
- if (contextKey.getUserKey() == null) { // value get
- return builder.build();
- }
- UK userKey = (UK) contextKey.getUserKey(); // map get
- return builder.buildCompositeKeyUserKey(userKey,
userKeySerializer);
- });
+ SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get();
+ builder.setKeyAndKeyGroup(contextKey.getRawKey(),
contextKey.getKeyGroup());
+ N namespace = contextKey.getNamespace();
+ builder.setNamespace(
+ namespace == null ? defaultNamespace : namespace,
namespaceSerializer.get());
+ if (contextKey.getUserKey() == null) { // value get
+ return builder.build();
+ }
+ UK userKey = (UK) contextKey.getUserKey(); // map get
+ return builder.buildCompositeKeyUserKey(userKey, userKeySerializer);
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
index 9861a2b451c..f640da7f9c1 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractReducingState;
import org.apache.flink.runtime.state.v2.ReducingStateDescriptor;
import org.apache.flink.util.Preconditions;
@@ -64,6 +66,9 @@ public class ForStReducingState<K, N, V> extends
AbstractReducingState<K, N, V>
/** The data inputStream used for value deserializer, which should be
thread-safe. */
private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
+ /** Whether to enable the reuse of serialized key(and namespace). */
+ private final boolean enableKeyReuse;
+
public ForStReducingState(
StateRequestHandler stateRequestHandler,
ColumnFamilyHandle columnFamily,
@@ -80,6 +85,11 @@ public class ForStReducingState<K, N, V> extends
AbstractReducingState<K, N, V>
this.namespaceSerializer =
ThreadLocal.withInitial(namespaceSerializerInitializer);
this.valueSerializerView =
ThreadLocal.withInitial(valueSerializerViewInitializer);
this.valueDeserializerView =
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+ // We only enable key reuse for the most common namespace across all
states.
+ this.enableKeyReuse =
+ (defaultNamespace instanceof VoidNamespace)
+ && (namespaceSerializerInitializer.get()
+ instanceof VoidNamespaceSerializer);
}
@Override
@@ -89,15 +99,12 @@ public class ForStReducingState<K, N, V> extends
AbstractReducingState<K, N, V>
@Override
public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException
{
- return contextKey.getOrCreateSerializedKey(
- ctxKey -> {
- SerializedCompositeKeyBuilder<K> builder =
serializedKeyBuilder.get();
- builder.setKeyAndKeyGroup(ctxKey.getRawKey(),
ctxKey.getKeyGroup());
- N namespace = ctxKey.getNamespace();
- return builder.buildCompositeKeyNamespace(
- namespace == null ? defaultNamespace : namespace,
- namespaceSerializer.get());
- });
+ return ForStSerializerUtils.serializeKeyAndNamespace(
+ contextKey,
+ serializedKeyBuilder.get(),
+ defaultNamespace,
+ namespaceSerializer.get(),
+ enableKeyReuse);
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
new file mode 100644
index 00000000000..d600b9b324d
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+
+import java.io.IOException;
+
+/** A utility of serialization keys in ForSt. */
+public class ForStSerializerUtils {
+
+ /**
+ * Serialize a key and namespace. No user key.
+ *
+ * @param contextKey the context key of current request
+ * @param builder key builder
+ * @param defaultNamespace default namespace of the state
+ * @param namespaceSerializer the namespace serializer
+ * @param enableKeyReuse whether to enable key reuse
+ */
+ public static <K, N> byte[] serializeKeyAndNamespace(
+ ContextKey<K, N> contextKey,
+ SerializedCompositeKeyBuilder<K> builder,
+ N defaultNamespace,
+ TypeSerializer<N> namespaceSerializer,
+ boolean enableKeyReuse)
+ throws IOException {
+ N namespace = contextKey.getNamespace();
+ namespace = (namespace == null ? defaultNamespace : namespace);
+ if (enableKeyReuse && namespace == defaultNamespace) {
+ // key reuse.
+ return contextKey.getOrCreateSerializedKey(
+ ctxKey -> {
+ builder.setKeyAndKeyGroup(ctxKey.getRawKey(),
ctxKey.getKeyGroup());
+ return builder.buildCompositeKeyNamespace(
+ defaultNamespace, namespaceSerializer);
+ });
+ } else {
+ // no key reuse, serialize again.
+ builder.setKeyAndKeyGroup(contextKey.getRawKey(),
contextKey.getKeyGroup());
+ return builder.buildCompositeKeyNamespace(namespace,
namespaceSerializer);
+ }
+ }
+
+ private ForStSerializerUtils() {}
+}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index 519f4634686..15dcd72cec6 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractValueState;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.Preconditions;
@@ -64,6 +66,9 @@ public class ForStValueState<K, N, V> extends
AbstractValueState<K, N, V>
/** The data inputStream used for value deserializer, which should be
thread-safe. */
private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
+ /** Whether to enable the reuse of serialized key(and namespace). */
+ private final boolean enableKeyReuse;
+
public ForStValueState(
StateRequestHandler stateRequestHandler,
ColumnFamilyHandle columnFamily,
@@ -80,6 +85,11 @@ public class ForStValueState<K, N, V> extends
AbstractValueState<K, N, V>
this.namespaceSerializer =
ThreadLocal.withInitial(namespaceSerializerInitializer);
this.valueSerializerView =
ThreadLocal.withInitial(valueSerializerViewInitializer);
this.valueDeserializerView =
ThreadLocal.withInitial(valueDeserializerViewInitializer);
+ // We only enable key reuse for the most common namespace across all
states.
+ this.enableKeyReuse =
+ (defaultNamespace instanceof VoidNamespace)
+ && (namespaceSerializerInitializer.get()
+ instanceof VoidNamespaceSerializer);
}
@Override
@@ -89,15 +99,12 @@ public class ForStValueState<K, N, V> extends
AbstractValueState<K, N, V>
@Override
public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException
{
- return contextKey.getOrCreateSerializedKey(
- ctxKey -> {
- SerializedCompositeKeyBuilder<K> builder =
serializedKeyBuilder.get();
- builder.setKeyAndKeyGroup(ctxKey.getRawKey(),
ctxKey.getKeyGroup());
- N namespace = contextKey.getNamespace();
- return builder.buildCompositeKeyNamespace(
- namespace == null ? defaultNamespace : namespace,
- namespaceSerializer.get());
- });
+ return ForStSerializerUtils.serializeKeyAndNamespace(
+ contextKey,
+ serializedKeyBuilder.get(),
+ defaultNamespace,
+ namespaceSerializer.get(),
+ enableKeyReuse);
}
@Override