matthewmturner commented on code in PR #13423:
URL: https://github.com/apache/datafusion/pull/13423#discussion_r1846635429


##########
datafusion/core/src/lib.rs:
##########
@@ -403,22 +403,183 @@
 //! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
 //! [Volcano style]: 
https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
 //! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
-//! [DataFusion paper submitted SIGMOD]: 
https://github.com/apache/datafusion/files/13874720/DataFusion_Query_Engine___SIGMOD_2024.pdf
+//! [DataFusion paper in SIGMOD 2024]: 
https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf
+//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583
 //! [implementors of `ExecutionPlan`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
 //!
-//! ## Thread Scheduling
+//! ## Streaming Execution
 //!
-//! DataFusion incrementally computes output from a 
[`SendableRecordBatchStream`]
-//! with `target_partitions` threads. Parallelism is implementing using 
multiple
-//! [Tokio] [`task`]s, which are executed by threads managed by a tokio 
Runtime.
-//! While tokio is most commonly used
-//! for asynchronous network I/O, its combination of an efficient, 
work-stealing
-//! scheduler, first class compiler support for automatic continuation 
generation,
-//! and exceptional performance makes it a compelling choice for CPU intensive
-//! applications as well. This is explained in more detail in [Using 
Rustlang’s Async Tokio
-//! Runtime for CPU-Bound Tasks].
+//! DataFusion is a "streaming" query engine which means `ExecutionPlan`s 
incrementally
+//! read from their input(s) and compute output one [`RecordBatch`] at a time
+//! by continually polling [`SendableRecordBatchStream`]s. Output and
+//! intermediate `RecordBatch`s each have approximately `batch_size` rows,
+//! which amortizes per-batch overhead of execution.
+//!
+//! Note that certain operations, sometimes called "pipeline breakers",
+//! (for example full sorts or hash aggregations) are fundamentally non 
streaming and
+//! must read their input fully before producing **any** output. As much as 
possible,
+//! other operators read a single [`RecordBatch`] from their input to produce a
+//! single `RecordBatch` as output.
+//!
+//! For example, given this SQL query:
+//!
+//! ```sql
+//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30);
+//! ```
+//!
+//! The diagram below shows the call sequence when a consumer calls [`next()`] 
to
+//! get the next `RecordBatch` of output. While it is possible that some
+//! steps run on different threads, typically tokio will use the same thread
+//! that called `next()` to read from the input, apply the filter, and
+//! return the results without interleaving any other operations. This results
+//! in excellent cache locality as the same CPU core that produces the data 
often
+//! consumes it immediately as well.
+//!
+//! ```text
+//!
+//! Step 3: FilterExec calls next()       Step 2: ProjectionExec calls
+//!         on input Stream                  next() on input Stream
+//!         ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+//!                            │                                               
Step 1: Consumer
+//!         ▼                        ▼                           │             
  calls next()
+//! ┏━━━━━━━━━━━━━━┓     ┏━━━━━┻━━━━━━━━━━━━━┓      ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
+//! ┃              ┃     ┃                   ┃      ┃                        ◀ 
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
+//! ┃  DataSource  ┃     ┃                   ┃      ┃                        ┃
+//! ┃    (e.g.     ┃     ┃    FilterExec     ┃      ┃     ProjectionExec     ┃
+//! ┃ ParquetExec) ┃     ┃id IN (10, 20, 30) ┃      ┃date_bin('month', time) ┃
+//! ┃              ┃     ┃                   ┃      ┃                        ┣ 
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
+//! ┃              ┃     ┃                   ┃      ┃                        ┃
+//! ┗━━━━━━━━━━━━━━┛     ┗━━━━━━━━━━━┳━━━━━━━┛      ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
+//!         │                  ▲                                 ▲          
Step 6: ProjectionExec
+//!                            │     │                           │        
computes date_trunc into a
+//!         └ ─ ─ ─ ─ ─ ─ ─ ─ ─       ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          new 
RecordBatch returned
+//!              ┌─────────────────────┐                ┌─────────────┐        
  from client
+//!              │     RecordBatch     │                │ RecordBatch │
+//!              └─────────────────────┘                └─────────────┘
+//!
+//!           Step 4: DataSource returns a        Step 5: FilterExec returns a 
new
+//!                single RecordBatch            RecordBatch with only 
matching rows
+//! ```
+//!
+//! [`next()`]: futures::StreamExt::next
+//!
+//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s
+//!
+//! DataFusion automatically runs each plan with multiple CPU cores using
+//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used
+//! for asynchronous network I/O, the combination of an efficient, 
work-stealing
+//! scheduler and first class compiler support for automatic continuation
+//! generation (`async`), also makes it a compelling choice for CPU intensive
+//! applications as explained in the [Using Rustlang’s Async Tokio
+//! Runtime for CPU-Bound Tasks] blog.
+//!
+//! The number of cores used is determined by the `target_partitions`
+//! configuration setting, which defaults to the number of CPU cores.
+//! During execution, DataFusion creates this many distinct `async` 
[`Stream`]s and
+//! this many distinct [Tokio] [`task`]s, which drive the `Stream`s

Review Comment:
   I was aware that `target_partitions` and number of `Stream`s matched the 
number of CPU cores but I was not aware that we cap the number of Tokio tasks 
as well.   Can you expand on that point / are we getting all the benefits of 
the tokio runtime (work stealing, etc) by keeping that number (relatively) low? 
 for context the local task queue size per thread is [256 
tasks](https://docs.rs/tokio/latest/tokio/runtime/index.html#multi-threaded-runtime-behavior-at-the-time-of-writing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to