alamb commented on code in PR #13423: URL: https://github.com/apache/datafusion/pull/13423#discussion_r1847221528
########## datafusion/core/src/lib.rs: ########## @@ -403,22 +403,183 @@ //! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html //! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf //! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf -//! [DataFusion paper submitted SIGMOD]: https://github.com/apache/datafusion/files/13874720/DataFusion_Query_Engine___SIGMOD_2024.pdf +//! [DataFusion paper in SIGMOD 2024]: https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf +//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583 //! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors //! -//! ## Thread Scheduling +//! ## Streaming Execution //! -//! DataFusion incrementally computes output from a [`SendableRecordBatchStream`] -//! with `target_partitions` threads. Parallelism is implementing using multiple -//! [Tokio] [`task`]s, which are executed by threads managed by a tokio Runtime. -//! While tokio is most commonly used -//! for asynchronous network I/O, its combination of an efficient, work-stealing -//! scheduler, first class compiler support for automatic continuation generation, -//! and exceptional performance makes it a compelling choice for CPU intensive -//! applications as well. This is explained in more detail in [Using Rustlang’s Async Tokio -//! Runtime for CPU-Bound Tasks]. +//! DataFusion is a "streaming" query engine which means `ExecutionPlan`s incrementally +//! read from their input(s) and compute output one [`RecordBatch`] at a time +//! by continually polling [`SendableRecordBatchStream`]s. Output and +//! intermediate `RecordBatch`s each have approximately `batch_size` rows, +//! which amortizes per-batch overhead of execution. +//! +//! Note that certain operations, sometimes called "pipeline breakers", +//! (for example full sorts or hash aggregations) are fundamentally non streaming and +//! must read their input fully before producing **any** output. As much as possible, +//! other operators read a single [`RecordBatch`] from their input to produce a +//! single `RecordBatch` as output. +//! +//! For example, given this SQL query: +//! +//! ```sql +//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30); +//! ``` +//! +//! The diagram below shows the call sequence when a consumer calls [`next()`] to +//! get the next `RecordBatch` of output. While it is possible that some +//! steps run on different threads, typically tokio will use the same thread +//! that called `next()` to read from the input, apply the filter, and +//! return the results without interleaving any other operations. This results +//! in excellent cache locality as the same CPU core that produces the data often +//! consumes it immediately as well. +//! +//! ```text +//! +//! Step 3: FilterExec calls next() Step 2: ProjectionExec calls +//! on input Stream next() on input Stream +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +//! │ Step 1: Consumer +//! ▼ ▼ │ calls next() +//! ┏━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! ┃ DataSource ┃ ┃ ┃ ┃ ┃ +//! ┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃ +//! ┃ ParquetExec) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃ +//! ┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ +//! ┃ ┃ ┃ ┃ ┃ ┃ +//! ┗━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! │ ▲ ▲ Step 6: ProjectionExec +//! │ │ │ computes date_trunc into a +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned +//! ┌─────────────────────┐ ┌─────────────┐ from client +//! │ RecordBatch │ │ RecordBatch │ +//! └─────────────────────┘ └─────────────┘ +//! +//! Step 4: DataSource returns a Step 5: FilterExec returns a new +//! single RecordBatch RecordBatch with only matching rows +//! ``` +//! +//! [`next()`]: futures::StreamExt::next +//! +//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s +//! +//! DataFusion automatically runs each plan with multiple CPU cores using +//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used +//! for asynchronous network I/O, the combination of an efficient, work-stealing +//! scheduler and first class compiler support for automatic continuation +//! generation (`async`), also makes it a compelling choice for CPU intensive +//! applications as explained in the [Using Rustlang’s Async Tokio +//! Runtime for CPU-Bound Tasks] blog. Review Comment: yes -- I think it does (line 584) ########## datafusion/core/src/lib.rs: ########## @@ -403,22 +403,183 @@ //! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html //! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf //! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf -//! [DataFusion paper submitted SIGMOD]: https://github.com/apache/datafusion/files/13874720/DataFusion_Query_Engine___SIGMOD_2024.pdf +//! [DataFusion paper in SIGMOD 2024]: https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf +//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583 //! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors //! -//! ## Thread Scheduling +//! ## Streaming Execution //! -//! DataFusion incrementally computes output from a [`SendableRecordBatchStream`] -//! with `target_partitions` threads. Parallelism is implementing using multiple -//! [Tokio] [`task`]s, which are executed by threads managed by a tokio Runtime. -//! While tokio is most commonly used -//! for asynchronous network I/O, its combination of an efficient, work-stealing -//! scheduler, first class compiler support for automatic continuation generation, -//! and exceptional performance makes it a compelling choice for CPU intensive -//! applications as well. This is explained in more detail in [Using Rustlang’s Async Tokio -//! Runtime for CPU-Bound Tasks]. +//! DataFusion is a "streaming" query engine which means `ExecutionPlan`s incrementally +//! read from their input(s) and compute output one [`RecordBatch`] at a time +//! by continually polling [`SendableRecordBatchStream`]s. Output and +//! intermediate `RecordBatch`s each have approximately `batch_size` rows, +//! which amortizes per-batch overhead of execution. +//! +//! Note that certain operations, sometimes called "pipeline breakers", +//! (for example full sorts or hash aggregations) are fundamentally non streaming and +//! must read their input fully before producing **any** output. As much as possible, +//! other operators read a single [`RecordBatch`] from their input to produce a +//! single `RecordBatch` as output. +//! +//! For example, given this SQL query: +//! +//! ```sql +//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30); +//! ``` +//! +//! The diagram below shows the call sequence when a consumer calls [`next()`] to +//! get the next `RecordBatch` of output. While it is possible that some +//! steps run on different threads, typically tokio will use the same thread +//! that called `next()` to read from the input, apply the filter, and +//! return the results without interleaving any other operations. This results +//! in excellent cache locality as the same CPU core that produces the data often +//! consumes it immediately as well. +//! +//! ```text +//! +//! Step 3: FilterExec calls next() Step 2: ProjectionExec calls +//! on input Stream next() on input Stream +//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +//! │ Step 1: Consumer +//! ▼ ▼ │ calls next() +//! ┏━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! ┃ DataSource ┃ ┃ ┃ ┃ ┃ +//! ┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃ +//! ┃ ParquetExec) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃ +//! ┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ +//! ┃ ┃ ┃ ┃ ┃ ┃ +//! ┗━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! │ ▲ ▲ Step 6: ProjectionExec +//! │ │ │ computes date_trunc into a +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned +//! ┌─────────────────────┐ ┌─────────────┐ from client +//! │ RecordBatch │ │ RecordBatch │ +//! └─────────────────────┘ └─────────────┘ +//! +//! Step 4: DataSource returns a Step 5: FilterExec returns a new +//! single RecordBatch RecordBatch with only matching rows +//! ``` +//! +//! [`next()`]: futures::StreamExt::next +//! +//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s +//! +//! DataFusion automatically runs each plan with multiple CPU cores using +//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used +//! for asynchronous network I/O, the combination of an efficient, work-stealing +//! scheduler and first class compiler support for automatic continuation +//! generation (`async`), also makes it a compelling choice for CPU intensive +//! applications as explained in the [Using Rustlang’s Async Tokio +//! Runtime for CPU-Bound Tasks] blog. +//! +//! The number of cores used is determined by the `target_partitions` +//! configuration setting, which defaults to the number of CPU cores. +//! During execution, DataFusion creates this many distinct `async` [`Stream`]s and +//! this many distinct [Tokio] [`task`]s, which drive the `Stream`s Review Comment: I think it is somewhat nuanced -- what happens is that certain operations (like `RepartionExec` spawn tasks to read from the inputs. As written I now see this sentence is somewhat misleading. I will try and clarify in a follow on PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org