alamb commented on code in PR #2694:
URL: https://github.com/apache/arrow-datafusion/pull/2694#discussion_r890475619


##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -906,15 +905,14 @@ impl DefaultPhysicalPlanner {
                     } else {
                         // Apply a LocalLimitExec to each partition. The 
optimizer will also insert
                         // a CoalescePartitionsExec between the 
GlobalLimitExec and LocalLimitExec
-                        Arc::new(LocalLimitExec::new(input, limit))
+                        if let Some(fetch) = fetch {
+                            Arc::new(LocalLimitExec::new(input, *fetch))

Review Comment:
   Shouldn't this be fetching `*fetch + *skip` rows so that there are at least 
`fetch` rows after skipping `skip`?



##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -449,13 +450,13 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize 
parallelism
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
             "ParquetExec: limit=None, partitions=[x], projection=[c1]",
         ];

Review Comment:
   I recommend adding a test to this file with an offset that is something 
other than `None`, especially given the issue I note below with multi partition 
plans



##########
datafusion/core/tests/sql/limit.rs:
##########
@@ -184,3 +184,84 @@ async fn limit_multi_partitions() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]

Review Comment:
   👍  thank you for this coverage



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -346,47 +356,89 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) -> 
RecordBatch {
 
 /// A Limit stream limits the stream to up to `limit` rows.
 struct LimitStream {
+    /// The number of rows to skip
+    skip: usize,
     /// The maximum number of rows to produce
-    limit: usize,
+    fetch: usize,
     /// The input to read from. This is set to None once the limit is
     /// reached to enable early termination
     input: Option<SendableRecordBatchStream>,
     /// Copy of the input schema
     schema: SchemaRef,
+    /// Number of rows have already skipped
+    current_skipped: usize,
     // the current number of rows which have been produced
-    current_len: usize,
+    current_fetched: usize,
     /// Execution time metrics
     baseline_metrics: BaselineMetrics,
 }
 
 impl LimitStream {
     fn new(
         input: SendableRecordBatchStream,
-        limit: usize,
+        skip: Option<usize>,
+        fetch: Option<usize>,
         baseline_metrics: BaselineMetrics,
     ) -> Self {
         let schema = input.schema();
         Self {
-            limit,
+            skip: skip.unwrap_or(0),
+            fetch: fetch.unwrap_or(usize::MAX),
             input: Some(input),
             schema,
-            current_len: 0,
+            current_skipped: 0,
+            current_fetched: 0,
             baseline_metrics,
         }
     }
 
+    fn poll_and_skip(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+        let input = self.input.as_mut().unwrap();
+        loop {
+            let poll = input.poll_next_unpin(cx);
+            let poll = poll.map_ok(|batch| {
+                if batch.num_rows() + self.current_skipped <= self.skip {

Review Comment:
   👍 



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -292,17 +292,10 @@ impl LogicalPlanBuilder {
     }
 
     /// Apply a limit

Review Comment:
   ```suggestion
       /// Limit the number of rows returned
       ///
       /// `skip` - Number of rows to skip before fetch any row, `None` means 
do not skip any rows
       ///
       /// `fetch` - Maximum number of rows to fetch, after skipping `skip` 
rows,
       ///          if specified. `None` means no limit. 
   ```



##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -43,61 +41,73 @@ impl LimitPushDown {
 /// when traversing down related to "limit push down".
 enum Ancestor {
     /// Limit
-    FromLimit,
-    /// Offset
-    FromOffset,
+    FromLimit {
+        skip: Option<usize>,
+        fetch: Option<usize>,
+    },
     /// Other nodes that don't affect the adjustment of "Limit"
     NotRelevant,
 }
 
 ///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important 
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
 ///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// Ancestor's real "fetch" is extended with ancestor's "skip".
+/// If the current node is a Limit, then the adjusted ancestor "fetch" will
+/// replace the its own "fetch", and push down this "fetch" down the tree.
+/// ancestor's "skip" is always replaced by the current "skip" when the current
+/// node is a Limit.
 ///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-///    When the ancestor is a "Limit" and the current node is a "Limit",
-///    it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-///    it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-///    it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-///    it is updated to (None, n1). Note that this won't happen when we
-///    generate the plan from SQL, it can happen when we build the plan
-///    using LogicalPlanBuilder.
 fn limit_push_down(
     _optimizer: &LimitPushDown,
     ancestor: Ancestor,
-    ancestor_offset: Option<usize>,
-    ancestor_limit: Option<usize>,
     plan: &LogicalPlan,
     _optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
-    match (plan, ancestor_limit) {
-        (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
-            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
-                Ancestor::FromLimit | Ancestor::FromOffset => (
-                    None,
-                    Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
-                ),
-                Ancestor::NotRelevant => (None, Some(*n)),
+    match (plan, ancestor) {
+        (
+            LogicalPlan::Limit(Limit {
+                skip: current_skip,
+                fetch: current_fetch,
+                input,
+            }),
+            ancestor,
+        ) => {
+            let new_current_fetch = match ancestor {
+                Ancestor::FromLimit {
+                    skip: ancestor_skip,
+                    fetch: ancestor_fetch,
+                } => {
+                    if let Some(fetch) = current_fetch {
+                        // extend ancestor's fetch
+                        let ancestor_fetch =
+                            ancestor_fetch.map(|f| f + 
ancestor_skip.unwrap_or(0));
+
+                        let new_current_fetch =
+                            ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x, 
*fetch));
+
+                        Some(new_current_fetch)
+                    } else {
+                        // we dont have a "fetch", and we can push down our 
parent's "fetch"
+                        // extend ancestor's fetch
+                        ancestor_fetch.map(|f| f + ancestor_skip.unwrap_or(0))
+                    }
+                }
+                _ => *current_fetch,
             };
 
             Ok(LogicalPlan::Limit(Limit {
-                n: new_ancestor_limit.unwrap_or(*n),
-                // push down limit to plan (minimum of upper limit and current 
limit)
+                skip: *current_skip,
+                fetch: new_current_fetch,
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    Ancestor::FromLimit,
-                    new_ancestor_offset,
-                    new_ancestor_limit,
+                    Ancestor::FromLimit {
+                        skip: *current_skip,

Review Comment:
   should `current_skip` be updated to min of the ancestor skip and this node's 
skip?



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -176,17 +184,18 @@ impl ExecutionPlan for GlobalLimitExec {
 
     fn statistics(&self) -> Statistics {
         let input_stats = self.input.statistics();
+        let skip = self.skip.unwrap_or(0);
         match input_stats {
             // if the input does not reach the limit globally, return input 
stats
             Statistics {
                 num_rows: Some(nr), ..
-            } if nr <= self.limit => input_stats,
+            } if nr - skip <= self.fetch.unwrap_or(usize::MAX) => input_stats,

Review Comment:
   Also, is there any guarantee that `nr > skip`? It seems like in this case we 
would get an underflow.
   
   Perhaps we should use something like `saturating_sub`
   
   ```rust
   if nr.saturating_sub(skip) <= self.fetch.unwrap_or(usize::MAX)
   ```



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -176,17 +184,18 @@ impl ExecutionPlan for GlobalLimitExec {
 
     fn statistics(&self) -> Statistics {
         let input_stats = self.input.statistics();
+        let skip = self.skip.unwrap_or(0);
         match input_stats {
             // if the input does not reach the limit globally, return input 
stats
             Statistics {
                 num_rows: Some(nr), ..
-            } if nr <= self.limit => input_stats,
+            } if nr - skip <= self.fetch.unwrap_or(usize::MAX) => input_stats,

Review Comment:
   This also applies below



##########
datafusion/core/src/dataframe.rs:
##########
@@ -190,20 +190,27 @@ impl DataFrame {
 
     /// Limit the number of rows returned from this DataFrame.
     ///
+    /// `skip` - Number of rows to skip before fetch any row, `None` means do 
not skip any rows
+    ///
+    /// `fetch` - Maximum number of rows to fetch, `None` means no limit

Review Comment:
   ```suggestion
       /// `fetch` - Maximum number of rows to fetch, after skipping `skip` 
rows,
       ///          if specified. `None` means no limit. 
   ```



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -346,47 +356,89 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) -> 
RecordBatch {
 
 /// A Limit stream limits the stream to up to `limit` rows.
 struct LimitStream {
+    /// The number of rows to skip
+    skip: usize,
     /// The maximum number of rows to produce

Review Comment:
   ```suggestion
       /// The maximum number of rows to produce, after `skip` are skipped
   ```



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -224,7 +233,7 @@ impl LocalLimitExec {
 
     /// Maximum number of rows to return
     pub fn limit(&self) -> usize {

Review Comment:
   Should we also rename this to `pub fn fetch` for consistency?



##########
datafusion/sql/src/planner.rs:
##########
@@ -1212,57 +1206,59 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// Wrap a plan in a limit
-    fn limit(&self, input: LogicalPlan, limit: Option<SQLExpr>) -> 
Result<LogicalPlan> {
-        match limit {
-            Some(limit_expr) => {
-                let n = match self.sql_to_rex(
-                    limit_expr,
-                    input.schema(),
-                    &mut HashMap::new(),
-                )? {
-                    Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as 
usize),
-                    _ => Err(DataFusionError::Plan(
-                        "Unexpected expression for LIMIT clause".to_string(),
-                    )),
-                }?;
-
-                LogicalPlanBuilder::from(input).limit(n)?.build()
-            }
-            _ => Ok(input),
-        }
-    }
-
-    /// Wrap a plan in a offset
-    fn offset(
+    fn limit(
         &self,
         input: LogicalPlan,
-        offset: Option<SQLOffset>,
+        skip: Option<SQLOffset>,
+        fetch: Option<SQLExpr>,
     ) -> Result<LogicalPlan> {
-        match offset {
-            Some(offset_expr) => {
-                let offset = match self.sql_to_rex(
-                    offset_expr.value,
+        if skip.is_none() && fetch.is_none() {
+            return Ok(input);
+        }
+
+        let skip = match skip {

Review Comment:
   FYI you can write this kind of `Option` to `Option` translation in the 
following way too which is a little more common in the rust world I think:
   
   ```rust
   let skip = skip
    .map(|skip_expr| {
        ...
      });
   ```
   
   Same for fetch



##########
datafusion/optimizer/src/eliminate_limit.rs:
##########
@@ -42,7 +42,9 @@ impl OptimizerRule for EliminateLimit {
         optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
+            LogicalPlan::Limit(Limit { fetch, input, .. })
+                if fetch.is_some() && fetch.unwrap() == 0 =>
+            {

Review Comment:
   You might be able to rewrite using a more specific match, if you wanted to 
try to be more idiomatic
   
   ```suggestion
               LogicalPlan::Limit(Limit { Some(fetch), input, .. }) if fetch == 0
               {
   ```



##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -43,61 +41,73 @@ impl LimitPushDown {
 /// when traversing down related to "limit push down".
 enum Ancestor {
     /// Limit
-    FromLimit,
-    /// Offset
-    FromOffset,
+    FromLimit {
+        skip: Option<usize>,
+        fetch: Option<usize>,
+    },
     /// Other nodes that don't affect the adjustment of "Limit"
     NotRelevant,
 }
 
 ///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important 
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
 ///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// Ancestor's real "fetch" is extended with ancestor's "skip".
+/// If the current node is a Limit, then the adjusted ancestor "fetch" will
+/// replace the its own "fetch", and push down this "fetch" down the tree.
+/// ancestor's "skip" is always replaced by the current "skip" when the current
+/// node is a Limit.
 ///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-///    When the ancestor is a "Limit" and the current node is a "Limit",
-///    it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-///    it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-///    it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-///    it is updated to (None, n1). Note that this won't happen when we
-///    generate the plan from SQL, it can happen when we build the plan
-///    using LogicalPlanBuilder.
 fn limit_push_down(
     _optimizer: &LimitPushDown,
     ancestor: Ancestor,
-    ancestor_offset: Option<usize>,
-    ancestor_limit: Option<usize>,
     plan: &LogicalPlan,
     _optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
-    match (plan, ancestor_limit) {
-        (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
-            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
-                Ancestor::FromLimit | Ancestor::FromOffset => (
-                    None,
-                    Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
-                ),
-                Ancestor::NotRelevant => (None, Some(*n)),
+    match (plan, ancestor) {
+        (
+            LogicalPlan::Limit(Limit {
+                skip: current_skip,
+                fetch: current_fetch,
+                input,
+            }),
+            ancestor,
+        ) => {
+            let new_current_fetch = match ancestor {
+                Ancestor::FromLimit {
+                    skip: ancestor_skip,
+                    fetch: ancestor_fetch,
+                } => {
+                    if let Some(fetch) = current_fetch {
+                        // extend ancestor's fetch
+                        let ancestor_fetch =

Review Comment:
   Does this do the right thing if `ancester_skip` is 0? As written I think it 
will only be included if `ancestor_fetch` is `Some(..)` 🤔 



##########
datafusion/core/src/physical_plan/limit.rs:
##########
@@ -485,4 +551,42 @@ mod tests {
 
         Ok(())
     }
+
+    // test cases for "skip"
+    async fn offset_with_value(skip: usize) -> Result<usize> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let num_partitions = 4;
+        let csv = test::scan_partitioned_csv(num_partitions)?;
+
+        assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
+
+        let offset = GlobalLimitExec::new(
+            Arc::new(CoalescePartitionsExec::new(csv)),
+            Some(skip),
+            None,
+        );
+
+        // the result should contain 4 batches (one per input partition)
+        let iter = offset.execute(0, task_ctx)?;
+        let batches = common::collect(iter).await?;
+        Ok(batches.iter().map(|batch| batch.num_rows()).sum())
+    }
+
+    #[tokio::test]
+    async fn enough_to_skip() -> Result<()> {
+        // there are total of 100 rows, we skipped 3 rows (offset = 3)
+        let row_count = offset_with_value(3).await?;
+        assert_eq!(row_count, 97);
+        Ok(())
+    }

Review Comment:
   I recommend a test with `skip=2 fetch=2` as well as testing with 
`skip=<number_of_rows_in_batch>` to test the boundary conditions



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -906,15 +905,14 @@ impl DefaultPhysicalPlanner {
                     } else {
                         // Apply a LocalLimitExec to each partition. The 
optimizer will also insert
                         // a CoalescePartitionsExec between the 
GlobalLimitExec and LocalLimitExec
-                        Arc::new(LocalLimitExec::new(input, limit))
+                        if let Some(fetch) = fetch {
+                            Arc::new(LocalLimitExec::new(input, *fetch))

Review Comment:
   Here is a reproducer showing the problem (we need multiple input files / 
partitions so the LocalLimitExec` is pushed down
   
   
   ```shell
   echo "a,b">/tmpa
   mkdir /tmp/csv
   echo "a,b">/tmp/csv/1.csv
   echo "1,2">>/tmp/csv/1.csv
   echo "a,b">/tmp/csv/2.csv
   echo "3,4">>/tmp/csv/2.csv
   ```
   
   Then run the query 
   
   ```sql
   (arrow_dev) alamb@MacBook-Pro-6:~/Software/arrow-datafusion/datafusion-cli$ 
cargo run
       Finished dev [unoptimized + debuginfo] target(s) in 0.27s
        Running `target/debug/datafusion-cli`
   DataFusion CLI v8.0.0
   ❯ create external table test stored as csv location 
'datafusion/core/tests/aggregate_simple.csv';
   IoError(Os { code: 2, kind: NotFound, message: "No such file or directory" })
   ❯ create external table test stored as csv location '/tmp/csv'
   ;
   0 rows in set. Query took 0.036 seconds.
   ❯ select * from test;
   +----------+----------+
   | column_1 | column_2 |
   +----------+----------+
   | a        | b        |
   | 3        | 4        |
   | a        | b        |
   | 1        | 2        |
   +----------+----------+
   4 rows in set. Query took 0.035 seconds.
   ❯ select * from test limit 2 offset 1;
   +----------+----------+
   | column_1 | column_2 |
   +----------+----------+
   | 3        | 4        |
   +----------+----------+
   1 row in set. Query took 0.011 seconds.
   ❯ 
   ```
   
   I believe the output should have 2 rows, but it has only one



##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -490,14 +497,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .limit(1000)?
-            .offset(10)?
+            .limit(None, Some(1000))?
+            .limit(Some(10), None)?
             .build()?;
 
-        let expected = "Offset: 10\
-        \n  Limit: 1000\
+        let expected = "Limit: skip=10, fetch=None\

Review Comment:
   Perhaps a future optimization could consolidate these together into a single 
`Limit` -- I don't think it is particularly important, just to be clear



##########
datafusion/optimizer/src/limit_push_down.rs:
##########
@@ -468,16 +477,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Should push the limit down to table provider
         // When it has a select
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Projection: #test.a\
-        \n      TableScan: test projection=None, limit=1010";
+        let expected = "Limit: skip=10, fetch=1000\

Review Comment:
   I reviewed the tests carefully in this file and they look good to me 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to