This is an automated email from the ASF dual-hosted git repository.
ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 14635dab4e perf: Reuse row converter during sort (#15302)
14635dab4e is described below
commit 14635dab4ea0049bda36c5a9068138ab6fb2ecd2
Author: Yongting You <[email protected]>
AuthorDate: Sun Mar 30 12:14:14 2025 +0800
perf: Reuse row converter during sort (#15302)
* reuse row converter during sort
* review
* update submodule pin
---
datafusion/physical-plan/src/sorts/sort.rs | 100 +++++++++++++++++++++++++----
1 file changed, 88 insertions(+), 12 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index ed35492041..11c3212f53 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -49,8 +49,10 @@ use arrow::array::{
};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays,
SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
-use arrow::row::{RowConverter, SortField};
-use datafusion_common::{internal_datafusion_err, internal_err, Result};
+use arrow::row::{RowConverter, Rows, SortField};
+use datafusion_common::{
+ exec_datafusion_err, internal_datafusion_err, internal_err, Result,
+};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -203,6 +205,8 @@ struct ExternalSorter {
schema: SchemaRef,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
+ /// RowConverter corresponding to the sort expressions
+ sort_keys_row_converter: Arc<RowConverter>,
/// If Some, the maximum number of output rows that will be produced
fetch: Option<usize>,
/// The target number of rows for output batches
@@ -265,7 +269,7 @@ impl ExternalSorter {
sort_in_place_threshold_bytes: usize,
metrics: &ExecutionPlanMetricsSet,
runtime: Arc<RuntimeEnv>,
- ) -> Self {
+ ) -> Result<Self> {
let metrics = ExternalSorterMetrics::new(metrics, partition_id);
let reservation =
MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
.with_can_spill(true)
@@ -275,19 +279,36 @@ impl ExternalSorter {
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);
+ // Construct RowConverter for sort keys
+ let sort_fields = expr
+ .iter()
+ .map(|e| {
+ let data_type = e
+ .expr
+ .data_type(&schema)
+ .map_err(|e| e.context("Resolving sort expression data
type"))?;
+ Ok(SortField::new_with_options(data_type, e.options))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let converter = RowConverter::new(sort_fields).map_err(|e| {
+ exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
+ })?;
+
let spill_manager = SpillManager::new(
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
Arc::clone(&schema),
);
- Self {
+ Ok(Self {
schema,
in_mem_batches: vec![],
in_mem_batches_sorted: false,
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
+ sort_keys_row_converter: Arc::new(converter),
metrics,
fetch,
reservation,
@@ -297,7 +318,7 @@ impl ExternalSorter {
batch_size,
sort_spill_reservation_bytes,
sort_in_place_threshold_bytes,
- }
+ })
}
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
@@ -723,15 +744,29 @@ impl ExternalSorter {
let fetch = self.fetch;
let expressions: LexOrdering = self.expr.iter().cloned().collect();
- let stream = futures::stream::once(futures::future::lazy(move |_| {
- let timer = metrics.elapsed_compute().timer();
- let sorted = sort_batch(&batch, &expressions, fetch)?;
- timer.done();
+ let row_converter = Arc::clone(&self.sort_keys_row_converter);
+ let stream = futures::stream::once(async move {
+ let _timer = metrics.elapsed_compute().timer();
+
+ let sort_columns = expressions
+ .iter()
+ .map(|expr| expr.evaluate_to_sort_column(&batch))
+ .collect::<Result<Vec<_>>>()?;
+
+ let sorted = if is_multi_column_with_lists(&sort_columns) {
+ // lex_sort_to_indices doesn't support List with more than one
column
+ // https://github.com/apache/arrow-rs/issues/5454
+ sort_batch_row_based(&batch, &expressions, row_converter,
fetch)?
+ } else {
+ sort_batch(&batch, &expressions, fetch)?
+ };
+
metrics.record_output(sorted.num_rows());
drop(batch);
drop(reservation);
Ok(sorted)
- }));
+ });
+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
@@ -776,6 +811,45 @@ impl Debug for ExternalSorter {
}
}
+/// Converts rows into a sorted array of indices based on their order.
+/// This function returns the indices that represent the sorted order of the
rows.
+fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
+ let mut sort: Vec<_> = rows.iter().enumerate().collect();
+ sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+
+ let mut len = rows.num_rows();
+ if let Some(limit) = limit {
+ len = limit.min(len);
+ }
+ let indices =
+ UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as
u32));
+ Ok(indices)
+}
+
+/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format
for faster comparison.
+fn sort_batch_row_based(
+ batch: &RecordBatch,
+ expressions: &LexOrdering,
+ row_converter: Arc<RowConverter>,
+ fetch: Option<usize>,
+) -> Result<RecordBatch> {
+ let sort_columns = expressions
+ .iter()
+ .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
+ .collect::<Result<Vec<_>>>()?;
+ let rows = row_converter.convert_columns(&sort_columns)?;
+ let indices = rows_to_indices(rows, fetch)?;
+ let columns = take_arrays(batch.columns(), &indices, None)?;
+
+ let options =
RecordBatchOptions::new().with_row_count(Some(indices.len()));
+
+ Ok(RecordBatch::try_new_with_options(
+ batch.schema(),
+ columns,
+ &options,
+ )?)
+}
+
pub fn sort_batch(
batch: &RecordBatch,
expressions: &LexOrdering,
@@ -838,7 +912,9 @@ pub(crate) fn lexsort_to_indices_multi_columns(
},
);
- // TODO reuse converter and rows, refer to TopK.
+ // Note: row converter is reused through `sort_batch_row_based()`, this
function
+ // is not used during normal sort execution, but it's kept temporarily
because
+ // it's inside a public interface `sort_batch()`.
let converter = RowConverter::new(fields)?;
let rows = converter.convert_columns(&columns)?;
let mut sort: Vec<_> = rows.iter().enumerate().collect();
@@ -1154,7 +1230,7 @@ impl ExecutionPlan for SortExec {
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
- );
+ )?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]