zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809034811

   
   
   ```rust
   /bench.sh  compare main concat_batches_for_sort
   Comparing main and concat_batches_for_sort
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/clickbench_1.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/clickbench_1.json
 does not exist
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/h2o_join.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/h2o_join.json
 does not exist
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/sort_tpch.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/sort_tpch.json
 does not exist
   --------------------
   Benchmark sort_tpch1.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃     main ┃ concat_batches_for_sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Q1           │ 153.49ms │                142.66ms │ +1.08x faster │
   │ Q2           │ 131.29ms │                110.46ms │ +1.19x faster │
   │ Q3           │ 980.57ms │                959.09ms │     no change │
   │ Q4           │ 252.25ms │                207.46ms │ +1.22x faster │
   │ Q5           │ 464.81ms │                561.74ms │  1.21x slower │
   │ Q6           │ 481.44ms │                582.54ms │  1.21x slower │
   │ Q7           │ 810.73ms │                910.22ms │  1.12x slower │
   │ Q8           │ 498.10ms │                473.39ms │     no change │
   │ Q9           │ 503.80ms │                492.94ms │     no change │
   │ Q10          │ 789.02ms │                833.81ms │  1.06x slower │
   │ Q11          │ 417.39ms │                401.60ms │     no change │
   └──────────────┴──────────┴─────────────────────────┴───────────────┘
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
   ┃ Benchmark Summary                      ┃           ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
   │ Total Time (main)                      │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5675.92ms │
   │ Average Time (main)                    │  498.44ms │
   │ Average Time (concat_batches_for_sort) │  515.99ms │
   │ Queries Faster                         │         3 │
   │ Queries Slower                         │         4 │
   │ Queries with No Change                 │         4 │
   └────────────────────────────────────────┴───────────┘
   --------------------
   Benchmark sort_tpch10.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃       main ┃ concat_batches_for_sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Q1           │  2243.52ms │               1412.73ms │ +1.59x faster │
   │ Q2           │  1842.11ms │               1113.81ms │ +1.65x faster │
   │ Q3           │ 12446.31ms │              12687.14ms │     no change │
   │ Q4           │  4047.55ms │               1891.10ms │ +2.14x faster │
   │ Q5           │  4364.46ms │               6024.94ms │  1.38x slower │
   │ Q6           │  4561.01ms │               6281.17ms │  1.38x slower │
   │ Q7           │  8158.01ms │              13184.06ms │  1.62x slower │
   │ Q8           │  6077.40ms │               5277.44ms │ +1.15x faster │
   │ Q9           │  6347.21ms │               5308.08ms │ +1.20x faster │
   │ Q10          │ 11561.03ms │              22213.97ms │  1.92x slower │
   │ Q11          │  6069.42ms │               4524.58ms │ +1.34x faster │
   └──────────────┴────────────┴─────────────────────────┴───────────────┘
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
   ┃ Benchmark Summary                      ┃            ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
   │ Total Time (main)                      │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 79918.99ms │
   │ Average Time (main)                    │  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  7265.36ms │
   │ Queries Faster                         │          6 │
   │ Queries Slower                         │          4 │
   │ Queries with No Change                 │          1 │
   └────────────────────────────────────────┴────────────┘
   ```
   
   > I wonder if we can make a more "simple" change for now:
   > 
   > * `concat` regresses because it copies the _all columns_ of the 
recordbatch before sorting.
   > * We can concat the sorting columns instead + generate a Vec of 
`(batch_id, row_id)` and map output of `lexsort_to_indices` back to the 
original values.
   > 
   > Doing this I think will benefit existing sorting without (too much) 
