This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch revert-15302-sort-reuse-row-converter
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 433a50a98a4b571e83befb371e15cc23fe3c2799
Author: Yongting You <[email protected]>
AuthorDate: Sun Mar 30 15:08:18 2025 +0800

    Revert "perf: Reuse row converter during sort (#15302)"
    
    This reverts commit 14635dab4ea0049bda36c5a9068138ab6fb2ecd2.
---
 datafusion/physical-plan/src/sorts/sort.rs | 100 ++++-------------------------
 1 file changed, 12 insertions(+), 88 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 11c3212f53..ed35492041 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -49,10 +49,8 @@ use arrow::array::{
 };
 use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
 use arrow::datatypes::{DataType, SchemaRef};
-use arrow::row::{RowConverter, Rows, SortField};
-use datafusion_common::{
-    exec_datafusion_err, internal_datafusion_err, internal_err, Result,
-};
+use arrow::row::{RowConverter, SortField};
+use datafusion_common::{internal_datafusion_err, internal_err, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
@@ -205,8 +203,6 @@ struct ExternalSorter {
     schema: SchemaRef,
     /// Sort expressions
     expr: Arc<[PhysicalSortExpr]>,
-    /// RowConverter corresponding to the sort expressions
-    sort_keys_row_converter: Arc<RowConverter>,
     /// If Some, the maximum number of output rows that will be produced
     fetch: Option<usize>,
     /// The target number of rows for output batches
@@ -269,7 +265,7 @@ impl ExternalSorter {
         sort_in_place_threshold_bytes: usize,
         metrics: &ExecutionPlanMetricsSet,
         runtime: Arc<RuntimeEnv>,
-    ) -> Result<Self> {
+    ) -> Self {
         let metrics = ExternalSorterMetrics::new(metrics, partition_id);
         let reservation = 
MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
             .with_can_spill(true)
@@ -279,36 +275,19 @@ impl ExternalSorter {
             MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
                 .register(&runtime.memory_pool);
 
-        // Construct RowConverter for sort keys
-        let sort_fields = expr
-            .iter()
-            .map(|e| {
-                let data_type = e
-                    .expr
-                    .data_type(&schema)
-                    .map_err(|e| e.context("Resolving sort expression data 
type"))?;
-                Ok(SortField::new_with_options(data_type, e.options))
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        let converter = RowConverter::new(sort_fields).map_err(|e| {
-            exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
-        })?;
-
         let spill_manager = SpillManager::new(
             Arc::clone(&runtime),
             metrics.spill_metrics.clone(),
             Arc::clone(&schema),
         );
 
-        Ok(Self {
+        Self {
             schema,
             in_mem_batches: vec![],
             in_mem_batches_sorted: false,
             in_progress_spill_file: None,
             finished_spill_files: vec![],
             expr: expr.into(),
-            sort_keys_row_converter: Arc::new(converter),
             metrics,
             fetch,
             reservation,
@@ -318,7 +297,7 @@ impl ExternalSorter {
             batch_size,
             sort_spill_reservation_bytes,
             sort_in_place_threshold_bytes,
-        })
+        }
     }
 
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
@@ -744,29 +723,15 @@ impl ExternalSorter {
 
         let fetch = self.fetch;
         let expressions: LexOrdering = self.expr.iter().cloned().collect();
-        let row_converter = Arc::clone(&self.sort_keys_row_converter);
-        let stream = futures::stream::once(async move {
-            let _timer = metrics.elapsed_compute().timer();
-
-            let sort_columns = expressions
-                .iter()
-                .map(|expr| expr.evaluate_to_sort_column(&batch))
-                .collect::<Result<Vec<_>>>()?;
-
-            let sorted = if is_multi_column_with_lists(&sort_columns) {
-                // lex_sort_to_indices doesn't support List with more than one 
column
-                // https://github.com/apache/arrow-rs/issues/5454
-                sort_batch_row_based(&batch, &expressions, row_converter, 
fetch)?
-            } else {
-                sort_batch(&batch, &expressions, fetch)?
-            };
-
+        let stream = futures::stream::once(futures::future::lazy(move |_| {
+            let timer = metrics.elapsed_compute().timer();
+            let sorted = sort_batch(&batch, &expressions, fetch)?;
+            timer.done();
             metrics.record_output(sorted.num_rows());
             drop(batch);
             drop(reservation);
             Ok(sorted)
-        });
-
+        }));
         Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
     }
 
@@ -811,45 +776,6 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// Converts rows into a sorted array of indices based on their order.
-/// This function returns the indices that represent the sorted order of the 
rows.
-fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
-    let mut sort: Vec<_> = rows.iter().enumerate().collect();
-    sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
-
-    let mut len = rows.num_rows();
-    if let Some(limit) = limit {
-        len = limit.min(len);
-    }
-    let indices =
-        UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as 
u32));
-    Ok(indices)
-}
-
-/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format 
for faster comparison.
-fn sort_batch_row_based(
-    batch: &RecordBatch,
-    expressions: &LexOrdering,
-    row_converter: Arc<RowConverter>,
-    fetch: Option<usize>,
-) -> Result<RecordBatch> {
-    let sort_columns = expressions
-        .iter()
-        .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
-        .collect::<Result<Vec<_>>>()?;
-    let rows = row_converter.convert_columns(&sort_columns)?;
-    let indices = rows_to_indices(rows, fetch)?;
-    let columns = take_arrays(batch.columns(), &indices, None)?;
-
-    let options = 
RecordBatchOptions::new().with_row_count(Some(indices.len()));
-
-    Ok(RecordBatch::try_new_with_options(
-        batch.schema(),
-        columns,
-        &options,
-    )?)
-}
-
 pub fn sort_batch(
     batch: &RecordBatch,
     expressions: &LexOrdering,
@@ -912,9 +838,7 @@ pub(crate) fn lexsort_to_indices_multi_columns(
         },
     );
 
-    // Note: row converter is reused through `sort_batch_row_based()`, this 
function
-    // is not used during normal sort execution, but it's kept temporarily 
because
-    // it's inside a public interface `sort_batch()`.
+    // TODO reuse converter and rows, refer to TopK.
     let converter = RowConverter::new(fields)?;
     let rows = converter.convert_columns(&columns)?;
     let mut sort: Vec<_> = rows.iter().enumerate().collect();
@@ -1230,7 +1154,7 @@ impl ExecutionPlan for SortExec {
                     execution_options.sort_in_place_threshold_bytes,
                     &self.metrics_set,
                     context.runtime_env(),
-                )?;
+                );
                 Ok(Box::pin(RecordBatchStreamAdapter::new(
                     self.schema(),
                     futures::stream::once(async move {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to