This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a25529166 Combine limit and offset to `fetch` and `skip` and implement 
physical plan support (#2694)
a25529166 is described below

commit a25529166f82e4f52d56eba6448a5a32e126698e
Author: Hu Ming <[email protected]>
AuthorDate: Thu Jun 9 18:28:53 2022 +0800

    Combine limit and offset to `fetch` and `skip` and implement physical plan 
support (#2694)
    
    * 1. combine "limit" and "offset" together in api/logical plan/physical 
plan.
    2. implement "offset" in physical plan.
    
    * add several test and fix some comments
---
 datafusion/core/src/dataframe.rs                   |  27 +-
 datafusion/core/src/datasource/empty.rs            |  16 +-
 datafusion/core/src/execution/context.rs           |   2 +-
 datafusion/core/src/lib.rs                         |   2 +-
 .../core/src/physical_optimizer/repartition.rs     |  75 +++-
 datafusion/core/src/physical_plan/empty.rs         |  38 +-
 datafusion/core/src/physical_plan/limit.rs         | 275 ++++++++----
 datafusion/core/src/physical_plan/planner.rs       |  58 ++-
 datafusion/core/src/test_util.rs                   |  16 +
 datafusion/core/tests/dataframe_functions.rs       |   2 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   6 +-
 datafusion/core/tests/sql/limit.rs                 |  81 ++++
 datafusion/core/tests/user_defined_plan.rs         |   9 +-
 datafusion/expr/src/logical_plan/builder.rs        |  40 +-
 datafusion/expr/src/logical_plan/plan.rs           |  39 +-
 datafusion/expr/src/utils.rs                       |  13 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |   1 -
 datafusion/optimizer/src/eliminate_limit.rs        |  18 +-
 datafusion/optimizer/src/filter_push_down.rs       |  32 +-
 datafusion/optimizer/src/limit_push_down.rs        | 462 +++++++++++----------
 datafusion/optimizer/src/projection_push_down.rs   |   9 +-
 datafusion/optimizer/src/simplify_expressions.rs   |   4 +-
 datafusion/proto/proto/datafusion.proto            |  11 +-
 datafusion/proto/src/logical_plan.rs               |  46 +-
 datafusion/sql/src/planner.rs                      | 107 +++--
 25 files changed, 864 insertions(+), 525 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5a1330bf9..02b9116be 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -62,7 +62,7 @@ use std::sync::Arc;
 /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
 /// let df = df.filter(col("a").lt_eq(col("b")))?
 ///            .aggregate(vec![col("a")], vec![min(col("b"))])?
-///            .limit(100)?;
+///            .limit(None, Some(100))?;
 /// let results = df.collect();
 /// # Ok(())
 /// # }
@@ -190,6 +190,9 @@ impl DataFrame {
 
     /// Limit the number of rows returned from this DataFrame.
     ///
+    /// `skip` - Number of rows to skip before fetch any row
+    ///
+    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows.
     /// ```
     /// # use datafusion::prelude::*;
     /// # use datafusion::error::Result;
@@ -197,13 +200,17 @@ impl DataFrame {
     /// # async fn main() -> Result<()> {
     /// let ctx = SessionContext::new();
     /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).await?;
-    /// let df = df.limit(100)?;
+    /// let df = df.limit(None, Some(100))?;
     /// # Ok(())
     /// # }
     /// ```
-    pub fn limit(&self, n: usize) -> Result<Arc<DataFrame>> {
+    pub fn limit(
+        &self,
+        skip: Option<usize>,
+        fetch: Option<usize>,
+    ) -> Result<Arc<DataFrame>> {
         let plan = LogicalPlanBuilder::from(self.plan.clone())
-            .limit(n)?
+            .limit(skip, fetch)?
             .build()?;
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
     }
@@ -414,7 +421,7 @@ impl DataFrame {
     /// # }
     /// ```
     pub async fn show_limit(&self, num: usize) -> Result<()> {
-        let results = self.limit(num)?.collect().await?;
+        let results = self.limit(None, Some(num))?.collect().await?;
         Ok(pretty::print_batches(&results)?)
     }
 
@@ -514,7 +521,7 @@ impl DataFrame {
     /// # async fn main() -> Result<()> {
     /// let ctx = SessionContext::new();
     /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).await?;
-    /// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
+    /// let batches = df.limit(None, Some(100))?.explain(false, 
false)?.collect().await?;
     /// # Ok(())
     /// # }
     /// ```
@@ -665,7 +672,7 @@ impl TableProvider for DataFrame {
         Self::new(
             self.session_state.clone(),
             &limit
-                .map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
+                .map_or_else(|| Ok(expr.clone()), |n| expr.limit(None, 
Some(n)))?
                 .plan
                 .clone(),
         )
@@ -799,7 +806,9 @@ mod tests {
     async fn limit() -> Result<()> {
         // build query using Table API
         let t = test_table().await?;
-        let t2 = t.select_columns(&["c1", "c2", "c11"])?.limit(10)?;
+        let t2 = t
+            .select_columns(&["c1", "c2", "c11"])?
+            .limit(None, Some(10))?;
         let plan = t2.plan.clone();
 
         // build query using SQL
@@ -818,7 +827,7 @@ mod tests {
         let df = test_table().await?;
         let df = df
             .select_columns(&["c1", "c2", "c11"])?
-            .limit(10)?
+            .limit(None, Some(10))?
             .explain(false, false)?;
         let plan = df.plan.clone();
 
diff --git a/datafusion/core/src/datasource/empty.rs 
b/datafusion/core/src/datasource/empty.rs
index 3bc7a958c..b8bdb5fd8 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -33,12 +33,22 @@ use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};
 /// A table with a schema but no data.
 pub struct EmptyTable {
     schema: SchemaRef,
+    partitions: usize,
 }
 
 impl EmptyTable {
     /// Initialize a new `EmptyTable` from a schema.
     pub fn new(schema: SchemaRef) -> Self {
-        Self { schema }
+        Self {
+            schema,
+            partitions: 1,
+        }
+    }
+
+    /// Creates a new EmptyTable with specified partition number.
+    pub fn with_partitions(mut self, partitions: usize) -> Self {
+        self.partitions = partitions;
+        self
     }
 }
 
@@ -65,6 +75,8 @@ impl TableProvider for EmptyTable {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // even though there is no data, projections apply
         let projected_schema = project_schema(&self.schema, 
projection.as_ref())?;
-        Ok(Arc::new(EmptyExec::new(false, projected_schema)))
+        Ok(Arc::new(
+            EmptyExec::new(false, 
projected_schema).with_partitions(self.partitions),
+        ))
     }
 }
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index db435e399..ac7727f64 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -129,7 +129,7 @@ const DEFAULT_SCHEMA: &str = "public";
 /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
 /// let df = df.filter(col("a").lt_eq(col("b")))?
 ///            .aggregate(vec![col("a")], vec![min(col("b"))])?
-///            .limit(100)?;
+///            .limit(None, Some(100))?;
 /// let results = df.collect();
 /// # Ok(())
 /// # }
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index df863ebd1..7a0cf475d 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -42,7 +42,7 @@
 //! // create a plan
 //! let df = df.filter(col("a").lt_eq(col("b")))?
 //!            .aggregate(vec![col("a")], vec![min(col("b"))])?
-//!            .limit(100)?;
+//!            .limit(None, Some(100))?;
 //!
 //! // execute the plan
 //! let results: Vec<RecordBatch> = df.collect().await?;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index bdb01e205..b3b7ba948 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Repartition optimizer that introduces repartition nodes to increase the 
level of parallism available
+//! Repartition optimizer that introduces repartition nodes to increase the 
level of parallelism available
 use std::sync::Arc;
 
 use super::optimizer::PhysicalOptimizerRule;
@@ -326,7 +326,16 @@ mod tests {
     fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
         Arc::new(GlobalLimitExec::new(
             Arc::new(LocalLimitExec::new(input, 100)),
-            100,
+            None,
+            Some(100),
+        ))
+    }
+
+    fn limit_exec_with_skip(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(
+            Arc::new(LocalLimitExec::new(input, 100)),
+            Some(5),
+            Some(100),
         ))
     }
 
@@ -395,8 +404,25 @@ mod tests {
         let plan = limit_exec(filter_exec(parquet_exec()));
 
         let expected = &[
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
+            "FilterExec: c1@0",
+            // nothing sorts the data, so the local limit doesn't require 
sorted data either
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_unsorted_limit_with_skip() -> Result<()> {
+        let plan = limit_exec_with_skip(filter_exec(parquet_exec()));
+
+        let expected = &[
+            "GlobalLimitExec: skip=5, fetch=100",
+            "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require 
sorted data either
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
@@ -412,8 +438,8 @@ mod tests {
         let plan = limit_exec(sort_exec(parquet_exec()));
 
         let expected = &[
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
             // data is sorted so can't repartition here
             "SortExec: [c1@0 ASC]",
             "ParquetExec: limit=None, partitions=[x], projection=[c1]",
@@ -428,8 +454,8 @@ mod tests {
         let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
 
         let expected = &[
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // data is sorted so can't repartition here even though
             // filter would benefit from parallelism, the answers might be 
wrong
@@ -449,13 +475,38 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
+            "FilterExec: c1@0",
+            // repartition should happen prior to the filter to maximize 
parallelism
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
+            // Expect no repartition to happen for local limit
+            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_ignores_limit_with_skip() -> Result<()> {
+        let plan = aggregate(limit_exec_with_skip(filter_exec(limit_exec(
+            parquet_exec(),
+        ))));
+
+        let expected = &[
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "GlobalLimitExec: skip=5, fetch=100",
+            "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize 
parallelism
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "GlobalLimitExec: limit=100",
-            "LocalLimitExec: limit=100",
+            "GlobalLimitExec: skip=None, fetch=100",
+            "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
             "ParquetExec: limit=None, partitions=[x], projection=[c1]",
         ];
diff --git a/datafusion/core/src/physical_plan/empty.rs 
b/datafusion/core/src/physical_plan/empty.rs
index bba87e190..c693764c8 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -41,6 +41,8 @@ pub struct EmptyExec {
     produce_one_row: bool,
     /// The schema for the produced row
     schema: SchemaRef,
+    /// Number of partitions
+    partitions: usize,
 }
 
 impl EmptyExec {
@@ -49,9 +51,16 @@ impl EmptyExec {
         EmptyExec {
             produce_one_row,
             schema,
+            partitions: 1,
         }
     }
 
+    /// Create a new EmptyExec with specified partition number
+    pub fn with_partitions(mut self, partitions: usize) -> Self {
+        self.partitions = partitions;
+        self
+    }
+
     /// Specifies whether this exec produces a row or not
     pub fn produce_one_row(&self) -> bool {
         self.produce_one_row
@@ -95,7 +104,7 @@ impl ExecutionPlan for EmptyExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
+        Partitioning::UnknownPartitioning(self.partitions)
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -118,11 +127,11 @@ impl ExecutionPlan for EmptyExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         debug!("Start EmptyExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
-        // GlobalLimitExec has a single output partition
-        if 0 != partition {
+
+        if partition >= self.partitions {
             return Err(DataFusionError::Internal(format!(
-                "EmptyExec invalid partition {} (expected 0)",
-                partition
+                "EmptyExec invalid partition {} (expected less than {})",
+                partition, self.partitions
             )));
         }
 
@@ -226,4 +235,23 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn produce_one_row_multiple_partition() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let schema = test_util::aggr_test_schema();
+        let partitions = 3;
+        let empty = EmptyExec::new(true, schema).with_partitions(partitions);
+
+        for n in 0..partitions {
+            let iter = empty.execute(n, task_ctx.clone())?;
+            let batches = common::collect(iter).await?;
+
+            // should have one item
+            assert_eq!(batches.len(), 1);
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index 6ad93d3e4..73566b823 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -17,15 +17,14 @@
 
 //! Defines the LIMIT plan
 
+use futures::stream::Stream;
+use futures::stream::StreamExt;
+use log::debug;
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use futures::stream::Stream;
-use futures::stream::StreamExt;
-use log::debug;
-
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
     DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
@@ -49,31 +48,28 @@ use crate::execution::context::TaskContext;
 pub struct GlobalLimitExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
-    /// Maximum number of rows to return
-    limit: usize,
+    /// Number of rows to skip before fetch
+    skip: Option<usize>,
+    /// Maximum number of rows to fetch
+    fetch: Option<usize>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
 
 impl GlobalLimitExec {
     /// Create a new GlobalLimitExec
-    pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
+    pub fn new(
+        input: Arc<dyn ExecutionPlan>,
+        skip: Option<usize>,
+        fetch: Option<usize>,
+    ) -> Self {
         GlobalLimitExec {
             input,
-            limit,
+            skip,
+            fetch,
             metrics: ExecutionPlanMetricsSet::new(),
         }
     }
-
-    /// Input execution plan
-    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
-        &self.input
-    }
-
-    /// Maximum number of rows to return
-    pub fn limit(&self) -> usize {
-        self.limit
-    }
 }
 
 impl ExecutionPlan for GlobalLimitExec {
@@ -121,7 +117,8 @@ impl ExecutionPlan for GlobalLimitExec {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(GlobalLimitExec::new(
             children[0].clone(),
-            self.limit,
+            self.skip,
+            self.fetch,
         )))
     }
 
@@ -153,7 +150,8 @@ impl ExecutionPlan for GlobalLimitExec {
         let stream = self.input.execute(0, context)?;
         Ok(Box::pin(LimitStream::new(
             stream,
-            self.limit,
+            self.skip,
+            self.fetch,
             baseline_metrics,
         )))
     }
@@ -165,7 +163,12 @@ impl ExecutionPlan for GlobalLimitExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(f, "GlobalLimitExec: limit={}", self.limit)
+                write!(
+                    f,
+                    "GlobalLimitExec: skip={}, fetch={}",
+                    self.skip.map_or("None".to_string(), |x| x.to_string()),
+                    self.fetch.map_or("None".to_string(), |x| x.to_string())
+                )
             }
         }
     }
@@ -176,21 +179,33 @@ impl ExecutionPlan for GlobalLimitExec {
 
     fn statistics(&self) -> Statistics {
         let input_stats = self.input.statistics();
+        let skip = self.skip.unwrap_or(0);
         match input_stats {
-            // if the input does not reach the limit globally, return input 
stats
-            Statistics {
-                num_rows: Some(nr), ..
-            } if nr <= self.limit => input_stats,
-            // if the input is greater than the limit, the num_row will be the 
limit
-            // but we won't be able to predict the other statistics
             Statistics {
                 num_rows: Some(nr), ..
-            } if nr > self.limit => Statistics {
-                num_rows: Some(self.limit),
-                is_exact: input_stats.is_exact,
-                ..Default::default()
-            },
-            // if we don't know the input size, we can't predict the limit's 
behaviour
+            } => {
+                if nr <= skip {
+                    // if all input data will be skipped, return 0
+                    Statistics {
+                        num_rows: Some(0),
+                        is_exact: input_stats.is_exact,
+                        ..Default::default()
+                    }
+                } else if nr - skip <= self.fetch.unwrap_or(usize::MAX) {
+                    // if the input does not reach the "fetch" globally, 
return input stats
+                    input_stats
+                } else if nr - skip > self.fetch.unwrap_or(usize::MAX) {
+                    // if the input is greater than the "fetch", the num_row 
will be the "fetch",
+                    // but we won't be able to predict the other statistics
+                    Statistics {
+                        num_rows: self.fetch,
+                        is_exact: input_stats.is_exact,
+                        ..Default::default()
+                    }
+                } else {
+                    Statistics::default()
+                }
+            }
             _ => Statistics::default(),
         }
     }
@@ -202,30 +217,20 @@ pub struct LocalLimitExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
     /// Maximum number of rows to return
-    limit: usize,
+    fetch: usize,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
 
 impl LocalLimitExec {
     /// Create a new LocalLimitExec partition
-    pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
+    pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
         Self {
             input,
-            limit,
+            fetch,
             metrics: ExecutionPlanMetricsSet::new(),
         }
     }
-
-    /// Input execution plan
-    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
-        &self.input
-    }
-
-    /// Maximum number of rows to return
-    pub fn limit(&self) -> usize {
-        self.limit
-    }
 }
 
 impl ExecutionPlan for LocalLimitExec {
@@ -271,7 +276,7 @@ impl ExecutionPlan for LocalLimitExec {
         match children.len() {
             1 => Ok(Arc::new(LocalLimitExec::new(
                 children[0].clone(),
-                self.limit,
+                self.fetch,
             ))),
             _ => Err(DataFusionError::Internal(
                 "LocalLimitExec wrong number of children".to_string(),
@@ -289,7 +294,8 @@ impl ExecutionPlan for LocalLimitExec {
         let stream = self.input.execute(partition, context)?;
         Ok(Box::pin(LimitStream::new(
             stream,
-            self.limit,
+            None,
+            Some(self.fetch),
             baseline_metrics,
         )))
     }
@@ -301,7 +307,7 @@ impl ExecutionPlan for LocalLimitExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(f, "LocalLimitExec: limit={}", self.limit)
+                write!(f, "LocalLimitExec: fetch={}", self.fetch)
             }
         }
     }
@@ -316,14 +322,14 @@ impl ExecutionPlan for LocalLimitExec {
             // if the input does not reach the limit globally, return input 
stats
             Statistics {
                 num_rows: Some(nr), ..
-            } if nr <= self.limit => input_stats,
+            } if nr <= self.fetch => input_stats,
             // if the input is greater than the limit, the num_row will be 
greater
             // than the limit because the partitions will be limited separatly
             // the statistic
             Statistics {
                 num_rows: Some(nr), ..
-            } if nr > self.limit => Statistics {
-                num_rows: Some(self.limit),
+            } if nr > self.fetch => Statistics {
+                num_rows: Some(self.fetch),
                 // this is not actually exact, but will be when GlobalLimit is 
applied
                 // TODO stats: find a more explicit way to vehiculate this 
information
                 is_exact: input_stats.is_exact,
@@ -344,17 +350,21 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) -> 
RecordBatch {
     RecordBatch::try_new(batch.schema(), limited_columns).unwrap()
 }
 
-/// A Limit stream limits the stream to up to `limit` rows.
+/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
 struct LimitStream {
-    /// The maximum number of rows to produce
-    limit: usize,
+    /// The number of rows to skip
+    skip: usize,
+    /// The maximum number of rows to produce, after `skip` are skipped
+    fetch: usize,
     /// The input to read from. This is set to None once the limit is
     /// reached to enable early termination
     input: Option<SendableRecordBatchStream>,
     /// Copy of the input schema
     schema: SchemaRef,
-    // the current number of rows which have been produced
-    current_len: usize,
+    /// Number of rows have already skipped
+    current_skipped: usize,
+    /// the current number of rows which have been produced
+    current_fetched: usize,
     /// Execution time metrics
     baseline_metrics: BaselineMetrics,
 }
@@ -362,31 +372,69 @@ struct LimitStream {
 impl LimitStream {
     fn new(
         input: SendableRecordBatchStream,
-        limit: usize,
+        skip: Option<usize>,
+        fetch: Option<usize>,
         baseline_metrics: BaselineMetrics,
     ) -> Self {
         let schema = input.schema();
         Self {
-            limit,
+            skip: skip.unwrap_or(0),
+            fetch: fetch.unwrap_or(usize::MAX),
             input: Some(input),
             schema,
-            current_len: 0,
+            current_skipped: 0,
+            current_fetched: 0,
             baseline_metrics,
         }
     }
 
+    fn poll_and_skip(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+        let input = self.input.as_mut().unwrap();
+        loop {
+            let poll = input.poll_next_unpin(cx);
+            let poll = poll.map_ok(|batch| {
+                if batch.num_rows() + self.current_skipped <= self.skip {
+                    self.current_skipped += batch.num_rows();
+                    RecordBatch::new_empty(input.schema())
+                } else {
+                    let offset = self.skip - self.current_skipped;
+                    let new_batch = batch.slice(offset, batch.num_rows() - 
offset);
+                    self.current_skipped = self.skip;
+                    new_batch
+                }
+            });
+
+            match &poll {
+                Poll::Ready(Some(Ok(batch)))
+                    if batch.num_rows() > 0 && self.current_skipped == 
self.skip =>
+                {
+                    break poll
+                }
+                Poll::Ready(Some(Err(_e))) => break poll,
+                Poll::Ready(None) => break poll,
+                Poll::Pending => break poll,
+                _ => {
+                    // continue to poll input stream
+                }
+            }
+        }
+    }
+
     fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
         // records time on drop
         let _timer = self.baseline_metrics.elapsed_compute().timer();
-        if self.current_len == self.limit {
+        if self.current_fetched == self.fetch {
             self.input = None; // clear input so it can be dropped early
             None
-        } else if self.current_len + batch.num_rows() <= self.limit {
-            self.current_len += batch.num_rows();
+        } else if self.current_fetched + batch.num_rows() <= self.fetch {
+            self.current_fetched += batch.num_rows();
             Some(batch)
         } else {
-            let batch_rows = self.limit - self.current_len;
-            self.current_len = self.limit;
+            let batch_rows = self.fetch - self.current_fetched;
+            self.current_fetched = self.fetch;
             self.input = None; // clear input so it can be dropped early
             Some(truncate_batch(&batch, batch_rows))
         }
@@ -400,11 +448,20 @@ impl Stream for LimitStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        let fetch_started = self.current_skipped == self.skip;
         let poll = match &mut self.input {
-            Some(input) => input.poll_next_unpin(cx).map(|x| match x {
-                Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
-                other => other,
-            }),
+            Some(input) => {
+                let poll = if fetch_started {
+                    input.poll_next_unpin(cx)
+                } else {
+                    self.poll_and_skip(cx)
+                };
+
+                poll.map(|x| match x {
+                    Some(Ok(batch)) => 
Ok(self.stream_limit(batch)).transpose(),
+                    other => other,
+                })
+            }
             // input has been cleared
             None => Poll::Ready(None),
         };
@@ -442,7 +499,11 @@ mod tests {
         // input should have 4 partitions
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
 
-        let limit = 
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 7);
+        let limit = GlobalLimitExec::new(
+            Arc::new(CoalescePartitionsExec::new(csv)),
+            None,
+            Some(7),
+        );
 
         // the result should contain 4 batches (one per input partition)
         let iter = limit.execute(0, task_ctx)?;
@@ -472,7 +533,8 @@ mod tests {
         // limit of six needs to consume the entire first record batch
         // (5 rows) and 1 row from the second (1 row)
         let baseline_metrics = 
BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
-        let limit_stream = LimitStream::new(Box::pin(input), 6, 
baseline_metrics);
+        let limit_stream =
+            LimitStream::new(Box::pin(input), None, Some(6), baseline_metrics);
         assert_eq!(index.value(), 0);
 
         let results = collect(Box::pin(limit_stream)).await.unwrap();
@@ -485,4 +547,75 @@ mod tests {
 
         Ok(())
     }
+
+    // test cases for "skip"
+    async fn skip_and_fetch(skip: Option<usize>, fetch: Option<usize>) -> 
Result<usize> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let num_partitions = 4;
+        let csv = test::scan_partitioned_csv(num_partitions)?;
+
+        assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
+
+        let offset =
+            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 
skip, fetch);
+
+        // the result should contain 4 batches (one per input partition)
+        let iter = offset.execute(0, task_ctx)?;
+        let batches = common::collect(iter).await?;
+        Ok(batches.iter().map(|batch| batch.num_rows()).sum())
+    }
+
+    #[tokio::test]
+    async fn skip_none_fetch_none() -> Result<()> {
+        let row_count = skip_and_fetch(None, None).await?;
+        assert_eq!(row_count, 100);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_none_fetch_50() -> Result<()> {
+        let row_count = skip_and_fetch(None, Some(50)).await?;
+        assert_eq!(row_count, 50);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_3_fetch_none() -> Result<()> {
+        // there are total of 100 rows, we skipped 3 rows (offset = 3)
+        let row_count = skip_and_fetch(Some(3), None).await?;
+        assert_eq!(row_count, 97);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_3_fetch_10() -> Result<()> {
+        // there are total of 100 rows, we skipped 3 rows (offset = 3)
+        let row_count = skip_and_fetch(Some(3), Some(10)).await?;
+        assert_eq!(row_count, 10);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_100_fetch_none() -> Result<()> {
+        let row_count = skip_and_fetch(Some(100), None).await?;
+        assert_eq!(row_count, 0);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_100_fetch_1() -> Result<()> {
+        let row_count = skip_and_fetch(Some(100), Some(1)).await?;
+        assert_eq!(row_count, 0);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn skip_101_fetch_none() -> Result<()> {
+        // there are total of 100 rows, we skipped 101 rows (offset = 3)
+        let row_count = skip_and_fetch(Some(101), None).await?;
+        assert_eq!(row_count, 0);
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 743ed7e5d..642d9ef28 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -375,7 +375,7 @@ impl DefaultPhysicalPlanner {
                     source,
                     projection,
                     filters,
-                    limit,
+                    fetch,
                     ..
                 }) => {
                     let source = source_as_provider(source)?;
@@ -384,7 +384,7 @@ impl DefaultPhysicalPlanner {
                     // referred to in the query
                     let filters = unnormalize_cols(filters.iter().cloned());
                     let unaliased: Vec<Expr> = 
filters.into_iter().map(unalias).collect();
-                    source.scan(session_state, projection, &unaliased, 
*limit).await
+                    source.scan(session_state, projection, &unaliased, 
*fetch).await
                 }
                 LogicalPlan::Values(Values {
                     values,
@@ -897,8 +897,7 @@ impl DefaultPhysicalPlanner {
                         _ => Err(DataFusionError::Plan("SubqueryAlias should 
only wrap TableScan".to_string()))
                     }
                 }
-                LogicalPlan::Limit(Limit { input, n, .. }) => {
-                    let limit = *n;
+                LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => {
                     let input = self.create_initial_plan(input, 
session_state).await?;
 
                     // GlobalLimitExec requires a single partition for input
@@ -907,15 +906,14 @@ impl DefaultPhysicalPlanner {
                     } else {
                         // Apply a LocalLimitExec to each partition. The 
optimizer will also insert
                         // a CoalescePartitionsExec between the 
GlobalLimitExec and LocalLimitExec
-                        Arc::new(LocalLimitExec::new(input, limit))
+                        if let Some(fetch) = fetch {
+                            Arc::new(LocalLimitExec::new(input, *fetch + 
skip.unwrap_or(0)))
+                        } else {
+                            input
+                        }
                     };
 
-                    Ok(Arc::new(GlobalLimitExec::new(input, limit)))
-                }
-                LogicalPlan::Offset(_) => {
-                    Err(DataFusionError::Internal(
-                        "Unsupported logical plan: OFFSET".to_string(),
-                    ))
+                    Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
                 }
                 LogicalPlan::CreateExternalTable(_) => {
                     // There is no default plan for "CREATE EXTERNAL
@@ -1298,7 +1296,7 @@ mod tests {
     };
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::scalar::ScalarValue;
-    use crate::test_util::scan_empty;
+    use crate::test_util::{scan_empty, scan_empty_with_partitions};
     use crate::{
         logical_plan::LogicalPlanBuilder, 
physical_plan::SendableRecordBatchStream,
     };
@@ -1334,7 +1332,7 @@ mod tests {
             .project(vec![col("c1"), col("c2")])?
             .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
             .sort(vec![col("c1").sort(true, true)])?
-            .limit(10)?
+            .limit(Some(3), Some(10))?
             .build()?;
 
         let plan = plan(&logical_plan).await?;
@@ -1372,14 +1370,44 @@ mod tests {
         let logical_plan = test_csv_scan()
             .await?
             .filter(col("c7").lt(col("c12")))?
+            .limit(Some(3), None)?
             .build()?;
 
         let plan = plan(&logical_plan).await?;
 
         // c12 is f64, c7 is u8 -> cast c7 to f64
         // the cast here is implicit so has CastOptions with safe=true
-        let expected = "predicate: BinaryExpr { left: TryCastExpr { expr: 
Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column 
{ name: \"c12\", index: 11 } }";
-        assert!(format!("{:?}", plan).contains(expected));
+        let _expected = "predicate: BinaryExpr { left: TryCastExpr { expr: 
Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column 
{ name: \"c12\", index: 11 } }";
+        let plan_debug_str = format!("{:?}", plan);
+        assert!(plan_debug_str.contains("GlobalLimitExec"));
+        assert!(plan_debug_str.contains("skip: Some(3)"));
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_zero_offset_plan() -> Result<()> {
+        let logical_plan = test_csv_scan().await?.limit(Some(0), 
None)?.build()?;
+        let plan = plan(&logical_plan).await?;
+        assert!(format!("{:?}", plan).contains("GlobalLimitExec"));
+        assert!(format!("{:?}", plan).contains("skip: Some(0)"));
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_limit_with_partitions() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("id", DataType::Int32, 
false)]);
+
+        let logical_plan = scan_empty_with_partitions(Some("test"), &schema, 
None, 2)?
+            .limit(Some(3), Some(5))?
+            .build()?;
+        let plan = plan(&logical_plan).await?;
+
+        assert!(format!("{:?}", plan).contains("GlobalLimitExec"));
+        assert!(format!("{:?}", plan).contains("skip: Some(3), fetch: 
Some(5)"));
+
+        // LocalLimitExec adjusts the `fetch`
+        assert!(format!("{:?}", plan).contains("LocalLimitExec"));
+        assert!(format!("{:?}", plan).contains("fetch: 8"));
         Ok(())
     }
 
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 1a6a5028e..ee53a424c 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -250,6 +250,22 @@ pub fn scan_empty(
     )
 }
 
+/// Scan an empty data source with configured partition, mainly used in tests.
+pub fn scan_empty_with_partitions(
+    name: Option<&str>,
+    table_schema: &Schema,
+    projection: Option<Vec<usize>>,
+    partitions: usize,
+) -> Result<LogicalPlanBuilder, DataFusionError> {
+    let table_schema = Arc::new(table_schema.clone());
+    let provider = 
Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
+    LogicalPlanBuilder::scan(
+        name.unwrap_or(UNNAMED_TABLE),
+        provider_as_source(provider),
+        projection,
+    )
+}
+
 /// Get the schema for the aggregate_test_* csv files
 pub fn aggr_test_schema() -> SchemaRef {
     let mut f1 = Field::new("c1", DataType::Utf8, false);
diff --git a/datafusion/core/tests/dataframe_functions.rs 
b/datafusion/core/tests/dataframe_functions.rs
index 48e39f370..b126d010c 100644
--- a/datafusion/core/tests/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe_functions.rs
@@ -72,7 +72,7 @@ macro_rules! assert_fn_batches {
     };
     ($EXPR:expr, $EXPECTED: expr, $LIMIT: expr) => {
         let df = create_test_table()?;
-        let df = df.select(vec![$EXPR])?.limit($LIMIT)?;
+        let df = df.select(vec![$EXPR])?.limit(None, Some($LIMIT))?;
         let batches = df.collect().await?;
 
         assert_batches_eq!($EXPECTED, &batches);
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index a25543b34..c9719aad6 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -71,12 +71,12 @@ async fn explain_analyze_baseline_metrics() {
     );
     assert_metrics!(
         &formatted,
-        "GlobalLimitExec: limit=3, ",
+        "GlobalLimitExec: skip=None, fetch=3, ",
         "metrics=[output_rows=1, elapsed_compute="
     );
     assert_metrics!(
         &formatted,
-        "LocalLimitExec: limit=3",
+        "LocalLimitExec: fetch=3",
         "metrics=[output_rows=3, elapsed_compute="
     );
     assert_metrics!(
@@ -656,7 +656,7 @@ async fn test_physical_plan_display_indent() {
 
     let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let expected = vec![
-        "GlobalLimitExec: limit=10",
+        "GlobalLimitExec: skip=None, fetch=10",
         "  SortExec: [the_min@2 DESC]",
         "    CoalescePartitionsExec",
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
diff --git a/datafusion/core/tests/sql/limit.rs 
b/datafusion/core/tests/sql/limit.rs
index 6cfc423de..646074209 100644
--- a/datafusion/core/tests/sql/limit.rs
+++ b/datafusion/core/tests/sql/limit.rs
@@ -184,3 +184,84 @@ async fn limit_multi_partitions() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn csv_offset_without_limit_99() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 99";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    #[rustfmt::skip]
+        let expected = vec![
+        "+----+",
+        "| c1 |",
+        "+----+",
+        "| e  |",
+        "+----+"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_offset_without_limit_100() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 100";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec!["++", "++"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_offset_without_limit_101() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 101";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec!["++", "++"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 2 LIMIT 2";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    #[rustfmt::skip]
+        let expected = vec![
+        "+----+",
+        "| c1 |",
+        "+----+",
+        "| b  |",
+        "| a  |",
+        "+----+"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset_the_same_as_nbr_of_rows() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 1 OFFSET 100";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec!["++", "++"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset_bigger_than_nbr_of_rows() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 1 OFFSET 101";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec!["++", "++"];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
diff --git a/datafusion/core/tests/user_defined_plan.rs 
b/datafusion/core/tests/user_defined_plan.rs
index de94c63b5..33cfd1f56 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -290,7 +290,12 @@ impl OptimizerRule for TopKOptimizerRule {
         // Note: this code simply looks for the pattern of a Limit followed by 
a
         // Sort and replaces it by a TopK node. It does not handle many
         // edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
-        if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan {
+        if let LogicalPlan::Limit(Limit {
+            fetch: Some(fetch),
+            input,
+            ..
+        }) = plan
+        {
             if let LogicalPlan::Sort(Sort {
                 ref expr,
                 ref input,
@@ -300,7 +305,7 @@ impl OptimizerRule for TopKOptimizerRule {
                     // we found a sort with a single sort expr, replace with a 
a TopK
                     return Ok(LogicalPlan::Extension(Extension {
                         node: Arc::new(TopKPlanNode {
-                            k: *n,
+                            k: *fetch,
                             input: self.optimize(input.as_ref(), 
optimizer_config)?,
                             expr: expr[0].clone(),
                         }),
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 917606144..d0309b95f 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -22,9 +22,9 @@ use crate::utils::{columnize_expr, exprlist_to_fields, 
from_plan};
 use crate::{
     logical_plan::{
         Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
-        JoinConstraint, JoinType, Limit, LogicalPlan, Offset, Partitioning, 
PlanType,
-        Projection, Repartition, Sort, SubqueryAlias, TableScan, 
ToStringifiedPlan,
-        Union, Values, Window,
+        JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, 
Projection,
+        Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union, 
Values,
+        Window,
     },
     utils::{
         expand_qualified_wildcard, expand_wildcard, expr_to_columns,
@@ -233,7 +233,7 @@ impl LogicalPlanBuilder {
             projected_schema: Arc::new(projected_schema),
             projection,
             filters,
-            limit: None,
+            fetch: None,
         });
         Ok(Self::from(table_scan))
     }
@@ -291,18 +291,16 @@ impl LogicalPlanBuilder {
         })))
     }
 
-    /// Apply a limit
-    pub fn limit(&self, n: usize) -> Result<Self> {
+    /// Limit the number of rows returned
+    ///
+    /// `skip` - Number of rows to skip before fetch any row.
+    ///
+    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
+    ///          if specified.
+    pub fn limit(&self, skip: Option<usize>, fetch: Option<usize>) -> 
Result<Self> {
         Ok(Self::from(LogicalPlan::Limit(Limit {
-            n,
-            input: Arc::new(self.plan.clone()),
-        })))
-    }
-
-    /// Apply an offset
-    pub fn offset(&self, offset: usize) -> Result<Self> {
-        Ok(Self::from(LogicalPlan::Offset(Offset {
-            offset,
+            skip,
+            fetch,
             input: Arc::new(self.plan.clone()),
         })))
     }
@@ -1026,15 +1024,13 @@ mod tests {
                     vec![sum(col("salary")).alias("total_salary")],
                 )?
                 .project(vec![col("state"), col("total_salary")])?
-                .limit(10)?
-                .offset(2)?
+                .limit(Some(2), Some(10))?
                 .build()?;
 
-        let expected = "Offset: 2\
-        \n  Limit: 10\
-        \n    Projection: #employee_csv.state, #total_salary\
-        \n      Aggregate: groupBy=[[#employee_csv.state]], 
aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
-        \n        TableScan: employee_csv projection=Some([state, salary])";
+        let expected = "Limit: skip=2, fetch=10\
+        \n  Projection: #employee_csv.state, #total_salary\
+        \n    Aggregate: groupBy=[[#employee_csv.state]], 
aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
+        \n      TableScan: employee_csv projection=Some([state, salary])";
 
         assert_eq!(expected, format!("{:?}", plan));
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 022f50ea3..c476300f1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -71,10 +71,8 @@ pub enum LogicalPlan {
     Subquery(Subquery),
     /// Aliased relation provides, or changes, the name of a relation.
     SubqueryAlias(SubqueryAlias),
-    /// Produces the first `n` tuples from its input and discards the rest.
+    /// Skip some number of rows, and then fetch some number of rows.
     Limit(Limit),
-    /// Adjusts the starting point at which the rest of the expressions begin 
to effect
-    Offset(Offset),
     /// Creates an external table.
     CreateExternalTable(CreateExternalTable),
     /// Creates an in memory table.
@@ -119,7 +117,6 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => 
input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
-            LogicalPlan::Offset(Offset { input, .. }) => input.schema(),
             LogicalPlan::Subquery(Subquery { subquery, .. }) => 
subquery.schema(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. 
}) => {
@@ -190,7 +187,6 @@ impl LogicalPlan {
             | LogicalPlan::Sort(Sort { input, .. })
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::CreateView(CreateView { input, .. })
-            | LogicalPlan::Offset(Offset { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
         }
@@ -245,7 +241,6 @@ impl LogicalPlan {
             | LogicalPlan::Subquery(_)
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Limit(_)
-            | LogicalPlan::Offset(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::CreateView(_)
@@ -274,7 +269,6 @@ impl LogicalPlan {
             LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => 
vec![left, right],
             LogicalPlan::Limit(Limit { input, .. }) => vec![input],
-            LogicalPlan::Offset(Offset { input, .. }) => vec![input],
             LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => 
vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -415,7 +409,6 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
-            LogicalPlan::Offset(Offset { input, .. }) => 
input.accept(visitor)?,
             LogicalPlan::Subquery(Subquery { subquery, .. }) => {
                 subquery.accept(visitor)?
             }
@@ -653,7 +646,7 @@ impl LogicalPlan {
                         ref table_name,
                         ref projection,
                         ref filters,
-                        ref limit,
+                        ref fetch,
                         ..
                     }) => {
                         let projected_fields = match projection {
@@ -710,8 +703,8 @@ impl LogicalPlan {
                             }
                         }
 
-                        if let Some(n) = limit {
-                            write!(f, ", limit={}", n)?;
+                        if let Some(n) = fetch {
+                            write!(f, ", fetch={}", n)?;
                         }
 
                         Ok(())
@@ -816,9 +809,17 @@ impl LogicalPlan {
                             )
                         }
                     },
-                    LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, 
"Limit: {}", n),
-                    LogicalPlan::Offset(Offset { ref offset, .. }) => {
-                        write!(f, "Offset: {}", offset)
+                    LogicalPlan::Limit(Limit {
+                        ref skip,
+                        ref fetch,
+                        ..
+                    }) => {
+                        write!(
+                            f,
+                            "Limit: skip={}, fetch={}",
+                            skip.map_or("None".to_string(), |x| x.to_string()),
+                            fetch.map_or_else(|| "None".to_string(), |x| 
x.to_string())
+                        )
                     }
                     LogicalPlan::Subquery(Subquery { subquery, .. }) => {
                         write!(f, "Subquery: {:?}", subquery)
@@ -1037,8 +1038,8 @@ pub struct TableScan {
     pub projected_schema: DFSchemaRef,
     /// Optional expressions to be used as filters by the table provider
     pub filters: Vec<Expr>,
-    /// Optional limit to skip reading
-    pub limit: Option<usize>,
+    /// Optional number of rows to read
+    pub fetch: Option<usize>,
 }
 
 /// Apply Cross Join to two logical plans
@@ -1166,8 +1167,10 @@ pub struct Extension {
 /// Produces the first `n` tuples from its input and discards the rest.
 #[derive(Clone)]
 pub struct Limit {
-    /// The limit
-    pub n: usize,
+    /// Number of rows to skip before fetch
+    pub skip: Option<usize>,
+    /// Maximum number of rows to fetch
+    pub fetch: Option<usize>,
     /// The logical plan
     pub input: Arc<LogicalPlan>,
 }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3986eb3e6..065e6120b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -21,8 +21,8 @@ use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, 
Recursion};
 use crate::logical_plan::builder::build_join_schema;
 use crate::logical_plan::{
     Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter, 
Join, Limit,
-    Offset, Partitioning, Projection, Repartition, Sort, Subquery, 
SubqueryAlias, Union,
-    Values, Window,
+    Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias, 
Union, Values,
+    Window,
 };
 use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
 use datafusion_common::{
@@ -427,12 +427,9 @@ pub fn from_plan(
                 schema,
             }))
         }
-        LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
-            n: *n,
-            input: Arc::new(inputs[0].clone()),
-        })),
-        LogicalPlan::Offset(Offset { offset, .. }) => 
Ok(LogicalPlan::Offset(Offset {
-            offset: *offset,
+        LogicalPlan::Limit(Limit { skip, fetch, .. }) => 
Ok(LogicalPlan::Limit(Limit {
+            skip: *skip,
+            fetch: *fetch,
             input: Arc::new(inputs[0].clone()),
         })),
         LogicalPlan::CreateMemoryTable(CreateMemoryTable {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index bc635c215..458714bd7 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -222,7 +222,6 @@ fn optimize(
         | LogicalPlan::Subquery(_)
         | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Limit(_)
-        | LogicalPlan::Offset(_)
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
diff --git a/datafusion/optimizer/src/eliminate_limit.rs 
b/datafusion/optimizer/src/eliminate_limit.rs
index f6d3b8472..822b03a4a 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -42,12 +42,14 @@ impl OptimizerRule for EliminateLimit {
         optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
-            LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
-                Ok(LogicalPlan::EmptyRelation(EmptyRelation {
-                    produce_one_row: false,
-                    schema: input.schema().clone(),
-                }))
-            }
+            LogicalPlan::Limit(Limit {
+                fetch: Some(0),
+                input,
+                ..
+            }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: false,
+                schema: input.schema().clone(),
+            })),
             // Rest: recurse and find possible LIMIT 0 nodes
             _ => {
                 let expr = plan.expressions();
@@ -91,7 +93,7 @@ mod tests {
         let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b"))])
             .unwrap()
-            .limit(0)
+            .limit(None, Some(0))
             .unwrap()
             .build()
             .unwrap();
@@ -112,7 +114,7 @@ mod tests {
         let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![sum(col("b"))])
             .unwrap()
-            .limit(0)
+            .limit(None, Some(0))
             .unwrap()
             .union(plan1)
             .unwrap()
diff --git a/datafusion/optimizer/src/filter_push_down.rs 
b/datafusion/optimizer/src/filter_push_down.rs
index 8d2d9c06d..7b3b6a21f 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -558,7 +558,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             filters,
             projection,
             table_name,
-            limit,
+            fetch,
         }) => {
             let mut used_columns = HashSet::new();
             let mut new_filters = filters.clone();
@@ -594,7 +594,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
                     projected_schema: projected_schema.clone(),
                     table_name: table_name.clone(),
                     filters: new_filters,
-                    limit: *limit,
+                    fetch: *fetch,
                 }),
             )
         }
@@ -693,13 +693,13 @@ mod tests {
         let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
-            .limit(10)?
+            .limit(None, Some(10))?
             .filter(col("a").eq(lit(1i64)))?
             .build()?;
         // filter is before single projection
         let expected = "\
             Filter: #test.a = Int64(1)\
-            \n  Limit: 10\
+            \n  Limit: skip=None, fetch=10\
             \n    Projection: #test.a, #test.b\
             \n      TableScan: test projection=None";
         assert_optimized_plan_eq(&plan, expected);
@@ -945,8 +945,8 @@ mod tests {
         let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a"), col("b")])?
-            .limit(20)?
-            .limit(10)?
+            .limit(None, Some(20))?
+            .limit(None, Some(10))?
             .project(vec![col("a"), col("b")])?
             .filter(col("a").eq(lit(1i64)))?
             .build()?;
@@ -954,8 +954,8 @@ mod tests {
         let expected = "\
             Projection: #test.a, #test.b\
             \n  Filter: #test.a = Int64(1)\
-            \n    Limit: 10\
-            \n      Limit: 20\
+            \n    Limit: skip=None, fetch=10\
+            \n      Limit: skip=None, fetch=20\
             \n        Projection: #test.a, #test.b\
             \n          TableScan: test projection=None";
         assert_optimized_plan_eq(&plan, expected);
@@ -1008,7 +1008,7 @@ mod tests {
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
             .filter(col("a").lt_eq(lit(1i64)))?
-            .limit(1)?
+            .limit(None, Some(1))?
             .project(vec![col("a")])?
             .filter(col("a").gt_eq(lit(1i64)))?
             .build()?;
@@ -1019,7 +1019,7 @@ mod tests {
             format!("{:?}", plan),
             "Filter: #test.a >= Int64(1)\
              \n  Projection: #test.a\
-             \n    Limit: 1\
+             \n    Limit: skip=None, fetch=1\
              \n      Filter: #test.a <= Int64(1)\
              \n        Projection: #test.a\
              \n          TableScan: test projection=None"
@@ -1028,7 +1028,7 @@ mod tests {
         let expected = "\
         Projection: #test.a\
         \n  Filter: #test.a >= Int64(1)\
-        \n    Limit: 1\
+        \n    Limit: skip=None, fetch=1\
         \n      Projection: #test.a\
         \n        Filter: #test.a <= Int64(1)\
         \n          TableScan: test projection=None";
@@ -1042,7 +1042,7 @@ mod tests {
     fn two_filters_on_same_depth() -> Result<()> {
         let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .limit(1)?
+            .limit(None, Some(1))?
             .filter(col("a").lt_eq(lit(1i64)))?
             .filter(col("a").gt_eq(lit(1i64)))?
             .project(vec![col("a")])?
@@ -1054,14 +1054,14 @@ mod tests {
             "Projection: #test.a\
             \n  Filter: #test.a >= Int64(1)\
             \n    Filter: #test.a <= Int64(1)\
-            \n      Limit: 1\
+            \n      Limit: skip=None, fetch=1\
             \n        TableScan: test projection=None"
         );
 
         let expected = "\
         Projection: #test.a\
         \n  Filter: #test.a >= Int64(1) AND #test.a <= Int64(1)\
-        \n    Limit: 1\
+        \n    Limit: skip=None, fetch=1\
         \n      TableScan: test projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -1742,7 +1742,7 @@ mod tests {
             )?),
             projection: None,
             source: Arc::new(test_provider),
-            limit: None,
+            fetch: None,
         });
 
         LogicalPlanBuilder::from(table_scan)
@@ -1815,7 +1815,7 @@ mod tests {
             )?),
             projection: Some(vec![0]),
             source: Arc::new(test_provider),
-            limit: None,
+            fetch: None,
         });
 
         let plan = LogicalPlanBuilder::from(table_scan)
diff --git a/datafusion/optimizer/src/limit_push_down.rs 
b/datafusion/optimizer/src/limit_push_down.rs
index 91f976001..8e47048db 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -20,9 +20,7 @@
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{
-    logical_plan::{
-        Join, JoinType, Limit, LogicalPlan, Offset, Projection, TableScan, 
Union,
-    },
+    logical_plan::{Join, JoinType, Limit, LogicalPlan, Projection, TableScan, 
Union},
     utils::from_plan,
 };
 use std::sync::Arc;
@@ -43,61 +41,84 @@ impl LimitPushDown {
 /// when traversing down related to "limit push down".
 enum Ancestor {
     /// Limit
-    FromLimit,
-    /// Offset
-    FromOffset,
+    FromLimit {
+        skip: Option<usize>,
+        fetch: Option<usize>,
+    },
     /// Other nodes that don't affect the adjustment of "Limit"
     NotRelevant,
 }
 
 ///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important 
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
+///
+/// If the current node is a Limit, its "fetch" is updated by:
+/// 1. extended_fetch = extended the "fetch" with ancestor's "skip".
+/// 2. min(extended_fetch, current node's fetch)
+///
+/// Current node's "skip" is never updated, it is
+/// just a hint for the child to extend its "fetch".
 ///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// When building a new Limit in Union, the "fetch" is calculated
+/// by using ancestor's "fetch" and "skip".
+///
+/// When finally assign "limit" in TableScan, the "limit" is calculated
+/// by using ancestor's "fetch" and "skip".
 ///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-///    When the ancestor is a "Limit" and the current node is a "Limit",
-///    it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-///    it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-///    it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-///    it is updated to (None, n1). Note that this won't happen when we
-///    generate the plan from SQL, it can happen when we build the plan
-///    using LogicalPlanBuilder.
 fn limit_push_down(
     _optimizer: &LimitPushDown,
     ancestor: Ancestor,
-    ancestor_offset: Option<usize>,
-    ancestor_limit: Option<usize>,
     plan: &LogicalPlan,
     _optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
-    match (plan, ancestor_limit) {
-        (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
-            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
-                Ancestor::FromLimit | Ancestor::FromOffset => (
-                    None,
-                    Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
-                ),
-                Ancestor::NotRelevant => (None, Some(*n)),
+    match (plan, ancestor) {
+        (
+            LogicalPlan::Limit(Limit {
+                skip: current_skip,
+                fetch: current_fetch,
+                input,
+            }),
+            ancestor,
+        ) => {
+            let new_current_fetch = match ancestor {
+                Ancestor::FromLimit {
+                    skip: ancestor_skip,
+                    fetch: ancestor_fetch,
+                } => {
+                    if let Some(fetch) = current_fetch {
+                        // extend ancestor's fetch
+                        let ancestor_fetch =
+                            ancestor_fetch.map(|f| f + 
ancestor_skip.unwrap_or(0));
+
+                        let new_current_fetch =
+                            ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x, 
*fetch));
+
+                        Some(new_current_fetch)
+                    } else {
+                        // we dont have a "fetch", and we can push down our 
parent's "fetch"
+                        // extend ancestor's fetch
+                        ancestor_fetch.map(|f| f + ancestor_skip.unwrap_or(0))
+                    }
+                }
+                _ => *current_fetch,
             };
 
             Ok(LogicalPlan::Limit(Limit {
-                n: new_ancestor_limit.unwrap_or(*n),
-                // push down limit to plan (minimum of upper limit and current 
limit)
+                // current node's "skip" is not updated, updating
+                // this value would violate the semantics of Limit operator
+                skip: *current_skip,
+                fetch: new_current_fetch,
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    Ancestor::FromLimit,
-                    new_ancestor_offset,
-                    new_ancestor_limit,
+                    Ancestor::FromLimit {
+                        // current node's "skip" is passing to the subtree
+                        // so that the child can extend the "fetch"
+                        skip: *current_skip,
+                        fetch: new_current_fetch,
+                    },
                     input.as_ref(),
                     _optimizer_config,
                 )?),
@@ -109,20 +130,28 @@ fn limit_push_down(
                 source,
                 projection,
                 filters,
-                limit,
+                fetch,
                 projected_schema,
             }),
-            Some(ancestor_limit),
-        ) => Ok(LogicalPlan::TableScan(TableScan {
-            table_name: table_name.clone(),
-            source: source.clone(),
-            projection: projection.clone(),
-            filters: filters.clone(),
-            limit: limit
-                .map(|x| std::cmp::min(x, ancestor_limit))
-                .or(Some(ancestor_limit)),
-            projected_schema: projected_schema.clone(),
-        })),
+            Ancestor::FromLimit {
+                skip: ancestor_skip,
+                fetch: Some(ancestor_fetch),
+                ..
+            },
+        ) => {
+            let ancestor_fetch =
+                ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
+            Ok(LogicalPlan::TableScan(TableScan {
+                table_name: table_name.clone(),
+                source: source.clone(),
+                projection: projection.clone(),
+                filters: filters.clone(),
+                fetch: fetch
+                    .map(|x| std::cmp::min(x, ancestor_fetch))
+                    .or(Some(ancestor_fetch)),
+                projected_schema: projected_schema.clone(),
+            }))
+        }
         (
             LogicalPlan::Projection(Projection {
                 expr,
@@ -130,7 +159,7 @@ fn limit_push_down(
                 schema,
                 alias,
             }),
-            ancestor_limit,
+            ancestor,
         ) => {
             // Push down limit directly (projection doesn't change number of 
rows)
             Ok(LogicalPlan::Projection(Projection {
@@ -138,8 +167,6 @@ fn limit_push_down(
                 input: Arc::new(limit_push_down(
                     _optimizer,
                     ancestor,
-                    ancestor_offset,
-                    ancestor_limit,
                     input.as_ref(),
                     _optimizer_config,
                 )?),
@@ -153,19 +180,27 @@ fn limit_push_down(
                 alias,
                 schema,
             }),
-            Some(ancestor_limit),
+            Ancestor::FromLimit {
+                skip: ancestor_skip,
+                fetch: Some(ancestor_fetch),
+                ..
+            },
         ) => {
             // Push down limit through UNION
+            let ancestor_fetch =
+                ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
             let new_inputs = inputs
                 .iter()
                 .map(|x| {
                     Ok(LogicalPlan::Limit(Limit {
-                        n: ancestor_limit,
+                        skip: None,
+                        fetch: Some(ancestor_fetch),
                         input: Arc::new(limit_push_down(
                             _optimizer,
-                            Ancestor::FromLimit,
-                            None,
-                            Some(ancestor_limit),
+                            Ancestor::FromLimit {
+                                skip: None,
+                                fetch: Some(ancestor_fetch),
+                            },
                             x,
                             _optimizer_config,
                         )?),
@@ -178,52 +213,47 @@ fn limit_push_down(
                 schema: schema.clone(),
             }))
         }
-        // offset 5 limit 10 then push limit 15 (5 + 10)
-        (LogicalPlan::Offset(Offset { offset, input }), ancestor_limit) => {
-            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
-                Ancestor::FromLimit => {
-                    (Some(*offset), ancestor_limit.map(|x| x + *offset))
+        (
+            LogicalPlan::Join(Join { join_type, .. }),
+            Ancestor::FromLimit {
+                skip: ancestor_skip,
+                fetch: Some(ancestor_fetch),
+                ..
+            },
+        ) => {
+            let ancestor_fetch =
+                ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
+            match join_type {
+                JoinType::Left => {
+                    //if LeftOuter join push limit to left
+                    generate_push_down_join(
+                        _optimizer,
+                        _optimizer_config,
+                        plan,
+                        Some(ancestor_fetch),
+                        None,
+                    )
                 }
-                Ancestor::FromOffset => (Some(*offset), None),
-                Ancestor::NotRelevant => (Some(*offset), None),
-            };
-
-            Ok(LogicalPlan::Offset(Offset {
-                offset: *offset,
-                input: Arc::new(limit_push_down(
-                    _optimizer,
-                    Ancestor::FromOffset,
-                    new_ancestor_offset,
-                    new_ancestor_limit,
-                    input.as_ref(),
-                    _optimizer_config,
-                )?),
-            }))
-        }
-        (LogicalPlan::Join(Join { join_type, .. }), upper_limit) => match 
join_type {
-            JoinType::Left => {
-                //if LeftOuter join push limit to left
-                generate_push_down_join(
+                JoinType::Right =>
+                // If RightOuter join  push limit to right
+                {
+                    generate_push_down_join(
+                        _optimizer,
+                        _optimizer_config,
+                        plan,
+                        None,
+                        Some(ancestor_fetch),
+                    )
+                }
+                _ => generate_push_down_join(
                     _optimizer,
                     _optimizer_config,
                     plan,
-                    upper_limit,
                     None,
-                )
-            }
-            JoinType::Right =>
-            // If RightOuter join  push limit to right
-            {
-                generate_push_down_join(
-                    _optimizer,
-                    _optimizer_config,
-                    plan,
                     None,
-                    upper_limit,
-                )
+                ),
             }
-            _ => generate_push_down_join(_optimizer, _optimizer_config, plan, 
None, None),
-        },
+        }
         // For other nodes we can't push down the limit
         // But try to recurse and find other limit nodes to push down
         _ => push_down_children_limit(_optimizer, _optimizer_config, plan),
@@ -251,17 +281,19 @@ fn generate_push_down_join(
         return Ok(LogicalPlan::Join(Join {
             left: Arc::new(limit_push_down(
                 _optimizer,
-                Ancestor::FromLimit,
-                None,
-                left_limit,
+                Ancestor::FromLimit {
+                    skip: None,
+                    fetch: left_limit,
+                },
                 left.as_ref(),
                 _optimizer_config,
             )?),
             right: Arc::new(limit_push_down(
                 _optimizer,
-                Ancestor::FromLimit,
-                None,
-                right_limit,
+                Ancestor::FromLimit {
+                    skip: None,
+                    fetch: right_limit,
+                },
                 right.as_ref(),
                 _optimizer_config,
             )?),
@@ -292,14 +324,7 @@ fn push_down_children_limit(
     let new_inputs = inputs
         .iter()
         .map(|plan| {
-            limit_push_down(
-                _optimizer,
-                Ancestor::NotRelevant,
-                None,
-                None,
-                plan,
-                _optimizer_config,
-            )
+            limit_push_down(_optimizer, Ancestor::NotRelevant, plan, 
_optimizer_config)
         })
         .collect::<Result<Vec<_>>>()?;
 
@@ -312,14 +337,7 @@ impl OptimizerRule for LimitPushDown {
         plan: &LogicalPlan,
         optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        limit_push_down(
-            self,
-            Ancestor::NotRelevant,
-            None,
-            None,
-            plan,
-            optimizer_config,
-        )
+        limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -352,14 +370,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .build()?;
 
         // Should push the limit down to table provider
         // When it has a select
-        let expected = "Limit: 1000\
+        let expected = "Limit: skip=None, fetch=1000\
         \n  Projection: #test.a\
-        \n    TableScan: test projection=None, limit=1000";
+        \n    TableScan: test projection=None, fetch=1000";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -370,16 +388,16 @@ mod test {
         let table_scan = test_table_scan()?;
 
         let plan = LogicalPlanBuilder::from(table_scan)
-            .limit(1000)?
-            .limit(10)?
+            .limit(None, Some(1000))?
+            .limit(None, Some(10))?
             .build()?;
 
         // Should push down the smallest limit
         // Towards table scan
         // This rule doesn't replace multiple limits
-        let expected = "Limit: 10\
-        \n  Limit: 10\
-        \n    TableScan: test projection=None, limit=10";
+        let expected = "Limit: skip=None, fetch=10\
+        \n  Limit: skip=None, fetch=10\
+        \n    TableScan: test projection=None, fetch=10";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -392,11 +410,11 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![max(col("b"))])?
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .build()?;
 
         // Limit should *not* push down aggregate node
-        let expected = "Limit: 1000\
+        let expected = "Limit: skip=None, fetch=1000\
         \n  Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
         \n    TableScan: test projection=None";
 
@@ -411,16 +429,16 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan.clone())
             .union(LogicalPlanBuilder::from(table_scan).build()?)?
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .build()?;
 
         // Limit should push down through union
-        let expected = "Limit: 1000\
+        let expected = "Limit: skip=None, fetch=1000\
         \n  Union\
-        \n    Limit: 1000\
-        \n      TableScan: test projection=None, limit=1000\
-        \n    Limit: 1000\
-        \n      TableScan: test projection=None, limit=1000";
+        \n    Limit: skip=None, fetch=1000\
+        \n      TableScan: test projection=None, fetch=1000\
+        \n    Limit: skip=None, fetch=1000\
+        \n      TableScan: test projection=None, fetch=1000";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -432,16 +450,16 @@ mod test {
         let table_scan = test_table_scan()?;
 
         let plan = LogicalPlanBuilder::from(table_scan)
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .aggregate(vec![col("a")], vec![max(col("b"))])?
-            .limit(10)?
+            .limit(None, Some(10))?
             .build()?;
 
         // Limit should use deeper LIMIT 1000, but Limit 10 shouldn't push 
down aggregation
-        let expected = "Limit: 10\
+        let expected = "Limit: skip=None, fetch=10\
         \n  Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
-        \n    Limit: 1000\
-        \n      TableScan: test projection=None, limit=1000";
+        \n    Limit: skip=None, fetch=1000\
+        \n      TableScan: test projection=None, fetch=1000";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -451,11 +469,13 @@ mod test {
     #[test]
     fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> 
Result<()> {
         let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan).offset(10)?.build()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .limit(Some(10), None)?
+            .build()?;
 
         // Should not push any limit down to table provider
         // When it has a select
-        let expected = "Offset: 10\
+        let expected = "Limit: skip=10, fetch=None\
         \n  TableScan: test projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -468,16 +488,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Should push the limit down to table provider
         // When it has a select
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Projection: #test.a\
-        \n      TableScan: test projection=None, limit=1010";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Projection: #test.a\
+        \n    TableScan: test projection=None, fetch=1010";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -490,14 +508,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .limit(1000)?
-            .offset(10)?
+            .limit(None, Some(1000))?
+            .limit(Some(10), None)?
             .build()?;
 
-        let expected = "Offset: 10\
-        \n  Limit: 1000\
+        let expected = "Limit: skip=10, fetch=None\
+        \n  Limit: skip=None, fetch=1000\
         \n    Projection: #test.a\
-        \n      TableScan: test projection=None, limit=1000";
+        \n      TableScan: test projection=None, fetch=1000";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -510,14 +528,14 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("a")])?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), None)?
+            .limit(None, Some(1000))?
             .build()?;
 
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
+        let expected = "Limit: skip=None, fetch=1000\
+        \n  Limit: skip=10, fetch=1000\
         \n    Projection: #test.a\
-        \n      TableScan: test projection=None, limit=1010";
+        \n      TableScan: test projection=None, fetch=1010";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -529,18 +547,18 @@ mod test {
         let table_scan = test_table_scan()?;
 
         let plan = LogicalPlanBuilder::from(table_scan)
-            .offset(10)?
-            .limit(1000)?
-            .limit(10)?
+            .limit(Some(10), None)?
+            .limit(None, Some(1000))?
+            .limit(None, Some(10))?
             .build()?;
 
         // Should push down the smallest limit
         // Towards table scan
         // This rule doesn't replace multiple limits
-        let expected = "Limit: 10\
-        \n  Limit: 10\
-        \n    Offset: 10\
-        \n      TableScan: test projection=None, limit=20";
+        let expected = "Limit: skip=None, fetch=10\
+        \n  Limit: skip=None, fetch=10\
+        \n    Limit: skip=10, fetch=10\
+        \n      TableScan: test projection=None, fetch=20";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -553,15 +571,13 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .aggregate(vec![col("a")], vec![max(col("b"))])?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit should *not* push down aggregate node
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
-        \n      TableScan: test projection=None";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
+        \n    TableScan: test projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -574,18 +590,16 @@ mod test {
 
         let plan = LogicalPlanBuilder::from(table_scan.clone())
             .union(LogicalPlanBuilder::from(table_scan).build()?)?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit should push down through union
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Union\
-        \n      Limit: 1010\
-        \n        TableScan: test projection=None, limit=1010\
-        \n      Limit: 1010\
-        \n        TableScan: test projection=None, limit=1010";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Union\
+        \n    Limit: skip=None, fetch=1010\
+        \n      TableScan: test projection=None, fetch=1010\
+        \n    Limit: skip=None, fetch=1010\
+        \n      TableScan: test projection=None, fetch=1010";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -604,16 +618,14 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .limit(1000)?
-            .offset(10)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit pushdown Not supported in Join
-        let expected = "Offset: 10\
-        \n  Limit: 1000\
-        \n    Inner Join: #test.a = #test2.a\
-        \n      TableScan: test projection=None\
-        \n      TableScan: test2 projection=None";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Inner Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None\
+        \n    TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -632,16 +644,14 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit pushdown Not supported in Join
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Inner Join: #test.a = #test2.a\
-        \n      TableScan: test projection=None\
-        \n      TableScan: test2 projection=None";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Inner Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None\
+        \n    TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -661,18 +671,16 @@ mod test {
         let outer_query = LogicalPlanBuilder::from(table_scan_2)
             .project(vec![col("a")])?
             .filter(exists(Arc::new(subquery)))?
-            .limit(100)?
-            .offset(10)?
+            .limit(Some(10), Some(100))?
             .build()?;
 
         // Limit pushdown Not supported in sub_query
-        let expected = "Offset: 10\
-        \n  Limit: 100\
-        \n    Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+        let expected = "Limit: skip=10, fetch=100\
+        \n  Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
         \n  Projection: #test1.a\
         \n    TableScan: test1 projection=None)\
-        \n      Projection: #test2.a\
-        \n        TableScan: test2 projection=None";
+        \n    Projection: #test2.a\
+        \n      TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&outer_query, expected);
 
@@ -692,18 +700,16 @@ mod test {
         let outer_query = LogicalPlanBuilder::from(table_scan_2)
             .project(vec![col("a")])?
             .filter(exists(Arc::new(subquery)))?
-            .offset(10)?
-            .limit(100)?
+            .limit(Some(10), Some(100))?
             .build()?;
 
         // Limit pushdown Not supported in sub_query
-        let expected = "Limit: 100\
-        \n  Offset: 10\
-        \n    Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+        let expected = "Limit: skip=10, fetch=100\
+        \n  Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
         \n  Projection: #test1.a\
         \n    TableScan: test1 projection=None)\
-        \n      Projection: #test2.a\
-        \n        TableScan: test2 projection=None";
+        \n    Projection: #test2.a\
+        \n      TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&outer_query, expected);
 
@@ -722,13 +728,13 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .build()?;
 
         // Limit pushdown Not supported in Join
-        let expected = "Limit: 1000\
+        let expected = "Limit: skip=None, fetch=1000\
         \n  Left Join: #test.a = #test2.a\
-        \n    TableScan: test projection=None, limit=1000\
+        \n    TableScan: test projection=None, fetch=1000\
         \n    TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
@@ -748,16 +754,14 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit pushdown Not supported in Join
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Left Join: #test.a = #test2.a\
-        \n      TableScan: test projection=None, limit=1010\
-        \n      TableScan: test2 projection=None";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Left Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None, fetch=1010\
+        \n    TableScan: test2 projection=None";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -776,14 +780,14 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .limit(1000)?
+            .limit(None, Some(1000))?
             .build()?;
 
         // Limit pushdown Not supported in Join
-        let expected = "Limit: 1000\
+        let expected = "Limit: skip=None, fetch=1000\
         \n  Right Join: #test.a = #test2.a\
         \n    TableScan: test projection=None\
-        \n    TableScan: test2 projection=None, limit=1000";
+        \n    TableScan: test2 projection=None, fetch=1000";
 
         assert_optimized_plan_eq(&plan, expected);
 
@@ -802,16 +806,14 @@ mod test {
                 (vec!["a"], vec!["a"]),
                 None,
             )?
-            .offset(10)?
-            .limit(1000)?
+            .limit(Some(10), Some(1000))?
             .build()?;
 
         // Limit pushdown with offset supported in right outer join
-        let expected = "Limit: 1000\
-        \n  Offset: 10\
-        \n    Right Join: #test.a = #test2.a\
-        \n      TableScan: test projection=None\
-        \n      TableScan: test2 projection=None, limit=1010";
+        let expected = "Limit: skip=10, fetch=1000\
+        \n  Right Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None\
+        \n    TableScan: test2 projection=None, fetch=1010";
 
         assert_optimized_plan_eq(&plan, expected);
 
diff --git a/datafusion/optimizer/src/projection_push_down.rs 
b/datafusion/optimizer/src/projection_push_down.rs
index c6c81fd1f..c9aee1e03 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -362,7 +362,7 @@ fn optimize_plan(
             table_name,
             source,
             filters,
-            limit,
+            fetch: limit,
             ..
         }) => {
             let (projection, projected_schema) = get_projected_schema(
@@ -378,7 +378,7 @@ fn optimize_plan(
                 projection: Some(projection),
                 projected_schema,
                 filters: filters.clone(),
-                limit: *limit,
+                fetch: *limit,
             }))
         }
         LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
@@ -486,7 +486,6 @@ fn optimize_plan(
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
         LogicalPlan::Limit(_)
-        | LogicalPlan::Offset(_)
         | LogicalPlan::Filter { .. }
         | LogicalPlan::Repartition(_)
         | LogicalPlan::EmptyRelation(_)
@@ -866,12 +865,12 @@ mod tests {
 
         let plan = LogicalPlanBuilder::from(table_scan)
             .project(vec![col("c"), col("a")])?
-            .limit(5)?
+            .limit(None, Some(5))?
             .build()?;
 
         assert_fields_eq(&plan, vec!["c", "a"]);
 
-        let expected = "Limit: 5\
+        let expected = "Limit: skip=None, fetch=5\
         \n  Projection: #test.c, #test.a\
         \n    TableScan: test projection=Some([a, c])";
 
diff --git a/datafusion/optimizer/src/simplify_expressions.rs 
b/datafusion/optimizer/src/simplify_expressions.rs
index 9be9bfde8..e40180f69 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -1630,7 +1630,7 @@ mod tests {
             .unwrap()
             .filter(col("c").not_eq(lit(false)))
             .unwrap()
-            .limit(1)
+            .limit(None, Some(1))
             .unwrap()
             .project(vec![col("a")])
             .unwrap()
@@ -1639,7 +1639,7 @@ mod tests {
 
         let expected = "\
         Projection: #test.a\
-        \n  Limit: 1\
+        \n  Limit: skip=None, fetch=1\
         \n    Filter: #test.c AS test.c != Boolean(false)\
         \n      Filter: NOT #test.b AS test.b != Boolean(true)\
         \n        TableScan: test projection=None";
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index d17bad849..4522cd63e 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -68,7 +68,6 @@ message LogicalPlanNode {
     CreateCatalogNode create_catalog = 20;
     SubqueryAliasNode subquery_alias = 21;
     CreateViewNode create_view = 22;
-    OffsetNode offset = 23;
   }
 }
 
@@ -244,12 +243,10 @@ message CrossJoinNode {
 
 message LimitNode {
   LogicalPlanNode input = 1;
-  uint32 limit = 2;
-}
-
-message OffsetNode {
-  LogicalPlanNode input = 1;
-  uint32 offset = 2;
+  // The number of rows to skip before fetch; non-positive means don't skip any
+  int64 skip = 2;
+  // Maximum number of rows to fetch; negative means no limit
+  int64 fetch = 3;
 }
 
 message SelectionExecNode {
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index c6ab11458..c1ee0ab05 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -39,8 +39,7 @@ use datafusion_expr::{
     logical_plan::{
         Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, 
CreateView,
         CrossJoin, EmptyRelation, Extension, Filter, Join, JoinConstraint, 
JoinType,
-        Limit, Offset, Projection, Repartition, Sort, SubqueryAlias, 
TableScan, Values,
-        Window,
+        Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, 
Values, Window,
     },
     Expr, LogicalPlan, LogicalPlanBuilder,
 };
@@ -566,16 +565,19 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlanType::Limit(limit) => {
                 let input: LogicalPlan =
                     into_logical_plan!(limit.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .limit(limit.limit as usize)?
-                    .build()
-            }
-            LogicalPlanType::Offset(offset) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(offset.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .offset(offset.offset as usize)?
-                    .build()
+                let skip = if limit.skip <= 0 {
+                    None
+                } else {
+                    Some(limit.skip as usize)
+                };
+
+                let fetch = if limit.fetch < 0 {
+                    None
+                } else {
+                    Some(limit.fetch as usize)
+                };
+
+                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
             }
             LogicalPlanType::Join(join) => {
                 let left_keys: Vec<Column> =
@@ -920,7 +922,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
-            LogicalPlan::Limit(Limit { input, n }) => {
+            LogicalPlan::Limit(Limit { input, skip, fetch }) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
                         input.as_ref(),
@@ -930,22 +932,8 @@ impl AsLogicalPlan for LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
                         protobuf::LimitNode {
                             input: Some(Box::new(input)),
-                            limit: *n as u32,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Offset(Offset { input, offset }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
-                        protobuf::OffsetNode {
-                            input: Some(Box::new(input)),
-                            offset: *offset as u32,
+                            skip: skip.unwrap_or(0) as i64,
+                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
                         },
                     ))),
                 })
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 9f028e7e5..2a33be32d 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -296,13 +296,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let plan = self.set_expr_to_plan(set_expr, alias, ctes, 
outer_query_schema)?;
 
         let plan = self.order_by(plan, query.order_by)?;
-
-        // Offset is the parent of Limit.
-        // If both OFFSET and LIMIT appear,
-        // then OFFSET rows are skipped before starting to count the LIMIT 
rows that are returned.
-        // see https://www.postgresql.org/docs/current/queries-limit.html
-        let plan = self.offset(plan, query.offset)?;
-        self.limit(plan, query.limit)
+        self.limit(plan, query.offset, query.limit)
     }
 
     fn set_expr_to_plan(
@@ -1212,57 +1206,59 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     }
 
     /// Wrap a plan in a limit
-    fn limit(&self, input: LogicalPlan, limit: Option<SQLExpr>) -> 
Result<LogicalPlan> {
-        match limit {
-            Some(limit_expr) => {
-                let n = match self.sql_to_rex(
-                    limit_expr,
-                    input.schema(),
-                    &mut HashMap::new(),
-                )? {
-                    Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as 
usize),
-                    _ => Err(DataFusionError::Plan(
-                        "Unexpected expression for LIMIT clause".to_string(),
-                    )),
-                }?;
-
-                LogicalPlanBuilder::from(input).limit(n)?.build()
-            }
-            _ => Ok(input),
-        }
-    }
-
-    /// Wrap a plan in a offset
-    fn offset(
+    fn limit(
         &self,
         input: LogicalPlan,
-        offset: Option<SQLOffset>,
+        skip: Option<SQLOffset>,
+        fetch: Option<SQLExpr>,
     ) -> Result<LogicalPlan> {
-        match offset {
-            Some(offset_expr) => {
-                let offset = match self.sql_to_rex(
-                    offset_expr.value,
+        if skip.is_none() && fetch.is_none() {
+            return Ok(input);
+        }
+
+        let skip = match skip {
+            Some(skip_expr) => {
+                let skip = match self.sql_to_rex(
+                    skip_expr.value,
                     input.schema(),
                     &mut HashMap::new(),
                 )? {
-                    Expr::Literal(ScalarValue::Int64(Some(offset))) => {
-                        if offset < 0 {
+                    Expr::Literal(ScalarValue::Int64(Some(s))) => {
+                        if s < 0 {
                             return Err(DataFusionError::Plan(format!(
                                 "Offset must be >= 0, '{}' was provided.",
-                                offset
+                                s
                             )));
                         }
-                        Ok(offset as usize)
+                        Ok(s as usize)
                     }
                     _ => Err(DataFusionError::Plan(
                         "Unexpected expression in OFFSET clause".to_string(),
                     )),
                 }?;
+                Some(skip)
+            }
+            _ => None,
+        };
 
-                LogicalPlanBuilder::from(input).offset(offset)?.build()
+        let fetch = match fetch {
+            Some(limit_expr) => {
+                let n = match self.sql_to_rex(
+                    limit_expr,
+                    input.schema(),
+                    &mut HashMap::new(),
+                )? {
+                    Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as 
usize),
+                    _ => Err(DataFusionError::Plan(
+                        "Unexpected expression for LIMIT clause".to_string(),
+                    )),
+                }?;
+                Some(n)
             }
-            _ => Ok(input),
-        }
+            _ => None,
+        };
+
+        LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
     }
 
     /// Wrap the logical in a sort
@@ -4802,11 +4798,10 @@ mod tests {
     #[test]
     fn test_zero_offset_with_limit() {
         let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 
0;";
-        let expected = "Limit: 5\
-                                    \n  Offset: 0\
-                                    \n    Projection: #person.id\
-                                    \n      Filter: #person.id > Int64(100)\
-                                    \n        TableScan: person 
projection=None";
+        let expected = "Limit: skip=0, fetch=5\
+                                    \n  Projection: #person.id\
+                                    \n    Filter: #person.id > Int64(100)\
+                                    \n      TableScan: person projection=None";
         quick_test(sql, expected);
 
         // Flip the order of LIMIT and OFFSET in the query. Plan should remain 
the same.
@@ -4817,7 +4812,7 @@ mod tests {
     #[test]
     fn test_offset_no_limit() {
         let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 5;";
-        let expected = "Offset: 5\
+        let expected = "Limit: skip=5, fetch=None\
         \n  Projection: #person.id\
         \n    Filter: #person.id > Int64(100)\
         \n      TableScan: person projection=None";
@@ -4827,22 +4822,20 @@ mod tests {
     #[test]
     fn test_offset_after_limit() {
         let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 
3;";
-        let expected = "Limit: 5\
-        \n  Offset: 3\
-        \n    Projection: #person.id\
-        \n      Filter: #person.id > Int64(100)\
-        \n        TableScan: person projection=None";
+        let expected = "Limit: skip=3, fetch=5\
+        \n  Projection: #person.id\
+        \n    Filter: #person.id > Int64(100)\
+        \n      TableScan: person projection=None";
         quick_test(sql, expected);
     }
 
     #[test]
     fn test_offset_before_limit() {
         let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 
5;";
-        let expected = "Limit: 5\
-        \n  Offset: 3\
-        \n    Projection: #person.id\
-        \n      Filter: #person.id > Int64(100)\
-        \n        TableScan: person projection=None";
+        let expected = "Limit: skip=3, fetch=5\
+        \n  Projection: #person.id\
+        \n    Filter: #person.id > Int64(100)\
+        \n      TableScan: person projection=None";
         quick_test(sql, expected);
     }
 

Reply via email to