This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf6333cfa22 [FLINK-37978][table] Change buffer-capacity to 
max-concurrent-operations for ml_predict (#26706)
bf6333cfa22 is described below

commit bf6333cfa228defb1990459d77c775e2cc9f9f5c
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Sun Jun 22 19:21:38 2025 -0700

    [FLINK-37978][table] Change buffer-capacity to max-concurrent-operations 
for ml_predict (#26706)
---
 .../generated/execution_config_configuration.html           |  2 +-
 .../generated/ml_predict_runtime_config_configuration.html  |  2 +-
 .../flink/table/api/config/ExecutionConfigOptions.java      | 13 +++++++------
 .../table/api/config/MLPredictRuntimeConfigOptions.java     |  6 +++---
 .../table/planner/functions/sql/ml/SqlMLTableFunction.java  |  9 +++++----
 .../nodes/exec/stream/StreamExecMLPredictTableFunction.java |  2 +-
 .../flink/table/planner/plan/utils/MLPredictUtils.java      |  6 +++---
 .../plan/nodes/exec/stream/MLPredictRestoreTest.java        |  2 +-
 .../planner/plan/stream/sql/MLPredictTableFunctionTest.java |  5 ++---
 9 files changed, 24 insertions(+), 23 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 0e0753cbab8..8811fa297fa 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -33,7 +33,7 @@
             <td>The async timeout for the asynchronous operation to 
complete.</td>
         </tr>
         <tr>
-            <td><h5>table.exec.async-ml-predict.buffer-capacity</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            
<td><h5>table.exec.async-ml-predict.max-concurrent-operations</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">10</td>
             <td>Integer</td>
             <td>The max number of async i/o operation that the async ml 
predict can trigger.</td>
diff --git 
a/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html
 
b/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html
index 2c1add88edf..dd935e658c1 100644
--- 
a/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/ml_predict_runtime_config_configuration.html
@@ -15,7 +15,7 @@
             <td>Value can be 'true' or 'false' to suggest the planner choose 
the corresponding predict function. If the backend predict function provider 
does not support the suggested mode, it will throw exception to notify 
users.</td>
         </tr>
         <tr>
-            <td><h5>capacity</h5></td>
+            <td><h5>max-concurrent-operations</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
             <td>The max number of async i/o operation that the async ml 
predict can trigger.</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 3675505099e..0f269fd10cf 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -461,12 +461,13 @@ public class ExecutionConfigOptions {
     // ------------------------------------------------------------------------
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<Integer> 
TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY =
-            key("table.exec.async-ml-predict.buffer-capacity")
-                    .intType()
-                    .defaultValue(10)
-                    .withDescription(
-                            "The max number of async i/o operation that the 
async ml predict can trigger.");
+    public static final ConfigOption<Integer>
+            TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS =
+                    
key("table.exec.async-ml-predict.max-concurrent-operations")
+                            .intType()
+                            .defaultValue(10)
+                            .withDescription(
+                                    "The max number of async i/o operation 
that the async ml predict can trigger.");
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Duration> 
TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java
index 14d543c0002..bca2d09010b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java
@@ -54,8 +54,8 @@ public class MLPredictRuntimeConfigOptions {
                                     + "If set to ALLOW_UNORDERED, will attempt 
to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
                                     + "affect the correctness of the result, 
otherwise ORDERED will be still used.");
 
-    public static final ConfigOption<Integer> ASYNC_CAPACITY =
-            key("capacity")
+    public static final ConfigOption<Integer> ASYNC_MAX_CONCURRENT_OPERATIONS =
+            key("max-concurrent-operations")
                     .intType()
                     .noDefaultValue()
                     .withDescription(
@@ -74,7 +74,7 @@ public class MLPredictRuntimeConfigOptions {
     static {
         supportedKeys.add(ASYNC);
         supportedKeys.add(ASYNC_OUTPUT_MODE);
-        supportedKeys.add(ASYNC_CAPACITY);
+        supportedKeys.add(ASYNC_MAX_CONCURRENT_OPERATIONS);
         supportedKeys.add(ASYNC_TIMEOUT);
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
index ebdb9b05725..d88884d8c74 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
@@ -54,7 +54,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC;
-import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_CAPACITY;
+import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS;
 import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
 import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
 
@@ -247,13 +247,14 @@ public abstract class SqlMLTableFunction extends 
SqlFunction implements SqlTable
         // async options are all optional
         Boolean async = config.get(ASYNC);
         if (Boolean.TRUE.equals(async)) {
-            Integer capacity = config.get(ASYNC_CAPACITY);
-            if (capacity != null && capacity <= 0) {
+            Integer maxConcurrentOperations = 
config.get(ASYNC_MAX_CONCURRENT_OPERATIONS);
+            if (maxConcurrentOperations != null && maxConcurrentOperations <= 
0) {
                 return Optional.of(
                         new ValidationException(
                                 String.format(
                                         "Invalid runtime config option '%s'. 
Its value should be positive integer but was %s.",
-                                        ASYNC_CAPACITY.key(), capacity)));
+                                        ASYNC_MAX_CONCURRENT_OPERATIONS.key(),
+                                        maxConcurrentOperations)));
             }
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java
index fde1783f6ca..f649eeb47a7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java
@@ -82,7 +82,7 @@ import java.util.Optional;
         name = "stream-exec-ml-predict-table-function",
         version = 1,
         consumedOptions = {
-            "table.exec.async-ml-predict.buffer-capacity",
+            "table.exec.async-ml-predict.max-concurrent-operations",
             "table.exec.async-ml-predict.timeout",
             "table.exec.async-ml-predict.output-mode"
         },
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java
index 2e99f0d0953..5c0770dc7bb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MLPredictUtils.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.connector.ChangelogMode;
 
 import java.util.Map;
 
-import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_CAPACITY;
+import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS;
 import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_OUTPUT_MODE;
 import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_TIMEOUT;
 
@@ -42,10 +42,10 @@ public class MLPredictUtils extends FunctionCallUtils {
 
         return new AsyncOptions(
                 coalesce(
-                        queryConf.get(ASYNC_CAPACITY),
+                        queryConf.get(ASYNC_MAX_CONCURRENT_OPERATIONS),
                         config.get(
                                 ExecutionConfigOptions
-                                        
.TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY)),
+                                        
.TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS)),
                 coalesce(
                                 queryConf.get(ASYNC_TIMEOUT),
                                 config.get(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java
index 0a94096c833..6158733ff65 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictRestoreTest.java
@@ -55,7 +55,7 @@ public class MLPredictRestoreTest extends RestoreTestBase {
                 .isEqualTo(
                         Arrays.asList(
                                         ExecutionConfigOptions
-                                                
.TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY,
+                                                
.TABLE_EXEC_ASYNC_ML_PREDICT_MAX_CONCURRENT_OPERATIONS,
                                         
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT,
                                         ExecutionConfigOptions
                                                 
.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
index 418b1136a9c..12e6af852c0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
@@ -261,7 +261,6 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
         String sql =
                 "SELECT *\n"
                         + "FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL 
TypeModel, DESCRIPTOR(col)))";
-
         assertThatThrownBy(() -> util.verifyRelPlan(sql))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("cannot be assigned to model input 
type");
@@ -290,10 +289,10 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                         () ->
                                 util.verifyRelPlan(
                                         "SELECT *\n"
-                                                + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'capacity', 
'-1']))"))
+                                                + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 
'max-concurrent-operations', '-1']))"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "SQL validation failed. Invalid runtime config option 
'capacity'. Its value should be positive integer but was -1.");
+                        "SQL validation failed. Invalid runtime config option 
'max-concurrent-operations'. Its value should be positive integer but was -1.");
 
         assertThatThrownBy(
                         () ->

Reply via email to