This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 180d587717b [FLINK-37164][Runtime] Speed up state v2 synchronous methods execution (#26005) 180d587717b is described below commit 180d587717ba0997c35f89e080974851eea7a938 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Sat Jan 18 11:56:51 2025 +0800 [FLINK-37164][Runtime] Speed up state v2 synchronous methods execution (#26005) --- .../asyncprocessing/AsyncExecutionController.java | 55 ++++++++++------ .../runtime/asyncprocessing/StateExecutor.java | 7 ++ .../runtime/asyncprocessing/StateRequest.java | 8 +++ .../asyncprocessing/StateRequestBuffer.java | 11 +--- .../asyncprocessing/AbstractStateIteratorTest.java | 74 ++++++++++++---------- .../AsyncExecutionControllerTest.java | 46 ++++++++------ .../runtime/asyncprocessing/MockStateExecutor.java | 7 +- .../state/v2/AbstractAggregatingStateTest.java | 38 ++++++----- .../state/v2/AbstractKeyedStateTestBase.java | 23 ++++--- .../state/v2/AbstractReducingStateTest.java | 5 ++ .../flink/state/forst/ForStStateExecutor.java | 60 ++++++++++++++++-- .../state/forst/ForStStateRequestClassifier.java | 40 ++++++------ .../flink/state/forst/ForStStateExecutorTest.java | 6 +- 13 files changed, 248 insertions(+), 132 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index e4b0f6f9c8e..cc7eed91d83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -305,7 +305,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab @Override public <IN, OUT> InternalStateFuture<OUT> handleRequest( @Nullable State state, StateRequestType type, @Nullable IN payload) { - return handleRequest(state, type, payload, false); + return handleRequest(state, type, false, payload, false); } /** @@ -314,6 +314,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * @param state the state to request. Could be {@code null} if the type is {@link * StateRequestType#SYNC_POINT}. * @param type the type of this request. + * @param sync whether to trigger the request synchronously once it's ready. * @param payload the payload input for this request. * @param allowOverdraft whether to allow overdraft. * @return the state future. @@ -321,12 +322,19 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab public <IN, OUT> InternalStateFuture<OUT> handleRequest( @Nullable State state, StateRequestType type, + boolean sync, @Nullable IN payload, boolean allowOverdraft) { // Step 1: build state future & assign context. InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext); StateRequest<K, ?, IN, OUT> request = - new StateRequest<>(state, type, payload, stateFuture, currentContext); + new StateRequest<>( + state, + type, + sync || type == StateRequestType.SYNC_POINT, + payload, + stateFuture, + currentContext); // Step 2: try to seize the capacity, if the current in-flight records exceeds the limit, // block the current state request from entering until some buffered requests are processed. @@ -346,22 +354,25 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab @Override public <IN, OUT> OUT handleRequestSync( State state, StateRequestType type, @Nullable IN payload) { - InternalStateFuture<OUT> stateFuture = handleRequest(state, type, payload); - // Trigger since we are waiting the result. - triggerIfNeeded(true); - try { - while (!stateFuture.isDone()) { - if (!mailboxExecutor.tryYield()) { - // We force trigger the buffer if the executor is not fully loaded. - if (!stateExecutor.fullyLoaded()) { - triggerIfNeeded(true); + InternalStateFuture<OUT> stateFuture = handleRequest(state, type, true, payload, false); + if (!stateFuture.isDone()) { + // Trigger since we are waiting the result. + triggerIfNeeded(true); + try { + while (!stateFuture.isDone()) { + if (!mailboxExecutor.tryYield()) { + // We force trigger the buffer if the executor is not fully loaded. + if (!stateExecutor.fullyLoaded()) { + triggerIfNeeded(true); + } + waitForNewMails(); } - waitForNewMails(); } + } catch (InterruptedException ignored) { + // ignore the interrupted exception to avoid throwing fatal error when the task + // cancel + // or exit. } - } catch (InterruptedException ignored) { - // ignore the interrupted exception to avoid throwing fatal error when the task cancel - // or exit. } return stateFuture.get(); } @@ -373,7 +384,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab } <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) { - stateRequestsBuffer.enqueueToActive(request); + if (request.isSync()) { + if (request.getRequestType() == StateRequestType.SYNC_POINT) { + request.getFuture().complete(null); + } else { + stateExecutor.executeRequestSync(request); + } + } else { + stateRequestsBuffer.enqueueToActive(request); + } } <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) { @@ -441,7 +460,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab */ public StateFuture<Void> syncPointRequestWithCallback( ThrowingRunnable<Exception> callback, boolean allowOverdraft) { - return handleRequest(null, StateRequestType.SYNC_POINT, null, allowOverdraft) + return handleRequest(null, StateRequestType.SYNC_POINT, true, null, allowOverdraft) .thenAccept(v -> callback.run()); } @@ -464,7 +483,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * only drain in best efforts and return when no progress is made. */ private void drainInflightRecords(int targetNum, boolean forceToWait) { - if (!forceToWait && drainDepth > 0) { + if (!forceToWait && drainDepth > 5) { // We don't allow recursive call of drain if we are not forced to wait here. // This is to avoid stack overflow, since the yield will pick up another processing, // which may cause another drain. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java index 924d2557416..34803a5a5f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java @@ -45,6 +45,13 @@ public interface StateExecutor { */ StateRequestContainer createStateRequestContainer(); + /** + * Execute a single state request *synchronously*. This is for synchronous APIs. + * + * @param stateRequest the request to run. + */ + void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest); + /** * Check if this executor is fully loaded. Will be invoked to determine whether to give more * requests to run or wait for a while. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java index dea08422de6..d7e4ddec1a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java @@ -44,6 +44,8 @@ public class StateRequest<K, N, IN, OUT> implements Serializable { /** The type of this request. */ private final StateRequestType type; + private final boolean sync; + /** The payload(input) of this request. */ @Nullable private final IN payload; @@ -58,11 +60,13 @@ public class StateRequest<K, N, IN, OUT> implements Serializable { public StateRequest( @Nullable State state, StateRequestType type, + boolean sync, @Nullable IN payload, InternalStateFuture<OUT> stateFuture, RecordContext<K> context) { this.state = state; this.type = type; + this.sync = sync; this.payload = payload; this.stateFuture = stateFuture; this.context = context; @@ -79,6 +83,10 @@ public class StateRequest<K, N, IN, OUT> implements Serializable { return payload; } + public boolean isSync() { + return sync; + } + @Nullable public State getState() { return state; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index bcaf1f24501..fe469d90012 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -148,14 +148,9 @@ public class StateRequestBuffer<K> implements Closeable { } void enqueueToActive(StateRequest<K, ?, ?, ?> request) { - if (request.getRequestType() == StateRequestType.SYNC_POINT) { - request.getFuture().complete(null); - } else { - activeQueue.add(request); - if (bufferTimeout > 0 && seqAndTimeout == null) { - seqAndTimeout = - Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout); - } + activeQueue.add(request); + if (bufferTimeout > 0 && seqAndTimeout == null) { + seqAndTimeout = Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java index a05e443d3bc..68ea31ca0ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; @@ -196,40 +197,7 @@ public class AbstractStateIteratorTest { CompletableFuture<Void> future = new CompletableFuture<>(); for (StateRequest request : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { - if (request.getRequestType() == StateRequestType.MAP_ITER) { - ArrayList<Integer> results = new ArrayList<>(step); - for (int i = 0; current < limit && i < step; i++) { - results.add(current++); - } - request.getFuture() - .complete( - new TestIterator( - request.getState(), - request.getRequestType(), - aec, - results, - current, - limit)); - } else if (request.getRequestType() == StateRequestType.ITERATOR_LOADING) { - assertThat(request.getPayload()).isInstanceOf(TestIterator.class); - assertThat(((TestIterator) request.getPayload()).current).isEqualTo(current); - ArrayList<Integer> results = new ArrayList<>(step); - for (int i = 0; current < limit && i < step; i++) { - results.add(current++); - } - request.getFuture() - .complete( - new TestIterator( - request.getState(), - ((TestIterator) request.getPayload()).getRequestType(), - aec, - results, - current, - limit)); - } else { - fail("Unsupported request type " + request.getRequestType()); - } - processedCount.incrementAndGet(); + executeRequestSync(request); } future.complete(null); return future; @@ -240,6 +208,44 @@ public class AbstractStateIteratorTest { return new MockStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> request) { + if (request.getRequestType() == StateRequestType.MAP_ITER) { + ArrayList<Integer> results = new ArrayList<>(step); + for (int i = 0; current < limit && i < step; i++) { + results.add(current++); + } + ((InternalStateFuture<StateIterator<Integer>>) request.getFuture()) + .complete( + new TestIterator( + request.getState(), + request.getRequestType(), + aec, + results, + current, + limit)); + } else if (request.getRequestType() == StateRequestType.ITERATOR_LOADING) { + assertThat(request.getPayload()).isInstanceOf(TestIterator.class); + assertThat(((TestIterator) request.getPayload()).current).isEqualTo(current); + ArrayList<Integer> results = new ArrayList<>(step); + for (int i = 0; current < limit && i < step; i++) { + results.add(current++); + } + ((InternalStateFuture<StateIterator<Integer>>) request.getFuture()) + .complete( + new TestIterator( + request.getState(), + ((TestIterator) request.getPayload()).getRequestType(), + aec, + results, + current, + limit)); + } else { + fail("Unsupported request type " + request.getRequestType()); + } + processedCount.incrementAndGet(); + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 64e78bd853e..cf4ed16b1fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; @@ -847,26 +848,7 @@ class AsyncExecutionControllerTest { CompletableFuture<Void> future = new CompletableFuture<>(); for (StateRequest request : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { - if (request.getRequestType() == StateRequestType.VALUE_GET) { - Preconditions.checkState(request.getState() != null); - TestValueState state = (TestValueState) request.getState(); - Integer val = - state.underlyingState.get( - (String) request.getRecordContext().getKey(), - (String) request.getRecordContext().getNamespace(state)); - request.getFuture().complete(val); - } else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) { - Preconditions.checkState(request.getState() != null); - TestValueState state = (TestValueState) request.getState(); - - state.underlyingState.update( - (String) request.getRecordContext().getKey(), - (String) request.getRecordContext().getNamespace(state), - (Integer) request.getPayload()); - request.getFuture().complete(null); - } else { - throw new UnsupportedOperationException("Unsupported request type"); - } + executeRequestSync(request); } future.complete(null); return future; @@ -877,6 +859,30 @@ class AsyncExecutionControllerTest { return new MockStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> request) { + if (request.getRequestType() == StateRequestType.VALUE_GET) { + Preconditions.checkState(request.getState() != null); + TestValueState state = (TestValueState) request.getState(); + Integer val = + state.underlyingState.get( + (String) request.getRecordContext().getKey(), + (String) request.getRecordContext().getNamespace(state)); + ((InternalStateFuture<Integer>) request.getFuture()).complete(val); + } else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) { + Preconditions.checkState(request.getState() != null); + TestValueState state = (TestValueState) request.getState(); + + state.underlyingState.update( + (String) request.getRecordContext().getKey(), + (String) request.getRecordContext().getNamespace(state), + (Integer) request.getPayload()); + request.getFuture().complete(null); + } else { + throw new UnsupportedOperationException("Unsupported request type"); + } + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java index 51cfbb3b265..474bdb558a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java @@ -31,7 +31,7 @@ public class MockStateExecutor implements StateExecutor { Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer); for (StateRequest<?, ?, ?, ?> request : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { - request.getFuture().complete(null); + executeRequestSync(request); } return CompletableFuture.completedFuture(null); } @@ -41,6 +41,11 @@ public class MockStateExecutor implements StateExecutor { return new MockStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) { + stateRequest.getFuture().complete(null); + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java index 1e8dc088ede..705a2c50d16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer; import org.apache.flink.runtime.asyncprocessing.StateExecutor; @@ -233,22 +234,7 @@ class AbstractAggregatingStateTest extends AbstractKeyedStateTestBase { StateRequestContainer stateRequestContainer) { for (StateRequest stateRequest : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { - String key = (String) stateRequest.getRecordContext().getKey(); - String namespace = (String) stateRequest.getNamespace(); - if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_ADD) { - if (stateRequest.getPayload() == null) { - hashMap.remove(Tuple2.of(key, namespace)); - stateRequest.getFuture().complete(null); - } else { - hashMap.put(Tuple2.of(key, namespace), (Integer) stateRequest.getPayload()); - stateRequest.getFuture().complete(null); - } - } else if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_GET) { - Integer val = hashMap.get(Tuple2.of(key, namespace)); - stateRequest.getFuture().complete(val); - } else { - throw new UnsupportedOperationException("Unsupported type"); - } + executeRequestSync(stateRequest); } CompletableFuture<Void> future = new CompletableFuture<>(); future.complete(null); @@ -260,6 +246,26 @@ class AbstractAggregatingStateTest extends AbstractKeyedStateTestBase { return new MockStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) { + String key = (String) stateRequest.getRecordContext().getKey(); + String namespace = (String) stateRequest.getNamespace(); + if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_ADD) { + if (stateRequest.getPayload() == null) { + hashMap.remove(Tuple2.of(key, namespace)); + stateRequest.getFuture().complete(null); + } else { + hashMap.put(Tuple2.of(key, namespace), (Integer) stateRequest.getPayload()); + stateRequest.getFuture().complete(null); + } + } else if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_GET) { + Integer val = hashMap.get(Tuple2.of(key, namespace)); + ((InternalStateFuture<Integer>) stateRequest.getFuture()).complete(val); + } else { + throw new UnsupportedOperationException("Unsupported type"); + } + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java index 19c408025b5..00ca94d6041 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequest; @@ -236,14 +237,9 @@ public class AbstractKeyedStateTestBase { @Override public CompletableFuture<Void> executeBatchRequests( StateRequestContainer stateRequestContainer) { - receivedRequest.addAll(((TestStateRequestContainer) stateRequestContainer).requests); - for (StateRequest request : receivedRequest) { - if (request.getRequestType() == StateRequestType.MAP_CONTAINS - || request.getRequestType() == StateRequestType.MAP_IS_EMPTY) { - request.getFuture().complete(true); - } else { - request.getFuture().complete(null); - } + for (StateRequest request : + ((TestStateRequestContainer) stateRequestContainer).requests) { + executeRequestSync(request); } CompletableFuture<Void> future = new CompletableFuture<>(); future.complete(null); @@ -255,6 +251,17 @@ public class AbstractKeyedStateTestBase { return new TestStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> request) { + receivedRequest.add(request); + if (request.getRequestType() == StateRequestType.MAP_CONTAINS + || request.getRequestType() == StateRequestType.MAP_IS_EMPTY) { + ((InternalStateFuture<Boolean>) request.getFuture()).complete(true); + } else { + request.getFuture().complete(null); + } + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java index 9c4de234c41..b6a3eb7144b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java @@ -186,6 +186,11 @@ public class AbstractReducingStateTest extends AbstractKeyedStateTestBase { return new MockStateRequestContainer(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) { + throw new UnsupportedOperationException("Unsupported synchronous execution"); + } + @Override public boolean fullyLoaded() { return false; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index 6640181eb50..44814efb496 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -39,6 +40,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.flink.state.forst.ForStStateRequestClassifier.convertRequests; + /** * The {@link StateExecutor} implementation which executing batch {@link StateRequest}s for * ForStStateBackend. @@ -74,6 +77,9 @@ public class ForStStateExecutor implements StateExecutor { /** The ongoing sub-processes count. */ private final AtomicLong ongoing; + private final ExecutorService directExecutor = + org.apache.flink.util.concurrent.Executors.newDirectExecutorService(); + public ForStStateExecutor( boolean coordinatorInline, boolean isWriteInline, @@ -85,7 +91,7 @@ public class ForStStateExecutor implements StateExecutor { Preconditions.checkState(readIoParallelism > 0); this.coordinatorThread = coordinatorInline - ? org.apache.flink.util.concurrent.Executors.newDirectExecutorService() + ? directExecutor : Executors.newSingleThreadExecutor( new ExecutorThreadFactory( "ForSt-StateExecutor-Coordinator-And-Write")); @@ -94,14 +100,13 @@ public class ForStStateExecutor implements StateExecutor { Executors.newFixedThreadPool( readIoParallelism, new ExecutorThreadFactory("ForSt-StateExecutor-read-IO")); - this.writeThreads = - org.apache.flink.util.concurrent.Executors.newDirectExecutorService(); + this.writeThreads = directExecutor; this.sharedWriteThread = true; } else { Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0); this.coordinatorThread = coordinatorInline - ? org.apache.flink.util.concurrent.Executors.newDirectExecutorService() + ? directExecutor : Executors.newSingleThreadExecutor( new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator")); if (readIoParallelism <= 0 || writeIoParallelism <= 0) { @@ -223,6 +228,53 @@ public class ForStStateExecutor implements StateExecutor { return new ForStStateRequestClassifier(); } + @Override + public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) { + checkState(); + Object forstRequest = convertRequests(stateRequest); + try { + ForStDBOperation operation; + if (forstRequest instanceof ForStDBGetRequest) { + operation = + new ForStGeneralMultiGetOperation( + db, + Collections.singletonList( + (ForStDBGetRequest<?, ?, ?, ?>) forstRequest), + directExecutor, + 1, + null); + } else if (forstRequest instanceof ForStDBIterRequest) { + operation = + new ForStIterateOperation( + db, + Collections.singletonList( + (ForStDBIterRequest<?, ?, ?, ?, ?>) forstRequest), + directExecutor, + null); + } else if (forstRequest instanceof ForStDBPutRequest) { + operation = + new ForStWriteBatchOperation( + db, + Collections.singletonList( + (ForStDBPutRequest<?, ?, ?>) forstRequest), + writeOptions, + directExecutor); + } else { + throw new IllegalArgumentException("Unknown request type: " + forstRequest); + } + operation + .process() + .exceptionally( + throwable -> { + executionError = throwable; + return null; + }); + } catch (Exception e) { + executionError = e; + } + checkState(); + } + @Override public boolean fullyLoaded() { return ongoing.get() >= readThreadCount; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index 98b3a6438e3..e2ad8d7d1a2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -46,7 +46,14 @@ public class ForStStateRequestClassifier implements StateRequestContainer { @Override public void offer(StateRequest<?, ?, ?, ?> stateRequest) { - convertStateRequestsToForStDBRequests(stateRequest); + Object forstDbRequest = convertRequests(stateRequest); + if (forstDbRequest instanceof ForStDBGetRequest) { + dbGetRequests.add((ForStDBGetRequest<?, ?, ?, ?>) forstDbRequest); + } else if (forstDbRequest instanceof ForStDBPutRequest) { + dbPutRequests.add((ForStDBPutRequest<?, ?, ?>) forstDbRequest); + } else { + dbIterRequests.add((ForStDBIterRequest<?, ?, ?, ?, ?>) forstDbRequest); + } } @Override @@ -55,7 +62,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } @SuppressWarnings("ConstantConditions") - private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?, ?> stateRequest) { + public static Object convertRequests(StateRequest<?, ?, ?, ?> stateRequest) { StateRequestType stateRequestType = stateRequest.getRequestType(); switch (stateRequestType) { case VALUE_GET: @@ -68,8 +75,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { { ForStInnerTable<?, ?, ?> innerTable = (ForStInnerTable<?, ?, ?>) stateRequest.getState(); - dbGetRequests.add(innerTable.buildDBGetRequest(stateRequest)); - return; + return innerTable.buildDBGetRequest(stateRequest); } case VALUE_UPDATE: case LIST_UPDATE: @@ -82,8 +88,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { { ForStInnerTable<?, ?, ?> innerTable = (ForStInnerTable<?, ?, ?>) stateRequest.getState(); - dbPutRequests.add(innerTable.buildDBPutRequest(stateRequest)); - return; + return innerTable.buildDBPutRequest(stateRequest); } case MAP_ITER: case MAP_ITER_KEY: @@ -92,28 +97,24 @@ public class ForStStateRequestClassifier implements StateRequestContainer { { ForStMapState<?, ?, ?, ?> forStMapState = (ForStMapState<?, ?, ?, ?>) stateRequest.getState(); - dbIterRequests.add(forStMapState.buildDBIterRequest(stateRequest)); - return; + return forStMapState.buildDBIterRequest(stateRequest); } case MAP_PUT_ALL: { ForStMapState<?, ?, ?, ?> forStMapState = (ForStMapState<?, ?, ?, ?>) stateRequest.getState(); - dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest)); - return; + return forStMapState.buildDBBunchPutRequest(stateRequest); } case CLEAR: { if (stateRequest.getState() instanceof ForStMapState) { ForStMapState<?, ?, ?, ?> forStMapState = (ForStMapState<?, ?, ?, ?>) stateRequest.getState(); - dbPutRequests.add(forStMapState.buildDBBunchPutRequest(stateRequest)); - return; + return forStMapState.buildDBBunchPutRequest(stateRequest); } else if (stateRequest.getState() instanceof ForStInnerTable) { ForStInnerTable<?, ?, ?> innerTable = (ForStInnerTable<?, ?, ?>) stateRequest.getState(); - dbPutRequests.add(innerTable.buildDBPutRequest(stateRequest)); - return; + return innerTable.buildDBPutRequest(stateRequest); } else { throw new UnsupportedOperationException( "The State " @@ -123,8 +124,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } case CUSTOMIZED: { - handleCustomizedStateRequests(stateRequest); - return; + return handleCustomizedStateRequests(stateRequest); } default: throw new UnsupportedOperationException( @@ -133,7 +133,7 @@ public class ForStStateRequestClassifier implements StateRequestContainer { } @SuppressWarnings("unchecked") - private void handleCustomizedStateRequests(StateRequest<?, ?, ?, ?> stateRequest) { + private static Object handleCustomizedStateRequests(StateRequest<?, ?, ?, ?> stateRequest) { Tuple2<ForStStateRequestType, ?> payload = (Tuple2<ForStStateRequestType, ?>) stateRequest.getPayload(); ForStStateRequestType requestType = payload.f0; @@ -142,15 +142,13 @@ public class ForStStateRequestClassifier implements StateRequestContainer { { ForStListState<?, ?, ?> forStListState = (ForStListState<?, ?, ?>) stateRequest.getState(); - dbGetRequests.add(forStListState.buildDBGetRequest(stateRequest)); - return; + return forStListState.buildDBGetRequest(stateRequest); } case MERGE_ALL_RAW: { ForStListState<?, ?, ?> forStListState = (ForStListState<?, ?, ?>) stateRequest.getState(); - dbPutRequests.add(forStListState.buildDBPutRequest(stateRequest)); - return; + return forStListState.buildDBPutRequest(stateRequest); } default: throw new UnsupportedOperationException( diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index f31e27dbe2d..600a2f7a740 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -349,7 +349,8 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { RecordContext<K> recordContext = new RecordContext<>(record, key, t -> {}, keyGroup, new Epoch(0), 0); TestStateFuture stateFuture = new TestStateFuture<>(); - return new StateRequest<>(innerTable, requestType, value, stateFuture, recordContext); + return new StateRequest<>( + innerTable, requestType, false, value, stateFuture, recordContext); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -363,6 +364,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { RecordContext<K> recordContext = new RecordContext<>(record, key, t -> {}, keyGroup, new Epoch(0), 0); TestStateFuture stateFuture = new TestStateFuture<>(); - return new StateRequest<>(innerTable, requestType, value, stateFuture, recordContext); + return new StateRequest<>( + innerTable, requestType, false, value, stateFuture, recordContext); } }