Dandandan opened a new pull request, #21717:
URL: https://github.com/apache/datafusion/pull/21717
## Summary
Demo integration of `tokio-uring` into `dfbench` showing a **thread-per-core
+ `io_uring`** execution model for DataFusion.
- `util::tokio_uring_pool::TokioUringPool` — spawns N OS threads, each
hosting its own `tokio_uring::start` runtime (a current-thread Tokio reactor
driven by `io_uring`). `pool.spawn(|| async { ... })` ships a `Send` closure to
a round-robin worker which constructs the (possibly-`!Send`) future locally, so
tokio-uring's `!Send` `fs::File` can live inside it. A thread-local `IN_WORKER`
flag lets callers detect when they are already on a pool reactor.
- `util::tokio_uring_store::TokioUringObjectStore` — a local `ObjectStore`
that services `get_opts` / `get_ranges` via `tokio_uring::fs::File::read_at`.
If the caller is already on a pool worker (the common case), the read stays
local via `tokio_uring::spawn` on the current ring — no cross-thread hop.
Otherwise it falls back to round-robin `pool.spawn`.
- `util::tokio_uring_dispatch::PoolDispatchExec` — transparent
`ExecutionPlan` wrapper. `wrap_leaves_with_pool_dispatch` walks the plan
bottom-up and inserts it around every leaf (typically the `DataSourceExec`
scan) so `RepartitionExec`'s lazily-spawned input fetchers pull from streams
running on *different* pool workers. Without this, all `M` fetchers pile onto
the worker that won the lazy-spawn race, serializing I/O + decode.
- ClickBench uses the pool by default on Linux (one worker per CPU). Each
output partition of the physical plan is `pool.spawn`-ed, so plan execution
itself happens on the io_uring runtimes. Top-level `CoalescePartitionsExec` /
`SortPreservingMergeExec` are stripped so their N-partition children fan out
across workers; for SPM the merge is rebuilt over a `MemorySourceConfig` and
re-executed on one worker.
Enabled via `--tokio-uring-workers` (default:
`std::thread::available_parallelism()` on Linux, disabled elsewhere; `0` forces
the legacy tokio MT path).
## Design: thread-per-core with tokio-uring
```
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
main tokio│ worker thread │ │ worker thread │ │ worker thread │ ← N OS
threads
runtime │ tokio_uring:: │ tokio_uring:: │ tokio_uring:: spawned
with
(planning)│ start() loop │ │ start() loop │ │ start() loop │
tokio_uring::start
│ + io_uring sq │ │ + io_uring sq │ │ + io_uring sq │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└─── PoolDispatchExec + TokioUringObjectStore ─────
```
- **Parallel plan execution.** Each output partition of the stripped plan is
`pool.spawn`-ed; DataFusion's SendableRecordBatchStream is polled to completion
on one worker.
- **Parallel scan I/O.** `PoolDispatchExec` redirects each
`DataSourceExec.execute(partition, ...)` onto a pool worker, so Parquet reads +
decode parallelize even when the caller is a `RepartitionExec` fetcher
concentrated on one worker.
- **Locality-preserving reads.** `TokioUringObjectStore` stays local to the
caller's ring when already on a pool worker — no mpsc+oneshot round-trip for
every `get_ranges`.
- **`!Send`-safe.** `pool.spawn` takes a builder closure that runs on the
target worker, so `tokio_uring::fs::File` never crosses threads.
## Known limitations
- **Shuffle CPU work.** `RepartitionExec`'s hash/round-robin shuffling still
runs on whichever worker first polled it. `PoolDispatchExec` distributes the
*scan* CPU, not the shuffle. A cleaner fix would need DataFusion to expose a
spawner hook.
- **Cross-partition operator state.** Operators that share state across
partitions (some `JoinExec` variants, etc.) behave identically to the MT path
but are driven by the pool instead.
- **Writes remain on `LocalFileSystem`** — the demo focuses on reads.
## Test plan
- [x] `cargo check` on macOS (non-Linux path, LocalFS fallback).
- [x] `cargo zigbuild --target x86_64-unknown-linux-gnu
--no-default-features --lib --tests` (Linux `tokio-uring` path).
- [x] `cargo clippy -p datafusion-benchmarks --all-targets -- -D warnings`.
- [ ] Run `./target/release/dfbench clickbench -p /path/to/hits.parquet` on
Linux and compare wall time against `--tokio-uring-workers 0`.
- [ ] `perf stat -e io_uring:*` to confirm ring submissions during a
ClickBench run.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]