jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1579119199


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##########
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
 
     private RecordContext currentProcessingContext;
 
+    private Environment environment;
+
     /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
     @Override
     public void setup(
             StreamTask<?, ?> containingTask,
             StreamConfig config,
             Output<StreamRecord<OUT>> output) {
         super.setup(containingTask, config, output);
-        // TODO: properly read config and setup
-        final MailboxExecutor mailboxExecutor =
-                containingTask.getEnvironment().getMainMailboxExecutor();
-        this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+        final Environment environment = containingTask.getEnvironment();
+        final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
+        final int inFlightRecordsLimit =
+                
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+        final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
+        final long asyncBufferTimeout =
+                environment.getExecutionConfig().getAsyncStateBufferTimeout();
+        // TODO: initial state executor and set state executor for aec
+        this.asyncExecutionController =
+                new AsyncExecutionController(
+                        mailboxExecutor,
+                        this::handleStateCallbackException,
+                        null,
+                        asyncBufferSize,
+                        asyncBufferTimeout,
+                        inFlightRecordsLimit);
+    }
+
+    private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   Sorry my bad, I thought it is AsyncOperator for user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to