yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1572344539
##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,62 @@ public class ExecutionOptions {
+ " operators. NOTE: It takes effect only
in the BATCH runtime mode and requires sorted inputs"
+ SORT_INPUTS.key()
+ " to be enabled.");
+
+ // ------------------------- Async State Execution
--------------------------
+
+ /**
+ * The max limit of in-flight records number in async state execution,
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the
newly records entering
+ * will be blocked until the in-flight records number drops below the
limit.
+ */
+ @Experimental
+ @Documentation.ExcludeFromDocumentation(
+ "This is an experimental option, internal use only for now.")
+ public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+ ConfigOptions.key("execution.async-state.in-flight-records-limit")
+ .intType()
+ .defaultValue(6000)
+ .withDescription(
+ "The max limit of in-flight records number in
async state execution, 'in-flight' refers"
+ + " to the records that have entered the
operator but have not yet been processed and"
+ + " emitted to the downstream. If the
in-flight records number exceeds the limit,"
+ + " the newly records entering will be
blocked until the in-flight records number drops below the limit.");
+
+ /**
+ * The size of buffer under async state execution. Async state execution
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch
sizes will bring
+ * higher end-to-end latency, this option works with {@link
#ASYNC_STATE_BUFFER_TIMEOUT} to
+ * control the frequency of triggering.
+ */
+ @Experimental
+ @Documentation.ExcludeFromDocumentation(
+ "This is an experimental option, internal use only for now.")
+ public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_SIZE =
+ ConfigOptions.key("execution.async-state.buffer-size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The size of buffer under async state execution.
Async state execution provides a buffer mechanism to reduce state access."
+ + " When the number of state requests in
the active buffer exceeds the batch size,"
+ + " a batched state execution would be
triggered. Larger batch sizes will bring higher end-to-end latency,"
+ + " this option works with
'execution.async-state.buffer-timeout' to control the frequency of
triggering.");
+
+ /**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not
reached the {@link
+ * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a
trigger will perform
+ * actively.
+ */
+ @Experimental
+ @Documentation.ExcludeFromDocumentation(
+ "This is an experimental option, internal use only for now.")
+ public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_TIMEOUT =
Review Comment:
It might be better to make time configurations as `long` instead of as `int`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -50,15 +50,24 @@ public class AsyncExecutionController<K> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncExecutionController.class);
- public static final int DEFAULT_BATCH_SIZE = 1000;
- public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
+ private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
+ private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
/**
* The batch size. When the number of state requests in the active buffer
exceeds the batch
* size, a batched state execution would be triggered.
*/
private final int batchSize;
+ /**
+ * The timeout of {@link StateRequestBuffer#activeQueue} triggering in
milliseconds. If the
+ * activeQueue has not reached the {@link #batchSize} within
'buffer-timeout' milliseconds, a
+ * trigger will perform actively.
+ */
+ private final int bufferTimeOut;
Review Comment:
There is still a TODO in AsyncExecutionController#triggerIfNeeded that says
```
// TODO: introduce a timeout mechanism for triggering.
```
Given that the configuration bufferTimeout is introduced in this PR, shall
we provide the implementation for this configuration as well?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,19 +103,26 @@ public class AsyncExecutionController<K> {
final AtomicInteger inFlightRecordNum;
public AsyncExecutionController(MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor) {
- this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE,
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+ this(
+ mailboxExecutor,
+ stateExecutor,
+ DEFAULT_BATCH_SIZE,
+ DEFAULT_BUFFER_TIMEOUT,
+ DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
Review Comment:
How about remove the constants and use
`ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the ExecutionConfig? This can
help avoid maintaining the same value in two places.
##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean
useSnapshotCompression) {
configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION,
useSnapshotCompression);
}
+ //
--------------------------------------------------------------------------------------------
+ // Asynchronous execution configurations
+ //
--------------------------------------------------------------------------------------------
+
+ @Internal
Review Comment:
Hmm, seems that they are still `@Internal` rather than `@Experimental` in
this version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]