andygrove opened a new pull request, #23282:
URL: https://github.com/apache/datafusion/pull/23282

   πŸ§‘ I work on both Ballista and datafusion-distributed, and the same thing 
keeps biting downstream distributed engines: DataFusion's physical plans are 
written and tested assuming everything runs in one process, with all partitions 
of an operator polled together by cooperative threads on a shared runtime. Real 
distributed engines don't run that way β€” they cut the plan into stages, 
serialize each stage, and run each task in isolation with its own plan instance 
and its own task context. When a change quietly assumes the in-process model it 
still passes DataFusion's own tests, but it breaks Ballista and 
datafusion-distributed downstream, and we usually don't find out until much 
later.
   
   This PR adds a small, opt-in crate that models that execution style 
**in-process**: it breaks a query into stages, serializes each stage, and 
executes tasks in isolation over an in-memory exchange β€” without actually 
building a distributed system (no network, no gRPC, no separate processes). I'm 
hoping it's useful for two things:
   
   1. As a **reference implementation / example** of how distributed execution 
works on top of DataFusion. It's a much smaller thing to read than a full 
engine, and it shows the parts that matter: breaking a query into stages, 
serializing plans, and executing tasks in isolation rather than with 
cooperative threads.
   2. As a **regression guard**. Once we run a subset of these tests in CI, 
they'll catch the class of change that breaks isolated execution before it 
ships to downstream projects. It already caught one on its first run (a scan 
that reads the whole table when its partitions are executed in isolation β€” 
details in the AI section below).
   
   It's deliberately not trying to be another distributed engine β€” 
datafusion-distributed already fills that role well. This is meant to be the 
lightweight, dependency-free thing that can live in-tree and make life easier 
for the projects built on top of DataFusion.
   
   Opening as a **draft** to get early feedback on whether this is worth having 
in the repo and on the overall approach.
   
   ---
   
   πŸ€– *The section below was drafted by an AI assistant to describe the change 
in detail.*
   
   ## Which issue does this PR close?
   
   No existing issue β€” this is a draft to gauge interest. Happy to file a 
tracking issue if there's appetite for it.
   
   ## Rationale for this change
   
   DataFusion operators are developed against a single in-process plan tree: 
one `Arc<dyn ExecutionPlan>` instance, state shared via `Arc`, and all 
partitions of an operator polled together on one runtime. Distributed engines 
built on DataFusion violate every one of those assumptions:
   
   - the plan is cut into **stages** at shuffle boundaries, and each stage is 
**serialized** (both Ballista and datafusion-distributed use 
`datafusion-proto`) and shipped elsewhere;
   - each task runs **one partition** of a stage, on its **own decoded plan 
instance**, with **no shared execution state** with sibling tasks;
   - data crosses stage boundaries as **serialized batches over a transport**, 
never passed by reference.
   
   There is currently no test surface in DataFusion that exercises these 
assumptions, so a change that quietly depends on the in-process model compiles, 
passes CI, and only breaks once it reaches a downstream engine. This crate 
provides that surface, in-process and with no transport dependencies, so it can 
eventually run in DataFusion's own CI.
   
   ## What changes are included in this PR?
   
   A new top-level crate, `datafusion-scheduler`, added to the workspace 
**`exclude`** list so it is **not** a workspace member β€” neither `cargo build` 
nor `cargo build --workspace` compiles it, and it has zero impact on normal 
builds/CI. It builds only when explicitly targeted (`cd datafusion-scheduler && 
cargo test`). The only change to existing files is adding the crate to 
`[workspace] exclude` in the root `Cargo.toml`.
   
   How it works:
   
   1. **Stage splitting** (`create_stages`) walks a physical-optimized 
`ExecutionPlan` and cuts a new stage at each `RepartitionExec(Hash)`, 
`CoalescePartitionsExec`, and `SortPreservingMergeExec`, producing a 
`StageGraph` whose stage ids are allocated so producers precede consumers.
   2. **Serialization + isolation** (the core invariant): each stage plan is 
encoded once via `datafusion-proto`, and **every task decodes its own fresh 
plan instance against its own `SessionContext`/`TaskContext`**. No `Arc` 
execution state is shared between tasks β€” this is what reproduces the 
distributed model.
   3. **Streaming in-memory exchange**: `ExchangeSinkExec` hash-partitions its 
input and pushes **Arrow-IPC-encoded frames** into an `InMemoryExchange` of 
bounded channels; `ExchangeSourceExec` merges the producer channels and decodes 
them back. A custom `ExchangeCodec` (`PhysicalExtensionCodec`) serializes these 
two operators; the exchange itself is injected on decode, not serialized (the 
in-process analogue of a network endpoint).
   4. **Executor**: a spawn-all, no-barrier model β€” every task of every stage 
is spawned concurrently and wired together by the exchange's bounded channels 
(backpressure, not a barrier), mirroring datafusion-distributed's pipelined 
streaming, minus gRPC.
   5. **Differential harness**: `run_distributed(ctx, plan, config)` plus 
`assert_distributed_eq`, which asserts the distributed result equals 
single-node `collect(plan)`.
   
   It follows datafusion-distributed's streaming (no-disk, no-barrier) shape 
because that is the more general model, but the isolate-and-serialize contract 
it exercises is identical for Ballista, so a regression it catches applies to 
both.
   
   ### A regression it already caught
   
   The isolated-scan guard 
(`regression_isolated_scan_reads_only_its_partition`) reproduces a real 
behavior: a 4-file CSV scan (100 rows) returns **4Γ— the correct aggregate** 
(`n=400`, `sum=64800` vs the true `100`/`16200`) under distributed execution, 
because the scan reads the *whole* table in each isolated per-partition task. 
It reproduces with a **bare DataFusion plan and a single `execute(0, ..)` 
call** β€” no distributed code involved β€” and traces to the morsel-driven file 
scan (`datafusion/datasource/src/morsel/`) handing file work from a pool shared 
across a plan instance's partitions, drained correctly only when all partitions 
are polled together. This is exactly the class of in-process assumption that 
breaks any engine running plan partitions as isolated tasks. The test is 
`#[ignore]`d with its assertion intact rather than weakened, as a caught 
regression to triage.
   
   ## Are these changes tested?
   
   Yes β€” the crate is almost entirely tests. Unit tests cover stage splitting 
(boundaries, topological id order, multi-input wiring), the exchange operators 
(streaming round-trip, tight-backpressure/no-deadlock, codec round-trip with 
overflow guards and partitioning fidelity), and executor error propagation. 
Integration tests assert `assert_distributed_eq` for hash join, sort-merge 
join, sort, and multi-stage aggregate→sort queries, plus a tight-backpressure 
(`channel_capacity = 1`) end-to-end streaming test. `cargo test` in the crate 
is green (18 passing, 2 intentionally `#[ignore]`d), with `cargo clippy 
--all-targets -- -D warnings` and `cargo fmt` clean.
   
   ## Are there any user-facing changes?
   
   None. The crate is excluded from the workspace and nothing in DataFusion 
depends on it, so normal builds, published crates, and CI are unaffected. It is 
opt-in only.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to