xudong963 commented on issue #21650:
URL: https://github.com/apache/datafusion/issues/21650#issuecomment-4258659400

   It sounds like the architectural debt that will compound — every new feature 
that needs shared state (dynamic filters for more join types, CTE 
optimizations, plan caching) will make the problem worse.
   
   Let me ensure that I totally understand the issue (correct me @alamb )
   
   - Physical plan — a declarative description of what to compute. Its 
lifecycle is session-level: created once, potentially reused, cached, or 
serialized.
   - Execution state — the runtime artifacts produced while computing: hash 
tables, buffers, coordination signals. Its lifecycle is per-query-run: created 
fresh each time, discarded after.
   
   DataFusion currently binds execution state to plan objects. HashJoinExec 
holds Arc<OnceAsync<JoinLeftData>>, CrossJoinExec holds 
OnceAsync<JoinLeftData>, WorkTableExec holds Arc<WorkTable>, and dynamic 
filters thread Arc references between join and scan nodes. This lifecycle 
mismatch is the root cause of three concrete problems:
   
   1. Plans are not reusable. OnceAsync is consumed after one run. Re-execution 
requires reset_state() — which is itself an admission that state is in the 
wrong place.
   2. Plans are not serializable. Arc pointers are memory addresses. They 
cannot be written to protobuf and sent to another machine. This blocks 
distributed execution (Ballista), FFI wrappers (datafusion-python), and plan 
caching.
   3. Plans are not safely concurrent. Two concurrent runs of the same plan 
race on the same OnceAsync. The second run may observe residual state from the 
first.
   
   --- 
   If so, I agree with the `TaskContext` direction:
   
   1. Add a `SharedStateRegistry` to `TaskContext` — a `HashMap<SharedStateId, 
Arc<dyn Any + Send + Sync>>` where `SharedStateId` is a lightweight identifier 
(probably a `u64` or a newtype around `usize`).
   
   2. Plan nodes carry `SharedStateId` instead of `Arc<State>` — this is 
serializable, cloneable, and doesn't carry execution state. The physical 
planner assigns IDs when creating plans.
   
   3. State initialization happens in a new `prepare_execution` phase — before 
the first `execute()` call, walk the plan tree and let each node register its 
shared state in the `TaskContext`. This replaces the current pattern where 
state is lazily initialized in `execute()`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to