This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 7cfa6ed160053d0b0524d74d4cf64b71a2c2e7f4 Author: WenjinXie <[email protected]> AuthorDate: Thu Jan 29 14:40:35 2026 +0800 [runtime] Support configure the thread pool size for async executor. fix --- .../agents/api/agents/AgentExecutionOptions.java | 7 ++++++- docs/content/docs/operations/configuration.md | 1 + python/flink_agents/api/core_options.py | 7 +++++++ python/flink_agents/runtime/flink_runner_context.py | 5 +++-- .../runtime/async/ContinuationActionExecutor.java | 2 +- .../runtime/operator/ActionExecutionOperator.java | 14 ++++++++++---- .../runtime/python/utils/PythonActionExecutor.java | 21 +++++++++++++++------ .../runtime/async/ContinuationActionExecutor.java | 5 +++-- 8 files changed, 46 insertions(+), 16 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java index 26991b60..69ba2a02 100644 --- a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java @@ -30,7 +30,12 @@ public class AgentExecutionOptions { public static final ConfigOption<Integer> MAX_RETRIES = new ConfigOption<>("max-retries", Integer.class, 3); - // Async execution is supported on jdk >= 21, so set default false here. + public static final ConfigOption<Integer> NUM_ASYNC_THREADS = + new ConfigOption<>( + "num-async-threads", + Integer.class, + Runtime.getRuntime().availableProcessors() * 2); + public static final ConfigOption<Boolean> CHAT_ASYNC = new ConfigOption<>("chat.async", Boolean.class, true); diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index 1ee4df0f..556a0e6a 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -131,6 +131,7 @@ Here is the list of all built-in core configuration options. | `chat.async` | true | boolean | Whether chat asynchronously for built-in chat action. | | `tool-call.async` | true | boolean | Whether process tool call for built-in tool call action. | | `rag.async` | true | boolean | Whether retrieve context asynchronously for built-in context retrieval action. | +| `num-async-threads` | os cpu count * 2 | int | The thread pool size for async executor. | | `job-identifier` | none | String | The unique identifier of job, remaining consistent after restoring from a savepoint. If not set, uses flink job id. | diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py index f904174a..eb503de2 100644 --- a/python/flink_agents/api/core_options.py +++ b/python/flink_agents/api/core_options.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# +import os from enum import Enum from typing import Any @@ -106,6 +107,12 @@ class AgentExecutionOptions: default=3, ) + NUM_ASYNC_THREADS = ConfigOption( + key="num-async-threads", + config_type=int, + default=os.cpu_count() * 2, + ) + CHAT_ASYNC = ConfigOption( key="chat.async", config_type=bool, diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index 3592d21e..c4b06c10 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -558,11 +558,12 @@ def close_flink_runner_context( ctx.close() -def create_async_thread_pool() -> ThreadPoolExecutor: +def create_async_thread_pool(max_workers: int | None) -> ThreadPoolExecutor: """Used to create a thread pool to execute asynchronous code block in action. """ - return ThreadPoolExecutor(max_workers=os.cpu_count() * 2) + logging.info(f"Initialize fixed thread pool for async task with {max_workers} threads") + return ThreadPoolExecutor(max_workers=max_workers or os.cpu_count() * 2) def close_async_thread_pool(executor: ThreadPoolExecutor) -> None: diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java index c0b4cee8..7c1a526d 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java @@ -28,7 +28,7 @@ import java.util.function.Supplier; public class ContinuationActionExecutor { /** Creates a new ContinuationActionExecutor. */ - public ContinuationActionExecutor() {} + public ContinuationActionExecutor(int numAsyncThreads) {} /** * Executes the action. In JDK 11, this simply runs the action synchronously. diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 41b31e69..c83af977 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -17,11 +17,11 @@ */ package org.apache.flink.agents.runtime.operator; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; import org.apache.flink.agents.api.context.MemoryUpdate; import org.apache.flink.agents.api.listener.EventListener; import org.apache.flink.agents.api.logger.EventLogger; @@ -315,7 +315,9 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT initPythonEnvironment(); // init executor for Java async execution - continuationActionExecutor = new ContinuationActionExecutor(); + continuationActionExecutor = + new ContinuationActionExecutor( + agentPlan.getConfig().get(AgentExecutionOptions.NUM_ASYNC_THREADS)); mailboxProcessor = getMailboxProcessor(); @@ -652,7 +654,7 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT pythonActionExecutor = new PythonActionExecutor( pythonInterpreter, - new ObjectMapper().writeValueAsString(agentPlan), + agentPlan, javaResourceAdapter, pythonRunnerContext, jobIdentifier); @@ -1055,7 +1057,11 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT if (isJava) { if (runnerContext == null) { if (continuationActionExecutor == null) { - continuationActionExecutor = new ContinuationActionExecutor(); + continuationActionExecutor = + new ContinuationActionExecutor( + agentPlan + .getConfig() + .get(AgentExecutionOptions.NUM_ASYNC_THREADS)); } runnerContext = new JavaRunnerContextImpl( diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index d47d8f58..89306eec 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -17,6 +17,10 @@ */ package org.apache.flink.agents.runtime.python.utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; +import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; @@ -65,7 +69,7 @@ public class PythonActionExecutor { "python_java_utils.get_output_from_output_event"; private final PythonInterpreter interpreter; - private final String agentPlanJson; + private final AgentPlan agentPlan; private final PythonRunnerContextImpl runnerContext; private final JavaResourceAdapter javaResourceAdapter; private final String jobIdentifier; @@ -74,12 +78,13 @@ public class PythonActionExecutor { public PythonActionExecutor( PythonInterpreter interpreter, - String agentPlanJson, + AgentPlan agentPlan, JavaResourceAdapter javaResourceAdapter, PythonRunnerContextImpl runnerContext, - String jobIdentifier) { + String jobIdentifier) + throws JsonProcessingException { this.interpreter = interpreter; - this.agentPlanJson = agentPlanJson; + this.agentPlan = agentPlan; this.runnerContext = runnerContext; this.javaResourceAdapter = javaResourceAdapter; this.jobIdentifier = jobIdentifier; @@ -88,14 +93,18 @@ public class PythonActionExecutor { public void open() throws Exception { interpreter.exec(PYTHON_IMPORTS); - pythonAsyncThreadPool = (PyObject) interpreter.invoke(CREATE_ASYNC_THREAD_POOL); + pythonAsyncThreadPool = + (PyObject) + interpreter.invoke( + CREATE_ASYNC_THREAD_POOL, + agentPlan.getConfig().get(AgentExecutionOptions.NUM_ASYNC_THREADS)); pythonRunnerContext = (PyObject) interpreter.invoke( CREATE_FLINK_RUNNER_CONTEXT, runnerContext, - agentPlanJson, + new ObjectMapper().writeValueAsString(agentPlan), pythonAsyncThreadPool, javaResourceAdapter, jobIdentifier); diff --git a/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java b/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java index eda859d6..4dc30d81 100644 --- a/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java +++ b/runtime/src/main/java21/org/apache/flink/agents/runtime/async/ContinuationActionExecutor.java @@ -40,9 +40,10 @@ public class ContinuationActionExecutor { private final ExecutorService asyncExecutor; - public ContinuationActionExecutor() { + public ContinuationActionExecutor(int numAsyncThreads) { + LOG.info("Initialize fixed thread pool for async task with {} threads", numAsyncThreads); this.asyncExecutor = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + Executors.newFixedThreadPool(numAsyncThreads); } /**
