Zakelly commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1590642587


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -104,15 +105,21 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
-            StateExecutor stateExecutor,
+            AsyncKeyedStateBackend asyncKeyedStateBackend,
             int maxParallelism,
             int batchSize,
             long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
-        this.stateExecutor = stateExecutor;
+        if (asyncKeyedStateBackend != null) {

Review Comment:
   Well, I have different opinions... I suggest keeping `AEC` and `AKSB` simple 
by only keeping the necessary knowledge. It seems they work well even though 
they don't know each other. So I'd prefer do the binding within the operator. 
The UT is more easy to build if the `AEC` only takes `StateExecutor` in 
constructor. WDTY?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java:
##########
@@ -176,15 +180,31 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
         try {
 
             // -------------- Keyed State Backend --------------
-            keyedStatedBackend =
-                    keyedStatedBackend(
-                            keySerializer,
-                            operatorIdentifierText,
-                            prioritizedOperatorSubtaskStates,
-                            streamTaskCloseableRegistry,
-                            metricGroup,
-                            managedMemoryFraction,
-                            statsCollector);
+            // TODO: Only init async keyed state backend if supported, 
consider to init different
+            // state backend by the operator type.
+            if (stateBackend.supportsAsyncKeyedStateBackend()) {
+                asyncKeyedStateBackend =
+                        keyedStatedBackend(
+                                keySerializer,
+                                operatorIdentifierText,
+                                prioritizedOperatorSubtaskStates,
+                                streamTaskCloseableRegistry,
+                                metricGroup,
+                                managedMemoryFraction,
+                                statsCollector,
+                                StateBackend::createAsyncKeyedStateBackend);
+            } else {

Review Comment:
   Do you mean only create one type of state backend (sync/async). I'd suggest 
splitting this up, like 
https://github.com/apache/flink/pull/24697/files#diff-60f97b32726f78481f7852ea82daa2f3c9919933c93c234a049442195ccf0e2b
 does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to