alamb commented on issue #21650:
URL: https://github.com/apache/datafusion/issues/21650#issuecomment-4254887454
Here are some examples (found with codex, reviewed by me) of this shared
state:
<details><summary>Details</summary>
<p>
## Shared recursive CTE work table
This seems like the clearest example of explicit shared mutable
operator/runtime state.
-
[`WorkTable`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/work_table.rs#L60-L87)
itself is mutable shared state (`Mutex<Option<ReservedBatches>>`)
-
[`WorkTableExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/work_table.rs#L100-L113)
stores `Arc<WorkTable>`
## Hash join shared build-side state, especially `CollectLeft`
-
[`HashJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L720-L770)
stores a shared `left_fut: Arc<OnceAsync<JoinLeftData>>` and optional
`dynamic_filter`
-
[`JoinLeftData`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L189-L220)
contains shared runtime artifacts: hash map, visited bitmap, bounds,
membership, and atomics
- In
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1319-L1339),
`PartitionMode::CollectLeft` shares one `left_fut` across all probe partitions
## Dynamic filter information on joins
-
[`HashJoinExecDynamicFilter`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L763-L770)
holds the shared `DynamicFilterPhysicalExpr` plus
`OnceLock<Arc<SharedBuildAccumulator>>`
- The dynamic filter is also exposed as part of the plan expressions in
[`apply_expressions`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1250-L1253)
- In
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1371-L1393),
the dynamic filter path lazily creates a shared `SharedBuildAccumulator`
-
[`SharedBuildAccumulator`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs#L216-L341)
is the cross-partition shared state used to accumulate bounds / membership and
update the filter
- Filter updates happen in
[`SharedBuildAccumulator`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs#L350-L456)
when all build partitions report in
## Other join operators with shared build-side state
These follow the same general pattern: plan owns a `OnceAsync<_>` that
multiple output partitions/streams wait on.
### `CrossJoinExec`
-
[`CrossJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L76-L99)
has shared `left_fut: OnceAsync<JoinLeftData>`
-
[`reset_state`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L305-L314)
explicitly recreates `left_fut`
-
[`execute`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/cross_join.rs#L348-L355)
shares the buffered left side through `left_fut.try_once(...)`
### `NestedLoopJoinExec`
-
[`NestedLoopJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/nested_loop_join.rs#L170-L203)
has shared `build_side_data: OnceAsync<JoinLeftData>`
### `PiecewiseMergeJoinExec`
-
[`PiecewiseMergeJoinExec`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs#L255-L280)
has shared `buffered_fut: OnceAsync<BufferedSideData>`
### `SortExec`
-
[`SortExec::reset_state`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/sorts/sort.rs#L1241-L1253)
recreates the TopK dynamic filter and resets metrics
- The TopK self-filter is pushed down from operator-owned state in
[`gather_filters_for_pushdown`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/sorts/sort.rs#L1419-L1423)
### `AggregateExec`
-
[`AggregateExec::init_dynamic_filter`](https://github.com/apache/datafusion/blob/5287210ff44c2a1d1372c6badc641659511fe8c2/datafusion/physical-plan/src/aggregates/mod.rs#L1228-L1284)
can attach a `dynamic_filter` owned by the plan
</p>
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]