alamb commented on code in PR #2694:
URL: https://github.com/apache/arrow-datafusion/pull/2694#discussion_r890475619
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -906,15 +905,14 @@ impl DefaultPhysicalPlanner {
} else {
// Apply a LocalLimitExec to each partition. The
optimizer will also insert
// a CoalescePartitionsExec between the
GlobalLimitExec and LocalLimitExec
- Arc::new(LocalLimitExec::new(input, limit))
+ if let Some(fetch) = fetch {
+ Arc::new(LocalLimitExec::new(input, *fetch))
Review Comment:
Shouldn't this be fetching `*fetch + *skip` rows so that there are at least
`fetch` rows after skipping `skip`?
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -449,13 +450,13 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize
parallelism
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions=[x], projection=[c1]",
];
Review Comment:
I recommend adding a test to this file with an offset that is something
other than `None`, especially given the issue I note below with multi partition
plans
##########
datafusion/core/tests/sql/limit.rs:
##########
@@ -184,3 +184,84 @@ async fn limit_multi_partitions() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
Review Comment:
👍 thank you for this coverage
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -346,47 +356,89 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) ->
RecordBatch {
/// A Limit stream limits the stream to up to `limit` rows.
struct LimitStream {
+ /// The number of rows to skip
+ skip: usize,
/// The maximum number of rows to produce
- limit: usize,
+ fetch: usize,
/// The input to read from. This is set to None once the limit is
/// reached to enable early termination
input: Option<SendableRecordBatchStream>,
/// Copy of the input schema
schema: SchemaRef,
+ /// Number of rows have already skipped
+ current_skipped: usize,
// the current number of rows which have been produced
- current_len: usize,
+ current_fetched: usize,
/// Execution time metrics
baseline_metrics: BaselineMetrics,
}
impl LimitStream {
fn new(
input: SendableRecordBatchStream,
- limit: usize,
+ skip: Option<usize>,
+ fetch: Option<usize>,
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = input.schema();
Self {
- limit,
+ skip: skip.unwrap_or(0),
+ fetch: fetch.unwrap_or(usize::MAX),
input: Some(input),
schema,
- current_len: 0,
+ current_skipped: 0,
+ current_fetched: 0,
baseline_metrics,
}
}
+ fn poll_and_skip(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+ let input = self.input.as_mut().unwrap();
+ loop {
+ let poll = input.poll_next_unpin(cx);
+ let poll = poll.map_ok(|batch| {
+ if batch.num_rows() + self.current_skipped <= self.skip {
Review Comment:
👍
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -292,17 +292,10 @@ impl LogicalPlanBuilder {
}
/// Apply a limit
Review Comment:
```suggestion
/// Limit the number of rows returned
///
/// `skip` - Number of rows to skip before fetch any row, `None` means
do not skip any rows
///
/// `fetch` - Maximum number of rows to fetch, after skipping `skip`
rows,
/// if specified. `None` means no limit.
```
##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -43,61 +41,73 @@ impl LimitPushDown {
/// when traversing down related to "limit push down".
enum Ancestor {
/// Limit
- FromLimit,
- /// Offset
- FromOffset,
+ FromLimit {
+ skip: Option<usize>,
+ fetch: Option<usize>,
+ },
/// Other nodes that don't affect the adjustment of "Limit"
NotRelevant,
}
///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// Ancestor's real "fetch" is extended with ancestor's "skip".
+/// If the current node is a Limit, then the adjusted ancestor "fetch" will
+/// replace the its own "fetch", and push down this "fetch" down the tree.
+/// ancestor's "skip" is always replaced by the current "skip" when the current
+/// node is a Limit.
///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-/// When the ancestor is a "Limit" and the current node is a "Limit",
-/// it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-/// it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-/// it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-/// it is updated to (None, n1). Note that this won't happen when we
-/// generate the plan from SQL, it can happen when we build the plan
-/// using LogicalPlanBuilder.
fn limit_push_down(
_optimizer: &LimitPushDown,
ancestor: Ancestor,
- ancestor_offset: Option<usize>,
- ancestor_limit: Option<usize>,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- match (plan, ancestor_limit) {
- (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
- let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
- Ancestor::FromLimit | Ancestor::FromOffset => (
- None,
- Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
- ),
- Ancestor::NotRelevant => (None, Some(*n)),
+ match (plan, ancestor) {
+ (
+ LogicalPlan::Limit(Limit {
+ skip: current_skip,
+ fetch: current_fetch,
+ input,
+ }),
+ ancestor,
+ ) => {
+ let new_current_fetch = match ancestor {
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: ancestor_fetch,
+ } => {
+ if let Some(fetch) = current_fetch {
+ // extend ancestor's fetch
+ let ancestor_fetch =
+ ancestor_fetch.map(|f| f +
ancestor_skip.unwrap_or(0));
+
+ let new_current_fetch =
+ ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x,
*fetch));
+
+ Some(new_current_fetch)
+ } else {
+ // we dont have a "fetch", and we can push down our
parent's "fetch"
+ // extend ancestor's fetch
+ ancestor_fetch.map(|f| f + ancestor_skip.unwrap_or(0))
+ }
+ }
+ _ => *current_fetch,
};
Ok(LogicalPlan::Limit(Limit {
- n: new_ancestor_limit.unwrap_or(*n),
- // push down limit to plan (minimum of upper limit and current
limit)
+ skip: *current_skip,
+ fetch: new_current_fetch,
input: Arc::new(limit_push_down(
_optimizer,
- Ancestor::FromLimit,
- new_ancestor_offset,
- new_ancestor_limit,
+ Ancestor::FromLimit {
+ skip: *current_skip,
Review Comment:
should `current_skip` be updated to min of the ancestor skip and this node's
skip?
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -176,17 +184,18 @@ impl ExecutionPlan for GlobalLimitExec {
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
+ let skip = self.skip.unwrap_or(0);
match input_stats {
// if the input does not reach the limit globally, return input
stats
Statistics {
num_rows: Some(nr), ..
- } if nr <= self.limit => input_stats,
+ } if nr - skip <= self.fetch.unwrap_or(usize::MAX) => input_stats,
Review Comment:
Also, is there any guarantee that `nr > skip`? It seems like in this case we
would get an underflow.
Perhaps we should use something like `saturating_sub`
```rust
if nr.saturating_sub(skip) <= self.fetch.unwrap_or(usize::MAX)
```
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -176,17 +184,18 @@ impl ExecutionPlan for GlobalLimitExec {
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
+ let skip = self.skip.unwrap_or(0);
match input_stats {
// if the input does not reach the limit globally, return input
stats
Statistics {
num_rows: Some(nr), ..
- } if nr <= self.limit => input_stats,
+ } if nr - skip <= self.fetch.unwrap_or(usize::MAX) => input_stats,
Review Comment:
This also applies below
##########
datafusion/core/src/dataframe.rs:
##########
@@ -190,20 +190,27 @@ impl DataFrame {
/// Limit the number of rows returned from this DataFrame.
///
+ /// `skip` - Number of rows to skip before fetch any row, `None` means do
not skip any rows
+ ///
+ /// `fetch` - Maximum number of rows to fetch, `None` means no limit
Review Comment:
```suggestion
/// `fetch` - Maximum number of rows to fetch, after skipping `skip`
rows,
/// if specified. `None` means no limit.
```
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -346,47 +356,89 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) ->
RecordBatch {
/// A Limit stream limits the stream to up to `limit` rows.
struct LimitStream {
+ /// The number of rows to skip
+ skip: usize,
/// The maximum number of rows to produce
Review Comment:
```suggestion
/// The maximum number of rows to produce, after `skip` are skipped
```
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -224,7 +233,7 @@ impl LocalLimitExec {
/// Maximum number of rows to return
pub fn limit(&self) -> usize {
Review Comment:
Should we also rename this to `pub fn fetch` for consistency?
##########
datafusion/sql/src/planner.rs:
##########
@@ -1212,57 +1206,59 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
/// Wrap a plan in a limit
- fn limit(&self, input: LogicalPlan, limit: Option<SQLExpr>) ->
Result<LogicalPlan> {
- match limit {
- Some(limit_expr) => {
- let n = match self.sql_to_rex(
- limit_expr,
- input.schema(),
- &mut HashMap::new(),
- )? {
- Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as
usize),
- _ => Err(DataFusionError::Plan(
- "Unexpected expression for LIMIT clause".to_string(),
- )),
- }?;
-
- LogicalPlanBuilder::from(input).limit(n)?.build()
- }
- _ => Ok(input),
- }
- }
-
- /// Wrap a plan in a offset
- fn offset(
+ fn limit(
&self,
input: LogicalPlan,
- offset: Option<SQLOffset>,
+ skip: Option<SQLOffset>,
+ fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
- match offset {
- Some(offset_expr) => {
- let offset = match self.sql_to_rex(
- offset_expr.value,
+ if skip.is_none() && fetch.is_none() {
+ return Ok(input);
+ }
+
+ let skip = match skip {
Review Comment:
FYI you can write this kind of `Option` to `Option` translation in the
following way too which is a little more common in the rust world I think:
```rust
let skip = skip
.map(|skip_expr| {
...
});
```
Same for fetch
##########
datafusion/optimizer/src/eliminate_limit.rs:
##########
@@ -42,7 +42,9 @@ impl OptimizerRule for EliminateLimit {
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
+ LogicalPlan::Limit(Limit { fetch, input, .. })
+ if fetch.is_some() && fetch.unwrap() == 0 =>
+ {
Review Comment:
You might be able to rewrite using a more specific match, if you wanted to
try to be more idiomatic
```suggestion
LogicalPlan::Limit(Limit { Some(fetch), input, .. }) if fetch == 0
{
```
##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -43,61 +41,73 @@ impl LimitPushDown {
/// when traversing down related to "limit push down".
enum Ancestor {
/// Limit
- FromLimit,
- /// Offset
- FromOffset,
+ FromLimit {
+ skip: Option<usize>,
+ fetch: Option<usize>,
+ },
/// Other nodes that don't affect the adjustment of "Limit"
NotRelevant,
}
///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// Ancestor's real "fetch" is extended with ancestor's "skip".
+/// If the current node is a Limit, then the adjusted ancestor "fetch" will
+/// replace the its own "fetch", and push down this "fetch" down the tree.
+/// ancestor's "skip" is always replaced by the current "skip" when the current
+/// node is a Limit.
///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-/// When the ancestor is a "Limit" and the current node is a "Limit",
-/// it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-/// it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-/// it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-/// it is updated to (None, n1). Note that this won't happen when we
-/// generate the plan from SQL, it can happen when we build the plan
-/// using LogicalPlanBuilder.
fn limit_push_down(
_optimizer: &LimitPushDown,
ancestor: Ancestor,
- ancestor_offset: Option<usize>,
- ancestor_limit: Option<usize>,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- match (plan, ancestor_limit) {
- (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
- let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
- Ancestor::FromLimit | Ancestor::FromOffset => (
- None,
- Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
- ),
- Ancestor::NotRelevant => (None, Some(*n)),
+ match (plan, ancestor) {
+ (
+ LogicalPlan::Limit(Limit {
+ skip: current_skip,
+ fetch: current_fetch,
+ input,
+ }),
+ ancestor,
+ ) => {
+ let new_current_fetch = match ancestor {
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: ancestor_fetch,
+ } => {
+ if let Some(fetch) = current_fetch {
+ // extend ancestor's fetch
+ let ancestor_fetch =
Review Comment:
Does this do the right thing if `ancester_skip` is 0? As written I think it
will only be included if `ancestor_fetch` is `Some(..)` 🤔
##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -485,4 +551,42 @@ mod tests {
Ok(())
}
+
+ // test cases for "skip"
+ async fn offset_with_value(skip: usize) -> Result<usize> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let num_partitions = 4;
+ let csv = test::scan_partitioned_csv(num_partitions)?;
+
+ assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
+
+ let offset = GlobalLimitExec::new(
+ Arc::new(CoalescePartitionsExec::new(csv)),
+ Some(skip),
+ None,
+ );
+
+ // the result should contain 4 batches (one per input partition)
+ let iter = offset.execute(0, task_ctx)?;
+ let batches = common::collect(iter).await?;
+ Ok(batches.iter().map(|batch| batch.num_rows()).sum())
+ }
+
+ #[tokio::test]
+ async fn enough_to_skip() -> Result<()> {
+ // there are total of 100 rows, we skipped 3 rows (offset = 3)
+ let row_count = offset_with_value(3).await?;
+ assert_eq!(row_count, 97);
+ Ok(())
+ }
Review Comment:
I recommend a test with `skip=2 fetch=2` as well as testing with
`skip=<number_of_rows_in_batch>` to test the boundary conditions
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -906,15 +905,14 @@ impl DefaultPhysicalPlanner {
} else {
// Apply a LocalLimitExec to each partition. The
optimizer will also insert
// a CoalescePartitionsExec between the
GlobalLimitExec and LocalLimitExec
- Arc::new(LocalLimitExec::new(input, limit))
+ if let Some(fetch) = fetch {
+ Arc::new(LocalLimitExec::new(input, *fetch))
Review Comment:
Here is a reproducer showing the problem (we need multiple input files /
partitions so the LocalLimitExec` is pushed down
```shell
echo "a,b">/tmpa
mkdir /tmp/csv
echo "a,b">/tmp/csv/1.csv
echo "1,2">>/tmp/csv/1.csv
echo "a,b">/tmp/csv/2.csv
echo "3,4">>/tmp/csv/2.csv
```
Then run the query
```sql
(arrow_dev) alamb@MacBook-Pro-6:~/Software/arrow-datafusion/datafusion-cli$
cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.27s
Running `target/debug/datafusion-cli`
DataFusion CLI v8.0.0
❯ create external table test stored as csv location
'datafusion/core/tests/aggregate_simple.csv';
IoError(Os { code: 2, kind: NotFound, message: "No such file or directory" })
❯ create external table test stored as csv location '/tmp/csv'
;
0 rows in set. Query took 0.036 seconds.
❯ select * from test;
+----------+----------+
| column_1 | column_2 |
+----------+----------+
| a | b |
| 3 | 4 |
| a | b |
| 1 | 2 |
+----------+----------+
4 rows in set. Query took 0.035 seconds.
❯ select * from test limit 2 offset 1;
+----------+----------+
| column_1 | column_2 |
+----------+----------+
| 3 | 4 |
+----------+----------+
1 row in set. Query took 0.011 seconds.
❯
```
I believe the output should have 2 rows, but it has only one
##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -490,14 +497,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .limit(1000)?
- .offset(10)?
+ .limit(None, Some(1000))?
+ .limit(Some(10), None)?
.build()?;
- let expected = "Offset: 10\
- \n Limit: 1000\
+ let expected = "Limit: skip=10, fetch=None\
Review Comment:
Perhaps a future optimization could consolidate these together into a single
`Limit` -- I don't think it is particularly important, just to be clear
##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -468,16 +477,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Should push the limit down to table provider
// When it has a select
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Projection: #test.a\
- \n TableScan: test projection=None, limit=1010";
+ let expected = "Limit: skip=10, fetch=1000\
Review Comment:
I reviewed the tests carefully in this file and they look good to me 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]