This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a25529166 Combine limit and offset to `fetch` and `skip` and implement
physical plan support (#2694)
a25529166 is described below
commit a25529166f82e4f52d56eba6448a5a32e126698e
Author: Hu Ming <[email protected]>
AuthorDate: Thu Jun 9 18:28:53 2022 +0800
Combine limit and offset to `fetch` and `skip` and implement physical plan
support (#2694)
* 1. combine "limit" and "offset" together in api/logical plan/physical
plan.
2. implement "offset" in physical plan.
* add several test and fix some comments
---
datafusion/core/src/dataframe.rs | 27 +-
datafusion/core/src/datasource/empty.rs | 16 +-
datafusion/core/src/execution/context.rs | 2 +-
datafusion/core/src/lib.rs | 2 +-
.../core/src/physical_optimizer/repartition.rs | 75 +++-
datafusion/core/src/physical_plan/empty.rs | 38 +-
datafusion/core/src/physical_plan/limit.rs | 275 ++++++++----
datafusion/core/src/physical_plan/planner.rs | 58 ++-
datafusion/core/src/test_util.rs | 16 +
datafusion/core/tests/dataframe_functions.rs | 2 +-
datafusion/core/tests/sql/explain_analyze.rs | 6 +-
datafusion/core/tests/sql/limit.rs | 81 ++++
datafusion/core/tests/user_defined_plan.rs | 9 +-
datafusion/expr/src/logical_plan/builder.rs | 40 +-
datafusion/expr/src/logical_plan/plan.rs | 39 +-
datafusion/expr/src/utils.rs | 13 +-
.../optimizer/src/common_subexpr_eliminate.rs | 1 -
datafusion/optimizer/src/eliminate_limit.rs | 18 +-
datafusion/optimizer/src/filter_push_down.rs | 32 +-
datafusion/optimizer/src/limit_push_down.rs | 462 +++++++++++----------
datafusion/optimizer/src/projection_push_down.rs | 9 +-
datafusion/optimizer/src/simplify_expressions.rs | 4 +-
datafusion/proto/proto/datafusion.proto | 11 +-
datafusion/proto/src/logical_plan.rs | 46 +-
datafusion/sql/src/planner.rs | 107 +++--
25 files changed, 864 insertions(+), 525 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5a1330bf9..02b9116be 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -62,7 +62,7 @@ use std::sync::Arc;
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
-/// .limit(100)?;
+/// .limit(None, Some(100))?;
/// let results = df.collect();
/// # Ok(())
/// # }
@@ -190,6 +190,9 @@ impl DataFrame {
/// Limit the number of rows returned from this DataFrame.
///
+ /// `skip` - Number of rows to skip before fetch any row
+ ///
+ /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows.
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
@@ -197,13 +200,17 @@ impl DataFrame {
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).await?;
- /// let df = df.limit(100)?;
+ /// let df = df.limit(None, Some(100))?;
/// # Ok(())
/// # }
/// ```
- pub fn limit(&self, n: usize) -> Result<Arc<DataFrame>> {
+ pub fn limit(
+ &self,
+ skip: Option<usize>,
+ fetch: Option<usize>,
+ ) -> Result<Arc<DataFrame>> {
let plan = LogicalPlanBuilder::from(self.plan.clone())
- .limit(n)?
+ .limit(skip, fetch)?
.build()?;
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
}
@@ -414,7 +421,7 @@ impl DataFrame {
/// # }
/// ```
pub async fn show_limit(&self, num: usize) -> Result<()> {
- let results = self.limit(num)?.collect().await?;
+ let results = self.limit(None, Some(num))?.collect().await?;
Ok(pretty::print_batches(&results)?)
}
@@ -514,7 +521,7 @@ impl DataFrame {
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).await?;
- /// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
+ /// let batches = df.limit(None, Some(100))?.explain(false,
false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
@@ -665,7 +672,7 @@ impl TableProvider for DataFrame {
Self::new(
self.session_state.clone(),
&limit
- .map_or_else(|| Ok(expr.clone()), |n| expr.limit(n))?
+ .map_or_else(|| Ok(expr.clone()), |n| expr.limit(None,
Some(n)))?
.plan
.clone(),
)
@@ -799,7 +806,9 @@ mod tests {
async fn limit() -> Result<()> {
// build query using Table API
let t = test_table().await?;
- let t2 = t.select_columns(&["c1", "c2", "c11"])?.limit(10)?;
+ let t2 = t
+ .select_columns(&["c1", "c2", "c11"])?
+ .limit(None, Some(10))?;
let plan = t2.plan.clone();
// build query using SQL
@@ -818,7 +827,7 @@ mod tests {
let df = test_table().await?;
let df = df
.select_columns(&["c1", "c2", "c11"])?
- .limit(10)?
+ .limit(None, Some(10))?
.explain(false, false)?;
let plan = df.plan.clone();
diff --git a/datafusion/core/src/datasource/empty.rs
b/datafusion/core/src/datasource/empty.rs
index 3bc7a958c..b8bdb5fd8 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -33,12 +33,22 @@ use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};
/// A table with a schema but no data.
pub struct EmptyTable {
schema: SchemaRef,
+ partitions: usize,
}
impl EmptyTable {
/// Initialize a new `EmptyTable` from a schema.
pub fn new(schema: SchemaRef) -> Self {
- Self { schema }
+ Self {
+ schema,
+ partitions: 1,
+ }
+ }
+
+ /// Creates a new EmptyTable with specified partition number.
+ pub fn with_partitions(mut self, partitions: usize) -> Self {
+ self.partitions = partitions;
+ self
}
}
@@ -65,6 +75,8 @@ impl TableProvider for EmptyTable {
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema,
projection.as_ref())?;
- Ok(Arc::new(EmptyExec::new(false, projected_schema)))
+ Ok(Arc::new(
+ EmptyExec::new(false,
projected_schema).with_partitions(self.partitions),
+ ))
}
}
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index db435e399..ac7727f64 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -129,7 +129,7 @@ const DEFAULT_SCHEMA: &str = "public";
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
-/// .limit(100)?;
+/// .limit(None, Some(100))?;
/// let results = df.collect();
/// # Ok(())
/// # }
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index df863ebd1..7a0cf475d 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -42,7 +42,7 @@
//! // create a plan
//! let df = df.filter(col("a").lt_eq(col("b")))?
//! .aggregate(vec![col("a")], vec![min(col("b"))])?
-//! .limit(100)?;
+//! .limit(None, Some(100))?;
//!
//! // execute the plan
//! let results: Vec<RecordBatch> = df.collect().await?;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index bdb01e205..b3b7ba948 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Repartition optimizer that introduces repartition nodes to increase the
level of parallism available
+//! Repartition optimizer that introduces repartition nodes to increase the
level of parallelism available
use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
@@ -326,7 +326,16 @@ mod tests {
fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
- 100,
+ None,
+ Some(100),
+ ))
+ }
+
+ fn limit_exec_with_skip(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
+ Arc::new(GlobalLimitExec::new(
+ Arc::new(LocalLimitExec::new(input, 100)),
+ Some(5),
+ Some(100),
))
}
@@ -395,8 +404,25 @@ mod tests {
let plan = limit_exec(filter_exec(parquet_exec()));
let expected = &[
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
+ "FilterExec: c1@0",
+ // nothing sorts the data, so the local limit doesn't require
sorted data either
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_unsorted_limit_with_skip() -> Result<()> {
+ let plan = limit_exec_with_skip(filter_exec(parquet_exec()));
+
+ let expected = &[
+ "GlobalLimitExec: skip=5, fetch=100",
+ "LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require
sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10)",
@@ -412,8 +438,8 @@ mod tests {
let plan = limit_exec(sort_exec(parquet_exec()));
let expected = &[
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions=[x], projection=[c1]",
@@ -428,8 +454,8 @@ mod tests {
let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
let expected = &[
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be
wrong
@@ -449,13 +475,38 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
+ "FilterExec: c1@0",
+ // repartition should happen prior to the filter to maximize
parallelism
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
+ // Expect no repartition to happen for local limit
+ "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_ignores_limit_with_skip() -> Result<()> {
+ let plan = aggregate(limit_exec_with_skip(filter_exec(limit_exec(
+ parquet_exec(),
+ ))));
+
+ let expected = &[
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "GlobalLimitExec: skip=5, fetch=100",
+ "LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize
parallelism
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "GlobalLimitExec: limit=100",
- "LocalLimitExec: limit=100",
+ "GlobalLimitExec: skip=None, fetch=100",
+ "LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions=[x], projection=[c1]",
];
diff --git a/datafusion/core/src/physical_plan/empty.rs
b/datafusion/core/src/physical_plan/empty.rs
index bba87e190..c693764c8 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -41,6 +41,8 @@ pub struct EmptyExec {
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
+ /// Number of partitions
+ partitions: usize,
}
impl EmptyExec {
@@ -49,9 +51,16 @@ impl EmptyExec {
EmptyExec {
produce_one_row,
schema,
+ partitions: 1,
}
}
+ /// Create a new EmptyExec with specified partition number
+ pub fn with_partitions(mut self, partitions: usize) -> Self {
+ self.partitions = partitions;
+ self
+ }
+
/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
@@ -95,7 +104,7 @@ impl ExecutionPlan for EmptyExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
+ Partitioning::UnknownPartitioning(self.partitions)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -118,11 +127,11 @@ impl ExecutionPlan for EmptyExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
debug!("Start EmptyExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
- // GlobalLimitExec has a single output partition
- if 0 != partition {
+
+ if partition >= self.partitions {
return Err(DataFusionError::Internal(format!(
- "EmptyExec invalid partition {} (expected 0)",
- partition
+ "EmptyExec invalid partition {} (expected less than {})",
+ partition, self.partitions
)));
}
@@ -226,4 +235,23 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn produce_one_row_multiple_partition() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let schema = test_util::aggr_test_schema();
+ let partitions = 3;
+ let empty = EmptyExec::new(true, schema).with_partitions(partitions);
+
+ for n in 0..partitions {
+ let iter = empty.execute(n, task_ctx.clone())?;
+ let batches = common::collect(iter).await?;
+
+ // should have one item
+ assert_eq!(batches.len(), 1);
+ }
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 6ad93d3e4..73566b823 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -17,15 +17,14 @@
//! Defines the LIMIT plan
+use futures::stream::Stream;
+use futures::stream::StreamExt;
+use log::debug;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use futures::stream::Stream;
-use futures::stream::StreamExt;
-use log::debug;
-
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
@@ -49,31 +48,28 @@ use crate::execution::context::TaskContext;
pub struct GlobalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
- /// Maximum number of rows to return
- limit: usize,
+ /// Number of rows to skip before fetch
+ skip: Option<usize>,
+ /// Maximum number of rows to fetch
+ fetch: Option<usize>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
impl GlobalLimitExec {
/// Create a new GlobalLimitExec
- pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
+ pub fn new(
+ input: Arc<dyn ExecutionPlan>,
+ skip: Option<usize>,
+ fetch: Option<usize>,
+ ) -> Self {
GlobalLimitExec {
input,
- limit,
+ skip,
+ fetch,
metrics: ExecutionPlanMetricsSet::new(),
}
}
-
- /// Input execution plan
- pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
- &self.input
- }
-
- /// Maximum number of rows to return
- pub fn limit(&self) -> usize {
- self.limit
- }
}
impl ExecutionPlan for GlobalLimitExec {
@@ -121,7 +117,8 @@ impl ExecutionPlan for GlobalLimitExec {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(GlobalLimitExec::new(
children[0].clone(),
- self.limit,
+ self.skip,
+ self.fetch,
)))
}
@@ -153,7 +150,8 @@ impl ExecutionPlan for GlobalLimitExec {
let stream = self.input.execute(0, context)?;
Ok(Box::pin(LimitStream::new(
stream,
- self.limit,
+ self.skip,
+ self.fetch,
baseline_metrics,
)))
}
@@ -165,7 +163,12 @@ impl ExecutionPlan for GlobalLimitExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(f, "GlobalLimitExec: limit={}", self.limit)
+ write!(
+ f,
+ "GlobalLimitExec: skip={}, fetch={}",
+ self.skip.map_or("None".to_string(), |x| x.to_string()),
+ self.fetch.map_or("None".to_string(), |x| x.to_string())
+ )
}
}
}
@@ -176,21 +179,33 @@ impl ExecutionPlan for GlobalLimitExec {
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
+ let skip = self.skip.unwrap_or(0);
match input_stats {
- // if the input does not reach the limit globally, return input
stats
- Statistics {
- num_rows: Some(nr), ..
- } if nr <= self.limit => input_stats,
- // if the input is greater than the limit, the num_row will be the
limit
- // but we won't be able to predict the other statistics
Statistics {
num_rows: Some(nr), ..
- } if nr > self.limit => Statistics {
- num_rows: Some(self.limit),
- is_exact: input_stats.is_exact,
- ..Default::default()
- },
- // if we don't know the input size, we can't predict the limit's
behaviour
+ } => {
+ if nr <= skip {
+ // if all input data will be skipped, return 0
+ Statistics {
+ num_rows: Some(0),
+ is_exact: input_stats.is_exact,
+ ..Default::default()
+ }
+ } else if nr - skip <= self.fetch.unwrap_or(usize::MAX) {
+ // if the input does not reach the "fetch" globally,
return input stats
+ input_stats
+ } else if nr - skip > self.fetch.unwrap_or(usize::MAX) {
+ // if the input is greater than the "fetch", the num_row
will be the "fetch",
+ // but we won't be able to predict the other statistics
+ Statistics {
+ num_rows: self.fetch,
+ is_exact: input_stats.is_exact,
+ ..Default::default()
+ }
+ } else {
+ Statistics::default()
+ }
+ }
_ => Statistics::default(),
}
}
@@ -202,30 +217,20 @@ pub struct LocalLimitExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
- limit: usize,
+ fetch: usize,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
impl LocalLimitExec {
/// Create a new LocalLimitExec partition
- pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
+ pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
Self {
input,
- limit,
+ fetch,
metrics: ExecutionPlanMetricsSet::new(),
}
}
-
- /// Input execution plan
- pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
- &self.input
- }
-
- /// Maximum number of rows to return
- pub fn limit(&self) -> usize {
- self.limit
- }
}
impl ExecutionPlan for LocalLimitExec {
@@ -271,7 +276,7 @@ impl ExecutionPlan for LocalLimitExec {
match children.len() {
1 => Ok(Arc::new(LocalLimitExec::new(
children[0].clone(),
- self.limit,
+ self.fetch,
))),
_ => Err(DataFusionError::Internal(
"LocalLimitExec wrong number of children".to_string(),
@@ -289,7 +294,8 @@ impl ExecutionPlan for LocalLimitExec {
let stream = self.input.execute(partition, context)?;
Ok(Box::pin(LimitStream::new(
stream,
- self.limit,
+ None,
+ Some(self.fetch),
baseline_metrics,
)))
}
@@ -301,7 +307,7 @@ impl ExecutionPlan for LocalLimitExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(f, "LocalLimitExec: limit={}", self.limit)
+ write!(f, "LocalLimitExec: fetch={}", self.fetch)
}
}
}
@@ -316,14 +322,14 @@ impl ExecutionPlan for LocalLimitExec {
// if the input does not reach the limit globally, return input
stats
Statistics {
num_rows: Some(nr), ..
- } if nr <= self.limit => input_stats,
+ } if nr <= self.fetch => input_stats,
// if the input is greater than the limit, the num_row will be
greater
// than the limit because the partitions will be limited separatly
// the statistic
Statistics {
num_rows: Some(nr), ..
- } if nr > self.limit => Statistics {
- num_rows: Some(self.limit),
+ } if nr > self.fetch => Statistics {
+ num_rows: Some(self.fetch),
// this is not actually exact, but will be when GlobalLimit is
applied
// TODO stats: find a more explicit way to vehiculate this
information
is_exact: input_stats.is_exact,
@@ -344,17 +350,21 @@ pub fn truncate_batch(batch: &RecordBatch, n: usize) ->
RecordBatch {
RecordBatch::try_new(batch.schema(), limited_columns).unwrap()
}
-/// A Limit stream limits the stream to up to `limit` rows.
+/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
struct LimitStream {
- /// The maximum number of rows to produce
- limit: usize,
+ /// The number of rows to skip
+ skip: usize,
+ /// The maximum number of rows to produce, after `skip` are skipped
+ fetch: usize,
/// The input to read from. This is set to None once the limit is
/// reached to enable early termination
input: Option<SendableRecordBatchStream>,
/// Copy of the input schema
schema: SchemaRef,
- // the current number of rows which have been produced
- current_len: usize,
+ /// Number of rows have already skipped
+ current_skipped: usize,
+ /// the current number of rows which have been produced
+ current_fetched: usize,
/// Execution time metrics
baseline_metrics: BaselineMetrics,
}
@@ -362,31 +372,69 @@ struct LimitStream {
impl LimitStream {
fn new(
input: SendableRecordBatchStream,
- limit: usize,
+ skip: Option<usize>,
+ fetch: Option<usize>,
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = input.schema();
Self {
- limit,
+ skip: skip.unwrap_or(0),
+ fetch: fetch.unwrap_or(usize::MAX),
input: Some(input),
schema,
- current_len: 0,
+ current_skipped: 0,
+ current_fetched: 0,
baseline_metrics,
}
}
+ fn poll_and_skip(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+ let input = self.input.as_mut().unwrap();
+ loop {
+ let poll = input.poll_next_unpin(cx);
+ let poll = poll.map_ok(|batch| {
+ if batch.num_rows() + self.current_skipped <= self.skip {
+ self.current_skipped += batch.num_rows();
+ RecordBatch::new_empty(input.schema())
+ } else {
+ let offset = self.skip - self.current_skipped;
+ let new_batch = batch.slice(offset, batch.num_rows() -
offset);
+ self.current_skipped = self.skip;
+ new_batch
+ }
+ });
+
+ match &poll {
+ Poll::Ready(Some(Ok(batch)))
+ if batch.num_rows() > 0 && self.current_skipped ==
self.skip =>
+ {
+ break poll
+ }
+ Poll::Ready(Some(Err(_e))) => break poll,
+ Poll::Ready(None) => break poll,
+ Poll::Pending => break poll,
+ _ => {
+ // continue to poll input stream
+ }
+ }
+ }
+ }
+
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
- if self.current_len == self.limit {
+ if self.current_fetched == self.fetch {
self.input = None; // clear input so it can be dropped early
None
- } else if self.current_len + batch.num_rows() <= self.limit {
- self.current_len += batch.num_rows();
+ } else if self.current_fetched + batch.num_rows() <= self.fetch {
+ self.current_fetched += batch.num_rows();
Some(batch)
} else {
- let batch_rows = self.limit - self.current_len;
- self.current_len = self.limit;
+ let batch_rows = self.fetch - self.current_fetched;
+ self.current_fetched = self.fetch;
self.input = None; // clear input so it can be dropped early
Some(truncate_batch(&batch, batch_rows))
}
@@ -400,11 +448,20 @@ impl Stream for LimitStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ let fetch_started = self.current_skipped == self.skip;
let poll = match &mut self.input {
- Some(input) => input.poll_next_unpin(cx).map(|x| match x {
- Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
- other => other,
- }),
+ Some(input) => {
+ let poll = if fetch_started {
+ input.poll_next_unpin(cx)
+ } else {
+ self.poll_and_skip(cx)
+ };
+
+ poll.map(|x| match x {
+ Some(Ok(batch)) =>
Ok(self.stream_limit(batch)).transpose(),
+ other => other,
+ })
+ }
// input has been cleared
None => Poll::Ready(None),
};
@@ -442,7 +499,11 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
- let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 7);
+ let limit = GlobalLimitExec::new(
+ Arc::new(CoalescePartitionsExec::new(csv)),
+ None,
+ Some(7),
+ );
// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0, task_ctx)?;
@@ -472,7 +533,8 @@ mod tests {
// limit of six needs to consume the entire first record batch
// (5 rows) and 1 row from the second (1 row)
let baseline_metrics =
BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
- let limit_stream = LimitStream::new(Box::pin(input), 6,
baseline_metrics);
+ let limit_stream =
+ LimitStream::new(Box::pin(input), None, Some(6), baseline_metrics);
assert_eq!(index.value(), 0);
let results = collect(Box::pin(limit_stream)).await.unwrap();
@@ -485,4 +547,75 @@ mod tests {
Ok(())
}
+
+ // test cases for "skip"
+ async fn skip_and_fetch(skip: Option<usize>, fetch: Option<usize>) ->
Result<usize> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let num_partitions = 4;
+ let csv = test::scan_partitioned_csv(num_partitions)?;
+
+ assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
+
+ let offset =
+ GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)),
skip, fetch);
+
+ // the result should contain 4 batches (one per input partition)
+ let iter = offset.execute(0, task_ctx)?;
+ let batches = common::collect(iter).await?;
+ Ok(batches.iter().map(|batch| batch.num_rows()).sum())
+ }
+
+ #[tokio::test]
+ async fn skip_none_fetch_none() -> Result<()> {
+ let row_count = skip_and_fetch(None, None).await?;
+ assert_eq!(row_count, 100);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_none_fetch_50() -> Result<()> {
+ let row_count = skip_and_fetch(None, Some(50)).await?;
+ assert_eq!(row_count, 50);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_3_fetch_none() -> Result<()> {
+ // there are total of 100 rows, we skipped 3 rows (offset = 3)
+ let row_count = skip_and_fetch(Some(3), None).await?;
+ assert_eq!(row_count, 97);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_3_fetch_10() -> Result<()> {
+ // there are total of 100 rows, we skipped 3 rows (offset = 3)
+ let row_count = skip_and_fetch(Some(3), Some(10)).await?;
+ assert_eq!(row_count, 10);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_100_fetch_none() -> Result<()> {
+ let row_count = skip_and_fetch(Some(100), None).await?;
+ assert_eq!(row_count, 0);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_100_fetch_1() -> Result<()> {
+ let row_count = skip_and_fetch(Some(100), Some(1)).await?;
+ assert_eq!(row_count, 0);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn skip_101_fetch_none() -> Result<()> {
+ // there are total of 100 rows, we skipped 101 rows (offset = 3)
+ let row_count = skip_and_fetch(Some(101), None).await?;
+ assert_eq!(row_count, 0);
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 743ed7e5d..642d9ef28 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -375,7 +375,7 @@ impl DefaultPhysicalPlanner {
source,
projection,
filters,
- limit,
+ fetch,
..
}) => {
let source = source_as_provider(source)?;
@@ -384,7 +384,7 @@ impl DefaultPhysicalPlanner {
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let unaliased: Vec<Expr> =
filters.into_iter().map(unalias).collect();
- source.scan(session_state, projection, &unaliased,
*limit).await
+ source.scan(session_state, projection, &unaliased,
*fetch).await
}
LogicalPlan::Values(Values {
values,
@@ -897,8 +897,7 @@ impl DefaultPhysicalPlanner {
_ => Err(DataFusionError::Plan("SubqueryAlias should
only wrap TableScan".to_string()))
}
}
- LogicalPlan::Limit(Limit { input, n, .. }) => {
- let limit = *n;
+ LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => {
let input = self.create_initial_plan(input,
session_state).await?;
// GlobalLimitExec requires a single partition for input
@@ -907,15 +906,14 @@ impl DefaultPhysicalPlanner {
} else {
// Apply a LocalLimitExec to each partition. The
optimizer will also insert
// a CoalescePartitionsExec between the
GlobalLimitExec and LocalLimitExec
- Arc::new(LocalLimitExec::new(input, limit))
+ if let Some(fetch) = fetch {
+ Arc::new(LocalLimitExec::new(input, *fetch +
skip.unwrap_or(0)))
+ } else {
+ input
+ }
};
- Ok(Arc::new(GlobalLimitExec::new(input, limit)))
- }
- LogicalPlan::Offset(_) => {
- Err(DataFusionError::Internal(
- "Unsupported logical plan: OFFSET".to_string(),
- ))
+ Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
@@ -1298,7 +1296,7 @@ mod tests {
};
use crate::prelude::{SessionConfig, SessionContext};
use crate::scalar::ScalarValue;
- use crate::test_util::scan_empty;
+ use crate::test_util::{scan_empty, scan_empty_with_partitions};
use crate::{
logical_plan::LogicalPlanBuilder,
physical_plan::SendableRecordBatchStream,
};
@@ -1334,7 +1332,7 @@ mod tests {
.project(vec![col("c1"), col("c2")])?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.sort(vec![col("c1").sort(true, true)])?
- .limit(10)?
+ .limit(Some(3), Some(10))?
.build()?;
let plan = plan(&logical_plan).await?;
@@ -1372,14 +1370,44 @@ mod tests {
let logical_plan = test_csv_scan()
.await?
.filter(col("c7").lt(col("c12")))?
+ .limit(Some(3), None)?
.build()?;
let plan = plan(&logical_plan).await?;
// c12 is f64, c7 is u8 -> cast c7 to f64
// the cast here is implicit so has CastOptions with safe=true
- let expected = "predicate: BinaryExpr { left: TryCastExpr { expr:
Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column
{ name: \"c12\", index: 11 } }";
- assert!(format!("{:?}", plan).contains(expected));
+ let _expected = "predicate: BinaryExpr { left: TryCastExpr { expr:
Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column
{ name: \"c12\", index: 11 } }";
+ let plan_debug_str = format!("{:?}", plan);
+ assert!(plan_debug_str.contains("GlobalLimitExec"));
+ assert!(plan_debug_str.contains("skip: Some(3)"));
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_with_zero_offset_plan() -> Result<()> {
+ let logical_plan = test_csv_scan().await?.limit(Some(0),
None)?.build()?;
+ let plan = plan(&logical_plan).await?;
+ assert!(format!("{:?}", plan).contains("GlobalLimitExec"));
+ assert!(format!("{:?}", plan).contains("skip: Some(0)"));
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_limit_with_partitions() -> Result<()> {
+ let schema = Schema::new(vec![Field::new("id", DataType::Int32,
false)]);
+
+ let logical_plan = scan_empty_with_partitions(Some("test"), &schema,
None, 2)?
+ .limit(Some(3), Some(5))?
+ .build()?;
+ let plan = plan(&logical_plan).await?;
+
+ assert!(format!("{:?}", plan).contains("GlobalLimitExec"));
+ assert!(format!("{:?}", plan).contains("skip: Some(3), fetch:
Some(5)"));
+
+ // LocalLimitExec adjusts the `fetch`
+ assert!(format!("{:?}", plan).contains("LocalLimitExec"));
+ assert!(format!("{:?}", plan).contains("fetch: 8"));
Ok(())
}
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 1a6a5028e..ee53a424c 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -250,6 +250,22 @@ pub fn scan_empty(
)
}
+/// Scan an empty data source with configured partition, mainly used in tests.
+pub fn scan_empty_with_partitions(
+ name: Option<&str>,
+ table_schema: &Schema,
+ projection: Option<Vec<usize>>,
+ partitions: usize,
+) -> Result<LogicalPlanBuilder, DataFusionError> {
+ let table_schema = Arc::new(table_schema.clone());
+ let provider =
Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
+ LogicalPlanBuilder::scan(
+ name.unwrap_or(UNNAMED_TABLE),
+ provider_as_source(provider),
+ projection,
+ )
+}
+
/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
diff --git a/datafusion/core/tests/dataframe_functions.rs
b/datafusion/core/tests/dataframe_functions.rs
index 48e39f370..b126d010c 100644
--- a/datafusion/core/tests/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe_functions.rs
@@ -72,7 +72,7 @@ macro_rules! assert_fn_batches {
};
($EXPR:expr, $EXPECTED: expr, $LIMIT: expr) => {
let df = create_test_table()?;
- let df = df.select(vec![$EXPR])?.limit($LIMIT)?;
+ let df = df.select(vec![$EXPR])?.limit(None, Some($LIMIT))?;
let batches = df.collect().await?;
assert_batches_eq!($EXPECTED, &batches);
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index a25543b34..c9719aad6 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -71,12 +71,12 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
- "GlobalLimitExec: limit=3, ",
+ "GlobalLimitExec: skip=None, fetch=3, ",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
&formatted,
- "LocalLimitExec: limit=3",
+ "LocalLimitExec: fetch=3",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
@@ -656,7 +656,7 @@ async fn test_physical_plan_display_indent() {
let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let expected = vec![
- "GlobalLimitExec: limit=10",
+ "GlobalLimitExec: skip=None, fetch=10",
" SortExec: [the_min@2 DESC]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
diff --git a/datafusion/core/tests/sql/limit.rs
b/datafusion/core/tests/sql/limit.rs
index 6cfc423de..646074209 100644
--- a/datafusion/core/tests/sql/limit.rs
+++ b/datafusion/core/tests/sql/limit.rs
@@ -184,3 +184,84 @@ async fn limit_multi_partitions() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn csv_offset_without_limit_99() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 99";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+----+",
+ "| c1 |",
+ "+----+",
+ "| e |",
+ "+----+"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_offset_without_limit_100() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 100";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec!["++", "++"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_offset_without_limit_101() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 101";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec!["++", "++"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 OFFSET 2 LIMIT 2";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ #[rustfmt::skip]
+ let expected = vec![
+ "+----+",
+ "| c1 |",
+ "+----+",
+ "| b |",
+ "| a |",
+ "+----+"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset_the_same_as_nbr_of_rows() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 1 OFFSET 100";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec!["++", "++"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn csv_query_offset_bigger_than_nbr_of_rows() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 1 OFFSET 101";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec!["++", "++"];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
diff --git a/datafusion/core/tests/user_defined_plan.rs
b/datafusion/core/tests/user_defined_plan.rs
index de94c63b5..33cfd1f56 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -290,7 +290,12 @@ impl OptimizerRule for TopKOptimizerRule {
// Note: this code simply looks for the pattern of a Limit followed by
a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
- if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan {
+ if let LogicalPlan::Limit(Limit {
+ fetch: Some(fetch),
+ input,
+ ..
+ }) = plan
+ {
if let LogicalPlan::Sort(Sort {
ref expr,
ref input,
@@ -300,7 +305,7 @@ impl OptimizerRule for TopKOptimizerRule {
// we found a sort with a single sort expr, replace with a
a TopK
return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
- k: *n,
+ k: *fetch,
input: self.optimize(input.as_ref(),
optimizer_config)?,
expr: expr[0].clone(),
}),
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 917606144..d0309b95f 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -22,9 +22,9 @@ use crate::utils::{columnize_expr, exprlist_to_fields,
from_plan};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, EmptyRelation, Explain, Filter, Join,
- JoinConstraint, JoinType, Limit, LogicalPlan, Offset, Partitioning,
PlanType,
- Projection, Repartition, Sort, SubqueryAlias, TableScan,
ToStringifiedPlan,
- Union, Values, Window,
+ JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection,
+ Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Values,
+ Window,
},
utils::{
expand_qualified_wildcard, expand_wildcard, expr_to_columns,
@@ -233,7 +233,7 @@ impl LogicalPlanBuilder {
projected_schema: Arc::new(projected_schema),
projection,
filters,
- limit: None,
+ fetch: None,
});
Ok(Self::from(table_scan))
}
@@ -291,18 +291,16 @@ impl LogicalPlanBuilder {
})))
}
- /// Apply a limit
- pub fn limit(&self, n: usize) -> Result<Self> {
+ /// Limit the number of rows returned
+ ///
+ /// `skip` - Number of rows to skip before fetch any row.
+ ///
+ /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
+ /// if specified.
+ pub fn limit(&self, skip: Option<usize>, fetch: Option<usize>) ->
Result<Self> {
Ok(Self::from(LogicalPlan::Limit(Limit {
- n,
- input: Arc::new(self.plan.clone()),
- })))
- }
-
- /// Apply an offset
- pub fn offset(&self, offset: usize) -> Result<Self> {
- Ok(Self::from(LogicalPlan::Offset(Offset {
- offset,
+ skip,
+ fetch,
input: Arc::new(self.plan.clone()),
})))
}
@@ -1026,15 +1024,13 @@ mod tests {
vec![sum(col("salary")).alias("total_salary")],
)?
.project(vec![col("state"), col("total_salary")])?
- .limit(10)?
- .offset(2)?
+ .limit(Some(2), Some(10))?
.build()?;
- let expected = "Offset: 2\
- \n Limit: 10\
- \n Projection: #employee_csv.state, #total_salary\
- \n Aggregate: groupBy=[[#employee_csv.state]],
aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
- \n TableScan: employee_csv projection=Some([state, salary])";
+ let expected = "Limit: skip=2, fetch=10\
+ \n Projection: #employee_csv.state, #total_salary\
+ \n Aggregate: groupBy=[[#employee_csv.state]],
aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
+ \n TableScan: employee_csv projection=Some([state, salary])";
assert_eq!(expected, format!("{:?}", plan));
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 022f50ea3..c476300f1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -71,10 +71,8 @@ pub enum LogicalPlan {
Subquery(Subquery),
/// Aliased relation provides, or changes, the name of a relation.
SubqueryAlias(SubqueryAlias),
- /// Produces the first `n` tuples from its input and discards the rest.
+ /// Skip some number of rows, and then fetch some number of rows.
Limit(Limit),
- /// Adjusts the starting point at which the rest of the expressions begin
to effect
- Offset(Offset),
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
@@ -119,7 +117,6 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) =>
input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
- LogicalPlan::Offset(Offset { input, .. }) => input.schema(),
LogicalPlan::Subquery(Subquery { subquery, .. }) =>
subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, ..
}) => {
@@ -190,7 +187,6 @@ impl LogicalPlan {
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
- | LogicalPlan::Offset(Offset { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
@@ -245,7 +241,6 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
- | LogicalPlan::Offset(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateView(_)
@@ -274,7 +269,6 @@ impl LogicalPlan {
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) =>
vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
- LogicalPlan::Offset(Offset { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) =>
vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -415,7 +409,6 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Offset(Offset { input, .. }) =>
input.accept(visitor)?,
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
subquery.accept(visitor)?
}
@@ -653,7 +646,7 @@ impl LogicalPlan {
ref table_name,
ref projection,
ref filters,
- ref limit,
+ ref fetch,
..
}) => {
let projected_fields = match projection {
@@ -710,8 +703,8 @@ impl LogicalPlan {
}
}
- if let Some(n) = limit {
- write!(f, ", limit={}", n)?;
+ if let Some(n) = fetch {
+ write!(f, ", fetch={}", n)?;
}
Ok(())
@@ -816,9 +809,17 @@ impl LogicalPlan {
)
}
},
- LogicalPlan::Limit(Limit { ref n, .. }) => write!(f,
"Limit: {}", n),
- LogicalPlan::Offset(Offset { ref offset, .. }) => {
- write!(f, "Offset: {}", offset)
+ LogicalPlan::Limit(Limit {
+ ref skip,
+ ref fetch,
+ ..
+ }) => {
+ write!(
+ f,
+ "Limit: skip={}, fetch={}",
+ skip.map_or("None".to_string(), |x| x.to_string()),
+ fetch.map_or_else(|| "None".to_string(), |x|
x.to_string())
+ )
}
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
write!(f, "Subquery: {:?}", subquery)
@@ -1037,8 +1038,8 @@ pub struct TableScan {
pub projected_schema: DFSchemaRef,
/// Optional expressions to be used as filters by the table provider
pub filters: Vec<Expr>,
- /// Optional limit to skip reading
- pub limit: Option<usize>,
+ /// Optional number of rows to read
+ pub fetch: Option<usize>,
}
/// Apply Cross Join to two logical plans
@@ -1166,8 +1167,10 @@ pub struct Extension {
/// Produces the first `n` tuples from its input and discards the rest.
#[derive(Clone)]
pub struct Limit {
- /// The limit
- pub n: usize,
+ /// Number of rows to skip before fetch
+ pub skip: Option<usize>,
+ /// Maximum number of rows to fetch
+ pub fetch: Option<usize>,
/// The logical plan
pub input: Arc<LogicalPlan>,
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3986eb3e6..065e6120b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -21,8 +21,8 @@ use crate::expr_visitor::{ExprVisitable, ExpressionVisitor,
Recursion};
use crate::logical_plan::builder::build_join_schema;
use crate::logical_plan::{
Aggregate, Analyze, CreateMemoryTable, CreateView, Extension, Filter,
Join, Limit,
- Offset, Partitioning, Projection, Repartition, Sort, Subquery,
SubqueryAlias, Union,
- Values, Window,
+ Partitioning, Projection, Repartition, Sort, Subquery, SubqueryAlias,
Union, Values,
+ Window,
};
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::{
@@ -427,12 +427,9 @@ pub fn from_plan(
schema,
}))
}
- LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
- n: *n,
- input: Arc::new(inputs[0].clone()),
- })),
- LogicalPlan::Offset(Offset { offset, .. }) =>
Ok(LogicalPlan::Offset(Offset {
- offset: *offset,
+ LogicalPlan::Limit(Limit { skip, fetch, .. }) =>
Ok(LogicalPlan::Limit(Limit {
+ skip: *skip,
+ fetch: *fetch,
input: Arc::new(inputs[0].clone()),
})),
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index bc635c215..458714bd7 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -222,7 +222,6 @@ fn optimize(
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
- | LogicalPlan::Offset(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
diff --git a/datafusion/optimizer/src/eliminate_limit.rs
b/datafusion/optimizer/src/eliminate_limit.rs
index f6d3b8472..822b03a4a 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -42,12 +42,14 @@ impl OptimizerRule for EliminateLimit {
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
- Ok(LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema: input.schema().clone(),
- }))
- }
+ LogicalPlan::Limit(Limit {
+ fetch: Some(0),
+ input,
+ ..
+ }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: input.schema().clone(),
+ })),
// Rest: recurse and find possible LIMIT 0 nodes
_ => {
let expr = plan.expressions();
@@ -91,7 +93,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
- .limit(0)
+ .limit(None, Some(0))
.unwrap()
.build()
.unwrap();
@@ -112,7 +114,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
- .limit(0)
+ .limit(None, Some(0))
.unwrap()
.union(plan1)
.unwrap()
diff --git a/datafusion/optimizer/src/filter_push_down.rs
b/datafusion/optimizer/src/filter_push_down.rs
index 8d2d9c06d..7b3b6a21f 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -558,7 +558,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
filters,
projection,
table_name,
- limit,
+ fetch,
}) => {
let mut used_columns = HashSet::new();
let mut new_filters = filters.clone();
@@ -594,7 +594,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
projected_schema: projected_schema.clone(),
table_name: table_name.clone(),
filters: new_filters,
- limit: *limit,
+ fetch: *fetch,
}),
)
}
@@ -693,13 +693,13 @@ mod tests {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
- .limit(10)?
+ .limit(None, Some(10))?
.filter(col("a").eq(lit(1i64)))?
.build()?;
// filter is before single projection
let expected = "\
Filter: #test.a = Int64(1)\
- \n Limit: 10\
+ \n Limit: skip=None, fetch=10\
\n Projection: #test.a, #test.b\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -945,8 +945,8 @@ mod tests {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
- .limit(20)?
- .limit(10)?
+ .limit(None, Some(20))?
+ .limit(None, Some(10))?
.project(vec![col("a"), col("b")])?
.filter(col("a").eq(lit(1i64)))?
.build()?;
@@ -954,8 +954,8 @@ mod tests {
let expected = "\
Projection: #test.a, #test.b\
\n Filter: #test.a = Int64(1)\
- \n Limit: 10\
- \n Limit: 20\
+ \n Limit: skip=None, fetch=10\
+ \n Limit: skip=None, fetch=20\
\n Projection: #test.a, #test.b\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -1008,7 +1008,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
.filter(col("a").lt_eq(lit(1i64)))?
- .limit(1)?
+ .limit(None, Some(1))?
.project(vec![col("a")])?
.filter(col("a").gt_eq(lit(1i64)))?
.build()?;
@@ -1019,7 +1019,7 @@ mod tests {
format!("{:?}", plan),
"Filter: #test.a >= Int64(1)\
\n Projection: #test.a\
- \n Limit: 1\
+ \n Limit: skip=None, fetch=1\
\n Filter: #test.a <= Int64(1)\
\n Projection: #test.a\
\n TableScan: test projection=None"
@@ -1028,7 +1028,7 @@ mod tests {
let expected = "\
Projection: #test.a\
\n Filter: #test.a >= Int64(1)\
- \n Limit: 1\
+ \n Limit: skip=None, fetch=1\
\n Projection: #test.a\
\n Filter: #test.a <= Int64(1)\
\n TableScan: test projection=None";
@@ -1042,7 +1042,7 @@ mod tests {
fn two_filters_on_same_depth() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .limit(1)?
+ .limit(None, Some(1))?
.filter(col("a").lt_eq(lit(1i64)))?
.filter(col("a").gt_eq(lit(1i64)))?
.project(vec![col("a")])?
@@ -1054,14 +1054,14 @@ mod tests {
"Projection: #test.a\
\n Filter: #test.a >= Int64(1)\
\n Filter: #test.a <= Int64(1)\
- \n Limit: 1\
+ \n Limit: skip=None, fetch=1\
\n TableScan: test projection=None"
);
let expected = "\
Projection: #test.a\
\n Filter: #test.a >= Int64(1) AND #test.a <= Int64(1)\
- \n Limit: 1\
+ \n Limit: skip=None, fetch=1\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -1742,7 +1742,7 @@ mod tests {
)?),
projection: None,
source: Arc::new(test_provider),
- limit: None,
+ fetch: None,
});
LogicalPlanBuilder::from(table_scan)
@@ -1815,7 +1815,7 @@ mod tests {
)?),
projection: Some(vec![0]),
source: Arc::new(test_provider),
- limit: None,
+ fetch: None,
});
let plan = LogicalPlanBuilder::from(table_scan)
diff --git a/datafusion/optimizer/src/limit_push_down.rs
b/datafusion/optimizer/src/limit_push_down.rs
index 91f976001..8e47048db 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -20,9 +20,7 @@
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
- logical_plan::{
- Join, JoinType, Limit, LogicalPlan, Offset, Projection, TableScan,
Union,
- },
+ logical_plan::{Join, JoinType, Limit, LogicalPlan, Projection, TableScan,
Union},
utils::from_plan,
};
use std::sync::Arc;
@@ -43,61 +41,84 @@ impl LimitPushDown {
/// when traversing down related to "limit push down".
enum Ancestor {
/// Limit
- FromLimit,
- /// Offset
- FromOffset,
+ FromLimit {
+ skip: Option<usize>,
+ fetch: Option<usize>,
+ },
/// Other nodes that don't affect the adjustment of "Limit"
NotRelevant,
}
///
-/// When doing limit push down with "offset" and "limit" during traversal,
-/// the "limit" should be adjusted.
-/// limit_push_down is a recursive function that tracks three important
information
-/// to make the adjustment.
+/// When doing limit push down with "skip" and "fetch" during traversal,
+/// the "fetch" should be adjusted.
+/// "Ancestor" is pushed down the plan tree, so that the current node
+/// can adjust it's own "fetch".
+///
+/// If the current node is a Limit, its "fetch" is updated by:
+/// 1. extended_fetch = extended the "fetch" with ancestor's "skip".
+/// 2. min(extended_fetch, current node's fetch)
+///
+/// Current node's "skip" is never updated, it is
+/// just a hint for the child to extend its "fetch".
///
-/// 1. ancestor: the kind of Ancestor.
-/// 2. ancestor_offset: ancestor's offset value
-/// 3. ancestor_limit: ancestor's limit value
+/// When building a new Limit in Union, the "fetch" is calculated
+/// by using ancestor's "fetch" and "skip".
+///
+/// When finally assign "limit" in TableScan, the "limit" is calculated
+/// by using ancestor's "fetch" and "skip".
///
-/// (ancestor_offset, ancestor_limit) is updated in the following cases
-/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
-/// When the ancestor is a "Limit" and the current node is a "Limit",
-/// it is updated to (None, min(n1, n2))).
-/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
-/// it is updated to (m1, n1 + m1).
-/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
-/// it is updated to (m2, None).
-/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
-/// it is updated to (None, n1). Note that this won't happen when we
-/// generate the plan from SQL, it can happen when we build the plan
-/// using LogicalPlanBuilder.
fn limit_push_down(
_optimizer: &LimitPushDown,
ancestor: Ancestor,
- ancestor_offset: Option<usize>,
- ancestor_limit: Option<usize>,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- match (plan, ancestor_limit) {
- (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
- let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
- Ancestor::FromLimit | Ancestor::FromOffset => (
- None,
- Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
- ),
- Ancestor::NotRelevant => (None, Some(*n)),
+ match (plan, ancestor) {
+ (
+ LogicalPlan::Limit(Limit {
+ skip: current_skip,
+ fetch: current_fetch,
+ input,
+ }),
+ ancestor,
+ ) => {
+ let new_current_fetch = match ancestor {
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: ancestor_fetch,
+ } => {
+ if let Some(fetch) = current_fetch {
+ // extend ancestor's fetch
+ let ancestor_fetch =
+ ancestor_fetch.map(|f| f +
ancestor_skip.unwrap_or(0));
+
+ let new_current_fetch =
+ ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x,
*fetch));
+
+ Some(new_current_fetch)
+ } else {
+ // we dont have a "fetch", and we can push down our
parent's "fetch"
+ // extend ancestor's fetch
+ ancestor_fetch.map(|f| f + ancestor_skip.unwrap_or(0))
+ }
+ }
+ _ => *current_fetch,
};
Ok(LogicalPlan::Limit(Limit {
- n: new_ancestor_limit.unwrap_or(*n),
- // push down limit to plan (minimum of upper limit and current
limit)
+ // current node's "skip" is not updated, updating
+ // this value would violate the semantics of Limit operator
+ skip: *current_skip,
+ fetch: new_current_fetch,
input: Arc::new(limit_push_down(
_optimizer,
- Ancestor::FromLimit,
- new_ancestor_offset,
- new_ancestor_limit,
+ Ancestor::FromLimit {
+ // current node's "skip" is passing to the subtree
+ // so that the child can extend the "fetch"
+ skip: *current_skip,
+ fetch: new_current_fetch,
+ },
input.as_ref(),
_optimizer_config,
)?),
@@ -109,20 +130,28 @@ fn limit_push_down(
source,
projection,
filters,
- limit,
+ fetch,
projected_schema,
}),
- Some(ancestor_limit),
- ) => Ok(LogicalPlan::TableScan(TableScan {
- table_name: table_name.clone(),
- source: source.clone(),
- projection: projection.clone(),
- filters: filters.clone(),
- limit: limit
- .map(|x| std::cmp::min(x, ancestor_limit))
- .or(Some(ancestor_limit)),
- projected_schema: projected_schema.clone(),
- })),
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: Some(ancestor_fetch),
+ ..
+ },
+ ) => {
+ let ancestor_fetch =
+ ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
+ Ok(LogicalPlan::TableScan(TableScan {
+ table_name: table_name.clone(),
+ source: source.clone(),
+ projection: projection.clone(),
+ filters: filters.clone(),
+ fetch: fetch
+ .map(|x| std::cmp::min(x, ancestor_fetch))
+ .or(Some(ancestor_fetch)),
+ projected_schema: projected_schema.clone(),
+ }))
+ }
(
LogicalPlan::Projection(Projection {
expr,
@@ -130,7 +159,7 @@ fn limit_push_down(
schema,
alias,
}),
- ancestor_limit,
+ ancestor,
) => {
// Push down limit directly (projection doesn't change number of
rows)
Ok(LogicalPlan::Projection(Projection {
@@ -138,8 +167,6 @@ fn limit_push_down(
input: Arc::new(limit_push_down(
_optimizer,
ancestor,
- ancestor_offset,
- ancestor_limit,
input.as_ref(),
_optimizer_config,
)?),
@@ -153,19 +180,27 @@ fn limit_push_down(
alias,
schema,
}),
- Some(ancestor_limit),
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: Some(ancestor_fetch),
+ ..
+ },
) => {
// Push down limit through UNION
+ let ancestor_fetch =
+ ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
let new_inputs = inputs
.iter()
.map(|x| {
Ok(LogicalPlan::Limit(Limit {
- n: ancestor_limit,
+ skip: None,
+ fetch: Some(ancestor_fetch),
input: Arc::new(limit_push_down(
_optimizer,
- Ancestor::FromLimit,
- None,
- Some(ancestor_limit),
+ Ancestor::FromLimit {
+ skip: None,
+ fetch: Some(ancestor_fetch),
+ },
x,
_optimizer_config,
)?),
@@ -178,52 +213,47 @@ fn limit_push_down(
schema: schema.clone(),
}))
}
- // offset 5 limit 10 then push limit 15 (5 + 10)
- (LogicalPlan::Offset(Offset { offset, input }), ancestor_limit) => {
- let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
- Ancestor::FromLimit => {
- (Some(*offset), ancestor_limit.map(|x| x + *offset))
+ (
+ LogicalPlan::Join(Join { join_type, .. }),
+ Ancestor::FromLimit {
+ skip: ancestor_skip,
+ fetch: Some(ancestor_fetch),
+ ..
+ },
+ ) => {
+ let ancestor_fetch =
+ ancestor_skip.map_or(ancestor_fetch, |x| x + ancestor_fetch);
+ match join_type {
+ JoinType::Left => {
+ //if LeftOuter join push limit to left
+ generate_push_down_join(
+ _optimizer,
+ _optimizer_config,
+ plan,
+ Some(ancestor_fetch),
+ None,
+ )
}
- Ancestor::FromOffset => (Some(*offset), None),
- Ancestor::NotRelevant => (Some(*offset), None),
- };
-
- Ok(LogicalPlan::Offset(Offset {
- offset: *offset,
- input: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromOffset,
- new_ancestor_offset,
- new_ancestor_limit,
- input.as_ref(),
- _optimizer_config,
- )?),
- }))
- }
- (LogicalPlan::Join(Join { join_type, .. }), upper_limit) => match
join_type {
- JoinType::Left => {
- //if LeftOuter join push limit to left
- generate_push_down_join(
+ JoinType::Right =>
+ // If RightOuter join push limit to right
+ {
+ generate_push_down_join(
+ _optimizer,
+ _optimizer_config,
+ plan,
+ None,
+ Some(ancestor_fetch),
+ )
+ }
+ _ => generate_push_down_join(
_optimizer,
_optimizer_config,
plan,
- upper_limit,
None,
- )
- }
- JoinType::Right =>
- // If RightOuter join push limit to right
- {
- generate_push_down_join(
- _optimizer,
- _optimizer_config,
- plan,
None,
- upper_limit,
- )
+ ),
}
- _ => generate_push_down_join(_optimizer, _optimizer_config, plan,
None, None),
- },
+ }
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
_ => push_down_children_limit(_optimizer, _optimizer_config, plan),
@@ -251,17 +281,19 @@ fn generate_push_down_join(
return Ok(LogicalPlan::Join(Join {
left: Arc::new(limit_push_down(
_optimizer,
- Ancestor::FromLimit,
- None,
- left_limit,
+ Ancestor::FromLimit {
+ skip: None,
+ fetch: left_limit,
+ },
left.as_ref(),
_optimizer_config,
)?),
right: Arc::new(limit_push_down(
_optimizer,
- Ancestor::FromLimit,
- None,
- right_limit,
+ Ancestor::FromLimit {
+ skip: None,
+ fetch: right_limit,
+ },
right.as_ref(),
_optimizer_config,
)?),
@@ -292,14 +324,7 @@ fn push_down_children_limit(
let new_inputs = inputs
.iter()
.map(|plan| {
- limit_push_down(
- _optimizer,
- Ancestor::NotRelevant,
- None,
- None,
- plan,
- _optimizer_config,
- )
+ limit_push_down(_optimizer, Ancestor::NotRelevant, plan,
_optimizer_config)
})
.collect::<Result<Vec<_>>>()?;
@@ -312,14 +337,7 @@ impl OptimizerRule for LimitPushDown {
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- limit_push_down(
- self,
- Ancestor::NotRelevant,
- None,
- None,
- plan,
- optimizer_config,
- )
+ limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
}
fn name(&self) -> &str {
@@ -352,14 +370,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .limit(1000)?
+ .limit(None, Some(1000))?
.build()?;
// Should push the limit down to table provider
// When it has a select
- let expected = "Limit: 1000\
+ let expected = "Limit: skip=None, fetch=1000\
\n Projection: #test.a\
- \n TableScan: test projection=None, limit=1000";
+ \n TableScan: test projection=None, fetch=1000";
assert_optimized_plan_eq(&plan, expected);
@@ -370,16 +388,16 @@ mod test {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .limit(1000)?
- .limit(10)?
+ .limit(None, Some(1000))?
+ .limit(None, Some(10))?
.build()?;
// Should push down the smallest limit
// Towards table scan
// This rule doesn't replace multiple limits
- let expected = "Limit: 10\
- \n Limit: 10\
- \n TableScan: test projection=None, limit=10";
+ let expected = "Limit: skip=None, fetch=10\
+ \n Limit: skip=None, fetch=10\
+ \n TableScan: test projection=None, fetch=10";
assert_optimized_plan_eq(&plan, expected);
@@ -392,11 +410,11 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![max(col("b"))])?
- .limit(1000)?
+ .limit(None, Some(1000))?
.build()?;
// Limit should *not* push down aggregate node
- let expected = "Limit: 1000\
+ let expected = "Limit: skip=None, fetch=1000\
\n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
\n TableScan: test projection=None";
@@ -411,16 +429,16 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan.clone())
.union(LogicalPlanBuilder::from(table_scan).build()?)?
- .limit(1000)?
+ .limit(None, Some(1000))?
.build()?;
// Limit should push down through union
- let expected = "Limit: 1000\
+ let expected = "Limit: skip=None, fetch=1000\
\n Union\
- \n Limit: 1000\
- \n TableScan: test projection=None, limit=1000\
- \n Limit: 1000\
- \n TableScan: test projection=None, limit=1000";
+ \n Limit: skip=None, fetch=1000\
+ \n TableScan: test projection=None, fetch=1000\
+ \n Limit: skip=None, fetch=1000\
+ \n TableScan: test projection=None, fetch=1000";
assert_optimized_plan_eq(&plan, expected);
@@ -432,16 +450,16 @@ mod test {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .limit(1000)?
+ .limit(None, Some(1000))?
.aggregate(vec![col("a")], vec![max(col("b"))])?
- .limit(10)?
+ .limit(None, Some(10))?
.build()?;
// Limit should use deeper LIMIT 1000, but Limit 10 shouldn't push
down aggregation
- let expected = "Limit: 10\
+ let expected = "Limit: skip=None, fetch=10\
\n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
- \n Limit: 1000\
- \n TableScan: test projection=None, limit=1000";
+ \n Limit: skip=None, fetch=1000\
+ \n TableScan: test projection=None, fetch=1000";
assert_optimized_plan_eq(&plan, expected);
@@ -451,11 +469,13 @@ mod test {
#[test]
fn limit_pushdown_should_not_pushdown_limit_with_offset_only() ->
Result<()> {
let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan).offset(10)?.build()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .limit(Some(10), None)?
+ .build()?;
// Should not push any limit down to table provider
// When it has a select
- let expected = "Offset: 10\
+ let expected = "Limit: skip=10, fetch=None\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -468,16 +488,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Should push the limit down to table provider
// When it has a select
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Projection: #test.a\
- \n TableScan: test projection=None, limit=1010";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Projection: #test.a\
+ \n TableScan: test projection=None, fetch=1010";
assert_optimized_plan_eq(&plan, expected);
@@ -490,14 +508,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .limit(1000)?
- .offset(10)?
+ .limit(None, Some(1000))?
+ .limit(Some(10), None)?
.build()?;
- let expected = "Offset: 10\
- \n Limit: 1000\
+ let expected = "Limit: skip=10, fetch=None\
+ \n Limit: skip=None, fetch=1000\
\n Projection: #test.a\
- \n TableScan: test projection=None, limit=1000";
+ \n TableScan: test projection=None, fetch=1000";
assert_optimized_plan_eq(&plan, expected);
@@ -510,14 +528,14 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), None)?
+ .limit(None, Some(1000))?
.build()?;
- let expected = "Limit: 1000\
- \n Offset: 10\
+ let expected = "Limit: skip=None, fetch=1000\
+ \n Limit: skip=10, fetch=1000\
\n Projection: #test.a\
- \n TableScan: test projection=None, limit=1010";
+ \n TableScan: test projection=None, fetch=1010";
assert_optimized_plan_eq(&plan, expected);
@@ -529,18 +547,18 @@ mod test {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .offset(10)?
- .limit(1000)?
- .limit(10)?
+ .limit(Some(10), None)?
+ .limit(None, Some(1000))?
+ .limit(None, Some(10))?
.build()?;
// Should push down the smallest limit
// Towards table scan
// This rule doesn't replace multiple limits
- let expected = "Limit: 10\
- \n Limit: 10\
- \n Offset: 10\
- \n TableScan: test projection=None, limit=20";
+ let expected = "Limit: skip=None, fetch=10\
+ \n Limit: skip=None, fetch=10\
+ \n Limit: skip=10, fetch=10\
+ \n TableScan: test projection=None, fetch=20";
assert_optimized_plan_eq(&plan, expected);
@@ -553,15 +571,13 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![max(col("b"))])?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit should *not* push down aggregate node
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
- \n TableScan: test projection=None";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
+ \n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -574,18 +590,16 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan.clone())
.union(LogicalPlanBuilder::from(table_scan).build()?)?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit should push down through union
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Union\
- \n Limit: 1010\
- \n TableScan: test projection=None, limit=1010\
- \n Limit: 1010\
- \n TableScan: test projection=None, limit=1010";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Union\
+ \n Limit: skip=None, fetch=1010\
+ \n TableScan: test projection=None, fetch=1010\
+ \n Limit: skip=None, fetch=1010\
+ \n TableScan: test projection=None, fetch=1010";
assert_optimized_plan_eq(&plan, expected);
@@ -604,16 +618,14 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .limit(1000)?
- .offset(10)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit pushdown Not supported in Join
- let expected = "Offset: 10\
- \n Limit: 1000\
- \n Inner Join: #test.a = #test2.a\
- \n TableScan: test projection=None\
- \n TableScan: test2 projection=None";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Inner Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -632,16 +644,14 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit pushdown Not supported in Join
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Inner Join: #test.a = #test2.a\
- \n TableScan: test projection=None\
- \n TableScan: test2 projection=None";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Inner Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -661,18 +671,16 @@ mod test {
let outer_query = LogicalPlanBuilder::from(table_scan_2)
.project(vec![col("a")])?
.filter(exists(Arc::new(subquery)))?
- .limit(100)?
- .offset(10)?
+ .limit(Some(10), Some(100))?
.build()?;
// Limit pushdown Not supported in sub_query
- let expected = "Offset: 10\
- \n Limit: 100\
- \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+ let expected = "Limit: skip=10, fetch=100\
+ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1 projection=None)\
- \n Projection: #test2.a\
- \n TableScan: test2 projection=None";
+ \n Projection: #test2.a\
+ \n TableScan: test2 projection=None";
assert_optimized_plan_eq(&outer_query, expected);
@@ -692,18 +700,16 @@ mod test {
let outer_query = LogicalPlanBuilder::from(table_scan_2)
.project(vec![col("a")])?
.filter(exists(Arc::new(subquery)))?
- .offset(10)?
- .limit(100)?
+ .limit(Some(10), Some(100))?
.build()?;
// Limit pushdown Not supported in sub_query
- let expected = "Limit: 100\
- \n Offset: 10\
- \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+ let expected = "Limit: skip=10, fetch=100\
+ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1 projection=None)\
- \n Projection: #test2.a\
- \n TableScan: test2 projection=None";
+ \n Projection: #test2.a\
+ \n TableScan: test2 projection=None";
assert_optimized_plan_eq(&outer_query, expected);
@@ -722,13 +728,13 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .limit(1000)?
+ .limit(None, Some(1000))?
.build()?;
// Limit pushdown Not supported in Join
- let expected = "Limit: 1000\
+ let expected = "Limit: skip=None, fetch=1000\
\n Left Join: #test.a = #test2.a\
- \n TableScan: test projection=None, limit=1000\
+ \n TableScan: test projection=None, fetch=1000\
\n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -748,16 +754,14 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit pushdown Not supported in Join
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Left Join: #test.a = #test2.a\
- \n TableScan: test projection=None, limit=1010\
- \n TableScan: test2 projection=None";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Left Join: #test.a = #test2.a\
+ \n TableScan: test projection=None, fetch=1010\
+ \n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
@@ -776,14 +780,14 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .limit(1000)?
+ .limit(None, Some(1000))?
.build()?;
// Limit pushdown Not supported in Join
- let expected = "Limit: 1000\
+ let expected = "Limit: skip=None, fetch=1000\
\n Right Join: #test.a = #test2.a\
\n TableScan: test projection=None\
- \n TableScan: test2 projection=None, limit=1000";
+ \n TableScan: test2 projection=None, fetch=1000";
assert_optimized_plan_eq(&plan, expected);
@@ -802,16 +806,14 @@ mod test {
(vec!["a"], vec!["a"]),
None,
)?
- .offset(10)?
- .limit(1000)?
+ .limit(Some(10), Some(1000))?
.build()?;
// Limit pushdown with offset supported in right outer join
- let expected = "Limit: 1000\
- \n Offset: 10\
- \n Right Join: #test.a = #test2.a\
- \n TableScan: test projection=None\
- \n TableScan: test2 projection=None, limit=1010";
+ let expected = "Limit: skip=10, fetch=1000\
+ \n Right Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None, fetch=1010";
assert_optimized_plan_eq(&plan, expected);
diff --git a/datafusion/optimizer/src/projection_push_down.rs
b/datafusion/optimizer/src/projection_push_down.rs
index c6c81fd1f..c9aee1e03 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -362,7 +362,7 @@ fn optimize_plan(
table_name,
source,
filters,
- limit,
+ fetch: limit,
..
}) => {
let (projection, projected_schema) = get_projected_schema(
@@ -378,7 +378,7 @@ fn optimize_plan(
projection: Some(projection),
projected_schema,
filters: filters.clone(),
- limit: *limit,
+ fetch: *limit,
}))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
@@ -486,7 +486,6 @@ fn optimize_plan(
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
LogicalPlan::Limit(_)
- | LogicalPlan::Offset(_)
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation(_)
@@ -866,12 +865,12 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("c"), col("a")])?
- .limit(5)?
+ .limit(None, Some(5))?
.build()?;
assert_fields_eq(&plan, vec!["c", "a"]);
- let expected = "Limit: 5\
+ let expected = "Limit: skip=None, fetch=5\
\n Projection: #test.c, #test.a\
\n TableScan: test projection=Some([a, c])";
diff --git a/datafusion/optimizer/src/simplify_expressions.rs
b/datafusion/optimizer/src/simplify_expressions.rs
index 9be9bfde8..e40180f69 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -1630,7 +1630,7 @@ mod tests {
.unwrap()
.filter(col("c").not_eq(lit(false)))
.unwrap()
- .limit(1)
+ .limit(None, Some(1))
.unwrap()
.project(vec![col("a")])
.unwrap()
@@ -1639,7 +1639,7 @@ mod tests {
let expected = "\
Projection: #test.a\
- \n Limit: 1\
+ \n Limit: skip=None, fetch=1\
\n Filter: #test.c AS test.c != Boolean(false)\
\n Filter: NOT #test.b AS test.b != Boolean(true)\
\n TableScan: test projection=None";
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index d17bad849..4522cd63e 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -68,7 +68,6 @@ message LogicalPlanNode {
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
- OffsetNode offset = 23;
}
}
@@ -244,12 +243,10 @@ message CrossJoinNode {
message LimitNode {
LogicalPlanNode input = 1;
- uint32 limit = 2;
-}
-
-message OffsetNode {
- LogicalPlanNode input = 1;
- uint32 offset = 2;
+ // The number of rows to skip before fetch; non-positive means don't skip any
+ int64 skip = 2;
+ // Maximum number of rows to fetch; negative means no limit
+ int64 fetch = 3;
}
message SelectionExecNode {
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index c6ab11458..c1ee0ab05 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -39,8 +39,7 @@ use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateView,
CrossJoin, EmptyRelation, Extension, Filter, Join, JoinConstraint,
JoinType,
- Limit, Offset, Projection, Repartition, Sort, SubqueryAlias,
TableScan, Values,
- Window,
+ Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan,
Values, Window,
},
Expr, LogicalPlan, LogicalPlanBuilder,
};
@@ -566,16 +565,19 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan =
into_logical_plan!(limit.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .limit(limit.limit as usize)?
- .build()
- }
- LogicalPlanType::Offset(offset) => {
- let input: LogicalPlan =
- into_logical_plan!(offset.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .offset(offset.offset as usize)?
- .build()
+ let skip = if limit.skip <= 0 {
+ None
+ } else {
+ Some(limit.skip as usize)
+ };
+
+ let fetch = if limit.fetch < 0 {
+ None
+ } else {
+ Some(limit.fetch as usize)
+ };
+
+ LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
LogicalPlanType::Join(join) => {
let left_keys: Vec<Column> =
@@ -920,7 +922,7 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
- LogicalPlan::Limit(Limit { input, n }) => {
+ LogicalPlan::Limit(Limit { input, skip, fetch }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
@@ -930,22 +932,8 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
protobuf::LimitNode {
input: Some(Box::new(input)),
- limit: *n as u32,
- },
- ))),
- })
- }
- LogicalPlan::Offset(Offset { input, offset }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
- protobuf::OffsetNode {
- input: Some(Box::new(input)),
- offset: *offset as u32,
+ skip: skip.unwrap_or(0) as i64,
+ fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
},
))),
})
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 9f028e7e5..2a33be32d 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -296,13 +296,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.set_expr_to_plan(set_expr, alias, ctes,
outer_query_schema)?;
let plan = self.order_by(plan, query.order_by)?;
-
- // Offset is the parent of Limit.
- // If both OFFSET and LIMIT appear,
- // then OFFSET rows are skipped before starting to count the LIMIT
rows that are returned.
- // see https://www.postgresql.org/docs/current/queries-limit.html
- let plan = self.offset(plan, query.offset)?;
- self.limit(plan, query.limit)
+ self.limit(plan, query.offset, query.limit)
}
fn set_expr_to_plan(
@@ -1212,57 +1206,59 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
/// Wrap a plan in a limit
- fn limit(&self, input: LogicalPlan, limit: Option<SQLExpr>) ->
Result<LogicalPlan> {
- match limit {
- Some(limit_expr) => {
- let n = match self.sql_to_rex(
- limit_expr,
- input.schema(),
- &mut HashMap::new(),
- )? {
- Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as
usize),
- _ => Err(DataFusionError::Plan(
- "Unexpected expression for LIMIT clause".to_string(),
- )),
- }?;
-
- LogicalPlanBuilder::from(input).limit(n)?.build()
- }
- _ => Ok(input),
- }
- }
-
- /// Wrap a plan in a offset
- fn offset(
+ fn limit(
&self,
input: LogicalPlan,
- offset: Option<SQLOffset>,
+ skip: Option<SQLOffset>,
+ fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
- match offset {
- Some(offset_expr) => {
- let offset = match self.sql_to_rex(
- offset_expr.value,
+ if skip.is_none() && fetch.is_none() {
+ return Ok(input);
+ }
+
+ let skip = match skip {
+ Some(skip_expr) => {
+ let skip = match self.sql_to_rex(
+ skip_expr.value,
input.schema(),
&mut HashMap::new(),
)? {
- Expr::Literal(ScalarValue::Int64(Some(offset))) => {
- if offset < 0 {
+ Expr::Literal(ScalarValue::Int64(Some(s))) => {
+ if s < 0 {
return Err(DataFusionError::Plan(format!(
"Offset must be >= 0, '{}' was provided.",
- offset
+ s
)));
}
- Ok(offset as usize)
+ Ok(s as usize)
}
_ => Err(DataFusionError::Plan(
"Unexpected expression in OFFSET clause".to_string(),
)),
}?;
+ Some(skip)
+ }
+ _ => None,
+ };
- LogicalPlanBuilder::from(input).offset(offset)?.build()
+ let fetch = match fetch {
+ Some(limit_expr) => {
+ let n = match self.sql_to_rex(
+ limit_expr,
+ input.schema(),
+ &mut HashMap::new(),
+ )? {
+ Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as
usize),
+ _ => Err(DataFusionError::Plan(
+ "Unexpected expression for LIMIT clause".to_string(),
+ )),
+ }?;
+ Some(n)
}
- _ => Ok(input),
- }
+ _ => None,
+ };
+
+ LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
/// Wrap the logical in a sort
@@ -4802,11 +4798,10 @@ mod tests {
#[test]
fn test_zero_offset_with_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET
0;";
- let expected = "Limit: 5\
- \n Offset: 0\
- \n Projection: #person.id\
- \n Filter: #person.id > Int64(100)\
- \n TableScan: person
projection=None";
+ let expected = "Limit: skip=0, fetch=5\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
// Flip the order of LIMIT and OFFSET in the query. Plan should remain
the same.
@@ -4817,7 +4812,7 @@ mod tests {
#[test]
fn test_offset_no_limit() {
let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 5;";
- let expected = "Offset: 5\
+ let expected = "Limit: skip=5, fetch=None\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
@@ -4827,22 +4822,20 @@ mod tests {
#[test]
fn test_offset_after_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET
3;";
- let expected = "Limit: 5\
- \n Offset: 3\
- \n Projection: #person.id\
- \n Filter: #person.id > Int64(100)\
- \n TableScan: person projection=None";
+ let expected = "Limit: skip=3, fetch=5\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn test_offset_before_limit() {
let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT
5;";
- let expected = "Limit: 5\
- \n Offset: 3\
- \n Projection: #person.id\
- \n Filter: #person.id > Int64(100)\
- \n TableScan: person projection=None";
+ let expected = "Limit: skip=3, fetch=5\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
}