Dandandan opened a new pull request, #21717:
URL: https://github.com/apache/datafusion/pull/21717

   ## Summary
   
   Demo integration of `tokio-uring` into `dfbench` showing a **thread-per-core 
+ `io_uring`** execution model for DataFusion.
   
   - `util::tokio_uring_pool::TokioUringPool` — spawns N OS threads, each 
hosting its own `tokio_uring::start` runtime (a current-thread Tokio reactor 
driven by `io_uring`). `pool.spawn(|| async { ... })` ships a `Send` closure to 
a round-robin worker which constructs the (possibly-`!Send`) future locally, so 
tokio-uring's `!Send` `fs::File` can live inside it. A thread-local `IN_WORKER` 
flag lets callers detect when they are already on a pool reactor.
   - `util::tokio_uring_store::TokioUringObjectStore` — a local `ObjectStore` 
that services `get_opts` / `get_ranges` via `tokio_uring::fs::File::read_at`. 
If the caller is already on a pool worker (the common case), the read stays 
local via `tokio_uring::spawn` on the current ring — no cross-thread hop. 
Otherwise it falls back to round-robin `pool.spawn`.
   - `util::tokio_uring_dispatch::PoolDispatchExec` — transparent 
`ExecutionPlan` wrapper. `wrap_leaves_with_pool_dispatch` walks the plan 
bottom-up and inserts it around every leaf (typically the `DataSourceExec` 
scan) so `RepartitionExec`'s lazily-spawned input fetchers pull from streams 
running on *different* pool workers. Without this, all `M` fetchers pile onto 
the worker that won the lazy-spawn race, serializing I/O + decode.
   - ClickBench uses the pool by default on Linux (one worker per CPU). Each 
output partition of the physical plan is `pool.spawn`-ed, so plan execution 
itself happens on the io_uring runtimes. Top-level `CoalescePartitionsExec` / 
`SortPreservingMergeExec` are stripped so their N-partition children fan out 
across workers; for SPM the merge is rebuilt over a `MemorySourceConfig` and 
re-executed on one worker.
   
   Enabled via `--tokio-uring-workers` (default: 
`std::thread::available_parallelism()` on Linux, disabled elsewhere; `0` forces 
the legacy tokio MT path).
   
   ## Design: thread-per-core with tokio-uring
   
   ```
             ┌───────────────┐   ┌───────────────┐   ┌───────────────┐
   main tokio│ worker thread │   │ worker thread │   │ worker thread │   ← N OS 
threads
   runtime   │ tokio_uring::    │ tokio_uring::    │ tokio_uring::     spawned 
with
   (planning)│   start() loop │  │   start() loop │  │   start() loop │   
tokio_uring::start
             │  + io_uring sq │  │  + io_uring sq │  │  + io_uring sq │
             └───────┬───────┘   └───────┬───────┘   └───────┬───────┘
                     │                   │                   │
                     └─── PoolDispatchExec + TokioUringObjectStore ─────
   ```
   
   - **Parallel plan execution.** Each output partition of the stripped plan is 
`pool.spawn`-ed; DataFusion's SendableRecordBatchStream is polled to completion 
on one worker.
   - **Parallel scan I/O.** `PoolDispatchExec` redirects each 
`DataSourceExec.execute(partition, ...)` onto a pool worker, so Parquet reads + 
decode parallelize even when the caller is a `RepartitionExec` fetcher 
concentrated on one worker.
   - **Locality-preserving reads.** `TokioUringObjectStore` stays local to the 
caller's ring when already on a pool worker — no mpsc+oneshot round-trip for 
every `get_ranges`.
   - **`!Send`-safe.** `pool.spawn` takes a builder closure that runs on the 
target worker, so `tokio_uring::fs::File` never crosses threads.
   
   ## Known limitations
   
   - **Shuffle CPU work.** `RepartitionExec`'s hash/round-robin shuffling still 
runs on whichever worker first polled it. `PoolDispatchExec` distributes the 
*scan* CPU, not the shuffle. A cleaner fix would need DataFusion to expose a 
spawner hook.
   - **Cross-partition operator state.** Operators that share state across 
partitions (some `JoinExec` variants, etc.) behave identically to the MT path 
but are driven by the pool instead.
   - **Writes remain on `LocalFileSystem`** — the demo focuses on reads.
   
   ## Test plan
   
   - [x] `cargo check` on macOS (non-Linux path, LocalFS fallback).
   - [x] `cargo zigbuild --target x86_64-unknown-linux-gnu 
--no-default-features --lib --tests` (Linux `tokio-uring` path).
   - [x] `cargo clippy -p datafusion-benchmarks --all-targets -- -D warnings`.
   - [ ] Run `./target/release/dfbench clickbench -p /path/to/hits.parquet` on 
Linux and compare wall time against `--tokio-uring-workers 0`.
   - [ ] `perf stat -e io_uring:*` to confirm ring submissions during a 
ClickBench run.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to