alamb commented on code in PR #2694:
URL: https://github.com/apache/arrow-datafusion/pull/2694#discussion_r892781039
##########
datafusion/core/src/physical_plan/empty.rs:
##########
@@ -226,4 +235,23 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn produce_one_row_multiple_partition() -> Result<()> {
Review Comment:
👍
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -408,6 +416,23 @@ mod tests {
Ok(())
}
+ #[test]
+ fn repartition_unsorted_limit_with_skip() -> Result<()> {
+ let plan = limit_exec_with_skip(filter_exec(parquet_exec()));
+
+ let expected = &[
+ "GlobalLimitExec: skip=5, fetch=100",
+ "LocalLimitExec: fetch=100",
Review Comment:
Don't we have to fetch at least `105` from each local partition (`skip` +
`fetch`) to be sure that a sufficient number of rows is produced?
Thus I would expect this to be
LocalLimitExec: fetch=105
However, when I tried an actual query it seems to do the right thing so 👍
```
❯ explain select * from test limit 1 offset 5;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=5, fetch=1
|
| | Projection: #test.column_1, #test.column_2
|
| | TableScan: test projection=Some([column_1, column_2]),
fetch=6
|
| physical_plan | GlobalLimitExec: skip=5, fetch=1
|
| | CoalescePartitionsExec
|
| | LocalLimitExec: fetch=6
|
| | ProjectionExec: expr=[column_1@0 as column_1,
column_2@1 as column_2]
|
| | RepartitionExec: partitioning=RoundRobinBatch(16)
|
| | CsvExec: files=[/private/tmp/csv/1.csv,
/private/tmp/csv/2.csv], has_header=false, limit=Some(6), projection=[column_1,
column_2] |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
```
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -186,20 +181,31 @@ impl ExecutionPlan for GlobalLimitExec {
let input_stats = self.input.statistics();
let skip = self.skip.unwrap_or(0);
match input_stats {
- // if the input does not reach the limit globally, return input
stats
- Statistics {
- num_rows: Some(nr), ..
- } if nr - skip <= self.fetch.unwrap_or(usize::MAX) => input_stats,
- // if the input is greater than the limit, the num_row will be the
limit
- // but we won't be able to predict the other statistics
Statistics {
num_rows: Some(nr), ..
- } if nr - skip > self.fetch.unwrap_or(usize::MAX) => Statistics {
- num_rows: self.fetch,
- is_exact: input_stats.is_exact,
- ..Default::default()
- },
- // if we don't know the input size, we can't predict the limit's
behaviour
+ } => {
Review Comment:
thank you I think this is easier to read now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]