fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750
########## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ########## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + + /** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * <p>Note: This is an experimental feature(FLIP-425) under evaluation. + */ + @Experimental + public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED = + ConfigOptions.key("execution.async-mode.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "A flag to enable or disable async mode related components when tasks initialize." + + " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." + + " Otherwise, the state access of Async state APIs will be executed synchronously." + + " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" + + " Note: This is an experimental feature under evaluation."); + + /** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ + @Experimental + public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT = + ConfigOptions.key("execution.async-mode.in-flight-records-limit") + .intType() + .defaultValue(6000) + .withDescription( + "The max limit of in-flight records number in async execution mode, 'in-flight' refers" + + " to the records that have entered the operator but have not yet been processed and" + + " emitted to the downstream. If the in-flight records number exceeds the limit," + + " the newly records entering will be blocked until the in-flight records number drops below the limit."); + + /** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ + @Experimental + public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE = + ConfigOptions.key("execution.async-mode.buffer-size") + .intType() + .defaultValue(1000) + .withDescription( + "The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." + + " When the number of state requests in the active buffer exceeds the batch size," + + " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," + + " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + + /** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ + @Experimental + public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT = + ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Good point👍, changed it to `execution.async-mode.state-buffer-timeout`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org