GitHub user weiqingy added a comment to the discussion: Parallel Tool Call
Execution
Thanks for putting this together — the fan-out/fan-in shape is the right one,
and I like that the proposal already pins down the degradation matrix
(`async=false` / `parallel=false` / JDK<21) and grounds recovery in the #404
cursor model and #598 reconcilers instead of hand-waving it. @joeyutong already
covered the recovery state machine, collect-all vs fail-fast, duplicate side
effects, the global pool, and tracing, so I won't re-tread those — a few things
from the cross-language and API-surface angle that I didn't see raised yet.
**The Python side and the per-call durable identity.** The design is built
end-to-end on the JDK 21 Continuation model — mailbox-thread yield,
`pendingBatchFuture`, and `functionId = "tool-call-" + toolCallId`. Python gets
a one-line signature, but none of those primitives exist on that side.
`durable_execute_async` submits to a thread pool, and Python derives a call's
identity from the function object itself — `_compute_function_id(func)` →
`module.qualname` (`durable_execution.py:25`) — so there's no place to inject a
caller-supplied `"tool-call-{id}"`. Every call into the same `tool.call`
collides on `functionId` and is disambiguated only by `argsDigest` + cursor
position (`tool_call_action.py:51`). The proposed
`durable_execute_all_async(callables)` signature doesn't carry an id either. So
the deterministic per-call identity that the whole recovery section leans on —
ordered recording, partial-batch failover, call-order-mismatch detection — has
no Python expression as
written. CLAUDE.md asks for Java/Python parity and the goal table here lists
"Cross-language: same semantics"; could the design spell out the Python
fan-out/fan-in and recovery-identity model concretely rather than the single
signature line? As it stands it reads as a Java design with a Python stub, and
the two durable-execution models are different enough that parity won't come
for free.
**Collect-all isn't symmetric across the two languages today, which the
parallel design will inherit.** Picking up the collect-all point from above —
that semantics isn't actually the same on both sides right now, so "preserve
collect-all under parallelism" means different things on each. Java's
`ToolCallAction` wraps every tool in its own try/catch and accumulates
`success` / `error` / `responses` maps, then emits `new ToolResponseEvent(id,
responses, success, error, externalIds)` (`ToolCallAction.java:49-113`). The
Python `process_tool_request` has no per-tool try/catch around execution — only
the missing-tool case lands in the response map; if a `tool.call` raises, it
propagates and fails the whole action with no `ToolResponseEvent` emitted at
all (`tool_call_action.py`). And the Python `ToolResponseEvent` carries only
`responses` + `external_ids`, no `success` / `error` maps (`tool_event.py`), so
it can't express per-tool success/failure to begin with. So before parallel exec
ution can "preserve collect-all", collect-all has to exist symmetrically —
otherwise one tool failing in an N-tool parallel batch drops the whole batch on
Python while Java returns the other N-1 results plus one error. Worth deciding
the intended cross-language failure contract here, since parallelism makes the
divergence louder (which of N concurrent calls failed?).
**Does `durableExecuteAllAsync` need to be public API in v1?** It's proposed as
a public `RunnerContext` method on both languages, but the only caller in the
design is the built-in `tool_call_action`. Promoting it to the public interface
now locks the batch recovery semantics — which, per the open questions on this
thread, aren't settled — into two language APIs at once, before a real
implementation has shaken them out. Would it be worth keeping the batch
primitive internal to the tool-call path for the first version (the public knob
stays just `tool-call.parallel`) and exposing a public `RunnerContext` method
only once the recovery model has stabilized? And if it does stay public, the
return type is worth a second look: `List<T>` is structurally fail-fast —
`futures.stream().map(Future::join)` throws on the first failure — whereas the
tool-call path needs the collect-all behavior already flagged above, so the
signature itself would have to carry per-call outcomes rather than
a bare `List<T>`.
**One concrete note on the barrier sketch, since it feeds the pool-exhaustion
question.** In `executeAllAsync`, the barrier wraps each tool future in
`CompletableFuture.supplyAsync(() -> { f.join(); return null; })` with no
explicit executor — that runs on the common `ForkJoinPool` and parks one pool
thread per tool blocked on `join()`. So a batch of N tools occupies N threads
on `asyncExecutor` plus N blocking joiners on the common pool, entirely outside
the `num-async-threads` bound — which makes "concurrency is bounded by the
global async pool" harder to actually hold. The submitted futures are already
the barrier; building the wait directly from them (or submitting
`CompletableFuture`s onto `asyncExecutor`) keeps the whole batch inside
`num-async-threads`. Minor relative to the semantics questions, but it's the
difference between the global pool being a real bound and a nominal one.
GitHub link:
https://github.com/apache/flink-agents/discussions/855#discussioncomment-17490207
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]