This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 25969c9de1e [FLINK-36243][state/forst] Store namespace in state 
request and  contextKey (#25300)
25969c9de1e is described below

commit 25969c9de1e73c2364200ae3d155aca4862ea036
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Wed Sep 11 19:45:57 2024 +0800

    [FLINK-36243][state/forst] Store namespace in state request and  contextKey 
(#25300)
---
 .../asyncprocessing/AsyncExecutionController.java  |  6 ++---
 .../runtime/asyncprocessing/StateRequest.java      | 12 +++++++++-
 .../asyncprocessing/StateRequestBuffer.java        | 10 ++++----
 .../asyncprocessing/StateRequestContainer.java     |  2 +-
 .../runtime/asyncprocessing/MockStateExecutor.java |  2 +-
 .../asyncprocessing/MockStateRequestContainer.java |  6 ++---
 .../state/v2/InternalKeyedStateTestBase.java       |  8 +++----
 .../org/apache/flink/state/forst/ContextKey.java   | 14 ++++++-----
 .../apache/flink/state/forst/ForStInnerTable.java  |  4 ++--
 .../apache/flink/state/forst/ForStListState.java   | 15 ++++++++----
 .../apache/flink/state/forst/ForStMapState.java    | 28 +++++++++++++++-------
 .../state/forst/ForStStateRequestClassifier.java   |  4 ++--
 .../apache/flink/state/forst/ForStValueState.java  | 14 +++++++----
 .../state/forst/ForStDBOperationTestBase.java      |  4 ++--
 .../flink/state/forst/ForStStateExecutorTest.java  | 20 ++++++++--------
 15 files changed, 90 insertions(+), 59 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 24ae7e9342f..ad767aa340a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -239,7 +239,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
             @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
         // Step 1: build state future & assign context.
         InternalStateFuture<OUT> stateFuture = 
stateFutureFactory.create(currentContext);
-        StateRequest<K, IN, OUT> request =
+        StateRequest<K, ?, IN, OUT> request =
                 new StateRequest<>(state, type, payload, stateFuture, 
currentContext);
 
         // Step 2: try to seize the capacity, if the current in-flight records 
exceeds the limit,
@@ -278,11 +278,11 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
         currentContext.setNamespace(state, namespace);
     }
 
-    <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
+    <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) {
         stateRequestsBuffer.enqueueToActive(request);
     }
 
