wiedld commented on code in PR #7379:
URL: https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1326939934


##########
datafusion/core/src/physical_plan/sorts/cascade.rs:
##########
@@ -0,0 +1,305 @@
+use crate::physical_plan::metrics::BaselineMetrics;
+use crate::physical_plan::sorts::builder::SortOrder;
+use crate::physical_plan::sorts::cursor::Cursor;
+use crate::physical_plan::sorts::merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::stream::{
+    BatchCursorStream, BatchTrackingStream, MergeStream, OffsetCursorStream,
+    YieldedCursorStream,
+};
+use crate::physical_plan::stream::ReceiverStream;
+use crate::physical_plan::RecordBatchStream;
+
+use super::batch_cursor::BatchId;
+
+use arrow::compute::interleave;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+use futures::{Stream, StreamExt};
+use std::collections::{HashMap, VecDeque};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+/// Sort preserving cascade stream
+///
+/// The cascade works as a tree of sort-preserving-merges, where each merge has
+/// a limited fan-in (number of inputs) and a limit size yielded (batch size) 
per poll.
+/// The poll is called from the root merge, which will poll its children, and 
so on.
+///
+/// ```text
+/// ┌─────┐                ┌─────┐                                             
              
+/// │  2  │                │  1  │                                             
              
+/// │  3  │                │  2  │                                             
              
+/// │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ─ ─ ─ ┐                            
              
+/// │  4  │                │  3  │                                             
              
+/// │  2  │                │  4  │                │                            
              
+/// └─────┘                └─────┘                                             
              
+/// ┌─────┐                ┌─────┐                ▼                            
              
+/// │  1  │                │  1  │                                             
              
+/// │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ─ ─ ─ ▶ merge  ─ ─ ─ ─                  
              
+/// │  1  │                │  4  │                           │                 
              
+/// └─────┘                └─────┘                                             
              
+///   ...                   ...                ...           ▼                 
              
+///                                                                            
              
+///                                                       merge  ─ ─ ─ ─ ─ ─ ▶ 
sorted output
+///                                                                            
   stream     
+///                                                          ▲                 
              
+///   ...                   ...                ...           │                 
              
+/// ┌─────┐                ┌─────┐                                             
              
+/// │  3  │                │  3  │                           │                 
              
+/// │  1  │─ ▶  sort  ─ ─ ▶│  1  │─ ─ ─ ─ ─ ─▶ merge  ─ ─ ─ ─                  
              
+/// └─────┘                └─────┘                                             
              
+/// ┌─────┐                ┌─────┐                ▲                            
              
+/// │  4  │                │  3  │                                             
              
+/// │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ─ ─ ─ ┘                            
              
+/// └─────┘                └─────┘                                             
              
+///                                                                            
              
+/// in_mem_batches                   do a series of merges that                
              
+///                                  each has a limited fan-in                 
              
+///                                  (number of inputs)                        
              
+/// ```
+///
+/// The cascade is built using a series of streams, each with a different 
purpose:
+///   * Streams leading into the leaf nodes:
+///      1. [`BatchCursorStream`] yields the initial cursors and batches. 
(e.g. a RowCursorStream)
+///      2. [`BatchTrackingStream`] collects the batches, to avoid passing 
those around. Yields a [`CursorStream`](super::stream::CursorStream).
+///      3. This initial CursorStream is for a number of partitions (e.g. 100).
+///      4. The initial single CursorStream is shared across multiple leaf 
nodes, using [`OffsetCursorStream`].

Review Comment:
   ~Not in the notes: reason why a single input CursorStream is shared across 
leaves, is such that they share the same RowConverter.~  See [updated 
comment](https://github.com/apache/arrow-datafusion/pull/7379#discussion_r1330506718).
   
   After the arrow-rs version bump, will try to slightly change this design. 
Goal is to remove the mutex around the `BatchTrackingStream` and have instead 
the lock be only on a `BatchTracker` consumed by the `OffsetCursorStream` (and 
ofc also consumed in the final interleave in the cascade stream root).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to