zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2050395194


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -662,53 +665,152 @@ impl ExternalSorter {
         let elapsed_compute = metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer();
 
-        // Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-        // not perform any memory reservation. This is for avoiding the need 
of handling
-        // reservation failure and spilling in the middle of the sort/merge. 
The memory
-        // space for batches produced by the resulting stream will be reserved 
by the
-        // consumer of the stream.
+        // Note, in theory in memory batches should have limited size, but 
some testing
+        // cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+        // set a larger limit to avoid testing failure.
+        if self.expr.len() <= 2
+            && self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+        {
+            let interleave_indices = self.build_sorted_indices(
+                self.in_mem_batches.as_slice(),
+                Arc::clone(&self.expr),
+            )?;
 
-        if self.in_mem_batches.len() == 1 {
-            let batch = self.in_mem_batches.swap_remove(0);
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
-        }
+            let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+            let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-        if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
             self.in_mem_batches.clear();
             self.reservation
-                .try_resize(get_reserved_byte_for_record_batch(&batch))
+                .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
                 .map_err(Self::err_with_oom_context)?;
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+
+            metrics.record_output(sorted_batch.num_rows());
+
+            Ok(Box::pin(RecordBatchStreamAdapter::new(
+                Arc::clone(&self.schema),
+                futures::stream::once(async { Ok(sorted_batch) }),
+            )) as SendableRecordBatchStream)
+        } else {
+            // Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+            // not perform any memory reservation. This is for avoiding the 
need of handling
+            // reservation failure and spilling in the middle of the 
sort/merge. The memory
+            // space for batches produced by the resulting stream will be 
reserved by the
+            // consumer of the stream.
+
+            if self.in_mem_batches.len() == 1 {
+                let batch = self.in_mem_batches.swap_remove(0);
+                let reservation = self.reservation.take();
+                return self.sort_batch_stream(batch, metrics, reservation);
+            }
+
+            // If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+            if self.reservation.size() < self.sort_in_place_threshold_bytes {
+                // Concatenate memory batches together and sort
+                let batch = concat_batches(&self.schema, 
&self.in_mem_batches)?;

Review Comment:
   Attached the latest PR result:
   
   ```rust
   --------------------
   Benchmark sort_tpch1.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃     main ┃ concat_batches_for_sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Q1           │ 153.49ms │                141.31ms │ +1.09x faster │
   │ Q2           │ 131.29ms │                113.30ms │ +1.16x faster │
   │ Q3           │ 980.57ms │                986.14ms │     no change │
   │ Q4           │ 252.25ms │                215.03ms │ +1.17x faster │
   │ Q5           │ 464.81ms │                454.00ms │     no change │
   │ Q6           │ 481.44ms │                467.38ms │     no change │
   │ Q7           │ 810.73ms │                695.84ms │ +1.17x faster │
   │ Q8           │ 498.10ms │                502.47ms │     no change │
   │ Q9           │ 503.80ms │                506.87ms │     no change │
   │ Q10          │ 789.02ms │                709.34ms │ +1.11x faster │
   │ Q11          │ 417.39ms │                411.24ms │     no change │
   └──────────────┴──────────┴─────────────────────────┴───────────────┘
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
   ┃ Benchmark Summary                      ┃           ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
   │ Total Time (main)                      │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5202.89ms │
   │ Average Time (main)                    │  498.44ms │
   │ Average Time (concat_batches_for_sort) │  472.99ms │
   │ Queries Faster                         │         5 │
   │ Queries Slower                         │         0 │
   │ Queries with No Change                 │         6 │
   └────────────────────────────────────────┴───────────┘
   --------------------
   Benchmark sort_tpch10.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃       main ┃ concat_batches_for_sort ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Q1           │  2243.52ms │               1602.46ms │ +1.40x faster │
   │ Q2           │  1842.11ms │               1185.03ms │ +1.55x faster │
   │ Q3           │ 12446.31ms │              12203.05ms │     no change │
   │ Q4           │  4047.55ms │               2096.56ms │ +1.93x faster │
   │ Q5           │  4364.46ms │               4446.55ms │     no change │
   │ Q6           │  4561.01ms │               4592.95ms │     no change │
   │ Q7           │  8158.01ms │               7827.94ms │     no change │
   │ Q8           │  6077.40ms │               6379.98ms │     no change │
   │ Q9           │  6347.21ms │               6225.14ms │     no change │
   │ Q10          │ 11561.03ms │               9491.51ms │ +1.22x faster │
   │ Q11          │  6069.42ms │               5676.88ms │ +1.07x faster │
   └──────────────┴────────────┴─────────────────────────┴───────────────┘
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
   ┃ Benchmark Summary                      ┃            ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
   │ Total Time (main)                      │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 61728.05ms │
   │ Average Time (main)                    │  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  5611.64ms │
   │ Queries Faster                         │          5 │
   │ Queries Slower                         │          0 │
   │ Queries with No Change                 │          6 │
   └────────────────────────────────────────┴────────────┘
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to