This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bf6333cfa22 [FLINK-37978][table] Change buffer-capacity to max-concurrent-operations for ml_predict (#26706) bf6333cfa22 is described below commit bf6333cfa228defb1990459d77c775e2cc9f9f5c Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Sun Jun 22 19:21:38 2025 -0700 [FLINK-37978][table] Change buffer-capacity to max-concurrent-operations for ml_predict (#26706) --- .../generated/execution_config_configuration.html | 2 +- .../generated/ml_predict_runtime_config_configuration.html | 2 +- .../flink/table/api/config/ExecutionConfigOptions.java | 13 +++++++------ .../table/api/config/MLPredictRuntimeConfigOptions.java | 6 +++--- .../table/planner/functions/sql/ml/SqlMLTableFunction.java | 9 +++++---- .../nodes/exec/stream/StreamExecMLPredictTableFunction.java | 2 +- .../flink/table/planner/plan/utils/MLPredictUtils.java | 6 +++--- .../plan/nodes/exec/stream/MLPredictRestoreTest.java | 2 +- .../planner/plan/stream/sql/MLPredictTableFunctionTest.java | 5 ++--- 9 files changed, 24 insertions(+), 23 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 0e0753cbab8..8811fa297fa 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -33,7 +33,7 @@ <td>The async timeout for the asynchronous operation to complete.</td> </tr> <tr> - <td><h5>table.exec.async-ml-predict.buffer-capacity</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td><h5>table.exec.async-ml-predict.max-concurrent-operations</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> <td>The max number of async i/o operation that the async ml predict can trigger.</td> diff --git a/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html b/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html index 2c1add88edf..dd935e658c1 100644 --- a/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html +++ b/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html @@ -15,7 +15,7 @@ <td>Value can be 'true' or 'false' to suggest the planner choose the corresponding predict function. If the backend predict function provider does not support the suggested mode, it will throw exception to notify users.</td> </tr> <tr> - <td><h5>capacity</h5></td> + <td><h5>max-concurrent-operations</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>The max number of async i/o operation that the async ml predict can trigger.</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 3675505099e..0f269fd10cf 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -461,12 +461,13 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) - public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY = - key("table.exec.async-ml-predict.buffer-capacity") - .intType() - .defaultValue(10) - .withDescription( - "The max number of async i/o operation that the async ml predict can trigger."); + public static final ConfigOption<Integer> + TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS = + key("table.exec.async-ml-predict.max-concurrent-operations") + .intType() + .defaultValue(10) + .withDescription( + "The max number of async i/o operation that the async ml predict can trigger."); @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java index 14d543c0002..bca2d09010b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java @@ -54,8 +54,8 @@ public class MLPredictRuntimeConfigOptions { + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " + "affect the correctness of the result, otherwise ORDERED will be still used."); - public static final ConfigOption<Integer> ASYNC_CAPACITY = - key("capacity") + public static final ConfigOption<Integer> ASYNC_MAX_CONCURRENT_OPERATIONS = + key("max-concurrent-operations") .intType() .noDefaultValue() .withDescription( @@ -74,7 +74,7 @@ public class MLPredictRuntimeConfigOptions { static { supportedKeys.add(ASYNC); supportedKeys.add(ASYNC_OUTPUT_MODE); - supportedKeys.add(ASYNC_CAPACITY); + supportedKeys.add(ASYNC_MAX_CONCURRENT_OPERATIONS); supportedKeys.add(ASYNC_TIMEOUT); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java index ebdb9b05725..d88884d8c74 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java @@ -54,7 +54,7 @@ import java.util.Map; import java.util.Optional; import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC; -import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_CAPACITY; +import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS; import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType; import static org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING; @@ -247,13 +247,14 @@ public abstract class SqlMLTableFunction extends SqlFunction implements SqlTable // async options are all optional Boolean async = config.get(ASYNC); if (Boolean.TRUE.equals(async)) { - Integer capacity = config.get(ASYNC_CAPACITY); - if (capacity != null && capacity <= 0) { + Integer maxConcurrentOperations = config.get(ASYNC_MAX_CONCURRENT_OPERATIONS); + if (maxConcurrentOperations != null && maxConcurrentOperations <= 0) { return Optional.of( new ValidationException( String.format( "Invalid runtime config option '%s'. Its value should be positive integer but was %s.", - ASYNC_CAPACITY.key(), capacity))); + ASYNC_MAX_CONCURRENT_OPERATIONS.key(), + maxConcurrentOperations))); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java index fde1783f6ca..f649eeb47a7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java @@ -82,7 +82,7 @@ import java.util.Optional; name = "stream-exec-ml-predict-table-function", version = 1, consumedOptions = { - "table.exec.async-ml-predict.buffer-capacity", + "table.exec.async-ml-predict.max-concurrent-operations", "table.exec.async-ml-predict.timeout", "table.exec.async-ml-predict.output-mode" }, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java index 2e99f0d0953..5c0770dc7bb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java @@ -25,7 +25,7 @@ import org.apache.flink.table.connector.ChangelogMode; import java.util.Map; -import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_CAPACITY; +import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS; import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_OUTPUT_MODE; import static org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_TIMEOUT; @@ -42,10 +42,10 @@ public class MLPredictUtils extends FunctionCallUtils { return new AsyncOptions( coalesce( - queryConf.get(ASYNC_CAPACITY), + queryConf.get(ASYNC_MAX_CONCURRENT_OPERATIONS), config.get( ExecutionConfigOptions - .TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY)), + .TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS)), coalesce( queryConf.get(ASYNC_TIMEOUT), config.get( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java index 0a94096c833..6158733ff65 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java @@ -55,7 +55,7 @@ public class MLPredictRestoreTest extends RestoreTestBase { .isEqualTo( Arrays.asList( ExecutionConfigOptions - .TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY, + .TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS, ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT, ExecutionConfigOptions .TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java index 418b1136a9c..12e6af852c0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java @@ -261,7 +261,6 @@ public class MLPredictTableFunctionTest extends TableTestBase { String sql = "SELECT *\n" + "FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))"; - assertThatThrownBy(() -> util.verifyRelPlan(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining("cannot be assigned to model input type"); @@ -290,10 +289,10 @@ public class MLPredictTableFunctionTest extends TableTestBase { () -> util.verifyRelPlan( "SELECT *\n" - + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'capacity', '-1']))")) + + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'max-concurrent-operations', '-1']))")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "SQL validation failed. Invalid runtime config option 'capacity'. Its value should be positive integer but was -1."); + "SQL validation failed. Invalid runtime config option 'max-concurrent-operations'. Its value should be positive integer but was -1."); assertThatThrownBy( () ->