This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 7cfa6ed160053d0b0524d74d4cf64b71a2c2e7f4
Author: WenjinXie <[email protected]>
AuthorDate: Thu Jan 29 14:40:35 2026 +0800

    [runtime] Support configure the thread pool size for async executor.
    
    fix
---
 .../agents/api/agents/AgentExecutionOptions.java    |  7 ++++++-
 docs/content/docs/operations/configuration.md       |  1 +
 python/flink_agents/api/core_options.py             |  7 +++++++
 python/flink_agents/runtime/flink_runner_context.py |  5 +++--
 .../runtime/async/ContinuationActionExecutor.java   |  2 +-
 .../runtime/operator/ActionExecutionOperator.java   | 14 ++++++++++----
 .../runtime/python/utils/PythonActionExecutor.java  | 21 +++++++++++++++------
 .../runtime/async/ContinuationActionExecutor.java   |  5 +++--
 8 files changed, 46 insertions(+), 16 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
index 26991b60..69ba2a02 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
@@ -30,7 +30,12 @@ public class AgentExecutionOptions {
     public static final ConfigOption<Integer> MAX_RETRIES =
             new ConfigOption<>("max-retries", Integer.class, 3);
 
-    // Async execution is supported on jdk >= 21, so set default false here.
+    public static final ConfigOption<Integer> NUM_ASYNC_THREADS =
+            new ConfigOption<>(
+                    "num-async-threads",
+                    Integer.class,
+                    Runtime.getRuntime().availableProcessors() * 2);
+
     public static final ConfigOption<Boolean> CHAT_ASYNC =
             new ConfigOption<>("chat.async", Boolean.class, true);
 
diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 1ee4df0f..556a0e6a 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -131,6 +131,7 @@ Here is the list of all built-in core configuration options.
 | `chat.async`              | true                       | boolean             
  | Whether chat asynchronously for built-in chat action.                       
                                                                                
                                                                                
                    |
 | `tool-call.async`         | true                       | boolean             
  | Whether process tool call for built-in tool call action.                    
                                                                                
                                                                                
                    |
 | `rag.async`               | true                       | boolean             
  | Whether retrieve context asynchronously for built-in context retrieval 
action.                                                                         
                                                                                
                         |
+| `num-async-threads`       | os cpu count * 2           | int                 
  | The thread pool size for async executor.                                    
                                                                                
                                                                                
                    |
 | `job-identifier`          | none                       | String              
  | The unique identifier of job, remaining consistent after restoring from a 
savepoint. If not set, uses flink job id.                                       
                                                                                
                      |
 
 
diff --git a/python/flink_agents/api/core_options.py 
b/python/flink_agents/api/core_options.py
index f904174a..eb503de2 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import os
 from enum import Enum
 from typing import Any
 
@@ -106,6 +107,12 @@ class AgentExecutionOptions:
         default=3,
     )
 