-    <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
+    <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) {
         stateRequestsBuffer.enqueueToBlocking(request);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
index 87fd02926f2..32527a2e881 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing;
 
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 
 import javax.annotation.Nullable;
 
@@ -30,9 +31,10 @@ import java.io.Serializable;
  *
  * @param <K> Type of partitioned key.
  * @param <IN> Type of input of this request.
+ * @param <N> Type of namespace.
  * @param <OUT> Type of value that request will return.
  */
-public class StateRequest<K, IN, OUT> implements Serializable {
+public class StateRequest<K, N, IN, OUT> implements Serializable {
 
     /**
      * The underlying state to be accessed, can be empty for {@link 
StateRequestType#SYNC_POINT}.
@@ -51,6 +53,8 @@ public class StateRequest<K, IN, OUT> implements Serializable 
{
     /** The record context of this request. */
     private final RecordContext<K> context;
 
+    @Nullable private final N namespace;
+
     public StateRequest(
             @Nullable State state,
             StateRequestType type,
@@ -62,6 +66,8 @@ public class StateRequest<K, IN, OUT> implements Serializable 
{
         this.payload = payload;
         this.stateFuture = stateFuture;
         this.context = context;
+        this.namespace =
+                state == null ? null : 
context.getNamespace((InternalPartitionedState<N>) state);
     }
 
     public StateRequestType getRequestType() {
@@ -85,4 +91,8 @@ public class StateRequest<K, IN, OUT> implements Serializable 
{
     public RecordContext<K> getRecordContext() {
         return context;
     }
+
+    public @Nullable N getNamespace() {
+        return namespace;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
index 263b7cce092..04379d2d34b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
@@ -60,14 +60,14 @@ public class StateRequestBuffer<K> {
      * The state requests in this buffer could be executed when the buffer is 
full or configured
      * batch size is reached. All operations on this buffer must be invoked in 
task thread.
      */
-    final LinkedList<StateRequest<K, ?, ?>> activeQueue;
+    final LinkedList<StateRequest<K, ?, ?, ?>> activeQueue;
 
     /**
      * The requests in that should wait until all preceding records with 
identical key finishing its
      * execution. After which the queueing requests will move into the active 
buffer. All operations
      * on this buffer must be invoked in task thread.
      */
-    final Map<K, Deque<StateRequest<K, ?, ?>>> blockingQueue;
+    final Map<K, Deque<StateRequest<K, ?, ?, ?>>> blockingQueue;
 
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
@@ -120,7 +120,7 @@ public class StateRequestBuffer<K> {
         return currentSeq.get() == seq;
     }
 
-    void enqueueToActive(StateRequest<K, ?, ?> request) {
+    void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
         if (request.getRequestType() == StateRequestType.SYNC_POINT) {
             request.getFuture().complete(null);
         } else {
@@ -147,7 +147,7 @@ public class StateRequestBuffer<K> {
         }
     }
 
-    void enqueueToBlocking(StateRequest<K, ?, ?> request) {
+    void enqueueToBlocking(StateRequest<K, ?, ?, ?> request) {
         blockingQueue
                 .computeIfAbsent(request.getRecordContext().getKey(), k -> new 
LinkedList<>())
                 .add(request);
@@ -166,7 +166,7 @@ public class StateRequestBuffer<K> {
             return null;
         }
 
-        StateRequest<K, ?, ?> stateRequest = 
blockingQueue.get(key).removeFirst();
+        StateRequest<K, ?, ?, ?> stateRequest = 
blockingQueue.get(key).removeFirst();
         enqueueToActive(stateRequest);
         if (blockingQueue.get(key).isEmpty()) {
             blockingQueue.remove(key);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java
index 1a389272263..adc93e13995 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestContainer.java
@@ -29,7 +29,7 @@ package org.apache.flink.runtime.asyncprocessing;
 public interface StateRequestContainer {
 
     /** Preserve a stateRequest into the {@code StateRequestContainer}. */
-    void offer(StateRequest<?, ?, ?> stateRequest);
+    void offer(StateRequest<?, ?, ?, ?> stateRequest);
 
     /** Returns whether the container is empty. */
     boolean isEmpty();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
index b36c1da8426..8939dc7210d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
@@ -29,7 +29,7 @@ public class MockStateExecutor implements StateExecutor {
     public CompletableFuture<Void> executeBatchRequests(
             StateRequestContainer stateRequestContainer) {
         Preconditions.checkArgument(stateRequestContainer instanceof 
MockStateRequestContainer);
-        for (StateRequest<?, ?, ?> request :
+        for (StateRequest<?, ?, ?, ?> request :
                 ((MockStateRequestContainer) 
stateRequestContainer).getStateRequestList()) {
             request.getFuture().complete(null);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java
index f2200a55c03..0fa8746e3ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateRequestContainer.java
@@ -24,10 +24,10 @@ import java.util.List;
 /** The mocked {@link StateRequestContainer} for testing. */
 public class MockStateRequestContainer implements StateRequestContainer {
 
-    private final List<StateRequest<?, ?, ?>> stateRequestList = new 
ArrayList<>();
+    private final List<StateRequest<?, ?, ?, ?>> stateRequestList = new 
ArrayList<>();
 
     @Override
-    public void offer(StateRequest<?, ?, ?> stateRequest) {
+    public void offer(StateRequest<?, ?, ?, ?> stateRequest) {
         stateRequestList.add(stateRequest);
     }
 
@@ -36,7 +36,7 @@ public class MockStateRequestContainer implements 
StateRequestContainer {
         return stateRequestList.isEmpty();
     }
 
-    public List<StateRequest<?, ?, ?>> getStateRequestList() {
+    public List<StateRequest<?, ?, ?, ?>> getStateRequestList() {
         return stateRequestList;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java
index 22cce5e636d..91923ff7705 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalKeyedStateTestBase.java
@@ -154,7 +154,7 @@ public class InternalKeyedStateTestBase {
      */
     static class TestStateExecutor implements StateExecutor {
 
-        private Deque<StateRequest<?, ?, ?>> receivedRequest;
+        private Deque<StateRequest<?, ?, ?, ?>> receivedRequest;
 
         TestStateExecutor() {
             receivedRequest = new ConcurrentLinkedDeque<>();
@@ -162,7 +162,7 @@ public class InternalKeyedStateTestBase {
 
         <IN> void validate(@Nullable State state, StateRequestType type, 
@Nullable IN payload) {
             assertThat(receivedRequest.isEmpty()).isFalse();
-            StateRequest<?, ?, ?> request = receivedRequest.pop();
+            StateRequest<?, ?, ?, ?> request = receivedRequest.pop();
             assertThat(request.getState()).isEqualTo(state);
             assertThat(request.getRequestType()).isEqualTo(type);
             assertThat(request.getPayload()).isEqualTo(payload);
@@ -194,10 +194,10 @@ public class InternalKeyedStateTestBase {
         public void shutdown() {}
 
         static class TestStateRequestContainer implements 
StateRequestContainer {
-            ArrayList<StateRequest<?, ?, ?>> requests = new ArrayList<>();
+            ArrayList<StateRequest<?, ?, ?, ?>> requests = new ArrayList<>();
 
             @Override
-            public void offer(StateRequest<?, ?, ?> stateRequest) {
+            public void offer(StateRequest<?, ?, ?, ?> stateRequest) {
                 requests.add(stateRequest);
             }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
index 16807b78f10..53e7573a118 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java
@@ -19,7 +19,6 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.runtime.asyncprocessing.RecordContext;
-import org.apache.flink.runtime.state.v2.InternalPartitionedState;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
@@ -40,12 +39,15 @@ public class ContextKey<K, N> {
 
     @Nullable private Object userKey;
 
-    public ContextKey(RecordContext<K> recordContext) {
-        this.recordContext = recordContext;
+    @Nullable private final N namespace;
+
+    public ContextKey(RecordContext<K> recordContext, @Nullable N namespace) {
+        this(recordContext, namespace, null);
     }
 
-    public ContextKey(RecordContext<K> recordContext, Object userKey) {
+    public ContextKey(RecordContext<K> recordContext, @Nullable N namespace, 
Object userKey) {
         this.recordContext = recordContext;
+        this.namespace = namespace;
         this.userKey = userKey;
     }
 
@@ -57,8 +59,8 @@ public class ContextKey<K, N> {
         return recordContext.getKeyGroup();
     }
 
-    public N getNamespace(InternalPartitionedState<N> state) {
-        return recordContext.getNamespace(state);
+    public N getNamespace() {
+        return namespace;
     }
 
     public Object getUserKey() {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
index 141c63cf1e2..2984a815a20 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStInnerTable.java
@@ -73,7 +73,7 @@ public interface ForStInnerTable<K, N, V> {
      * @param stateRequest The given stateRequest.
      * @return The corresponding ForSt GetRequest.
      */
-    ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?> 
stateRequest);
+    ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, ?, ?> 
stateRequest);
 
     /**
      * Build a {@link ForStDBPutRequest} that belong to {@code 
ForStInnerTable} with the given
@@ -82,5 +82,5 @@ public interface ForStInnerTable<K, N, V> {
      * @param stateRequest The given stateRequest.
      * @return The corresponding ForSt PutRequest.
      */
-    ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest);
+    ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?, ?> 
stateRequest);
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
index 152bc0d92ea..986dbb11ddc 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java
@@ -99,7 +99,7 @@ public class ForStListState<K, N, V> extends 
InternalListState<K, N, V>
                 ctxKey -> {
                     SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
                     builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace(this);
+                    N namespace = contextKey.getNamespace();
                     return builder.buildCompositeKeyNamespace(
                             namespace == null ? defaultNamespace : namespace,
                             namespaceSerializer.get());
@@ -123,19 +123,24 @@ public class ForStListState<K, N, V> extends 
InternalListState<K, N, V>
     @SuppressWarnings("unchecked")
     @Override
     public ForStDBGetRequest<K, N, List<V>, StateIterator<V>> 
buildDBGetRequest(
-            StateRequest<?, ?, ?> stateRequest) {
+            StateRequest<?, ?, ?, ?> stateRequest) {
         Preconditions.checkArgument(stateRequest.getRequestType() == 
StateRequestType.LIST_GET);
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace());
         return new ForStDBListGetRequest<>(
                 contextKey, this, (InternalStateFuture<StateIterator<V>>) 
stateRequest.getFuture());
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public ForStDBPutRequest<K, N, List<V>> buildDBPutRequest(StateRequest<?, 
?, ?> stateRequest) {
+    public ForStDBPutRequest<K, N, List<V>> buildDBPutRequest(
+            StateRequest<?, ?, ?, ?> stateRequest) {
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace());
         List<V> value;
         boolean merge = false;
         switch (stateRequest.getRequestType()) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
index 71b30b9ede7..fe23949febb 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java
@@ -119,8 +119,10 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
                 ctxKey -> {
                     SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
                     builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace(this);
-                    builder.setNamespace(namespace, namespaceSerializer.get());
+                    N namespace = contextKey.getNamespace();
+                    builder.setNamespace(
+                            namespace == null ? defaultNamespace : namespace,
+                            namespaceSerializer.get());
                     if (contextKey.getUserKey() == null) { // value get
                         return builder.build();
                     }
@@ -152,7 +154,7 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
 
     @Override
     @SuppressWarnings("unchecked")
-    public ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, 
?> stateRequest) {
+    public ForStDBGetRequest<?, ?, ?, ?> buildDBGetRequest(StateRequest<?, ?, 
?, ?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.MAP_GET
                         || stateRequest.getRequestType() == 
StateRequestType.MAP_CONTAINS
@@ -160,6 +162,7 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
         ContextKey<K, N> contextKey =
                 new ContextKey<>(
                         (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace(),
                         stateRequest.getPayload());
 
         if (stateRequest.getRequestType() == StateRequestType.MAP_GET) {
@@ -175,13 +178,14 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
 
     @Override
     @SuppressWarnings("unchecked")
-    public ForStDBPutRequest<K, N, UV> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest) {
+    public ForStDBPutRequest<K, N, UV> buildDBPutRequest(StateRequest<?, ?, ?, 
?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.MAP_PUT
                         || stateRequest.getRequestType() == 
StateRequestType.MAP_REMOVE);
         ContextKey<K, N> contextKey =
                 new ContextKey<>(
                         (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace(),
                         ((Tuple2<UK, UV>) stateRequest.getPayload()).f0);
         Preconditions.checkNotNull(
                 stateRequest.getPayload(), String.format("payload is null, 
%s", stateRequest));
@@ -203,12 +207,15 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
      */
     @SuppressWarnings("unchecked")
     public ForStDBBunchPutRequest<K, N, UK, UV> buildDBBunchPutRequest(
-            StateRequest<?, ?, ?> stateRequest) {
+            StateRequest<?, ?, ?, ?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.MAP_PUT_ALL
                         || stateRequest.getRequestType() == 
StateRequestType.CLEAR);
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext(), null);
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace(),
+                        null);
         Map<UK, UV> value = (Map<UK, UV>) stateRequest.getPayload();
         return new ForStDBBunchPutRequest(contextKey, value, this, 
stateRequest.getFuture());
     }
@@ -223,7 +230,7 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
      */
     @SuppressWarnings("unchecked")
     public ForStDBIterRequest<K, N, UK, UV, ?> buildDBIterRequest(
-            StateRequest<?, ?, ?> stateRequest) {
+            StateRequest<?, ?, ?, ?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.MAP_ITER
                         || stateRequest.getRequestType() == 
StateRequestType.MAP_ITER_KEY
@@ -242,11 +249,14 @@ public class ForStMapState<K, N, UK, UV> extends 
InternalMapState<K, N, UK, UV>
 
     @SuppressWarnings("unchecked")
     private ForStDBIterRequest<K, N, UK, UV, ?> buildDBIterRequest(
-            StateRequest<?, ?, ?> stateRequest,
+            StateRequest<?, ?, ?, ?> stateRequest,
             StateRequestType requestType,
             RocksIterator rocksIterator) {
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext(), null);
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace(),
+                        null);
         switch (requestType) {
             case MAP_ITER:
                 return new ForStDBMapEntryIterRequest<>(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
index 9e9510e4327..ea6ede88acf 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
@@ -44,7 +44,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
     }
 
     @Override
-    public void offer(StateRequest<?, ?, ?> stateRequest) {
+    public void offer(StateRequest<?, ?, ?, ?> stateRequest) {
         convertStateRequestsToForStDBRequests(stateRequest);
     }
 
@@ -54,7 +54,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
     }
 
     @SuppressWarnings("ConstantConditions")
-    private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?> 
stateRequest) {
+    private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?, 
?> stateRequest) {
         StateRequestType stateRequestType = stateRequest.getRequestType();
         switch (stateRequestType) {
             case VALUE_GET:
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
index cd6968bb12f..31d65f521a6 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java
@@ -93,7 +93,7 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
                 ctxKey -> {
                     SerializedCompositeKeyBuilder<K> builder = 
serializedKeyBuilder.get();
                     builder.setKeyAndKeyGroup(ctxKey.getRawKey(), 
ctxKey.getKeyGroup());
-                    N namespace = contextKey.getNamespace(this);
+                    N namespace = contextKey.getNamespace();
                     return builder.buildCompositeKeyNamespace(
                             namespace == null ? defaultNamespace : namespace,
                             namespaceSerializer.get());
@@ -117,22 +117,26 @@ public class ForStValueState<K, N, V> extends 
InternalValueState<K, N, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    public ForStDBGetRequest<K, N, V, V> buildDBGetRequest(StateRequest<?, ?, 
?> stateRequest) {
+    public ForStDBGetRequest<K, N, V, V> buildDBGetRequest(StateRequest<?, ?, 
?, ?> stateRequest) {
         Preconditions.checkArgument(stateRequest.getRequestType() == 
StateRequestType.VALUE_GET);
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace());
         return new ForStDBSingleGetRequest<>(
                 contextKey, this, (InternalStateFuture<V>) 
stateRequest.getFuture());
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> 
stateRequest) {
+    public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?, 
?> stateRequest) {
         Preconditions.checkArgument(
                 stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE
                         || stateRequest.getRequestType() == 
StateRequestType.CLEAR);
         ContextKey<K, N> contextKey =
-                new ContextKey<>((RecordContext<K>) 
stateRequest.getRecordContext());
+                new ContextKey<>(
+                        (RecordContext<K>) stateRequest.getRecordContext(),
+                        (N) stateRequest.getNamespace());
         V value =
                 (stateRequest.getRequestType() == StateRequestType.CLEAR)
                         ? null // "Delete(key)" is equivalent to "Put(key, 
null)"
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index dba6254dc55..df8c9ff42c3 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -118,7 +118,7 @@ public class ForStDBOperationTestBase {
         int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128);
         RecordContext<Integer> recordContext =
                 new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0));
-        return new ContextKey<>(recordContext);
+        return new ContextKey<>(recordContext, VoidNamespace.INSTANCE, null);
     }
 
     protected ForStValueState<Integer, VoidNamespace, String> 
buildForStValueState(String stateName)
@@ -216,7 +216,7 @@ public class ForStDBOperationTestBase {
 
         @Override
         public void completeExceptionally(String message, Throwable ex) {
-            throw new UnsupportedOperationException();
+            // do nothing
         }
 
         @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
index b7872dc97f9..96f0c30a04f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
@@ -66,13 +66,13 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
 
         forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
 
-        List<StateRequest<?, ?, ?>> checkList = new ArrayList<>();
+        List<StateRequest<?, ?, ?, ?>> checkList = new ArrayList<>();
         stateRequestContainer = 
forStStateExecutor.createStateRequestContainer();
         // 2. Get value state: keyRange [0, keyNum)
         //    Update value state: keyRange [keyNum, keyNum + 100]
         for (int i = 0; i < keyNum; i++) {
             ForStValueState<Integer, VoidNamespace, String> state = (i % 2 == 
0 ? state1 : state2);
-            StateRequest<?, ?, ?> getRequest =
+            StateRequest<?, ?, ?, ?> getRequest =
                     buildStateRequest(state, StateRequestType.VALUE_GET, i, 
null, i * 2);
             stateRequestContainer.offer(getRequest);
             checkList.add(getRequest);
@@ -85,7 +85,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase 
{
         forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
 
         // 3. Check value state Get result : [0, keyNum)
-        for (StateRequest<?, ?, ?> getRequest : checkList) {
+        for (StateRequest<?, ?, ?, ?> getRequest : checkList) {
             
assertThat(getRequest.getRequestType()).isEqualTo(StateRequestType.VALUE_GET);
             int key = (Integer) getRequest.getRecordContext().getKey();
             
assertThat(getRequest.getRecordContext().getRecord()).isEqualTo(key * 2);
@@ -113,13 +113,13 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         checkList.clear();
         for (int i = keyNum - 100; i < keyNum + 100; i++) {
             ForStValueState<Integer, VoidNamespace, String> state = (i % 2 == 
0 ? state1 : state2);
-            StateRequest<?, ?, ?> getRequest =
+            StateRequest<?, ?, ?, ?> getRequest =
                     buildStateRequest(state, StateRequestType.VALUE_GET, i, 
null, i * 2);
             stateRequestContainer.offer(getRequest);
             checkList.add(getRequest);
         }
         forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
-        for (StateRequest<?, ?, ?> getRequest : checkList) {
+        for (StateRequest<?, ?, ?, ?> getRequest : checkList) {
             
assertThat(getRequest.getRequestType()).isEqualTo(StateRequestType.VALUE_GET);
             assertThat(((TestStateFuture<String>) 
getRequest.getFuture()).getCompletedResult())
                     .isEqualTo(null);
@@ -157,11 +157,11 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
 
         stateRequestContainer = 
forStStateExecutor.createStateRequestContainer();
-        List<StateRequest<?, ?, ?>> checkList = new ArrayList<>();
+        List<StateRequest<?, ?, ?, ?>> checkList = new ArrayList<>();
 
         // 2. check the number of user key under primary key is correct
         for (int i = 0; i < 100; i++) {
-            StateRequest<?, ?, ?> iterRequest =
+            StateRequest<?, ?, ?, ?> iterRequest =
                     buildStateRequest(state, StateRequestType.MAP_ITER_KEY, i, 
null, i * 2);
             stateRequestContainer.offer(iterRequest);
             checkList.add(iterRequest);
@@ -208,7 +208,7 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         checkList.clear();
         // 4. check primary key [75,100) is deleted
         for (int i = 0; i < 100; i++) {
-            StateRequest<?, ?, ?> iterRequest =
+            StateRequest<?, ?, ?, ?> iterRequest =
                     buildStateRequest(state, StateRequestType.MAP_IS_EMPTY, i, 
null, i * 2);
             stateRequestContainer.offer(iterRequest);
             checkList.add(iterRequest);
@@ -229,7 +229,7 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private <K, N, V, R> StateRequest<?, ?, ?> buildStateRequest(
+    private <K, N, V, R> StateRequest<?, ?, ?, ?> buildStateRequest(
             InternalKeyedState<K, N, V> innerTable,
             StateRequestType requestType,
             K key,
@@ -243,7 +243,7 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private <K, N, UK, UV, R> StateRequest<?, ?, ?> buildMapRequest(
+    private <K, N, UK, UV, R> StateRequest<?, ?, ?, ?> buildMapRequest(
             ForStMapState<K, N, UK, UV> innerTable,
             StateRequestType requestType,
             K key,

Reply via email to