xudong963 commented on issue #21650: URL: https://github.com/apache/datafusion/issues/21650#issuecomment-4258659400
It sounds like the architectural debt that will compound — every new feature that needs shared state (dynamic filters for more join types, CTE optimizations, plan caching) will make the problem worse. Let me ensure that I totally understand the issue (correct me @alamb ) - Physical plan — a declarative description of what to compute. Its lifecycle is session-level: created once, potentially reused, cached, or serialized. - Execution state — the runtime artifacts produced while computing: hash tables, buffers, coordination signals. Its lifecycle is per-query-run: created fresh each time, discarded after. DataFusion currently binds execution state to plan objects. HashJoinExec holds Arc<OnceAsync<JoinLeftData>>, CrossJoinExec holds OnceAsync<JoinLeftData>, WorkTableExec holds Arc<WorkTable>, and dynamic filters thread Arc references between join and scan nodes. This lifecycle mismatch is the root cause of three concrete problems: 1. Plans are not reusable. OnceAsync is consumed after one run. Re-execution requires reset_state() — which is itself an admission that state is in the wrong place. 2. Plans are not serializable. Arc pointers are memory addresses. They cannot be written to protobuf and sent to another machine. This blocks distributed execution (Ballista), FFI wrappers (datafusion-python), and plan caching. 3. Plans are not safely concurrent. Two concurrent runs of the same plan race on the same OnceAsync. The second run may observe residual state from the first. --- If so, I agree with the `TaskContext` direction: 1. Add a `SharedStateRegistry` to `TaskContext` — a `HashMap<SharedStateId, Arc<dyn Any + Send + Sync>>` where `SharedStateId` is a lightweight identifier (probably a `u64` or a newtype around `usize`). 2. Plan nodes carry `SharedStateId` instead of `Arc<State>` — this is serializable, cloneable, and doesn't carry execution state. The physical planner assigns IDs when creating plans. 3. State initialization happens in a new `prepare_execution` phase — before the first `execute()` call, walk the plan tree and let each node register its shared state in the `TaskContext`. This replaces the current pattern where state is lazily initialized in `execute()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
