alamb commented on code in PR #3530:
URL: https://github.com/apache/arrow-datafusion/pull/3530#discussion_r975174316
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
batch: RecordBatch,
schema: SchemaRef,
expr: &[PhysicalSortExpr],
+ fetch: Option<usize>,
) -> ArrowResult<BatchWithSortArray> {
- // TODO: pushup the limit expression to sort
let sort_columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;
- let indices = lexsort_to_indices(&sort_columns, None)?;
+ let indices = lexsort_to_indices(&sort_columns, fetch)?;
Review Comment:
nice
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -657,15 +660,18 @@ pub struct SortExec {
metrics_set: CompositeMetricsSet,
/// Preserve partitions of input plan
preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
Review Comment:
I see -- this seems like it it now has the information plumbed to the
SortExec to implement "TopK" within the physical operator's implementation. 👍
Very cool
##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -438,6 +461,44 @@ mod test {
Ok(())
}
+ #[test]
+ fn limit_push_down_sort() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .sort(vec![col("a")])?
+ .limit(0, Some(10))?
+ .build()?;
+
+ // Should push down limit to sort
+ let expected = "Limit: skip=0, fetch=10\
+ \n Sort: #test.a, fetch=10\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_push_down_sort_skip() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .sort(vec![col("a")])?
+ .limit(5, Some(10))?
+ .build()?;
+
+ // Should push down limit to sort
+ let expected = "Limit: skip=5, fetch=10\
+ \n Sort: #test.a, fetch=15\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
Review Comment:
Do we have test coverage showing that the limit isn't pushed to the children
of the sort?
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
batch: RecordBatch,
schema: SchemaRef,
expr: &[PhysicalSortExpr],
+ fetch: Option<usize>,
) -> ArrowResult<BatchWithSortArray> {
- // TODO: pushup the limit expression to sort
let sort_columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;
- let indices = lexsort_to_indices(&sort_columns, None)?;
+ let indices = lexsort_to_indices(&sort_columns, fetch)?;
Review Comment:
In fact, I wonder if you could also apply the limit here:
https://github.com/apache/arrow-datafusion/blob/3a9e0d0/datafusion/core/src/physical_plan/sorts/sort.rs#L123-L124
as part of sorting each batch -- rather than keeping the entire input batch,
we only need to keep at most `fetch` rows from each batch.
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -697,6 +705,11 @@ impl SortExec {
pub fn expr(&self) -> &[PhysicalSortExpr] {
&self.expr
}
+
+ /// Number of items to fetch
Review Comment:
```suggestion
/// If `Some(fetch)`, limits output to only the first "fetch" items
```
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -816,14 +831,14 @@ fn sort_batch(
batch: RecordBatch,
schema: SchemaRef,
expr: &[PhysicalSortExpr],
+ fetch: Option<usize>,
) -> ArrowResult<BatchWithSortArray> {
- // TODO: pushup the limit expression to sort
let sort_columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;
- let indices = lexsort_to_indices(&sort_columns, None)?;
+ let indices = lexsort_to_indices(&sort_columns, fetch)?;
Review Comment:
I wonder if this will effectively get us much of the benefit of a special
TopK operator as we don't have to copy the entire input -- we only copy the
`fetch` limit, if specified
Although I suppose `SortExec` still buffers all of its input where a TopK
could buffer them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]