[
https://issues.apache.org/jira/browse/CAMEL-23078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Federico Mariani updated CAMEL-23078:
-------------------------------------
Description:
Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the
following improvements:
* Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture
for batch calls
When the model returns multiple tool calls in a single response (e.g., "get
weather for London" and "get weather for Paris" simultaneously), they are
executed sequentially in {{OpenAIProducer.processNonStreamingAgentic()}}. Since
tool calls in a batch are independent by design (the model produces them all
from the same context, before seeing any result), they can safely run in
parallel.
*Impact:* If the model requests 5 tool calls that each take 2 seconds,
sequential execution takes 10 seconds while parallel execution takes ~2 seconds.
*Proposed changes:*
- Add a {{parallelToolExecution}} boolean parameter (default: {{false}}).
- When enabled, use Camel's {{ExecutorServiceManager}} to obtain a managed
thread pool (not a raw {{Executors.newFixedThreadPool}}) so it participates in
Camel's lifecycle and graceful shutdown.
- Collect results and feed them back to the model in the *same order* as the
original tool calls — the OpenAI API requires tool result messages to match the
tool call IDs.
- Add a {{parallelToolTimeout}} parameter (default: equals {{mcpTimeout}}) to
prevent a single slow tool from blocking the entire batch. Tools that exceed
the timeout should return an error message to the model rather than failing the
exchange.
*Edge cases to handle:*
- *returnDirect*: the current loop short-circuits when all tools in a batch
have {{returnDirect=true}}. This logic must work identically in parallel mode,
wait for all tools to complete, then check {{allReturnDirect}}.
- *Error handling*: currently tool errors are wrapped as {{"Error: ..."}}
strings and fed back to the model. In parallel mode, one tool's failure must
not cancel other in-flight tools.
- *Interaction with toolCallFilter*: if a tool call filter/interceptor is added
in the future, filtering should happen *before* dispatching to the thread pool.
* Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer callback
Tools are listed once during {{OpenAIEndpoint.initializeMcpServers()}} (called
from {{doStart()}}) and cached in {{cachedMcpTools}} for the lifetime of the
route. If an MCP server adds, removes, or updates tools at runtime, the cached
list becomes stale.
The MCP Java SDK provides a {{toolsChangeConsumer}} callback on the
{{McpClient}} builder. When an MCP server's tool set changes, the server sends
a {{notifications/tools/list_changed}} JSON-RPC notification through the
existing transport connection. The SDK then automatically calls {{listTools()}}
to fetch the updated list and invokes the registered consumer with the complete
{{List<McpSchema.Tool>}}. This is event-driven, requires no polling, and works
with all transport types (stdio, SSE, streamableHttp) since the notification
handling sits at the {{McpClientSession}} layer above the transport.
*Proposed changes:*
- Register a {{toolsChangeConsumer}} callback during MCP client initialization
in {{initializeMcpServers()}}. When fired, update {{cachedMcpTools}},
{{toolClientMap}}, and {{returnDirectTools}} for the affected server.
{code:java}
McpSyncClient mcpClient = McpClient.sync(transport)
.requestTimeout(timeout)
.initializationTimeout(timeout)
.toolsChangeConsumer(updatedTools -> {
// updatedTools is the full current List<McpSchema.Tool> for this server
// rebuild cachedMcpTools, toolClientMap, returnDirectTools
})
.build();
{code}
- Add a {{mcpToolRefresh}} boolean parameter (default: {{true}}) to
enable/disable dynamic refresh. Some deployments may prefer the deterministic
behavior of a fixed tool set.
- Ensure thread safety: the agentic loop reads {{cachedMcpTools}} and
{{toolClientMap}} during execution. Updates from the callback must not corrupt
in-flight iterations. Use {{CopyOnWriteArrayList}} / {{ConcurrentHashMap}} or
swap references atomically.
- The reconnection logic already in {{OpenAIEndpoint.reconnectMcpServer()}}
re-lists tools after reconnecting. The refresh callback should reuse this same
update logic to avoid duplication.
*Related:* if the MCP server management is extracted to a dedicated class (see
McpServerManager extraction), the refresh callback and reconnection logic would
live together in that class.
was:
Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the
following improvements:
* Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture
for batch calls
When the model returns multiple tool calls in a single response (e.g., "get
weather for London" and "get weather for Paris" simultaneously), they're
currently executed sequentially. This is a per-exchange latency optimization —
with 3 tools at 500ms each, sequential = 1.5s, parallel = ~500ms. It does not
address thread exhaustion under concurrent load; the exchange thread remains
blocked for the full agentic loop regardless. A configurable
mcpToolExecutorThreads (default 1 = sequential, current behavior) would create
a fixed thread pool at endpoint startup, used only for concurrent callTool()
within a single model response.
* Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer callback
Tools are listed once at doStart() and cached for the lifetime of the route. If
an MCP server adds, removes, or updates tools at runtime, the cached list
becomes stale — the model may try to call tools that no longer exist, or miss
new ones.
> camel-openai: MCP improvements — parallel tool execution and runtime tool
> refresh
> ---------------------------------------------------------------------------------
>
> Key: CAMEL-23078
> URL: https://issues.apache.org/jira/browse/CAMEL-23078
> Project: Camel
> Issue Type: Improvement
> Components: camel-openai
> Affects Versions: 4.18.0
> Reporter: Federico Mariani
> Priority: Minor
>
> Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the
> following improvements:
> * Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture
> for batch calls
> When the model returns multiple tool calls in a single response (e.g., "get
> weather for London" and "get weather for Paris" simultaneously), they are
> executed sequentially in {{OpenAIProducer.processNonStreamingAgentic()}}.
> Since tool calls in a batch are independent by design (the model produces
> them all from the same context, before seeing any result), they can safely
> run in parallel.
> *Impact:* If the model requests 5 tool calls that each take 2 seconds,
> sequential execution takes 10 seconds while parallel execution takes ~2
> seconds.
> *Proposed changes:*
> - Add a {{parallelToolExecution}} boolean parameter (default: {{false}}).
> - When enabled, use Camel's {{ExecutorServiceManager}} to obtain a managed
> thread pool (not a raw {{Executors.newFixedThreadPool}}) so it participates
> in Camel's lifecycle and graceful shutdown.
> - Collect results and feed them back to the model in the *same order* as the
> original tool calls — the OpenAI API requires tool result messages to match
> the tool call IDs.
> - Add a {{parallelToolTimeout}} parameter (default: equals {{mcpTimeout}}) to
> prevent a single slow tool from blocking the entire batch. Tools that exceed
> the timeout should return an error message to the model rather than failing
> the exchange.
> *Edge cases to handle:*
> - *returnDirect*: the current loop short-circuits when all tools in a batch
> have {{returnDirect=true}}. This logic must work identically in parallel
> mode, wait for all tools to complete, then check {{allReturnDirect}}.
> - *Error handling*: currently tool errors are wrapped as {{"Error: ..."}}
> strings and fed back to the model. In parallel mode, one tool's failure must
> not cancel other in-flight tools.
> - *Interaction with toolCallFilter*: if a tool call filter/interceptor is
> added in the future, filtering should happen *before* dispatching to the
> thread pool.
> * Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer
> callback
> Tools are listed once during {{OpenAIEndpoint.initializeMcpServers()}}
> (called from {{doStart()}}) and cached in {{cachedMcpTools}} for the lifetime
> of the route. If an MCP server adds, removes, or updates tools at runtime,
> the cached list becomes stale.
> The MCP Java SDK provides a {{toolsChangeConsumer}} callback on the
> {{McpClient}} builder. When an MCP server's tool set changes, the server
> sends a {{notifications/tools/list_changed}} JSON-RPC notification through
> the existing transport connection. The SDK then automatically calls
> {{listTools()}} to fetch the updated list and invokes the registered consumer
> with the complete {{List<McpSchema.Tool>}}. This is event-driven, requires no
> polling, and works with all transport types (stdio, SSE, streamableHttp)
> since the notification handling sits at the {{McpClientSession}} layer above
> the transport.
> *Proposed changes:*
> - Register a {{toolsChangeConsumer}} callback during MCP client
> initialization in {{initializeMcpServers()}}. When fired, update
> {{cachedMcpTools}}, {{toolClientMap}}, and {{returnDirectTools}} for the
> affected server.
> {code:java}
> McpSyncClient mcpClient = McpClient.sync(transport)
> .requestTimeout(timeout)
> .initializationTimeout(timeout)
> .toolsChangeConsumer(updatedTools -> {
> // updatedTools is the full current List<McpSchema.Tool> for this
> server
> // rebuild cachedMcpTools, toolClientMap, returnDirectTools
> })
> .build();
> {code}
> - Add a {{mcpToolRefresh}} boolean parameter (default: {{true}}) to
> enable/disable dynamic refresh. Some deployments may prefer the deterministic
> behavior of a fixed tool set.
> - Ensure thread safety: the agentic loop reads {{cachedMcpTools}} and
> {{toolClientMap}} during execution. Updates from the callback must not
> corrupt in-flight iterations. Use {{CopyOnWriteArrayList}} /
> {{ConcurrentHashMap}} or swap references atomically.
> - The reconnection logic already in {{OpenAIEndpoint.reconnectMcpServer()}}
> re-lists tools after reconnecting. The refresh callback should reuse this
> same update logic to avoid duplication.
> *Related:* if the MCP server management is extracted to a dedicated class
> (see McpServerManager extraction), the refresh callback and reconnection
> logic would live together in that class.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)