Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809508850
So to change it in your diff (didn't change the documentation). I would like to keep the original `StreamingMergeBuilder` case and the `if self.reservation.size() < self.sort_in_place_threshold_bytes` expression so that we only have the "avoid concat for non-sort columns" optimization in place and see if this improves on all sort queries. ```diff - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); - self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) - .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + let mut columns_by_expr: Vec<Vec<ArrayRef>> = vec![vec![]; self.expr.len()]; + for batch in &self.in_mem_batches { + for (i, expr) in self.expr.iter().enumerate() { + let col = expr.evaluate_to_sort_column(batch)?.values; + columns_by_expr[i].push(col); + } } - let streams = std::mem::take(&mut self.in_mem_batches) - .into_iter() - .map(|batch| { - let metrics = self.metrics.baseline.intermediate(); - let reservation = self - .reservation - .split(get_reserved_byte_for_record_batch(&batch)); - let input = self.sort_batch_stream(batch, metrics, reservation)?; - Ok(spawn_buffered(input, 1)) - }) - .collect::<Result<_>>()?; + // For each sort expression, concatenate arrays from all batches into one global array + let mut sort_columns = Vec::with_capacity(self.expr.len()); + for (arrays, expr) in columns_by_expr.into_iter().zip(self.expr.iter()) { + let array = concat( + &arrays + .iter() + .map(|a| a.as_ref()) + .collect::<Vec<&dyn Array>>(), + )?; + sort_columns.push(SortColumn { + values: array, + options: expr.options.into(), + }); + } - let expressions: LexOrdering = self.expr.iter().cloned().collect(); + // ===== Phase 2: Compute global sorted indices ===== + // Use `lexsort_to_indices` to get global row indices in sorted order (as if all batches were concatenated) + let indices = lexsort_to_indices(&sort_columns, None)?; - StreamingMergeBuilder::new() - .with_streams(streams) - .with_schema(Arc::clone(&self.schema)) - .with_expressions(expressions.as_ref()) - .with_metrics(metrics) - .with_batch_size(self.batch_size) - .with_fetch(None) - .with_reservation(self.merge_reservation.new_empty()) - .build() + // ===== Phase 3: Reorder each column using the global sorted indices ===== + let num_columns = self.schema.fields().len(); + + let batch_indices: Vec<(usize, usize)> = self + .in_mem_batches + .iter() + .enumerate() + .map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i| (batch_id, i))) + .flatten() + .collect(); + + // For each column: + // 1. Concatenate all batch arrays for this column (in the same order as assumed by `lexsort_to_indices`) + // 2. Use Arrow's `take` function to reorder the column by sorted indices + let interleave_indices: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|x| batch_indices[*x as usize]) + .collect(); + // Build a RecordBatch from the sorted columns + + let batches: Vec<&RecordBatch> = self.in_mem_batches.iter().collect(); + + let sorted_batch = + interleave_record_batch(batches.as_ref(), &interleave_indices)?; + // Clear in-memory batches and update reservation + self.in_mem_batches.clear(); + self.reservation + .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?; + let reservation = self.reservation.take(); + + // ===== Phase 4: Construct the resulting stream ===== + let stream = futures::stream::once(async move { + let _timer = metrics.elapsed_compute().timer(); + metrics.record_output(sorted_batch.num_rows()); + drop(reservation); + Ok(sorted_batch) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + stream, + ))) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org