This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 180d587717b [FLINK-37164][Runtime] Speed up state v2 synchronous 
methods execution (#26005)
180d587717b is described below

commit 180d587717ba0997c35f89e080974851eea7a938
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Sat Jan 18 11:56:51 2025 +0800

    [FLINK-37164][Runtime] Speed up state v2 synchronous methods execution 
(#26005)
---
 .../asyncprocessing/AsyncExecutionController.java  | 55 ++++++++++------
 .../runtime/asyncprocessing/StateExecutor.java     |  7 ++
 .../runtime/asyncprocessing/StateRequest.java      |  8 +++
 .../asyncprocessing/StateRequestBuffer.java        | 11 +---
 .../asyncprocessing/AbstractStateIteratorTest.java | 74 ++++++++++++----------
 .../AsyncExecutionControllerTest.java              | 46 ++++++++------
 .../runtime/asyncprocessing/MockStateExecutor.java |  7 +-
 .../state/v2/AbstractAggregatingStateTest.java     | 38 ++++++-----
 .../state/v2/AbstractKeyedStateTestBase.java       | 23 ++++---
 .../state/v2/AbstractReducingStateTest.java        |  5 ++
 .../flink/state/forst/ForStStateExecutor.java      | 60 ++++++++++++++++--
 .../state/forst/ForStStateRequestClassifier.java   | 40 ++++++------
 .../flink/state/forst/ForStStateExecutorTest.java  |  6 +-
 13 files changed, 248 insertions(+), 132 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index e4b0f6f9c8e..cc7eed91d83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -305,7 +305,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     @Override
     public <IN, OUT> InternalStateFuture<OUT> handleRequest(
             @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
-        return handleRequest(state, type, payload, false);
+        return handleRequest(state, type, false, payload, false);
     }
 
     /**
@@ -314,6 +314,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      * @param state the state to request. Could be {@code null} if the type is 
{@link
      *     StateRequestType#SYNC_POINT}.
      * @param type the type of this request.
+     * @param sync whether to trigger the request synchronously once it's 
ready.
      * @param payload the payload input for this request.
      * @param allowOverdraft whether to allow overdraft.
      * @return the state future.
@@ -321,12 +322,19 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     public <IN, OUT> InternalStateFuture<OUT> handleRequest(
             @Nullable State state,
             StateRequestType type,
+            boolean sync,
             @Nullable IN payload,
             boolean allowOverdraft) {
         // Step 1: build state future & assign context.
         InternalStateFuture<OUT> stateFuture = 
stateFutureFactory.create(currentContext);
         StateRequest<K, ?, IN, OUT> request =
-                new StateRequest<>(state, type, payload, stateFuture, 
currentContext);
+                new StateRequest<>(
+                        state,
+                        type,
+                        sync || type == StateRequestType.SYNC_POINT,
+                        payload,
+                        stateFuture,
+                        currentContext);
 
         // Step 2: try to seize the capacity, if the current in-flight records 
exceeds the limit,
         // block the current state request from entering until some buffered 
requests are processed.
@@ -346,22 +354,25 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     @Override
     public <IN, OUT> OUT handleRequestSync(
             State state, StateRequestType type, @Nullable IN payload) {
-        InternalStateFuture<OUT> stateFuture = handleRequest(state, type, 
payload);
-        // Trigger since we are waiting the result.
-        triggerIfNeeded(true);
-        try {
-            while (!stateFuture.isDone()) {
-                if (!mailboxExecutor.tryYield()) {
-                    // We force trigger the buffer if the executor is not 
fully loaded.
-                    if (!stateExecutor.fullyLoaded()) {
-                        triggerIfNeeded(true);
+        InternalStateFuture<OUT> stateFuture = handleRequest(state, type, 
true, payload, false);
+        if (!stateFuture.isDone()) {
+            // Trigger since we are waiting the result.
+            triggerIfNeeded(true);
+            try {
+                while (!stateFuture.isDone()) {
+                    if (!mailboxExecutor.tryYield()) {
+                        // We force trigger the buffer if the executor is not 
fully loaded.
+                        if (!stateExecutor.fullyLoaded()) {
+                            triggerIfNeeded(true);
+                        }
+                        waitForNewMails();
                     }
-                    waitForNewMails();
                 }
+            } catch (InterruptedException ignored) {
+                // ignore the interrupted exception to avoid throwing fatal 
error when the task
+                // cancel
+                // or exit.
             }
-        } catch (InterruptedException ignored) {
-            // ignore the interrupted exception to avoid throwing fatal error 
when the task cancel
-            // or exit.
         }
         return stateFuture.get();
     }
@@ -373,7 +384,15 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     }
 
     <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) {
-        stateRequestsBuffer.enqueueToActive(request);
+        if (request.isSync()) {
+            if (request.getRequestType() == StateRequestType.SYNC_POINT) {
+                request.getFuture().complete(null);
+            } else {
+                stateExecutor.executeRequestSync(request);
+            }
+        } else {
+            stateRequestsBuffer.enqueueToActive(request);
+        }
     }
 
     <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) {
@@ -441,7 +460,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      */
     public StateFuture<Void> syncPointRequestWithCallback(
             ThrowingRunnable<Exception> callback, boolean allowOverdraft) {
-        return handleRequest(null, StateRequestType.SYNC_POINT, null, 
allowOverdraft)
+        return handleRequest(null, StateRequestType.SYNC_POINT, true, null, 
allowOverdraft)
                 .thenAccept(v -> callback.run());
     }
 
@@ -464,7 +483,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      *     only drain in best efforts and return when no progress is made.
      */
     private void drainInflightRecords(int targetNum, boolean forceToWait) {
-        if (!forceToWait && drainDepth > 0) {
+        if (!forceToWait && drainDepth > 5) {
             // We don't allow recursive call of drain if we are not forced to 
wait here.
             // This is to avoid stack overflow, since the yield will pick up 
another processing,
             // which may cause another drain.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
index 924d2557416..34803a5a5f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
@@ -45,6 +45,13 @@ public interface StateExecutor {
      */
     StateRequestContainer createStateRequestContainer();
 
+    /**
+     * Execute a single state request *synchronously*. This is for synchronous 
APIs.
+     *
+     * @param stateRequest the request to run.
+     */
+    void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest);
+
     /**
      * Check if this executor is fully loaded. Will be invoked to determine 
whether to give more
      * requests to run or wait for a while.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
index dea08422de6..d7e4ddec1a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java
@@ -44,6 +44,8 @@ public class StateRequest<K, N, IN, OUT> implements 
Serializable {
     /** The type of this request. */
     private final StateRequestType type;
 
+    private final boolean sync;
+
     /** The payload(input) of this request. */
     @Nullable private final IN payload;
 
@@ -58,11 +60,13 @@ public class StateRequest<K, N, IN, OUT> implements 
Serializable {
     public StateRequest(
             @Nullable State state,
             StateRequestType type,
+            boolean sync,
             @Nullable IN payload,
             InternalStateFuture<OUT> stateFuture,
             RecordContext<K> context) {
         this.state = state;
         this.type = type;
+        this.sync = sync;
         this.payload = payload;
         this.stateFuture = stateFuture;
         this.context = context;
@@ -79,6 +83,10 @@ public class StateRequest<K, N, IN, OUT> implements 
Serializable {
         return payload;
     }
 
+    public boolean isSync() {
+        return sync;
+    }
+
     @Nullable
     public State getState() {
         return state;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
index bcaf1f24501..fe469d90012 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
@@ -148,14 +148,9 @@ public class StateRequestBuffer<K> implements Closeable {
     }
 
     void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
-        if (request.getRequestType() == StateRequestType.SYNC_POINT) {
-            request.getFuture().complete(null);
-        } else {
-            activeQueue.add(request);
-            if (bufferTimeout > 0 && seqAndTimeout == null) {
-                seqAndTimeout =
-                        Tuple2.of(currentSeq.get(), System.currentTimeMillis() 
+ bufferTimeout);
-            }
+        activeQueue.add(request);
+        if (bufferTimeout > 0 && seqAndTimeout == null) {
+            seqAndTimeout = Tuple2.of(currentSeq.get(), 
System.currentTimeMillis() + bufferTimeout);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
index a05e443d3bc..68ea31ca0ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing;
 
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.core.state.InternalStateFuture;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -196,40 +197,7 @@ public class AbstractStateIteratorTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             for (StateRequest request :
                     ((MockStateRequestContainer) 
stateRequestContainer).getStateRequestList()) {
-                if (request.getRequestType() == StateRequestType.MAP_ITER) {
-                    ArrayList<Integer> results = new ArrayList<>(step);
-                    for (int i = 0; current < limit && i < step; i++) {
-                        results.add(current++);
-                    }
-                    request.getFuture()
-                            .complete(
-                                    new TestIterator(
-                                            request.getState(),
-                                            request.getRequestType(),
-                                            aec,
-                                            results,
-                                            current,
-                                            limit));
-                } else if (request.getRequestType() == 
StateRequestType.ITERATOR_LOADING) {
-                    
assertThat(request.getPayload()).isInstanceOf(TestIterator.class);
-                    assertThat(((TestIterator) 
request.getPayload()).current).isEqualTo(current);
-                    ArrayList<Integer> results = new ArrayList<>(step);
-                    for (int i = 0; current < limit && i < step; i++) {
-                        results.add(current++);
-                    }
-                    request.getFuture()
-                            .complete(
-                                    new TestIterator(
-                                            request.getState(),
-                                            ((TestIterator) 
request.getPayload()).getRequestType(),
-                                            aec,
-                                            results,
-                                            current,
-                                            limit));
-                } else {
-                    fail("Unsupported request type " + 
request.getRequestType());
-                }
-                processedCount.incrementAndGet();
+                executeRequestSync(request);
             }
             future.complete(null);
             return future;
@@ -240,6 +208,44 @@ public class AbstractStateIteratorTest {
             return new MockStateRequestContainer();
         }
 
+        @Override
+        public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
+            if (request.getRequestType() == StateRequestType.MAP_ITER) {
+                ArrayList<Integer> results = new ArrayList<>(step);
+                for (int i = 0; current < limit && i < step; i++) {
+                    results.add(current++);
+                }
+                ((InternalStateFuture<StateIterator<Integer>>) 
request.getFuture())
+                        .complete(
+                                new TestIterator(
+                                        request.getState(),
+                                        request.getRequestType(),
+                                        aec,
+                                        results,
+                                        current,
+                                        limit));
+            } else if (request.getRequestType() == 
StateRequestType.ITERATOR_LOADING) {
+                
assertThat(request.getPayload()).isInstanceOf(TestIterator.class);
+                assertThat(((TestIterator) 
request.getPayload()).current).isEqualTo(current);
+                ArrayList<Integer> results = new ArrayList<>(step);
+                for (int i = 0; current < limit && i < step; i++) {
+                    results.add(current++);
+                }
+                ((InternalStateFuture<StateIterator<Integer>>) 
request.getFuture())
+                        .complete(
+                                new TestIterator(
+                                        request.getState(),
+                                        ((TestIterator) 
request.getPayload()).getRequestType(),
+                                        aec,
+                                        results,
+                                        current,
+                                        limit));
+            } else {
+                fail("Unsupported request type " + request.getRequestType());
+            }
+            processedCount.incrementAndGet();
+        }
+
         @Override
         public boolean fullyLoaded() {
             return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index 64e78bd853e..cf4ed16b1fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
@@ -847,26 +848,7 @@ class AsyncExecutionControllerTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             for (StateRequest request :
                     ((MockStateRequestContainer) 
stateRequestContainer).getStateRequestList()) {
-                if (request.getRequestType() == StateRequestType.VALUE_GET) {
-                    Preconditions.checkState(request.getState() != null);
-                    TestValueState state = (TestValueState) request.getState();
-                    Integer val =
-                            state.underlyingState.get(
-                                    (String) 
request.getRecordContext().getKey(),
-                                    (String) 
request.getRecordContext().getNamespace(state));
-                    request.getFuture().complete(val);
-                } else if (request.getRequestType() == 
StateRequestType.VALUE_UPDATE) {
-                    Preconditions.checkState(request.getState() != null);
-                    TestValueState state = (TestValueState) request.getState();
-
-                    state.underlyingState.update(
-                            (String) request.getRecordContext().getKey(),
-                            (String) 
request.getRecordContext().getNamespace(state),
-                            (Integer) request.getPayload());
-                    request.getFuture().complete(null);
-                } else {
-                    throw new UnsupportedOperationException("Unsupported 
request type");
-                }
+                executeRequestSync(request);
             }
             future.complete(null);
             return future;
@@ -877,6 +859,30 @@ class AsyncExecutionControllerTest {
             return new MockStateRequestContainer();
         }
 
+        @Override
+        public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
+            if (request.getRequestType() == StateRequestType.VALUE_GET) {
+                Preconditions.checkState(request.getState() != null);
+                TestValueState state = (TestValueState) request.getState();
+                Integer val =
+                        state.underlyingState.get(
+                                (String) request.getRecordContext().getKey(),
+                                (String) 
request.getRecordContext().getNamespace(state));
+                ((InternalStateFuture<Integer>) 
request.getFuture()).complete(val);
+            } else if (request.getRequestType() == 
StateRequestType.VALUE_UPDATE) {
+                Preconditions.checkState(request.getState() != null);
+                TestValueState state = (TestValueState) request.getState();
+
+                state.underlyingState.update(
+                        (String) request.getRecordContext().getKey(),
+                        (String) 
request.getRecordContext().getNamespace(state),
+                        (Integer) request.getPayload());
+                request.getFuture().complete(null);
+            } else {
+                throw new UnsupportedOperationException("Unsupported request 
type");
+            }
+        }
+
         @Override
         public boolean fullyLoaded() {
             return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
index 51cfbb3b265..474bdb558a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java
@@ -31,7 +31,7 @@ public class MockStateExecutor implements StateExecutor {
         Preconditions.checkArgument(stateRequestContainer instanceof 
MockStateRequestContainer);
         for (StateRequest<?, ?, ?, ?> request :
                 ((MockStateRequestContainer) 
stateRequestContainer).getStateRequestList()) {
-            request.getFuture().complete(null);
+            executeRequestSync(request);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -41,6 +41,11 @@ public class MockStateExecutor implements StateExecutor {
         return new MockStateRequestContainer();
     }
 
+    @Override
+    public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
+        stateRequest.getFuture().complete(null);
+    }
+
     @Override
     public boolean fullyLoaded() {
         return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
index 1e8dc088ede..705a2c50d16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.state.InternalStateFuture;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
@@ -233,22 +234,7 @@ class AbstractAggregatingStateTest extends 
AbstractKeyedStateTestBase {
                 StateRequestContainer stateRequestContainer) {
             for (StateRequest stateRequest :
                     ((MockStateRequestContainer) 
stateRequestContainer).getStateRequestList()) {
-                String key = (String) stateRequest.getRecordContext().getKey();
-                String namespace = (String) stateRequest.getNamespace();
-                if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_ADD) {
-                    if (stateRequest.getPayload() == null) {
-                        hashMap.remove(Tuple2.of(key, namespace));
-                        stateRequest.getFuture().complete(null);
-                    } else {
-                        hashMap.put(Tuple2.of(key, namespace), (Integer) 
stateRequest.getPayload());
-                        stateRequest.getFuture().complete(null);
-                    }
-                } else if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_GET) {
-                    Integer val = hashMap.get(Tuple2.of(key, namespace));
-                    stateRequest.getFuture().complete(val);
-                } else {
-                    throw new UnsupportedOperationException("Unsupported 
type");
-                }
+                executeRequestSync(stateRequest);
             }
             CompletableFuture<Void> future = new CompletableFuture<>();
             future.complete(null);
@@ -260,6 +246,26 @@ class AbstractAggregatingStateTest extends 
AbstractKeyedStateTestBase {
             return new MockStateRequestContainer();
         }
 
+        @Override
+        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
+            String key = (String) stateRequest.getRecordContext().getKey();
+            String namespace = (String) stateRequest.getNamespace();
+            if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_ADD) {
+                if (stateRequest.getPayload() == null) {
+                    hashMap.remove(Tuple2.of(key, namespace));
+                    stateRequest.getFuture().complete(null);
+                } else {
+                    hashMap.put(Tuple2.of(key, namespace), (Integer) 
stateRequest.getPayload());
+                    stateRequest.getFuture().complete(null);
+                }
+            } else if (stateRequest.getRequestType() == 
StateRequestType.AGGREGATING_GET) {
+                Integer val = hashMap.get(Tuple2.of(key, namespace));
+                ((InternalStateFuture<Integer>) 
stateRequest.getFuture()).complete(val);
+            } else {
+                throw new UnsupportedOperationException("Unsupported type");
+            }
+        }
+
         @Override
         public boolean fullyLoaded() {
             return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index 19c408025b5..00ca94d6041 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.state.InternalStateFuture;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
 import org.apache.flink.runtime.asyncprocessing.StateRequest;
@@ -236,14 +237,9 @@ public class AbstractKeyedStateTestBase {
         @Override
         public CompletableFuture<Void> executeBatchRequests(
                 StateRequestContainer stateRequestContainer) {
-            receivedRequest.addAll(((TestStateRequestContainer) 
stateRequestContainer).requests);
-            for (StateRequest request : receivedRequest) {
-                if (request.getRequestType() == StateRequestType.MAP_CONTAINS
-                        || request.getRequestType() == 
StateRequestType.MAP_IS_EMPTY) {
-                    request.getFuture().complete(true);
-                } else {
-                    request.getFuture().complete(null);
-                }
+            for (StateRequest request :
+                    ((TestStateRequestContainer) 
stateRequestContainer).requests) {
+                executeRequestSync(request);
             }
             CompletableFuture<Void> future = new CompletableFuture<>();
             future.complete(null);
@@ -255,6 +251,17 @@ public class AbstractKeyedStateTestBase {
             return new TestStateRequestContainer();
         }
 
+        @Override
+        public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
+            receivedRequest.add(request);
+            if (request.getRequestType() == StateRequestType.MAP_CONTAINS
+                    || request.getRequestType() == 
StateRequestType.MAP_IS_EMPTY) {
+                ((InternalStateFuture<Boolean>) 
request.getFuture()).complete(true);
+            } else {
+                request.getFuture().complete(null);
+            }
+        }
+
         @Override
         public boolean fullyLoaded() {
             return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
index 9c4de234c41..b6a3eb7144b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
@@ -186,6 +186,11 @@ public class AbstractReducingStateTest extends 
AbstractKeyedStateTestBase {
             return new MockStateRequestContainer();
         }
 
+        @Override
+        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
+            throw new UnsupportedOperationException("Unsupported synchronous 
execution");
+        }
+
         @Override
         public boolean fullyLoaded() {
             return false;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 6640181eb50..44814efb496 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -39,6 +40,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static 
org.apache.flink.state.forst.ForStStateRequestClassifier.convertRequests;
+
 /**
  * The {@link StateExecutor} implementation which executing batch {@link 
StateRequest}s for
  * ForStStateBackend.
@@ -74,6 +77,9 @@ public class ForStStateExecutor implements StateExecutor {
     /** The ongoing sub-processes count. */
     private final AtomicLong ongoing;
 
+    private final ExecutorService directExecutor =
+            
org.apache.flink.util.concurrent.Executors.newDirectExecutorService();
+
     public ForStStateExecutor(
             boolean coordinatorInline,
             boolean isWriteInline,
@@ -85,7 +91,7 @@ public class ForStStateExecutor implements StateExecutor {
             Preconditions.checkState(readIoParallelism > 0);
             this.coordinatorThread =
                     coordinatorInline
-                            ? 
org.apache.flink.util.concurrent.Executors.newDirectExecutorService()
+                            ? directExecutor
                             : Executors.newSingleThreadExecutor(
                                     new ExecutorThreadFactory(
                                             
"ForSt-StateExecutor-Coordinator-And-Write"));
@@ -94,14 +100,13 @@ public class ForStStateExecutor implements StateExecutor {
                     Executors.newFixedThreadPool(
                             readIoParallelism,
                             new 
ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
-            this.writeThreads =
-                    
org.apache.flink.util.concurrent.Executors.newDirectExecutorService();
+            this.writeThreads = directExecutor;
             this.sharedWriteThread = true;
         } else {
             Preconditions.checkState(readIoParallelism > 0 || 
writeIoParallelism > 0);
             this.coordinatorThread =
                     coordinatorInline
-                            ? 
org.apache.flink.util.concurrent.Executors.newDirectExecutorService()
+                            ? directExecutor
                             : Executors.newSingleThreadExecutor(
                                     new 
ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
             if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
@@ -223,6 +228,53 @@ public class ForStStateExecutor implements StateExecutor {
         return new ForStStateRequestClassifier();
     }
 
+    @Override
+    public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
+        checkState();
+        Object forstRequest = convertRequests(stateRequest);
+        try {
+            ForStDBOperation operation;
+            if (forstRequest instanceof ForStDBGetRequest) {
+                operation =
+                        new ForStGeneralMultiGetOperation(
+                                db,
+                                Collections.singletonList(
+                                        (ForStDBGetRequest<?, ?, ?, ?>) 
forstRequest),
+                                directExecutor,
+                                1,
+                                null);
+            } else if (forstRequest instanceof ForStDBIterRequest) {
+                operation =
+                        new ForStIterateOperation(
+                                db,
+                                Collections.singletonList(
+                                        (ForStDBIterRequest<?, ?, ?, ?, ?>) 
forstRequest),
+                                directExecutor,
+                                null);
+            } else if (forstRequest instanceof ForStDBPutRequest) {
+                operation =
+                        new ForStWriteBatchOperation(
+                                db,
+                                Collections.singletonList(
+                                        (ForStDBPutRequest<?, ?, ?>) 
forstRequest),
+                                writeOptions,
+                                directExecutor);
+            } else {
+                throw new IllegalArgumentException("Unknown request type: " + 
forstRequest);
+            }
+            operation
+                    .process()
+                    .exceptionally(
+                            throwable -> {
+                                executionError = throwable;
+                                return null;
+                            });
+        } catch (Exception e) {
+            executionError = e;
+        }
+        checkState();
+    }
+
     @Override
     public boolean fullyLoaded() {
         return ongoing.get() >= readThreadCount;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
index 98b3a6438e3..e2ad8d7d1a2 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java
@@ -46,7 +46,14 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
 
     @Override
     public void offer(StateRequest<?, ?, ?, ?> stateRequest) {
-        convertStateRequestsToForStDBRequests(stateRequest);
+        Object forstDbRequest = convertRequests(stateRequest);
+        if (forstDbRequest instanceof ForStDBGetRequest) {
+            dbGetRequests.add((ForStDBGetRequest<?, ?, ?, ?>) forstDbRequest);
+        } else if (forstDbRequest instanceof ForStDBPutRequest) {
+            dbPutRequests.add((ForStDBPutRequest<?, ?, ?>) forstDbRequest);
+        } else {
+            dbIterRequests.add((ForStDBIterRequest<?, ?, ?, ?, ?>) 
forstDbRequest);
+        }
     }
 
     @Override
@@ -55,7 +62,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
     }
 
     @SuppressWarnings("ConstantConditions")
-    private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?, 
?> stateRequest) {
+    public static Object convertRequests(StateRequest<?, ?, ?, ?> 
stateRequest) {
         StateRequestType stateRequestType = stateRequest.getRequestType();
         switch (stateRequestType) {
             case VALUE_GET:
@@ -68,8 +75,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
                 {
                     ForStInnerTable<?, ?, ?> innerTable =
                             (ForStInnerTable<?, ?, ?>) stateRequest.getState();
-                    
dbGetRequests.add(innerTable.buildDBGetRequest(stateRequest));
-                    return;
+                    return innerTable.buildDBGetRequest(stateRequest);
                 }
             case VALUE_UPDATE:
             case LIST_UPDATE:
@@ -82,8 +88,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
                 {
                     ForStInnerTable<?, ?, ?> innerTable =
                             (ForStInnerTable<?, ?, ?>) stateRequest.getState();
-                    
dbPutRequests.add(innerTable.buildDBPutRequest(stateRequest));
-                    return;
+                    return innerTable.buildDBPutRequest(stateRequest);
                 }
             case MAP_ITER:
             case MAP_ITER_KEY:
@@ -92,28 +97,24 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
                 {
                     ForStMapState<?, ?, ?, ?> forStMapState =
                             (ForStMapState<?, ?, ?, ?>) 
stateRequest.getState();
-                    
dbIterRequests.add(forStMapState.buildDBIterRequest(stateRequest));
-                    return;
+                    return forStMapState.buildDBIterRequest(stateRequest);
                 }
             case MAP_PUT_ALL:
                 {
                     ForStMapState<?, ?, ?, ?> forStMapState =
                             (ForStMapState<?, ?, ?, ?>) 
stateRequest.getState();
-                    
dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest));
-                    return;
+                    return forStMapState.buildDBBunchPutRequest(stateRequest);
                 }
             case CLEAR:
                 {
                     if (stateRequest.getState() instanceof ForStMapState) {
                         ForStMapState<?, ?, ?, ?> forStMapState =
                                 (ForStMapState<?, ?, ?, ?>) 
stateRequest.getState();
-                        
dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest));
-                        return;
+                        return 
forStMapState.buildDBBunchPutRequest(stateRequest);
                     } else if (stateRequest.getState() instanceof 
ForStInnerTable) {
                         ForStInnerTable<?, ?, ?> innerTable =
                                 (ForStInnerTable<?, ?, ?>) 
stateRequest.getState();
-                        
dbPutRequests.add(innerTable.buildDBPutRequest(stateRequest));
-                        return;
+                        return innerTable.buildDBPutRequest(stateRequest);
                     } else {
                         throw new UnsupportedOperationException(
                                 "The State "
@@ -123,8 +124,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
                 }
             case CUSTOMIZED:
                 {
-                    handleCustomizedStateRequests(stateRequest);
-                    return;
+                    return handleCustomizedStateRequests(stateRequest);
                 }
             default:
                 throw new UnsupportedOperationException(
@@ -133,7 +133,7 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
     }
 
     @SuppressWarnings("unchecked")
-    private void handleCustomizedStateRequests(StateRequest<?, ?, ?, ?> 
stateRequest) {
+    private static Object handleCustomizedStateRequests(StateRequest<?, ?, ?, 
?> stateRequest) {
         Tuple2<ForStStateRequestType, ?> payload =
                 (Tuple2<ForStStateRequestType, ?>) stateRequest.getPayload();
         ForStStateRequestType requestType = payload.f0;
@@ -142,15 +142,13 @@ public class ForStStateRequestClassifier implements 
StateRequestContainer {
                 {
                     ForStListState<?, ?, ?> forStListState =
                             (ForStListState<?, ?, ?>) stateRequest.getState();
-                    
dbGetRequests.add(forStListState.buildDBGetRequest(stateRequest));
-                    return;
+                    return forStListState.buildDBGetRequest(stateRequest);
                 }
             case MERGE_ALL_RAW:
                 {
                     ForStListState<?, ?, ?> forStListState =
                             (ForStListState<?, ?, ?>) stateRequest.getState();
-                    
dbPutRequests.add(forStListState.buildDBPutRequest(stateRequest));
-                    return;
+                    return forStListState.buildDBPutRequest(stateRequest);
                 }
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
index f31e27dbe2d..600a2f7a740 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
@@ -349,7 +349,8 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         RecordContext<K> recordContext =
                 new RecordContext<>(record, key, t -> {}, keyGroup, new 
Epoch(0), 0);
         TestStateFuture stateFuture = new TestStateFuture<>();
-        return new StateRequest<>(innerTable, requestType, value, stateFuture, 
recordContext);
+        return new StateRequest<>(
+                innerTable, requestType, false, value, stateFuture, 
recordContext);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -363,6 +364,7 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
         RecordContext<K> recordContext =
                 new RecordContext<>(record, key, t -> {}, keyGroup, new 
Epoch(0), 0);
         TestStateFuture stateFuture = new TestStateFuture<>();
-        return new StateRequest<>(innerTable, requestType, value, stateFuture, 
recordContext);
+        return new StateRequest<>(
+                innerTable, requestType, false, value, stateFuture, 
recordContext);
     }
 }


Reply via email to