regressions. we probably try increase the in memory threshold value 
(`sort_in_place_threshold_bytes`).
   
   Did a POC of this way:
   
   ```diff
   diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
   index 7fd1c2b16..4cebbb443 100644
   --- a/datafusion/physical-plan/src/sorts/sort.rs
   +++ b/datafusion/physical-plan/src/sorts/sort.rs
   @@ -43,10 +43,8 @@ use crate::{
        Statistics,
    };
   
   -use arrow::array::{
   -    Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
   -};
   -use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
   +use arrow::array::{Array, ArrayRef, PrimitiveArray, RecordBatch, 
RecordBatchOptions, StringViewArray, UInt32Array};
   +use arrow::compute::{concat, concat_batches, lexsort_to_indices, take, 
take_arrays, SortColumn};
    use arrow::datatypes::{DataType, SchemaRef};
    use arrow::row::{RowConverter, Rows, SortField};
    use datafusion_common::{
   @@ -61,6 +59,7 @@ use 
datafusion_physical_expr_common::sort_expr::LexRequirement;
   
    use futures::{StreamExt, TryStreamExt};
    use log::{debug, trace};
   +use crate::memory::MemoryStream;
   
    struct ExternalSorterMetrics {
        /// metrics
   @@ -644,112 +643,98 @@ impl ExternalSorter {
        ///
        ///   in_mem_batches
        /// ```
   -    fn in_mem_sort_stream(
   +    pub fn in_mem_sort_stream(
            &mut self,
            metrics: BaselineMetrics,
        ) -> Result<SendableRecordBatchStream> {
   +        // If there are no in-memory batches, return an empty stream
            if self.in_mem_batches.is_empty() {
   -            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
   -                &self.schema,
   -            ))));
   +            return 
Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema))));
            }
   
   -        // The elapsed compute timer is updated when the value is dropped.
   -        // There is no need for an explicit call to drop.
   +        // Timer: automatically records elapsed_compute on drop
            let elapsed_compute = metrics.elapsed_compute().clone();
            let _timer = elapsed_compute.timer();
   
   -        // Please pay attention that any operation inside of 
`in_mem_sort_stream` will
   -        // not perform any memory reservation. This is for avoiding the 
need of handling
   -        // reservation failure and spilling in the middle of the 
sort/merge. The memory
   -        // space for batches produced by the resulting stream will be 
