zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804216535

   Very interesting, firstly i now try merge all memory batch, and single sort, 
some query become crazy fast and some crazy slow, i think because:
   
   1. We sort in memory without merge, it's similar to sort single partition 
without partition parallel ?
   2. Previous some merge will have partition parallel?
   
   So next step, we can try to make the in memory sort with parallel?
   
   ```rust
   --------------------
   Benchmark sort_tpch10.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃       main ┃ concat_batches_for_sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Q1           │  2243.52ms │               1416.52ms │ +1.58x faster │
   │ Q2           │  1842.11ms │               1096.12ms │ +1.68x faster │
   │ Q3           │ 12446.31ms │              12535.45ms │     no change │
   │ Q4           │  4047.55ms │               1964.73ms │ +2.06x faster │
   │ Q5           │  4364.46ms │               5955.70ms │  1.36x slower │
   │ Q6           │  4561.01ms │               6275.39ms │  1.38x slower │
   │ Q7           │  8158.01ms │              19145.68ms │  2.35x slower │
   │ Q8           │  6077.40ms │               5146.80ms │ +1.18x faster │
   │ Q9           │  6347.21ms │               5544.48ms │ +1.14x faster │
   │ Q10          │ 11561.03ms │              23572.68ms │  2.04x slower │
   │ Q11          │  6069.42ms │               4810.88ms │ +1.26x faster │
   └──────────────┴────────────┴─────────────────────────┴───────────────┘
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
   ┃ Benchmark Summary                      ┃            ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
   │ Total Time (main)                      │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 87464.44ms │
   │ Average Time (main)                    │  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  7951.31ms │
   │ Queries Faster                         │          6 │
   │ Queries Slower                         │          4 │
   │ Queries with No Change                 │          1 │
   └────────────────────────────────────────┴────────────┘
   ```
   
   
   Patch tried:
   ```rust
   diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
   index 7fd1c2b16..ec3cd89f3 100644
   --- a/datafusion/physical-plan/src/sorts/sort.rs
   +++ b/datafusion/physical-plan/src/sorts/sort.rs
   @@ -671,85 +671,14 @@ impl ExternalSorter {
                return self.sort_batch_stream(batch, metrics, reservation);
            }
   
   -        // If less than sort_in_place_threshold_bytes, concatenate and sort 
in place
   -        if self.reservation.size() < self.sort_in_place_threshold_bytes {
   -            // Concatenate memory batches together and sort
   -            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   -            self.in_mem_batches.clear();
   -            self.reservation
   -                .try_resize(get_reserved_byte_for_record_batch(&batch))?;
   -            let reservation = self.reservation.take();
   -            return self.sort_batch_stream(batch, metrics, reservation);
   -        }
   -
   -        let mut merged_batches = Vec::new();
   -        let mut current_batches = Vec::new();
   -        let mut current_size = 0;
   -
   -        // Drain in_mem_batches using pop() to release memory earlier.
   -        // This avoids holding onto the entire vector during iteration.
   -        // Note:
   -        // Now we use `sort_in_place_threshold_bytes` to determine, in 
future we can make it more dynamic.
   -        while let Some(batch) = self.in_mem_batches.pop() {
   -            let batch_size = get_reserved_byte_for_record_batch(&batch);
   -
   -            // If adding this batch would exceed the memory threshold, 
merge current_batches.
   -            if current_size + batch_size > 
self.sort_in_place_threshold_bytes
   -                && !current_batches.is_empty()
   -            {
   -                // Merge accumulated batches into one.
   -                let merged = concat_batches(&self.schema, 
&current_batches)?;
   -                current_batches.clear();
   -
   -                // Update memory reservation.
   -                self.reservation.try_shrink(current_size)?;
   -                let merged_size = 
get_reserved_byte_for_record_batch(&merged);
   -                self.reservation.try_grow(merged_size)?;
   -
   -                merged_batches.push(merged);
   -                current_size = 0;
   -            }
   -
   -            current_batches.push(batch);
   -            current_size += batch_size;
   -        }
   -
   -        // Merge any remaining batches after the loop.
   -        if !current_batches.is_empty() {
   -            let merged = concat_batches(&self.schema, &current_batches)?;
   -            self.reservation.try_shrink(current_size)?;
   -            let merged_size = get_reserved_byte_for_record_batch(&merged);
   -            self.reservation.try_grow(merged_size)?;
   -            merged_batches.push(merged);
   -        }
   -
   -        // Create sorted streams directly without using spawn_buffered.
   -        // This allows for sorting to happen inline and enables earlier 
batch drop.
   -        let streams = merged_batches
   -            .into_iter()
   -            .map(|batch| {
   -                let metrics = self.metrics.baseline.intermediate();
   -                let reservation = self
   -                    .reservation
   -                    .split(get_reserved_byte_for_record_batch(&batch));
   -
   -                // Sort the batch inline.
   -                let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   -                Ok(input)
   -            })
   -            .collect::<Result<_>>()?;
   -
   -        let expressions: LexOrdering = self.expr.iter().cloned().collect();
   -
   -        StreamingMergeBuilder::new()
   -            .with_streams(streams)
   -            .with_schema(Arc::clone(&self.schema))
   -            .with_expressions(expressions.as_ref())
   -            .with_metrics(metrics)
   -            .with_batch_size(self.batch_size)
   -            .with_fetch(None)
   -            .with_reservation(self.merge_reservation.new_empty())
   -            .build()
   +        // Because batches are all in memory, we can sort them in place
   +        // Concatenate memory batches together and sort
   +        let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   +        self.in_mem_batches.clear();
   +        self.reservation
   +            .try_resize(get_reserved_byte_for_record_batch(&batch))?;
   +        let reservation = self.reservation.take();
   +        self.sort_batch_stream(batch, metrics, reservation)
        }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to