fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577782892
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##########
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator<OUT>
extends AbstractStre
private RecordContext currentProcessingContext;
+ private Environment environment;
+
/** Initialize necessary state components for {@link
AbstractStreamOperator}. */
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
- // TODO: properly read config and setup
- final MailboxExecutor mailboxExecutor =
- containingTask.getEnvironment().getMainMailboxExecutor();
- this.asyncExecutionController = new
AsyncExecutionController(mailboxExecutor, null);
+ final Environment environment = containingTask.getEnvironment();
+ final MailboxExecutor mailboxExecutor =
environment.getMainMailboxExecutor();
+ final int inFlightRecordsLimit =
+
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+ final int asyncBufferSize =
environment.getExecutionConfig().getAsyncStateBufferSize();
+ final long asyncBufferTimeout =
+ environment.getExecutionConfig().getAsyncStateBufferTimeout();
+ // TODO: initial state executor and set state executor for aec
+ this.asyncExecutionController =
+ new AsyncExecutionController(
+ mailboxExecutor,
+ this::handleStateCallbackException,
+ null,
+ asyncBufferSize,
+ asyncBufferTimeout,
+ inFlightRecordsLimit);
+ }
+
+ private void handleStateCallbackException(String message, Throwable
exception) {
Review Comment:
AEC is transparent to the user, and users should not be aware of its
internal implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]