reserved by the
   -        // consumer of the stream.
   -
   -        if self.in_mem_batches.len() == 1 {
   -            let batch = self.in_mem_batches.swap_remove(0);
   -            let reservation = self.reservation.take();
   -            return self.sort_batch_stream(batch, metrics, reservation);
   +        // // If only one batch exists, delegate to sort_batch_stream 
directly
   +        // if self.in_mem_batches.len() == 1 {
   +        //     let batch = self.in_mem_batches.swap_remove(0);
   +        //     let reservation = self.reservation.take();
   +        //     return self.sort_batch_stream(batch, metrics, reservation);
   +        // }
   +
   +        // ===== Phase 1: Build global sort columns for each sort 
expression =====
   +        // For each sort expression, evaluate and collect the corresponding 
sort column from each in-memory batch
   +        // Here, `self.expr` is a list of sort expressions, each providing 
`evaluate_to_sort_column()`,
   +        // which returns an ArrayRef (in `.values`) and sort options 
(`options`)
   +        let mut columns_by_expr: Vec<Vec<ArrayRef>> = vec![vec![]; 
self.expr.len()];
   +        let mut total_rows = 0;
   +        for batch in &self.in_mem_batches {
   +            for (i, expr) in self.expr.iter().enumerate() {
   +                let col = expr.evaluate_to_sort_column(batch)?.values;
   +                columns_by_expr[i].push(col);
   +            }
   +            total_rows += batch.num_rows();
            }
   
   -        // If less than sort_in_place_threshold_bytes, concatenate and sort 
in place
   -        if self.reservation.size() < self.sort_in_place_threshold_bytes {
   -            // Concatenate memory batches together and sort
   -            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   -            self.in_mem_batches.clear();
   -            self.reservation
   -                .try_resize(get_reserved_byte_for_record_batch(&batch))?;
   -            let reservation = self.reservation.take();
   -            return self.sort_batch_stream(batch, metrics, reservation);
   +        // For each sort expression, concatenate arrays from all batches 
into one global array
   +        let mut sort_columns = Vec::with_capacity(self.expr.len());
   +        for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
   +            let array = concat(&arrays.iter().map(|a| 
a.as_ref()).collect::<Vec<&dyn Array>>())?;
   +            sort_columns.push(SortColumn {
   +                values: array,
   +                options: expr.options.into(),
   +            });
            }
   
   -        let mut merged_batches = Vec::new();
   -        let mut current_batches = Vec::new();
   -        let mut current_size = 0;
   -
   -        // Drain in_mem_batches using pop() to release memory earlier.
   -        // This avoids holding onto the entire vector during iteration.
   -        // Note:
   -        // Now we use `sort_in_place_threshold_bytes` to determine, in 
future we can make it more dynamic.
   -        while let Some(batch) = self.in_mem_batches.pop() {
   -            let batch_size = get_reserved_byte_for_record_batch(&batch);
   -
   -            // If adding this batch would exceed the memory threshold, 
merge current_batches.
   -            if current_size + batch_size > 
self.sort_in_place_threshold_bytes
   -                && !current_batches.is_empty()
   -            {
   -                // Merge accumulated batches into one.
   -                let merged = concat_batches(&self.schema, 
&current_batches)?;
   -                current_batches.clear();
   -
   -                // Update memory reservation.
   -                self.reservation.try_shrink(current_size)?;
   -                let merged_size = 
get_reserved_byte_for_record_batch(&merged);
   -                self.reservation.try_grow(merged_size)?;
   -
   -                merged_batches.push(merged);
   -                current_size = 0;
   -            }
   +        // ===== Phase 2: Compute global sorted indices =====
   +        // Use `lexsort_to_indices` to get global row indices in sorted 
order (as if all batches were concatenated)
   +        let indices = lexsort_to_indices(&sort_columns, None)?;
   
   -            current_batches.push(batch);
   -            current_size += batch_size;
   -        }
   +        // Convert indices to UInt32Array, assuming all are Some(u32)
   +        let indices_array = {
   +            let indices_nums: Vec<u32> = indices
   +                .iter()
   +                .map(|opt| opt.expect("Sort indices should not be None"))
   +                .collect();
   +            UInt32Array::from(indices_nums)
   +        };
   +
   +        // ===== Phase 3: Reorder each column using the global sorted 
indices =====
   +        let num_columns = self.schema.fields().len();
   +        let mut sorted_columns: Vec<ArrayRef> = 
Vec::with_capacity(num_columns);
   
   -        // Merge any remaining batches after the loop.
   -        if !current_batches.is_empty() {
   -            let merged = concat_batches(&self.schema, &current_batches)?;
   -            self.reservation.try_shrink(current_size)?;
   -            let merged_size = get_reserved_byte_for_record_batch(&merged);
   -            self.reservation.try_grow(merged_size)?;
   -            merged_batches.push(merged);
   +        // For each column:
   +        // 1. Concatenate all batch arrays for this column (in the same 
order as assumed by `lexsort_to_indices`)
   +        // 2. Use Arrow's `take` function to reorder the column by sorted 
indices
   +        for col_idx in 0..num_columns {
   +            let arrays: Vec<&dyn Array> = self
   +                .in_mem_batches
   +                .iter()
   +                .map(|batch| batch.column(col_idx).as_ref())
   +                .collect();
   +            let concatenated = concat(&arrays)?;
   +            let sorted = take(&concatenated, &indices_array, None)?;
   +            sorted_columns.push(sorted);
            }
   
   -        // Create sorted streams directly without using spawn_buffered.
   -        // This allows for sorting to happen inline and enables earlier 
batch drop.
   -        let streams = merged_batches
   -            .into_iter()
   -            .map(|batch| {
   -                let metrics = self.metrics.baseline.intermediate();
   -                let reservation = self
   -                    .reservation
   -                    .split(get_reserved_byte_for_record_batch(&batch));
   -
   -                // Sort the batch inline.
   -                let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   -                Ok(input)
   -            })
   -            .collect::<Result<_>>()?;
   +        // Build a RecordBatch from the sorted columns
   +        let sorted_batch = RecordBatch::try_new(self.schema.clone(), 
sorted_columns)?;
   
   -        let expressions: LexOrdering = self.expr.iter().cloned().collect();
   +        // Clear in-memory batches and update reservation
   +        self.in_mem_batches.clear();
   +        
self.reservation.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
   +        let reservation = self.reservation.take();
   +
   +        // ===== Phase 4: Construct the resulting stream =====
   +        let stream = futures::stream::once(async move {
   +            let _timer = metrics.elapsed_compute().timer();
   +            metrics.record_output(sorted_batch.num_rows());
   +            drop(reservation);
   +            Ok(sorted_batch)
   +        });
   
   -        StreamingMergeBuilder::new()
   -            .with_streams(streams)
   -            .with_schema(Arc::clone(&self.schema))
   -            .with_expressions(expressions.as_ref())
   -            .with_metrics(metrics)
   -            .with_batch_size(self.batch_size)
   -            .with_fetch(None)
   -            .with_reservation(self.merge_reservation.new_empty())
   -            .build()
   +        Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
        }
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to