This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 14635dab4e perf: Reuse row converter during sort (#15302)
14635dab4e is described below

commit 14635dab4ea0049bda36c5a9068138ab6fb2ecd2
Author: Yongting You <[email protected]>
AuthorDate: Sun Mar 30 12:14:14 2025 +0800

    perf: Reuse row converter during sort (#15302)
    
    * reuse row converter during sort
    
    * review
    
    * update submodule pin
---
 datafusion/physical-plan/src/sorts/sort.rs | 100 +++++++++++++++++++++++++----
 1 file changed, 88 insertions(+), 12 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index ed35492041..11c3212f53 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -49,8 +49,10 @@ use arrow::array::{
 };
 use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
 use arrow::datatypes::{DataType, SchemaRef};
-use arrow::row::{RowConverter, SortField};
-use datafusion_common::{internal_datafusion_err, internal_err, Result};
+use arrow::row::{RowConverter, Rows, SortField};
+use datafusion_common::{
+    exec_datafusion_err, internal_datafusion_err, internal_err, Result,
+};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
@@ -203,6 +205,8 @@ struct ExternalSorter {
     schema: SchemaRef,
     /// Sort expressions
     expr: Arc<[PhysicalSortExpr]>,
+    /// RowConverter corresponding to the sort expressions
+    sort_keys_row_converter: Arc<RowConverter>,
     /// If Some, the maximum number of output rows that will be produced
     fetch: Option<usize>,
     /// The target number of rows for output batches
@@ -265,7 +269,7 @@ impl ExternalSorter {
         sort_in_place_threshold_bytes: usize,
         metrics: &ExecutionPlanMetricsSet,
         runtime: Arc<RuntimeEnv>,
-    ) -> Self {
+    ) -> Result<Self> {
         let metrics = ExternalSorterMetrics::new(metrics, partition_id);
         let reservation = 
MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
             .with_can_spill(true)
@@ -275,19 +279,36 @@ impl ExternalSorter {
             MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
                 .register(&runtime.memory_pool);
 
+        // Construct RowConverter for sort keys
+        let sort_fields = expr
+            .iter()
+            .map(|e| {
+                let data_type = e
+                    .expr
+                    .data_type(&schema)
+                    .map_err(|e| e.context("Resolving sort expression data 
type"))?;
+                Ok(SortField::new_with_options(data_type, e.options))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let converter = RowConverter::new(sort_fields).map_err(|e| {
+            exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
+        })?;
+
         let spill_manager = SpillManager::new(
             Arc::clone(&runtime),
             metrics.spill_metrics.clone(),
             Arc::clone(&schema),
         );
 
-        Self {
+        Ok(Self {
             schema,
             in_mem_batches: vec![],
             in_mem_batches_sorted: false,
             in_progress_spill_file: None,
             finished_spill_files: vec![],
             expr: expr.into(),
+            sort_keys_row_converter: Arc::new(converter),
             metrics,
             fetch,
             reservation,
@@ -297,7 +318,7 @@ impl ExternalSorter {
             batch_size,
             sort_spill_reservation_bytes,
             sort_in_place_threshold_bytes,
-        }
+        })
     }
 
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
@@ -723,15 +744,29 @@ impl ExternalSorter {
 
         let fetch = self.fetch;
         let expressions: LexOrdering = self.expr.iter().cloned().collect();
-        let stream = futures::stream::once(futures::future::lazy(move |_| {
-            let timer = metrics.elapsed_compute().timer();
-            let sorted = sort_batch(&batch, &expressions, fetch)?;
-            timer.done();
+        let row_converter = Arc::clone(&self.sort_keys_row_converter);
+        let stream = futures::stream::once(async move {
+            let _timer = metrics.elapsed_compute().timer();
+
+            let sort_columns = expressions
+                .iter()
+                .map(|expr| expr.evaluate_to_sort_column(&batch))
+                .collect::<Result<Vec<_>>>()?;
+
+            let sorted = if is_multi_column_with_lists(&sort_columns) {
+                // lex_sort_to_indices doesn't support List with more than one 
column
+                // https://github.com/apache/arrow-rs/issues/5454
+                sort_batch_row_based(&batch, &expressions, row_converter, 
fetch)?
+            } else {
+                sort_batch(&batch, &expressions, fetch)?
+            };
+
             metrics.record_output(sorted.num_rows());
             drop(batch);
             drop(reservation);
             Ok(sorted)
-        }));
+        });
+
         Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
     }
 
@@ -776,6 +811,45 @@ impl Debug for ExternalSorter {
     }
 }
 
+/// Converts rows into a sorted array of indices based on their order.
+/// This function returns the indices that represent the sorted order of the 
rows.
+fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
+    let mut sort: Vec<_> = rows.iter().enumerate().collect();
+    sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+
+    let mut len = rows.num_rows();
+    if let Some(limit) = limit {
+        len = limit.min(len);
+    }
+    let indices =
+        UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as 
u32));
+    Ok(indices)
+}
+
+/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format 
for faster comparison.
+fn sort_batch_row_based(
+    batch: &RecordBatch,
+    expressions: &LexOrdering,
+    row_converter: Arc<RowConverter>,
+    fetch: Option<usize>,
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
+        .iter()
+        .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
+        .collect::<Result<Vec<_>>>()?;
+    let rows = row_converter.convert_columns(&sort_columns)?;
+    let indices = rows_to_indices(rows, fetch)?;
+    let columns = take_arrays(batch.columns(), &indices, None)?;
+
+    let options = 
RecordBatchOptions::new().with_row_count(Some(indices.len()));
+
+    Ok(RecordBatch::try_new_with_options(
+        batch.schema(),
+        columns,
+        &options,
+    )?)
+}
+
 pub fn sort_batch(
     batch: &RecordBatch,
     expressions: &LexOrdering,
@@ -838,7 +912,9 @@ pub(crate) fn lexsort_to_indices_multi_columns(
         },
     );
 
-    // TODO reuse converter and rows, refer to TopK.
+    // Note: row converter is reused through `sort_batch_row_based()`, this 
function
+    // is not used during normal sort execution, but it's kept temporarily 
because
+    // it's inside a public interface `sort_batch()`.
     let converter = RowConverter::new(fields)?;
     let rows = converter.convert_columns(&columns)?;
     let mut sort: Vec<_> = rows.iter().enumerate().collect();
@@ -1154,7 +1230,7 @@ impl ExecutionPlan for SortExec {
                     execution_options.sort_in_place_threshold_bytes,
                     &self.metrics_set,
                     context.runtime_env(),
-                );
+                )?;
                 Ok(Box::pin(RecordBatchStreamAdapter::new(
                     self.schema(),
                     futures::stream::once(async move {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to