alamb commented on code in PR #3530:
URL: https://github.com/apache/arrow-datafusion/pull/3530#discussion_r975174316


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
     batch: RecordBatch,
     schema: SchemaRef,
     expr: &[PhysicalSortExpr],
+    fetch: Option<usize>,
 ) -> ArrowResult<BatchWithSortArray> {
-    // TODO: pushup the limit expression to sort
     let sort_columns = expr
         .iter()
         .map(|e| e.evaluate_to_sort_column(&batch))
         .collect::<Result<Vec<SortColumn>>>()?;
 
-    let indices = lexsort_to_indices(&sort_columns, None)?;
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;

Review Comment:
   nice



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -657,15 +660,18 @@ pub struct SortExec {
     metrics_set: CompositeMetricsSet,
     /// Preserve partitions of input plan
     preserve_partitioning: bool,
+    /// Fetch highest/lowest n results

Review Comment:
   I see -- this seems like it it now has the information plumbed to the 
SortExec to implement "TopK" within the physical operator's implementation. 👍 
   
   Very cool



##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -438,6 +461,44 @@ mod test {
         Ok(())
     }
 
+    #[test]
+    fn limit_push_down_sort() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .sort(vec![col("a")])?
+            .limit(0, Some(10))?
+            .build()?;
+
+        // Should push down limit to sort
+        let expected = "Limit: skip=0, fetch=10\
+        \n  Sort: #test.a, fetch=10\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_push_down_sort_skip() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .sort(vec![col("a")])?
+            .limit(5, Some(10))?
+            .build()?;
+
+        // Should push down limit to sort
+        let expected = "Limit: skip=5, fetch=10\
+        \n  Sort: #test.a, fetch=15\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+

Review Comment:
   Do we have test coverage showing that the limit isn't pushed to the children 
of the sort?



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
     batch: RecordBatch,
     schema: SchemaRef,
     expr: &[PhysicalSortExpr],
+    fetch: Option<usize>,
 ) -> ArrowResult<BatchWithSortArray> {
-    // TODO: pushup the limit expression to sort
     let sort_columns = expr
         .iter()
         .map(|e| e.evaluate_to_sort_column(&batch))
         .collect::<Result<Vec<SortColumn>>>()?;
 
-    let indices = lexsort_to_indices(&sort_columns, None)?;
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;

Review Comment:
   In fact, I wonder if you could also apply the limit here:
   
   
https://github.com/apache/arrow-datafusion/blob/3a9e0d0/datafusion/core/src/physical_plan/sorts/sort.rs#L123-L124
   
   as part of sorting each batch -- rather than keeping the entire input batch, 
we only need to keep at most `fetch` rows from each batch. 



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -697,6 +705,11 @@ impl SortExec {
     pub fn expr(&self) -> &[PhysicalSortExpr] {
         &self.expr
     }
+
+    /// Number of items to fetch

Review Comment:
   ```suggestion
       /// If `Some(fetch)`, limits output to only the first "fetch" items
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
     batch: RecordBatch,
     schema: SchemaRef,
     expr: &[PhysicalSortExpr],
+    fetch: Option<usize>,
 ) -> ArrowResult<BatchWithSortArray> {
-    // TODO: pushup the limit expression to sort
     let sort_columns = expr
         .iter()
         .map(|e| e.evaluate_to_sort_column(&batch))
         .collect::<Result<Vec<SortColumn>>>()?;
 
-    let indices = lexsort_to_indices(&sort_columns, None)?;
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;

Review Comment:
   I wonder if this will effectively get us much of the benefit of a special 
TopK operator as we don't have to copy the entire input -- we only copy the 
`fetch` limit, if specified
   
   Although I suppose `SortExec` still buffers all of its input where a TopK 
could buffer them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to