This is an automated email from the ASF dual-hosted git repository. ytyou pushed a commit to branch revert-15302-sort-reuse-row-converter in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 433a50a98a4b571e83befb371e15cc23fe3c2799 Author: Yongting You <[email protected]> AuthorDate: Sun Mar 30 15:08:18 2025 +0800 Revert "perf: Reuse row converter during sort (#15302)" This reverts commit 14635dab4ea0049bda36c5a9068138ab6fb2ecd2. --- datafusion/physical-plan/src/sorts/sort.rs | 100 ++++------------------------- 1 file changed, 12 insertions(+), 88 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 11c3212f53..ed35492041 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -49,10 +49,8 @@ use arrow::array::{ }; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, SchemaRef}; -use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::{ - exec_datafusion_err, internal_datafusion_err, internal_err, Result, -}; +use arrow::row::{RowConverter, SortField}; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; @@ -205,8 +203,6 @@ struct ExternalSorter { schema: SchemaRef, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, - /// RowConverter corresponding to the sort expressions - sort_keys_row_converter: Arc<RowConverter>, /// If Some, the maximum number of output rows that will be produced fetch: Option<usize>, /// The target number of rows for output batches @@ -269,7 +265,7 @@ impl ExternalSorter { sort_in_place_threshold_bytes: usize, metrics: &ExecutionPlanMetricsSet, runtime: Arc<RuntimeEnv>, - ) -> Result<Self> { + ) -> Self { let metrics = ExternalSorterMetrics::new(metrics, partition_id); let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) .with_can_spill(true) @@ -279,36 +275,19 @@ impl ExternalSorter { MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) .register(&runtime.memory_pool); - // Construct RowConverter for sort keys - let sort_fields = expr - .iter() - .map(|e| { - let data_type = e - .expr - .data_type(&schema) - .map_err(|e| e.context("Resolving sort expression data type"))?; - Ok(SortField::new_with_options(data_type, e.options)) - }) - .collect::<Result<Vec<_>>>()?; - - let converter = RowConverter::new(sort_fields).map_err(|e| { - exec_datafusion_err!("Failed to create RowConverter: {:?}", e) - })?; - let spill_manager = SpillManager::new( Arc::clone(&runtime), metrics.spill_metrics.clone(), Arc::clone(&schema), ); - Ok(Self { + Self { schema, in_mem_batches: vec![], in_mem_batches_sorted: false, in_progress_spill_file: None, finished_spill_files: vec![], expr: expr.into(), - sort_keys_row_converter: Arc::new(converter), metrics, fetch, reservation, @@ -318,7 +297,7 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, - }) + } } /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` @@ -744,29 +723,15 @@ impl ExternalSorter { let fetch = self.fetch; let expressions: LexOrdering = self.expr.iter().cloned().collect(); - let row_converter = Arc::clone(&self.sort_keys_row_converter); - let stream = futures::stream::once(async move { - let _timer = metrics.elapsed_compute().timer(); - - let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(&batch)) - .collect::<Result<Vec<_>>>()?; - - let sorted = if is_multi_column_with_lists(&sort_columns) { - // lex_sort_to_indices doesn't support List with more than one column - // https://github.com/apache/arrow-rs/issues/5454 - sort_batch_row_based(&batch, &expressions, row_converter, fetch)? - } else { - sort_batch(&batch, &expressions, fetch)? - }; - + let stream = futures::stream::once(futures::future::lazy(move |_| { + let timer = metrics.elapsed_compute().timer(); + let sorted = sort_batch(&batch, &expressions, fetch)?; + timer.done(); metrics.record_output(sorted.num_rows()); drop(batch); drop(reservation); Ok(sorted) - }); - + })); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } @@ -811,45 +776,6 @@ impl Debug for ExternalSorter { } } -/// Converts rows into a sorted array of indices based on their order. -/// This function returns the indices that represent the sorted order of the rows. -fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> { - let mut sort: Vec<_> = rows.iter().enumerate().collect(); - sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); - - let mut len = rows.num_rows(); - if let Some(limit) = limit { - len = limit.min(len); - } - let indices = - UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32)); - Ok(indices) -} - -/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format for faster comparison. -fn sort_batch_row_based( - batch: &RecordBatch, - expressions: &LexOrdering, - row_converter: Arc<RowConverter>, - fetch: Option<usize>, -) -> Result<RecordBatch> { - let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values)) - .collect::<Result<Vec<_>>>()?; - let rows = row_converter.convert_columns(&sort_columns)?; - let indices = rows_to_indices(rows, fetch)?; - let columns = take_arrays(batch.columns(), &indices, None)?; - - let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); - - Ok(RecordBatch::try_new_with_options( - batch.schema(), - columns, - &options, - )?) -} - pub fn sort_batch( batch: &RecordBatch, expressions: &LexOrdering, @@ -912,9 +838,7 @@ pub(crate) fn lexsort_to_indices_multi_columns( }, ); - // Note: row converter is reused through `sort_batch_row_based()`, this function - // is not used during normal sort execution, but it's kept temporarily because - // it's inside a public interface `sort_batch()`. + // TODO reuse converter and rows, refer to TopK. let converter = RowConverter::new(fields)?; let rows = converter.convert_columns(&columns)?; let mut sort: Vec<_> = rows.iter().enumerate().collect(); @@ -1230,7 +1154,7 @@ impl ExecutionPlan for SortExec { execution_options.sort_in_place_threshold_bytes, &self.metrics_set, context.runtime_env(), - )?; + ); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
