alamb commented on issue #21650:
URL: https://github.com/apache/datafusion/issues/21650#issuecomment-4254887454

   Here are some examples (found with codex, reviewed by me) of this shared 
state:
   
   <details><summary>Details</summary>
   <p>
   
   ## Shared recursive CTE work table
   
     This seems like the clearest example of explicit shared mutable 
operator/runtime state.
   
     - 
[`WorkTable`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/work_table.rs#L60-L87)
 itself is mutable shared state (`Mutex<Option<ReservedBatches>>`)
     - 
[`WorkTableExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/work_table.rs#L100-L113)
 stores `Arc<WorkTable>`
    
     ## Hash join shared build-side state, especially `CollectLeft`
   
     - 
[`HashJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L720-L770)
 stores a shared `left_fut: Arc<OnceAsync<JoinLeftData>>` and optional 
`dynamic_filter`
     - 
[`JoinLeftData`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L189-L220)
 contains shared runtime artifacts: hash map, visited bitmap, bounds, 
membership, and atomics
     - In 
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1319-L1339),
 `PartitionMode::CollectLeft` shares one `left_fut` across all probe partitions
    
     ## Dynamic filter information on joins
   
     - 
[`HashJoinExecDynamicFilter`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L763-L770)
 holds the shared `DynamicFilterPhysicalExpr` plus 
`OnceLock<Arc<SharedBuildAccumulator>>`
     - The dynamic filter is also exposed as part of the plan expressions in 
[`apply_expressions`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1250-L1253)
     - In 
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1371-L1393),
 the dynamic filter path lazily creates a shared `SharedBuildAccumulator`
     - 
[`SharedBuildAccumulator`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs#L216-L341)
 is the cross-partition shared state used to accumulate bounds / membership and 
update the filter
     - Filter updates happen in 
[`SharedBuildAccumulator`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs#L350-L456)
 when all build partitions report in
   
     ## Other join operators with shared build-side state
   
     These follow the same general pattern: plan owns a `OnceAsync<_>` that 
multiple output partitions/streams wait on.
   
     ### `CrossJoinExec`
   
     - 
[`CrossJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L76-L99)
 has shared `left_fut: OnceAsync<JoinLeftData>`
     - 
[`reset_state`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L305-L314)
 explicitly recreates `left_fut`
     - 
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L348-L355)
 shares the buffered left side through `left_fut.try_once(...)`
   
     ### `NestedLoopJoinExec`
   
     - 
[`NestedLoopJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/nested_loop_join.rs#L170-L203)
 has shared `build_side_data: OnceAsync<JoinLeftData>`
   
     ### `PiecewiseMergeJoinExec`
   
     - 
[`PiecewiseMergeJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs#L255-L280)
 has shared `buffered_fut: OnceAsync<BufferedSideData>`
   
     ### `SortExec`
   
     - 
[`SortExec::reset_state`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/sorts/sort.rs#L1241-L1253)
 recreates the TopK dynamic filter and resets  metrics
     - The TopK self-filter is pushed down from operator-owned state in 
[`gather_filters_for_pushdown`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/sorts/sort.rs#L1419-L1423)
   
     ### `AggregateExec`
   
     - 
[`AggregateExec::init_dynamic_filter`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/aggregates/mod.rs#L1228-L1284)
 can attach a `dynamic_filter` owned by the plan
    
   
   </p>
   </details> 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to