alamb commented on code in PR #3545:
URL: https://github.com/apache/arrow-datafusion/pull/3545#discussion_r976740799


##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -841,8 +842,21 @@ impl DefaultPhysicalPlanner {
                             )),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, 
*fetch)?))
-                }
+                    Ok(if fetch.is_some() && 
session_state.config.target_partitions > 1 {

Review Comment:
   ```suggestion
                       // If we have a `LIMIT` can run sort/limts in parallel 
(similar to TopK)
                       Ok(if fetch.is_some() && 
session_state.config.target_partitions > 1 {
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -374,44 +379,38 @@ fn get_sorted_iter(
             })
         })
         .collect::<Result<Vec<_>>>()?;
-    let indices = lexsort_to_indices(&sort_columns, None)?;
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    Ok(SortedIterator::new(indices, row_indices, batch_size))
+    // Calculate composite index based on sorted indices
+    let row_indices = indices
+        .values()
+        .iter()
+        .map(|i| row_indices[*i as usize])
+        .collect();
+
+    Ok(SortedIterator::new(row_indices, batch_size))
 }
 
 struct SortedIterator {
     /// Current logical position in the iterator
     pos: usize,
-    /// Indexes into the input representing the correctly sorted total output
-    indices: UInt32Array,
-    /// Map each each logical input index to where it can be found in the 
sorted input batches
+    /// Sorted composite index of where to find the rows in buffered batches

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -374,44 +379,38 @@ fn get_sorted_iter(
             })
         })
         .collect::<Result<Vec<_>>>()?;
-    let indices = lexsort_to_indices(&sort_columns, None)?;
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;

Review Comment:
   ✨ 👍 



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -273,6 +275,7 @@ impl MemoryConsumer for ExternalSorter {
             &self.expr,
             self.session_config.batch_size(),
             tracking_metrics,
+            self.fetch,

Review Comment:
   Although to be honest, I would hope that if there is a LIMIT on the query we 
could probably avoid the spilling entirely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to