[ 
https://issues.apache.org/jira/browse/CAMEL-23078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Mariani updated CAMEL-23078:
-------------------------------------
    Description: 
Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the 
following improvements:

* Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture 
for batch calls

When the model returns multiple tool calls in a single response (e.g., "get 
weather for London" and "get weather for Paris" simultaneously), they are 
executed sequentially in {{OpenAIProducer.processNonStreamingAgentic()}}. Since 
tool calls in a batch are independent by design (the model produces them all 
from the same context, before seeing any result), they can safely run in 
parallel.

*Impact:* If the model requests 5 tool calls that each take 2 seconds, 
sequential execution takes 10 seconds while parallel execution takes ~2 seconds.

*Proposed changes:*

- Add a {{parallelToolExecution}} boolean parameter (default: {{false}}).
- When enabled, use Camel's {{ExecutorServiceManager}} to obtain a managed 
thread pool (not a raw {{Executors.newFixedThreadPool}}) so it participates in 
Camel's lifecycle and graceful shutdown.
- Collect results and feed them back to the model in the *same order* as the 
original tool calls — the OpenAI API requires tool result messages to match the 
tool call IDs.
- Add a {{parallelToolTimeout}} parameter (default: equals {{mcpTimeout}}) to 
prevent a single slow tool from blocking the entire batch. Tools that exceed 
the timeout should return an error message to the model rather than failing the 
exchange.

*Edge cases to handle:*

- *returnDirect*: the current loop short-circuits when all tools in a batch 
have {{returnDirect=true}}. This logic must work identically in parallel mode, 
wait for all tools to complete, then check {{allReturnDirect}}.
- *Error handling*: currently tool errors are wrapped as {{"Error: ..."}} 
strings and fed back to the model. In parallel mode, one tool's failure must 
not cancel other in-flight tools.
- *Interaction with toolCallFilter*: if a tool call filter/interceptor is added 
in the future, filtering should happen *before* dispatching to the thread pool.

* Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer callback

Tools are listed once during {{OpenAIEndpoint.initializeMcpServers()}} (called 
from {{doStart()}}) and cached in {{cachedMcpTools}} for the lifetime of the 
route. If an MCP server adds, removes, or updates tools at runtime, the cached 
list becomes stale.

The MCP Java SDK provides a {{toolsChangeConsumer}} callback on the 
{{McpClient}} builder. When an MCP server's tool set changes, the server sends 
a {{notifications/tools/list_changed}} JSON-RPC notification through the 
existing transport connection. The SDK then automatically calls {{listTools()}} 
to fetch the updated list and invokes the registered consumer with the complete 
{{List<McpSchema.Tool>}}. This is event-driven, requires no polling, and works 
with all transport types (stdio, SSE, streamableHttp) since the notification 
handling sits at the {{McpClientSession}} layer above the transport.

*Proposed changes:*

- Register a {{toolsChangeConsumer}} callback during MCP client initialization 
in {{initializeMcpServers()}}. When fired, update {{cachedMcpTools}}, 
{{toolClientMap}}, and {{returnDirectTools}} for the affected server.

{code:java}
McpSyncClient mcpClient = McpClient.sync(transport)
    .requestTimeout(timeout)
    .initializationTimeout(timeout)
    .toolsChangeConsumer(updatedTools -> {
        // updatedTools is the full current List<McpSchema.Tool> for this server
        // rebuild cachedMcpTools, toolClientMap, returnDirectTools
    })
    .build();
{code}

- Add a {{mcpToolRefresh}} boolean parameter (default: {{true}}) to 
enable/disable dynamic refresh. Some deployments may prefer the deterministic 
behavior of a fixed tool set.
- Ensure thread safety: the agentic loop reads {{cachedMcpTools}} and 
{{toolClientMap}} during execution. Updates from the callback must not corrupt 
in-flight iterations. Use {{CopyOnWriteArrayList}} / {{ConcurrentHashMap}} or 
swap references atomically.
- The reconnection logic already in {{OpenAIEndpoint.reconnectMcpServer()}} 
re-lists tools after reconnecting. The refresh callback should reuse this same 
update logic to avoid duplication.

*Related:* if the MCP server management is extracted to a dedicated class (see 
McpServerManager extraction), the refresh callback and reconnection logic would 
live together in that class.

  was:
Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the 
following improvements:

* Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture 
for batch calls

When the model returns multiple tool calls in a single response (e.g., "get 
weather for London" and "get weather for Paris" simultaneously), they're 
currently executed sequentially. This is a per-exchange latency optimization — 
with 3 tools at 500ms each, sequential = 1.5s, parallel = ~500ms. It does not 
address thread exhaustion under concurrent load; the exchange thread remains 
blocked for the full agentic loop regardless. A configurable 
mcpToolExecutorThreads (default 1 = sequential, current behavior) would create 
a fixed thread pool at endpoint startup, used only for concurrent callTool() 
within a single model response.

* Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer callback

Tools are listed once at doStart() and cached for the lifetime of the route. If 
an MCP server adds, removes, or updates tools at runtime, the cached list 
becomes stale — the model may try to call tools that no longer exist, or miss 
new ones.


> camel-openai: MCP improvements — parallel tool execution and runtime tool 
> refresh
> ---------------------------------------------------------------------------------
>
>                 Key: CAMEL-23078
>                 URL: https://issues.apache.org/jira/browse/CAMEL-23078
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-openai
>    Affects Versions: 4.18.0
>            Reporter: Federico Mariani
>            Priority: Minor
>
> Implementing https://issues.apache.org/jira/browse/CAMEL-23076 I noticed the 
> following improvements:
> * Parallel tool execution - mcpToolExecutorThreads config, CompletableFuture 
> for batch calls
> When the model returns multiple tool calls in a single response (e.g., "get 
> weather for London" and "get weather for Paris" simultaneously), they are 
> executed sequentially in {{OpenAIProducer.processNonStreamingAgentic()}}. 
> Since tool calls in a batch are independent by design (the model produces 
> them all from the same context, before seeing any result), they can safely 
> run in parallel.
> *Impact:* If the model requests 5 tool calls that each take 2 seconds, 
> sequential execution takes 10 seconds while parallel execution takes ~2 
> seconds.
> *Proposed changes:*
> - Add a {{parallelToolExecution}} boolean parameter (default: {{false}}).
> - When enabled, use Camel's {{ExecutorServiceManager}} to obtain a managed 
> thread pool (not a raw {{Executors.newFixedThreadPool}}) so it participates 
> in Camel's lifecycle and graceful shutdown.
> - Collect results and feed them back to the model in the *same order* as the 
> original tool calls — the OpenAI API requires tool result messages to match 
> the tool call IDs.
> - Add a {{parallelToolTimeout}} parameter (default: equals {{mcpTimeout}}) to 
> prevent a single slow tool from blocking the entire batch. Tools that exceed 
> the timeout should return an error message to the model rather than failing 
> the exchange.
> *Edge cases to handle:*
> - *returnDirect*: the current loop short-circuits when all tools in a batch 
> have {{returnDirect=true}}. This logic must work identically in parallel 
> mode, wait for all tools to complete, then check {{allReturnDirect}}.
> - *Error handling*: currently tool errors are wrapped as {{"Error: ..."}} 
> strings and fed back to the model. In parallel mode, one tool's failure must 
> not cancel other in-flight tools.
> - *Interaction with toolCallFilter*: if a tool call filter/interceptor is 
> added in the future, filtering should happen *before* dispatching to the 
> thread pool.
> * Tool list refresh - mcpToolRefreshInterval or SDK toolsChangeConsumer 
> callback
> Tools are listed once during {{OpenAIEndpoint.initializeMcpServers()}} 
> (called from {{doStart()}}) and cached in {{cachedMcpTools}} for the lifetime 
> of the route. If an MCP server adds, removes, or updates tools at runtime, 
> the cached list becomes stale.
> The MCP Java SDK provides a {{toolsChangeConsumer}} callback on the 
> {{McpClient}} builder. When an MCP server's tool set changes, the server 
> sends a {{notifications/tools/list_changed}} JSON-RPC notification through 
> the existing transport connection. The SDK then automatically calls 
> {{listTools()}} to fetch the updated list and invokes the registered consumer 
> with the complete {{List<McpSchema.Tool>}}. This is event-driven, requires no 
> polling, and works with all transport types (stdio, SSE, streamableHttp) 
> since the notification handling sits at the {{McpClientSession}} layer above 
> the transport.
> *Proposed changes:*
> - Register a {{toolsChangeConsumer}} callback during MCP client 
> initialization in {{initializeMcpServers()}}. When fired, update 
> {{cachedMcpTools}}, {{toolClientMap}}, and {{returnDirectTools}} for the 
> affected server.
> {code:java}
> McpSyncClient mcpClient = McpClient.sync(transport)
>     .requestTimeout(timeout)
>     .initializationTimeout(timeout)
>     .toolsChangeConsumer(updatedTools -> {
>         // updatedTools is the full current List<McpSchema.Tool> for this 
> server
>         // rebuild cachedMcpTools, toolClientMap, returnDirectTools
>     })
>     .build();
> {code}
> - Add a {{mcpToolRefresh}} boolean parameter (default: {{true}}) to 
> enable/disable dynamic refresh. Some deployments may prefer the deterministic 
> behavior of a fixed tool set.
> - Ensure thread safety: the agentic loop reads {{cachedMcpTools}} and 
> {{toolClientMap}} during execution. Updates from the callback must not 
> corrupt in-flight iterations. Use {{CopyOnWriteArrayList}} / 
> {{ConcurrentHashMap}} or swap references atomically.
> - The reconnection logic already in {{OpenAIEndpoint.reconnectMcpServer()}} 
> re-lists tools after reconnecting. The refresh callback should reuse this 
> same update logic to avoid duplication.
> *Related:* if the MCP server management is extracted to a dedicated class 
> (see McpServerManager extraction), the refresh callback and reconnection 
> logic would live together in that class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to