This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch datafusion-expr-window-ops in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit c0a9f56505694d10eae9ba86cefbe80297a2fa4e Author: Jiayu Liu <[email protected]> AuthorDate: Sun Feb 6 15:31:10 2022 +0800 include window frames and operator into datafusion-expr --- datafusion-expr/Cargo.toml | 1 + datafusion-expr/src/lib.rs | 4 + .../src/operator.rs | 73 +---- .../src/window_frame.rs | 22 +- datafusion/src/logical_plan/operators.rs | 83 +---- datafusion/src/logical_plan/window_frames.rs | 363 +-------------------- 6 files changed, 22 insertions(+), 524 deletions(-) diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml index 3cac735..d8a4f29 100644 --- a/datafusion-expr/Cargo.toml +++ b/datafusion-expr/Cargo.toml @@ -38,3 +38,4 @@ path = "src/lib.rs" [dependencies] datafusion-common = { path = "../datafusion-common" } arrow = { version = "8.0.0", features = ["prettyprint"] } +sqlparser = "0.13" diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs index b6eaaf7..13fa93e 100644 --- a/datafusion-expr/src/lib.rs +++ b/datafusion-expr/src/lib.rs @@ -16,7 +16,11 @@ // under the License. mod aggregate_function; +mod operator; +mod window_frame; mod window_function; pub use aggregate_function::AggregateFunction; +pub use operator::Operator; +pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; pub use window_function::{BuiltInWindowFunction, WindowFunction}; diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion-expr/src/operator.rs similarity index 67% copy from datafusion/src/logical_plan/operators.rs copy to datafusion-expr/src/operator.rs index 14ccab0..e6b7e35 100644 --- a/datafusion/src/logical_plan/operators.rs +++ b/datafusion-expr/src/operator.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt, ops}; - -use super::{binary_expr, Expr}; +use std::fmt; /// Operators applied to expressions #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] @@ -97,72 +95,3 @@ impl fmt::Display for Operator { write!(f, "{}", display) } } - -impl ops::Add for Expr { - type Output = Self; - - fn add(self, rhs: Self) -> Self { - binary_expr(self, Operator::Plus, rhs) - } -} - -impl ops::Sub for Expr { - type Output = Self; - - fn sub(self, rhs: Self) -> Self { - binary_expr(self, Operator::Minus, rhs) - } -} - -impl ops::Mul for Expr { - type Output = Self; - - fn mul(self, rhs: Self) -> Self { - binary_expr(self, Operator::Multiply, rhs) - } -} - -impl ops::Div for Expr { - type Output = Self; - - fn div(self, rhs: Self) -> Self { - binary_expr(self, Operator::Divide, rhs) - } -} - -impl ops::Rem for Expr { - type Output = Self; - - fn rem(self, rhs: Self) -> Self { - binary_expr(self, Operator::Modulo, rhs) - } -} - -#[cfg(test)] -mod tests { - use crate::prelude::lit; - - #[test] - fn test_operators() { - assert_eq!( - format!("{:?}", lit(1u32) + lit(2u32)), - "UInt32(1) + UInt32(2)" - ); - assert_eq!( - format!("{:?}", lit(1u32) - lit(2u32)), - "UInt32(1) - UInt32(2)" - ); - assert_eq!( - format!("{:?}", lit(1u32) * lit(2u32)), - "UInt32(1) * UInt32(2)" - ); - assert_eq!( - format!("{:?}", lit(1u32) / lit(2u32)), - "UInt32(1) / UInt32(2)" - ); - assert_eq!( - format!("{:?}", lit(1u32) % lit(2u32)), - "UInt32(1) % UInt32(2)" - ); - } -} diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion-expr/src/window_frame.rs similarity index 96% copy from datafusion/src/logical_plan/window_frames.rs copy to datafusion-expr/src/window_frame.rs index 42e0a7e..ba65a50 100644 --- a/datafusion/src/logical_plan/window_frames.rs +++ b/datafusion-expr/src/window_frame.rs @@ -23,7 +23,7 @@ //! - An ending frame boundary, //! - An EXCLUDE clause. -use crate::error::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use sqlparser::ast; use std::cmp::Ordering; use std::convert::{From, TryFrom}; @@ -78,9 +78,9 @@ impl TryFrom<ast::WindowFrame> for WindowFrame { )) } else if start_bound > end_bound { Err(DataFusionError::Execution(format!( - "Invalid window frame: start bound ({}) cannot be larger than end bound ({})", - start_bound, end_bound - ))) + "Invalid window frame: start bound ({}) cannot be larger than end bound ({})", + start_bound, end_bound + ))) } else { let units = value.units.into(); if units == WindowFrameUnits::Range { @@ -268,9 +268,10 @@ mod tests { }; let result = WindowFrame::try_from(window_frame); assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned() - ); + result.err().unwrap().to_string(), + "Execution error: Invalid window frame: start bound cannot be unbounded following" + .to_owned() + ); let window_frame = ast::WindowFrame { units: ast::WindowFrameUnits::Range, @@ -279,9 +280,10 @@ mod tests { }; let result = WindowFrame::try_from(window_frame); assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned() - ); + result.err().unwrap().to_string(), + "Execution error: Invalid window frame: end bound cannot be unbounded preceding" + .to_owned() + ); let window_frame = ast::WindowFrame { units: ast::WindowFrameUnits::Range, diff --git a/datafusion/src/logical_plan/operators.rs b/datafusion/src/logical_plan/operators.rs index 14ccab0..813f7e0 100644 --- a/datafusion/src/logical_plan/operators.rs +++ b/datafusion/src/logical_plan/operators.rs @@ -15,88 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt, ops}; - use super::{binary_expr, Expr}; - -/// Operators applied to expressions -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] -pub enum Operator { - /// Expressions are equal - Eq, - /// Expressions are not equal - NotEq, - /// Left side is smaller than right side - Lt, - /// Left side is smaller or equal to right side - LtEq, - /// Left side is greater than right side - Gt, - /// Left side is greater or equal to right side - GtEq, - /// Addition - Plus, - /// Subtraction - Minus, - /// Multiplication operator, like `*` - Multiply, - /// Division operator, like `/` - Divide, - /// Remainder operator, like `%` - Modulo, - /// Logical AND, like `&&` - And, - /// Logical OR, like `||` - Or, - /// Matches a wildcard pattern - Like, - /// Does not match a wildcard pattern - NotLike, - /// IS DISTINCT FROM - IsDistinctFrom, - /// IS NOT DISTINCT FROM - IsNotDistinctFrom, - /// Case sensitive regex match - RegexMatch, - /// Case insensitive regex match - RegexIMatch, - /// Case sensitive regex not match - RegexNotMatch, - /// Case insensitive regex not match - RegexNotIMatch, - /// Bitwise and, like `&` - BitwiseAnd, -} - -impl fmt::Display for Operator { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let display = match &self { - Operator::Eq => "=", - Operator::NotEq => "!=", - Operator::Lt => "<", - Operator::LtEq => "<=", - Operator::Gt => ">", - Operator::GtEq => ">=", - Operator::Plus => "+", - Operator::Minus => "-", - Operator::Multiply => "*", - Operator::Divide => "/", - Operator::Modulo => "%", - Operator::And => "AND", - Operator::Or => "OR", - Operator::Like => "LIKE", - Operator::NotLike => "NOT LIKE", - Operator::RegexMatch => "~", - Operator::RegexIMatch => "~*", - Operator::RegexNotMatch => "!~", - Operator::RegexNotIMatch => "!~*", - Operator::IsDistinctFrom => "IS DISTINCT FROM", - Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM", - Operator::BitwiseAnd => "&", - }; - write!(f, "{}", display) - } -} +pub use datafusion_expr::Operator; +use std::ops; impl ops::Add for Expr { type Output = Self; diff --git a/datafusion/src/logical_plan/window_frames.rs b/datafusion/src/logical_plan/window_frames.rs index 42e0a7e..5195820 100644 --- a/datafusion/src/logical_plan/window_frames.rs +++ b/datafusion/src/logical_plan/window_frames.rs @@ -15,365 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Window frame -//! -//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts: -//! - A frame type - either ROWS, RANGE or GROUPS, -//! - A starting frame boundary, -//! - An ending frame boundary, -//! - An EXCLUDE clause. +//! Window frame types, reimported from datafusion_expr -use crate::error::{DataFusionError, Result}; -use sqlparser::ast; -use std::cmp::Ordering; -use std::convert::{From, TryFrom}; -use std::fmt; -use std::hash::{Hash, Hasher}; - -/// The frame-spec determines which output rows are read by an aggregate window function. -/// -/// The ending frame boundary can be omitted (if the BETWEEN and AND keywords that surround the -/// starting frame boundary are also omitted), in which case the ending frame boundary defaults to -/// CURRENT ROW. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)] -pub struct WindowFrame { - /// A frame type - either ROWS, RANGE or GROUPS - pub units: WindowFrameUnits, - /// A starting frame boundary - pub start_bound: WindowFrameBound, - /// An ending frame boundary - pub end_bound: WindowFrameBound, -} - -impl fmt::Display for WindowFrame { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{} BETWEEN {} AND {}", - self.units, self.start_bound, self.end_bound - )?; - Ok(()) - } -} - -impl TryFrom<ast::WindowFrame> for WindowFrame { - type Error = DataFusionError; - - fn try_from(value: ast::WindowFrame) -> Result<Self> { - let start_bound = value.start_bound.into(); - let end_bound = value - .end_bound - .map(WindowFrameBound::from) - .unwrap_or(WindowFrameBound::CurrentRow); - - if let WindowFrameBound::Following(None) = start_bound { - Err(DataFusionError::Execution( - "Invalid window frame: start bound cannot be unbounded following" - .to_owned(), - )) - } else if let WindowFrameBound::Preceding(None) = end_bound { - Err(DataFusionError::Execution( - "Invalid window frame: end bound cannot be unbounded preceding" - .to_owned(), - )) - } else if start_bound > end_bound { - Err(DataFusionError::Execution(format!( - "Invalid window frame: start bound ({}) cannot be larger than end bound ({})", - start_bound, end_bound - ))) - } else { - let units = value.units.into(); - if units == WindowFrameUnits::Range { - for bound in &[start_bound, end_bound] { - match bound { - WindowFrameBound::Preceding(Some(v)) - | WindowFrameBound::Following(Some(v)) - if *v > 0 => - { - Err(DataFusionError::NotImplemented(format!( - "With WindowFrameUnits={}, the bound cannot be {} PRECEDING or FOLLOWING at the moment", - units, v - ))) - } - _ => Ok(()), - }?; - } - } - Ok(Self { - units, - start_bound, - end_bound, - }) - } - } -} - -impl Default for WindowFrame { - fn default() -> Self { - WindowFrame { - units: WindowFrameUnits::Range, - start_bound: WindowFrameBound::Preceding(None), - end_bound: WindowFrameBound::CurrentRow, - } - } -} - -/// There are five ways to describe starting and ending frame boundaries: -/// -/// 1. UNBOUNDED PRECEDING -/// 2. <expr> PRECEDING -/// 3. CURRENT ROW -/// 4. <expr> FOLLOWING -/// 5. UNBOUNDED FOLLOWING -/// -/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary) -#[derive(Debug, Clone, Copy, Eq)] -pub enum WindowFrameBound { - /// 1. UNBOUNDED PRECEDING - /// The frame boundary is the first row in the partition. - /// - /// 2. <expr> PRECEDING - /// <expr> must be a non-negative constant numeric expression. The boundary is a row that - /// is <expr> "units" prior to the current row. - Preceding(Option<u64>), - /// 3. The current row. - /// - /// For RANGE and GROUPS frame types, peers of the current row are also - /// included in the frame, unless specifically excluded by the EXCLUDE clause. - /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame - /// boundary. - CurrentRow, - /// 4. This is the same as "<expr> PRECEDING" except that the boundary is <expr> units after the - /// current rather than before the current row. - /// - /// 5. UNBOUNDED FOLLOWING - /// The frame boundary is the last row in the partition. - Following(Option<u64>), -} - -impl From<ast::WindowFrameBound> for WindowFrameBound { - fn from(value: ast::WindowFrameBound) -> Self { - match value { - ast::WindowFrameBound::Preceding(v) => Self::Preceding(v), - ast::WindowFrameBound::Following(v) => Self::Following(v), - ast::WindowFrameBound::CurrentRow => Self::CurrentRow, - } - } -} - -impl fmt::Display for WindowFrameBound { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"), - WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"), - WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"), - WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n), - WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n), - } - } -} - -impl PartialEq for WindowFrameBound { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl PartialOrd for WindowFrameBound { - fn partial_cmp(&self, other: &Self) -> Option<Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for WindowFrameBound { - fn cmp(&self, other: &Self) -> Ordering { - self.get_rank().cmp(&other.get_rank()) - } -} - -impl Hash for WindowFrameBound { - fn hash<H: Hasher>(&self, state: &mut H) { - self.get_rank().hash(state) - } -} - -impl WindowFrameBound { - /// get the rank of this window frame bound. - /// - /// the rank is a tuple of (u8, u64) because we'll firstly compare the kind and then the value - /// which requires special handling e.g. with preceding the larger the value the smaller the - /// rank and also for 0 preceding / following it is the same as current row - fn get_rank(&self) -> (u8, u64) { - match self { - WindowFrameBound::Preceding(None) => (0, 0), - WindowFrameBound::Following(None) => (4, 0), - WindowFrameBound::Preceding(Some(0)) - | WindowFrameBound::CurrentRow - | WindowFrameBound::Following(Some(0)) => (2, 0), - WindowFrameBound::Preceding(Some(v)) => (1, u64::MAX - *v), - WindowFrameBound::Following(Some(v)) => (3, *v), - } - } -} - -/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the -/// starting and ending boundaries of the frame are measured. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)] -pub enum WindowFrameUnits { - /// The ROWS frame type means that the starting and ending boundaries for the frame are - /// determined by counting individual rows relative to the current row. - Rows, - /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one - /// term. Call that term "X". With the RANGE frame type, the elements of the frame are - /// determined by computing the value of expression X for all rows in the partition and framing - /// those rows for which the value of X is within a certain range of the value of X for the - /// current row. - Range, - /// The GROUPS frame type means that the starting and ending boundaries are determine - /// by counting "groups" relative to the current group. A "group" is a set of rows that all have - /// equivalent values for all all terms of the window ORDER BY clause. - Groups, -} - -impl fmt::Display for WindowFrameUnits { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str(match self { - WindowFrameUnits::Rows => "ROWS", - WindowFrameUnits::Range => "RANGE", - WindowFrameUnits::Groups => "GROUPS", - }) - } -} - -impl From<ast::WindowFrameUnits> for WindowFrameUnits { - fn from(value: ast::WindowFrameUnits) -> Self { - match value { - ast::WindowFrameUnits::Range => Self::Range, - ast::WindowFrameUnits::Groups => Self::Groups, - ast::WindowFrameUnits::Rows => Self::Rows, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_window_frame_creation() -> Result<()> { - let window_frame = ast::WindowFrame { - units: ast::WindowFrameUnits::Range, - start_bound: ast::WindowFrameBound::Following(None), - end_bound: None, - }; - let result = WindowFrame::try_from(window_frame); - assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned() - ); - - let window_frame = ast::WindowFrame { - units: ast::WindowFrameUnits::Range, - start_bound: ast::WindowFrameBound::Preceding(None), - end_bound: Some(ast::WindowFrameBound::Preceding(None)), - }; - let result = WindowFrame::try_from(window_frame); - assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: end bound cannot be unbounded preceding".to_owned() - ); - - let window_frame = ast::WindowFrame { - units: ast::WindowFrameUnits::Range, - start_bound: ast::WindowFrameBound::Preceding(Some(1)), - end_bound: Some(ast::WindowFrameBound::Preceding(Some(2))), - }; - let result = WindowFrame::try_from(window_frame); - assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: start bound (1 PRECEDING) cannot be larger than end bound (2 PRECEDING)".to_owned() - ); - - let window_frame = ast::WindowFrame { - units: ast::WindowFrameUnits::Range, - start_bound: ast::WindowFrameBound::Preceding(Some(2)), - end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))), - }; - let result = WindowFrame::try_from(window_frame); - assert_eq!( - result.err().unwrap().to_string(), - "This feature is not implemented: With WindowFrameUnits=RANGE, the bound cannot be 2 PRECEDING or FOLLOWING at the moment".to_owned() - ); - - let window_frame = ast::WindowFrame { - units: ast::WindowFrameUnits::Rows, - start_bound: ast::WindowFrameBound::Preceding(Some(2)), - end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))), - }; - let result = WindowFrame::try_from(window_frame); - assert!(result.is_ok()); - Ok(()) - } - - #[test] - fn test_eq() { - assert_eq!( - WindowFrameBound::Preceding(Some(0)), - WindowFrameBound::CurrentRow - ); - assert_eq!( - WindowFrameBound::CurrentRow, - WindowFrameBound::Following(Some(0)) - ); - assert_eq!( - WindowFrameBound::Following(Some(2)), - WindowFrameBound::Following(Some(2)) - ); - assert_eq!( - WindowFrameBound::Following(None), - WindowFrameBound::Following(None) - ); - assert_eq!( - WindowFrameBound::Preceding(Some(2)), - WindowFrameBound::Preceding(Some(2)) - ); - assert_eq!( - WindowFrameBound::Preceding(None), - WindowFrameBound::Preceding(None) - ); - } - - #[test] - fn test_ord() { - assert!(WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::CurrentRow); - // ! yes this is correct! - assert!( - WindowFrameBound::Preceding(Some(2)) < WindowFrameBound::Preceding(Some(1)) - ); - assert!( - WindowFrameBound::Preceding(Some(u64::MAX)) - < WindowFrameBound::Preceding(Some(u64::MAX - 1)) - ); - assert!( - WindowFrameBound::Preceding(None) - < WindowFrameBound::Preceding(Some(1000000)) - ); - assert!( - WindowFrameBound::Preceding(None) - < WindowFrameBound::Preceding(Some(u64::MAX)) - ); - assert!(WindowFrameBound::Preceding(None) < WindowFrameBound::Following(Some(0))); - assert!( - WindowFrameBound::Preceding(Some(1)) < WindowFrameBound::Following(Some(1)) - ); - assert!(WindowFrameBound::CurrentRow < WindowFrameBound::Following(Some(1))); - assert!( - WindowFrameBound::Following(Some(1)) < WindowFrameBound::Following(Some(2)) - ); - assert!(WindowFrameBound::Following(Some(2)) < WindowFrameBound::Following(None)); - assert!( - WindowFrameBound::Following(Some(u64::MAX)) - < WindowFrameBound::Following(None) - ); - } -} +pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