+    NUM_ASYNC_THREADS = ConfigOption(
+        key="num-async-threads",
+        config_type=int,
+        default=os.cpu_count() * 2,
+    )
+
     CHAT_ASYNC = ConfigOption(
         key="chat.async",
         config_type=bool,
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 3592d21e..c4b06c10 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -558,11 +558,12 @@ def close_flink_runner_context(
     ctx.close()
 
 
-def create_async_thread_pool() -> ThreadPoolExecutor:
+def create_async_thread_pool(max_workers: int | None) -> ThreadPoolExecutor:
     """Used to create a thread pool to execute asynchronous
     code block in action.
     """
-    return ThreadPoolExecutor(max_workers=os.cpu_count() * 2)
+    logging.info(f"Initialize fixed thread pool for async task with 
{max_workers} threads")
+    return ThreadPoolExecutor(max_workers=max_workers or os.cpu_count() * 2)
 
 
 def close_async_thread_pool(executor: ThreadPoolExecutor) -> None:
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
index c0b4cee8..7c1a526d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
@@ -28,7 +28,7 @@ import java.util.function.Supplier;
 public class ContinuationActionExecutor {
 
     /** Creates a new ContinuationActionExecutor. */
-    public ContinuationActionExecutor() {}
+    public ContinuationActionExecutor(int numAsyncThreads) {}
 
     /**
      * Executes the action. In JDK 11, this simply runs the action 
synchronously.
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 41b31e69..c83af977 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.agents.runtime.operator;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.EventContext;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
 import org.apache.flink.agents.api.context.MemoryUpdate;
 import org.apache.flink.agents.api.listener.EventListener;
 import org.apache.flink.agents.api.logger.EventLogger;
@@ -315,7 +315,9 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         initPythonEnvironment();
 
         // init executor for Java async execution
-        continuationActionExecutor = new ContinuationActionExecutor();
+        continuationActionExecutor =
+                new ContinuationActionExecutor(
+                        
agentPlan.getConfig().get(AgentExecutionOptions.NUM_ASYNC_THREADS));
 
         mailboxProcessor = getMailboxProcessor();
 
@@ -652,7 +654,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         pythonActionExecutor =
                 new PythonActionExecutor(
                         pythonInterpreter,
-                        new ObjectMapper().writeValueAsString(agentPlan),
+                        agentPlan,
                         javaResourceAdapter,
                         pythonRunnerContext,
                         jobIdentifier);
@@ -1055,7 +1057,11 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         if (isJava) {
             if (runnerContext == null) {
                 if (continuationActionExecutor == null) {
-                    continuationActionExecutor = new 
ContinuationActionExecutor();
+                    continuationActionExecutor =
+                            new ContinuationActionExecutor(
+                                    agentPlan
+                                            .getConfig()
+                                            
.get(AgentExecutionOptions.NUM_ASYNC_THREADS));
                 }
                 runnerContext =
                         new JavaRunnerContextImpl(
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index d47d8f58..89306eec 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.agents.runtime.python.utils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
@@ -65,7 +69,7 @@ public class PythonActionExecutor {
             "python_java_utils.get_output_from_output_event";
 
     private final PythonInterpreter interpreter;
-    private final String agentPlanJson;
+    private final AgentPlan agentPlan;
     private final PythonRunnerContextImpl runnerContext;
     private final JavaResourceAdapter javaResourceAdapter;
     private final String jobIdentifier;
@@ -74,12 +78,13 @@ public class PythonActionExecutor {
 
     public PythonActionExecutor(
             PythonInterpreter interpreter,
-            String agentPlanJson,
+            AgentPlan agentPlan,
             JavaResourceAdapter javaResourceAdapter,
             PythonRunnerContextImpl runnerContext,
-            String jobIdentifier) {
+            String jobIdentifier)
+            throws JsonProcessingException {
         this.interpreter = interpreter;
-        this.agentPlanJson = agentPlanJson;
+        this.agentPlan = agentPlan;
         this.runnerContext = runnerContext;
         this.javaResourceAdapter = javaResourceAdapter;
         this.jobIdentifier = jobIdentifier;
@@ -88,14 +93,18 @@ public class PythonActionExecutor {
     public void open() throws Exception {
         interpreter.exec(PYTHON_IMPORTS);
 
-        pythonAsyncThreadPool = (PyObject) 
interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
+        pythonAsyncThreadPool =
+                (PyObject)
+                        interpreter.invoke(
+                                CREATE_ASYNC_THREAD_POOL,
+                                
agentPlan.getConfig().get(AgentExecutionOptions.NUM_ASYNC_THREADS));
 
         pythonRunnerContext =
                 (PyObject)
                         interpreter.invoke(
                                 CREATE_FLINK_RUNNER_CONTEXT,
                                 runnerContext,
-                                agentPlanJson,
+                                new 
ObjectMapper().writeValueAsString(agentPlan),
                                 pythonAsyncThreadPool,
                                 javaResourceAdapter,
                                 jobIdentifier);
diff --git 
a/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
 
b/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
index eda859d6..4dc30d81 100644
--- 
a/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
+++ 
b/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java
@@ -40,9 +40,10 @@ public class ContinuationActionExecutor {
 
     private final ExecutorService asyncExecutor;
 
-    public ContinuationActionExecutor() {
+    public ContinuationActionExecutor(int numAsyncThreads) {
+        LOG.info("Initialize fixed thread pool for async task with {} 
threads", numAsyncThreads);
         this.asyncExecutor =
-                
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+                Executors.newFixedThreadPool(numAsyncThreads);
     }
 
     /**

Reply via email to