Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809508850

   So to change it in your diff (didn't change the documentation).
   
   I would like to keep the original `StreamingMergeBuilder` case and the `if 
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so 
that we only have the "avoid concat for non-sort columns" optimization in place 
and see if this improves on all sort queries.
   
   ```diff
   
   -        // If less than sort_in_place_threshold_bytes, concatenate and sort 
in place
   -        if self.reservation.size() < self.sort_in_place_threshold_bytes {
   -            // Concatenate memory batches together and sort
   -            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   -            self.in_mem_batches.clear();
   -            self.reservation
   -                .try_resize(get_reserved_byte_for_record_batch(&batch))
   -                .map_err(Self::err_with_oom_context)?;
   -            let reservation = self.reservation.take();
   -            return self.sort_batch_stream(batch, metrics, reservation);
   +        let mut columns_by_expr: Vec<Vec<ArrayRef>> = vec![vec![]; 
self.expr.len()];
   +        for batch in &self.in_mem_batches {
   +            for (i, expr) in self.expr.iter().enumerate() {
   +                let col = expr.evaluate_to_sort_column(batch)?.values;
   +                columns_by_expr[i].push(col);
   +            }
            }
    
   -        let streams = std::mem::take(&mut self.in_mem_batches)
   -            .into_iter()
   -            .map(|batch| {
   -                let metrics = self.metrics.baseline.intermediate();
   -                let reservation = self
   -                    .reservation
   -                    .split(get_reserved_byte_for_record_batch(&batch));
   -                let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   -                Ok(spawn_buffered(input, 1))
   -            })
   -            .collect::<Result<_>>()?;
   +        // For each sort expression, concatenate arrays from all batches 
into one global array
   +        let mut sort_columns = Vec::with_capacity(self.expr.len());
   +        for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
   +            let array = concat(
   +                &arrays
   +                    .iter()
   +                    .map(|a| a.as_ref())
   +                    .collect::<Vec<&dyn Array>>(),
   +            )?;
   +            sort_columns.push(SortColumn {
   +                values: array,
   +                options: expr.options.into(),
   +            });
   +        }
    
   -        let expressions: LexOrdering = self.expr.iter().cloned().collect();
   +        // ===== Phase 2: Compute global sorted indices =====
   +        // Use `lexsort_to_indices` to get global row indices in sorted 
order (as if all batches were concatenated)
   +        let indices = lexsort_to_indices(&sort_columns, None)?;
    
   -        StreamingMergeBuilder::new()
   -            .with_streams(streams)
   -            .with_schema(Arc::clone(&self.schema))
   -            .with_expressions(expressions.as_ref())
   -            .with_metrics(metrics)
   -            .with_batch_size(self.batch_size)
   -            .with_fetch(None)
   -            .with_reservation(self.merge_reservation.new_empty())
   -            .build()
   +        // ===== Phase 3: Reorder each column using the global sorted 
indices =====
   +        let num_columns = self.schema.fields().len();
   +
   +        let batch_indices: Vec<(usize, usize)> = self
   +            .in_mem_batches
   +            .iter()
   +            .enumerate()
   +            .map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i| 
(batch_id, i)))
   +            .flatten()
   +            .collect();
   +
   +        // For each column:
   +        // 1. Concatenate all batch arrays for this column (in the same 
order as assumed by `lexsort_to_indices`)
   +        // 2. Use Arrow's `take` function to reorder the column by sorted 
indices
   +        let interleave_indices: Vec<(usize, usize)> = indices
   +            .values()
   +            .iter()
   +            .map(|x| batch_indices[*x as usize])
   +            .collect();
   +        // Build a RecordBatch from the sorted columns
   +
   +        let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
   +
   +        let sorted_batch =
   +            interleave_record_batch(batches.as_ref(), &interleave_indices)?;
   +        // Clear in-memory batches and update reservation
   +        self.in_mem_batches.clear();
   +        self.reservation
   +            .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
   +        let reservation = self.reservation.take();
   +
   +        // ===== Phase 4: Construct the resulting stream =====
   +        let stream = futures::stream::once(async move {
   +            let _timer = metrics.elapsed_compute().timer();
   +            metrics.record_output(sorted_batch.num_rows());
   +            drop(reservation);
   +            Ok(sorted_batch)
   +        });
   +
   +        Ok(Box::pin(RecordBatchStreamAdapter::new(
   +            self.schema.clone(),
   +            stream,
   +        )))
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to