yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878
##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean
useSnapshotCompression) {
configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION,
useSnapshotCompression);
}
+ //
--------------------------------------------------------------------------------------------
+ // Asynchronous execution configurations
+ //
--------------------------------------------------------------------------------------------
+
+ @Internal
Review Comment:
It might be better to mark them as `@Experimental` instead of `@Internal`.
- Internal: Annotation to mark methods within **stable**, public APIs as an
internal developer API.
- Experimental: Classes with this annotation are neither battle-tested **nor
stable**, and may be changed or removed in future versions.
##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
+ " operators. NOTE: It takes effect only
in the BATCH runtime mode and requires sorted inputs"
+ SORT_INPUTS.key()
+ " to be enabled.");
+
+ /**
+ * A flag to enable or disable async mode related components when tasks
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable
this option would bring
+ * some overhead.
+ *
+ * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+ @Experimental
+ public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+ ConfigOptions.key("execution.async-mode.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "A flag to enable or disable async mode related
components when tasks initialize."
+ + " As long as this option is enabled, the
state access of Async state APIs will be executed asynchronously."
+ + " Otherwise, the state access of Async
state APIs will be executed synchronously."
+ + " For Sync state APIs, the state access
is always executed synchronously, enable this option would bring some
overhead.\n"
Review Comment:
It might be better to remove this configuration for now and add it in future
when such use cases are found.
- Whether to enable async state access can be inferred automatically by the
Flink infrastructure, depending on the location of state backends and the state
API used in operators.
- It is better not to expose implementation details, like the sync/async
modes mentioned here, to end users. So long as the order to same-key records
and the order of async state callbacks are guaranteed, that is enough for users.
- Especially, the implementation in the sync mode might be altered in
future, in order to improve performance in situations where sync state API +
remote state backend is used. In this case the statement "the state access is
always executed synchronously" should cause deprecation issues.
##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
+ " operators. NOTE: It takes effect only
in the BATCH runtime mode and requires sorted inputs"
+ SORT_INPUTS.key()
+ " to be enabled.");
+
+ /**
+ * A flag to enable or disable async mode related components when tasks
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable
this option would bring
+ * some overhead.
+ *
+ * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+ @Experimental
+ public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+ ConfigOptions.key("execution.async-mode.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "A flag to enable or disable async mode related
components when tasks initialize."
+ + " As long as this option is enabled, the
state access of Async state APIs will be executed asynchronously."
+ + " Otherwise, the state access of Async
state APIs will be executed synchronously."
+ + " For Sync state APIs, the state access
is always executed synchronously, enable this option would bring some
overhead.\n"
+ + " Note: This is an experimental feature
under evaluation.");
+
+ /**
+ * The max limit of in-flight records number in async execution mode,
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the
newly records entering
+ * will be blocked until the in-flight records number drops below the
limit.
+ */
+ @Experimental
+ public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+ ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+ .intType()
+ .defaultValue(6000)
+ .withDescription(
+ "The max limit of in-flight records number in
async execution mode, 'in-flight' refers"
+ + " to the records that have entered the
operator but have not yet been processed and"
+ + " emitted to the downstream. If the
in-flight records number exceeds the limit,"
+ + " the newly records entering will be
blocked until the in-flight records number drops below the limit.");
+
+ /**
+ * The size of buffer under async execution mode. Async execution mode
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch
sizes will bring
+ * higher end-to-end latency, this option works with {@link
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+ @Experimental
+ public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+ ConfigOptions.key("execution.async-mode.buffer-size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The size of buffer under async execution mode.
Async execution mode provides a buffer mechanism to reduce state access."
+ + " When the number of state requests in
the active buffer exceeds the batch size,"
+ + " a batched state execution would be
triggered. Larger batch sizes will bring higher end-to-end latency,"
+ + " this option works with
'execution.async-state.buffer-timeout' to control the frequency of
triggering.");
+
+ /**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger
will perform actively.
+ */
+ @Experimental
+ public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+ ConfigOptions.key("execution.async-state.buffer-timeout")
Review Comment:
It might be better to change the `execution.async-mode` above to
`execution.async-state`.
##########
docs/layouts/shortcodes/generated/execution_configuration.html:
##########
@@ -8,6 +8,31 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>execution.async-mode.buffer-size</h5></td>
Review Comment:
Given that these configurations are only for internal experimental use, it
might be better not to expose them in documents.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]