andygrove opened a new pull request, #23282: URL: https://github.com/apache/datafusion/pull/23282
π§ I work on both Ballista and datafusion-distributed, and the same thing keeps biting downstream distributed engines: DataFusion's physical plans are written and tested assuming everything runs in one process, with all partitions of an operator polled together by cooperative threads on a shared runtime. Real distributed engines don't run that way β they cut the plan into stages, serialize each stage, and run each task in isolation with its own plan instance and its own task context. When a change quietly assumes the in-process model it still passes DataFusion's own tests, but it breaks Ballista and datafusion-distributed downstream, and we usually don't find out until much later. This PR adds a small, opt-in crate that models that execution style **in-process**: it breaks a query into stages, serializes each stage, and executes tasks in isolation over an in-memory exchange β without actually building a distributed system (no network, no gRPC, no separate processes). I'm hoping it's useful for two things: 1. As a **reference implementation / example** of how distributed execution works on top of DataFusion. It's a much smaller thing to read than a full engine, and it shows the parts that matter: breaking a query into stages, serializing plans, and executing tasks in isolation rather than with cooperative threads. 2. As a **regression guard**. Once we run a subset of these tests in CI, they'll catch the class of change that breaks isolated execution before it ships to downstream projects. It already caught one on its first run (a scan that reads the whole table when its partitions are executed in isolation β details in the AI section below). It's deliberately not trying to be another distributed engine β datafusion-distributed already fills that role well. This is meant to be the lightweight, dependency-free thing that can live in-tree and make life easier for the projects built on top of DataFusion. Opening as a **draft** to get early feedback on whether this is worth having in the repo and on the overall approach. --- π€ *The section below was drafted by an AI assistant to describe the change in detail.* ## Which issue does this PR close? No existing issue β this is a draft to gauge interest. Happy to file a tracking issue if there's appetite for it. ## Rationale for this change DataFusion operators are developed against a single in-process plan tree: one `Arc<dyn ExecutionPlan>` instance, state shared via `Arc`, and all partitions of an operator polled together on one runtime. Distributed engines built on DataFusion violate every one of those assumptions: - the plan is cut into **stages** at shuffle boundaries, and each stage is **serialized** (both Ballista and datafusion-distributed use `datafusion-proto`) and shipped elsewhere; - each task runs **one partition** of a stage, on its **own decoded plan instance**, with **no shared execution state** with sibling tasks; - data crosses stage boundaries as **serialized batches over a transport**, never passed by reference. There is currently no test surface in DataFusion that exercises these assumptions, so a change that quietly depends on the in-process model compiles, passes CI, and only breaks once it reaches a downstream engine. This crate provides that surface, in-process and with no transport dependencies, so it can eventually run in DataFusion's own CI. ## What changes are included in this PR? A new top-level crate, `datafusion-scheduler`, added to the workspace **`exclude`** list so it is **not** a workspace member β neither `cargo build` nor `cargo build --workspace` compiles it, and it has zero impact on normal builds/CI. It builds only when explicitly targeted (`cd datafusion-scheduler && cargo test`). The only change to existing files is adding the crate to `[workspace] exclude` in the root `Cargo.toml`. How it works: 1. **Stage splitting** (`create_stages`) walks a physical-optimized `ExecutionPlan` and cuts a new stage at each `RepartitionExec(Hash)`, `CoalescePartitionsExec`, and `SortPreservingMergeExec`, producing a `StageGraph` whose stage ids are allocated so producers precede consumers. 2. **Serialization + isolation** (the core invariant): each stage plan is encoded once via `datafusion-proto`, and **every task decodes its own fresh plan instance against its own `SessionContext`/`TaskContext`**. No `Arc` execution state is shared between tasks β this is what reproduces the distributed model. 3. **Streaming in-memory exchange**: `ExchangeSinkExec` hash-partitions its input and pushes **Arrow-IPC-encoded frames** into an `InMemoryExchange` of bounded channels; `ExchangeSourceExec` merges the producer channels and decodes them back. A custom `ExchangeCodec` (`PhysicalExtensionCodec`) serializes these two operators; the exchange itself is injected on decode, not serialized (the in-process analogue of a network endpoint). 4. **Executor**: a spawn-all, no-barrier model β every task of every stage is spawned concurrently and wired together by the exchange's bounded channels (backpressure, not a barrier), mirroring datafusion-distributed's pipelined streaming, minus gRPC. 5. **Differential harness**: `run_distributed(ctx, plan, config)` plus `assert_distributed_eq`, which asserts the distributed result equals single-node `collect(plan)`. It follows datafusion-distributed's streaming (no-disk, no-barrier) shape because that is the more general model, but the isolate-and-serialize contract it exercises is identical for Ballista, so a regression it catches applies to both. ### A regression it already caught The isolated-scan guard (`regression_isolated_scan_reads_only_its_partition`) reproduces a real behavior: a 4-file CSV scan (100 rows) returns **4Γ the correct aggregate** (`n=400`, `sum=64800` vs the true `100`/`16200`) under distributed execution, because the scan reads the *whole* table in each isolated per-partition task. It reproduces with a **bare DataFusion plan and a single `execute(0, ..)` call** β no distributed code involved β and traces to the morsel-driven file scan (`datafusion/datasource/src/morsel/`) handing file work from a pool shared across a plan instance's partitions, drained correctly only when all partitions are polled together. This is exactly the class of in-process assumption that breaks any engine running plan partitions as isolated tasks. The test is `#[ignore]`d with its assertion intact rather than weakened, as a caught regression to triage. ## Are these changes tested? Yes β the crate is almost entirely tests. Unit tests cover stage splitting (boundaries, topological id order, multi-input wiring), the exchange operators (streaming round-trip, tight-backpressure/no-deadlock, codec round-trip with overflow guards and partitioning fidelity), and executor error propagation. Integration tests assert `assert_distributed_eq` for hash join, sort-merge join, sort, and multi-stage aggregateβsort queries, plus a tight-backpressure (`channel_capacity = 1`) end-to-end streaming test. `cargo test` in the crate is green (18 passing, 2 intentionally `#[ignore]`d), with `cargo clippy --all-targets -- -D warnings` and `cargo fmt` clean. ## Are there any user-facing changes? None. The crate is excluded from the workspace and nothing in DataFusion depends on it, so normal builds, published crates, and CI are unaffected. It is opt-in only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
