alamb commented on code in PR #3570: URL: https://github.com/apache/arrow-datafusion/pull/3570#discussion_r978678261
########## integration-tests/sqls/simple_ordered_row.sql: ########## @@ -0,0 +1,26 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at + +-- http://www.apache.org/licenses/LICENSE-2.0 + +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SELECT Review Comment: this is great to test against posgres 👍 ########## datafusion/common/src/scalar.rs: ########## @@ -306,6 +309,192 @@ impl PartialOrd for ScalarValue { impl Eq for ScalarValue {} +// TODO implement this in arrow-rs with simd +// https://github.com/apache/arrow-rs/issues/1010 +macro_rules! decimal_op { + ($LHS:expr, $RHS:expr, $PRECISION:expr, $LHS_SCALE:expr, $RHS_SCALE:expr, $OPERATION:tt ) => {{ + let (difference, side) = if $LHS_SCALE > $RHS_SCALE { + ($LHS_SCALE - $RHS_SCALE, true) + } else { + ($RHS_SCALE - $LHS_SCALE, false) + }; + let scale = max($LHS_SCALE, $RHS_SCALE); + match ($LHS, $RHS, difference) { + (None, None, _) => ScalarValue::Decimal128(None, $PRECISION, scale), + (None, Some(rhs_value), 0) => ScalarValue::Decimal128(Some((0 as i128) $OPERATION rhs_value), $PRECISION, scale), + (None, Some(rhs_value), _) => { + let mut new_value = ((0 as i128) $OPERATION rhs_value); + if side { + new_value *= 10_i128.pow((difference) as u32) + }; + ScalarValue::Decimal128(Some(new_value), $PRECISION, scale) + } + (Some(lhs_value), None, 0) => ScalarValue::Decimal128(Some(lhs_value $OPERATION (0 as i128)), $PRECISION, scale), + (Some(lhs_value), None, _) => { + let mut new_value = (lhs_value $OPERATION (0 as i128)); + if !!!side { + new_value *= 10_i128.pow((difference) as u32) + } + ScalarValue::Decimal128(Some(new_value), $PRECISION, scale) + } + (Some(lhs_value), Some(rhs_value), 0) => { + ScalarValue::Decimal128(Some(lhs_value $OPERATION rhs_value), $PRECISION, scale) + } + (Some(lhs_value), Some(rhs_value), _) => { + let new_value = if side { + rhs_value * 10_i128.pow((difference) as u32) $OPERATION lhs_value + } else { + lhs_value * 10_i128.pow((difference) as u32) $OPERATION rhs_value + }; + ScalarValue::Decimal128(Some(new_value), $PRECISION, scale) + } + }} + + } +} + +// Returns the result of applying operation to two scalar values, including coercion into $TYPE. +macro_rules! typed_op { + ($LEFT:expr, $RIGHT:expr, $SCALAR:ident, $TYPE:ident, $OPERATION:tt) => { + Some(ScalarValue::$SCALAR(match ($LEFT, $RIGHT) { + (None, None) => None, + (Some(a), None) => Some((*a as $TYPE) $OPERATION (0 as $TYPE)), + (None, Some(b)) => Some((0 as $TYPE) $OPERATION (*b as $TYPE)), + (Some(a), Some(b)) => Some((*a as $TYPE) $OPERATION (*b as $TYPE)), + })) + }; +} + +macro_rules! impl_common_symmetic_cases_op { + ($LHS:expr, $RHS:expr, $OPERATION:tt, [$([$L_TYPE:ident, $R_TYPE:ident, $O_TYPE:ident, $O_PRIM:ident]),+]) => { + match ($LHS, $RHS) { + $( + (ScalarValue::$L_TYPE(lhs), ScalarValue::$R_TYPE(rhs)) => { + typed_op!(lhs, rhs, $O_TYPE, $O_PRIM, $OPERATION) + } + (ScalarValue::$R_TYPE(lhs), ScalarValue::$L_TYPE(rhs)) => { + typed_op!(lhs, rhs, $O_TYPE, $O_PRIM, $OPERATION) + } + )+ + _ => None + } + } +} + +macro_rules! impl_common_cases_op { + ($LHS:expr, $RHS:expr, $OPERATION:tt) => { + match ($LHS, $RHS) { + ( + ScalarValue::Decimal128(v1, p1, s1), + ScalarValue::Decimal128(v2, p2, s2), + ) => { + let max_precision = *p1.max(p2); + Some(decimal_op!(v1, v2, max_precision, *s1, *s2, $OPERATION)) + } + (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => { + typed_op!(lhs, rhs, Float64, f64, $OPERATION) + } + (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => { + typed_op!(lhs, rhs, Float32, f32, $OPERATION) + } + (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => { + typed_op!(lhs, rhs, UInt64, u64, $OPERATION) + } + (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => { + typed_op!(lhs, rhs, Int64, i64, $OPERATION) + } + (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => { + typed_op!(lhs, rhs, UInt32, u32, $OPERATION) + } + (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => { + typed_op!(lhs, rhs, Int32, i32, $OPERATION) + } + (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => { + typed_op!(lhs, rhs, UInt16, u16, $OPERATION) + } + (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => { + typed_op!(lhs, rhs, Int16, i16, $OPERATION) + } + _ => impl_common_symmetic_cases_op!( Review Comment: I don't think we should be doing coercion in `ScalarValue` -- by the time the execution plan runs the coercsion should have already been done elsewhere. In other words, I would expect `Scalar::Float64` + `Scalar::Float64` to work, but `Scalar::Float32` + `Scalar::Float64` to fail I fear that by automatically coercing in ScalarValue we will hide errors / failures to coerce elsewhere, thus masking bugs @liukun4515 has been working on improving coercion and moving it earlier in the plan https://github.com/apache/arrow-datafusion/pull/3396 which is perhaps related ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. Review Comment: ```suggestion /// It calculates the aggregation on all rows in `value_slice` and returns an array of size `len` ``` ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. + /// We iterate each row to calculate its corresponding window. It is a running + /// window calculation. First, cur_range is calculated, then it is compared with last_range. + /// We increment the accumulator by update and retract. + /// Note that not all aggregators implement retract_batch just yet. + fn calculate_running_window( + &mut self, + value_slice: &[ArrayRef], + order_bys: &[&ArrayRef], + value_range: &Range<usize>, + ) -> Result<ArrayRef> { + let len = value_range.end - value_range.start; + let slice_order_columns = order_bys + .iter() + .map(|v| v.slice(value_range.start, value_range.end - value_range.start)) + .collect::<Vec<_>>(); + + let updated_zero_offset_value_range = Range { + start: 0, + end: value_range.end - value_range.start, + }; + let mut row_wise_results = vec![]; + let mut last_range: (usize, usize) = ( + updated_zero_offset_value_range.start, + updated_zero_offset_value_range.start, + ); + + for i in 0..len { + let window_frame = self.window_frame.ok_or_else(|| { + DataFusionError::Internal( + "Window frame cannot be empty to calculate window ranges".to_string(), + ) + })?; + let cur_range = + calculate_current_window(window_frame, &slice_order_columns, len, i)?; + + if cur_range.1 - cur_range.0 == 0 { + // We produce None if the window is empty. + row_wise_results.push(ScalarValue::try_from(self.field.data_type())) + } else { + let update: Vec<ArrayRef> = value_slice + .iter() + .map(|v| v.slice(last_range.1, cur_range.1 - last_range.1)) + .collect(); + let retract: Vec<ArrayRef> = value_slice + .iter() + .map(|v| v.slice(last_range.0, cur_range.0 - last_range.0)) + .collect(); + self.accumulator.update_batch(&update)?; + // Prevents error raising if retract is not implemented. Review Comment: I don't understand this comment -- how does it prevent an error? It seems like this code ensures a slice is bounded by the beginning of the range ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. + /// We iterate each row to calculate its corresponding window. It is a running + /// window calculation. First, cur_range is calculated, then it is compared with last_range. + /// We increment the accumulator by update and retract. + /// Note that not all aggregators implement retract_batch just yet. + fn calculate_running_window( + &mut self, + value_slice: &[ArrayRef], + order_bys: &[&ArrayRef], + value_range: &Range<usize>, + ) -> Result<ArrayRef> { + let len = value_range.end - value_range.start; + let slice_order_columns = order_bys + .iter() + .map(|v| v.slice(value_range.start, value_range.end - value_range.start)) + .collect::<Vec<_>>(); + + let updated_zero_offset_value_range = Range { + start: 0, + end: value_range.end - value_range.start, + }; + let mut row_wise_results = vec![]; + let mut last_range: (usize, usize) = ( + updated_zero_offset_value_range.start, + updated_zero_offset_value_range.start, + ); + + for i in 0..len { + let window_frame = self.window_frame.ok_or_else(|| { + DataFusionError::Internal( + "Window frame cannot be empty to calculate window ranges".to_string(), + ) + })?; + let cur_range = + calculate_current_window(window_frame, &slice_order_columns, len, i)?; + + if cur_range.1 - cur_range.0 == 0 { + // We produce None if the window is empty. + row_wise_results.push(ScalarValue::try_from(self.field.data_type())) + } else { + let update: Vec<ArrayRef> = value_slice + .iter() + .map(|v| v.slice(last_range.1, cur_range.1 - last_range.1)) + .collect(); + let retract: Vec<ArrayRef> = value_slice + .iter() + .map(|v| v.slice(last_range.0, cur_range.0 - last_range.0)) + .collect(); + self.accumulator.update_batch(&update)?; + // Prevents error raising if retract is not implemented. + if cur_range.0 - last_range.0 > 0 { + self.accumulator.retract_batch(&retract)? + } + match self.accumulator.evaluate() { + Err(_e) => row_wise_results + .push(ScalarValue::try_from(self.field.data_type())), + value => row_wise_results.push(value), + } + } + last_range = cur_range; + } + let row_wise_results: Result<Vec<ScalarValue>> = Review Comment: since we are iterating through the vec anyways above, rather than incrementally gathering `Vec<Result<_>>` you could use `?` and only push to `row_wise_results` on success. It might make this code slightly simpler ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. Review Comment: ```suggestion /// It calculates the running window logic for the rows in `value_range` of `value_slice` /// /// Returns an array of length of the `value_range` /// ``` ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( Review Comment: I recommend listing out explicitly the cases here (it is `WindowFrameBound::Following(None)`)? It seems like the error should be `Unsupported` rather than `Internal` -- perhaps a test case could show that ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. + /// We iterate each row to calculate its corresponding window. It is a running + /// window calculation. First, cur_range is calculated, then it is compared with last_range. + /// We increment the accumulator by update and retract. + /// Note that not all aggregators implement retract_batch just yet. + fn calculate_running_window( + &mut self, + value_slice: &[ArrayRef], + order_bys: &[&ArrayRef], + value_range: &Range<usize>, + ) -> Result<ArrayRef> { + let len = value_range.end - value_range.start; + let slice_order_columns = order_bys + .iter() + .map(|v| v.slice(value_range.start, value_range.end - value_range.start)) + .collect::<Vec<_>>(); + + let updated_zero_offset_value_range = Range { + start: 0, + end: value_range.end - value_range.start, + }; + let mut row_wise_results = vec![]; + let mut last_range: (usize, usize) = ( + updated_zero_offset_value_range.start, + updated_zero_offset_value_range.start, + ); + + for i in 0..len { + let window_frame = self.window_frame.ok_or_else(|| { + DataFusionError::Internal( + "Window frame cannot be empty to calculate window ranges".to_string(), + ) + })?; + let cur_range = + calculate_current_window(window_frame, &slice_order_columns, len, i)?; + + if cur_range.1 - cur_range.0 == 0 { + // We produce None if the window is empty. + row_wise_results.push(ScalarValue::try_from(self.field.data_type())) + } else { + let update: Vec<ArrayRef> = value_slice Review Comment: ```suggestion // Accumulate any new rows that have entered the window let update: Vec<ArrayRef> = value_slice ``` ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, Review Comment: It may not matter, but it might be better to try and avoid copying the `WindowFrame` for each output row ```suggestion window_frame: &WindowFrame, ``` ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. + /// We iterate each row to calculate its corresponding window. It is a running + /// window calculation. First, cur_range is calculated, then it is compared with last_range. + /// We increment the accumulator by update and retract. + /// Note that not all aggregators implement retract_batch just yet. + fn calculate_running_window( + &mut self, + value_slice: &[ArrayRef], + order_bys: &[&ArrayRef], + value_range: &Range<usize>, + ) -> Result<ArrayRef> { + let len = value_range.end - value_range.start; + let slice_order_columns = order_bys + .iter() + .map(|v| v.slice(value_range.start, value_range.end - value_range.start)) + .collect::<Vec<_>>(); + + let updated_zero_offset_value_range = Range { + start: 0, + end: value_range.end - value_range.start, + }; + let mut row_wise_results = vec![]; + let mut last_range: (usize, usize) = ( + updated_zero_offset_value_range.start, + updated_zero_offset_value_range.start, + ); + + for i in 0..len { + let window_frame = self.window_frame.ok_or_else(|| { + DataFusionError::Internal( + "Window frame cannot be empty to calculate window ranges".to_string(), + ) + })?; + let cur_range = + calculate_current_window(window_frame, &slice_order_columns, len, i)?; + + if cur_range.1 - cur_range.0 == 0 { + // We produce None if the window is empty. + row_wise_results.push(ScalarValue::try_from(self.field.data_type())) + } else { + let update: Vec<ArrayRef> = value_slice + .iter() + .map(|v| v.slice(last_range.1, cur_range.1 - last_range.1)) + .collect(); + let retract: Vec<ArrayRef> = value_slice Review Comment: ```suggestion // Remove rows that have now left the window let retract: Vec<ArrayRef> = value_slice ``` ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) + } + WindowFrameUnits::Rows => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx), + WindowFrameBound::Following(Some(n)) => Ok(min(idx + n as usize, len)), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + if idx >= n as usize { + Ok(idx - n as usize + 1) + } else { + Ok(0) + } + } + WindowFrameBound::CurrentRow => Ok(idx + 1), + WindowFrameBound::Following(Some(n)) => { + Ok(min(idx + n as usize + 1, len)) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + Ok((start?, end?)) } + WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented( + "Window frame for groups is not implemented".to_string(), + )), } } /// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. +/// across evaluation arguments based on peer equivalences. It uses many information to calculate +/// correct running window. #[derive(Debug)] struct AggregateWindowAccumulator { accumulator: Box<dyn Accumulator>, + window_frame: Option<WindowFrame>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + field: Field, } impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( + /// An ORDER BY is + fn implicit_order_by_window() -> WindowFrame { + // OVER(ORDER BY <field>) case + WindowFrame { + units: WindowFrameUnits::Range, + start_bound: WindowFrameBound::Preceding(None), + end_bound: WindowFrameBound::Following(Some(0)), + } + } + /// It calculates the whole aggregation result and copy into an array of table size. + fn calculate_whole_table( + &mut self, + value_slice: &[ArrayRef], + len: usize, + ) -> Result<ArrayRef> { + self.accumulator.update_batch(value_slice)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } + + /// It calculates the running window logic. + /// We iterate each row to calculate its corresponding window. It is a running + /// window calculation. First, cur_range is calculated, then it is compared with last_range. + /// We increment the accumulator by update and retract. + /// Note that not all aggregators implement retract_batch just yet. + fn calculate_running_window( + &mut self, + value_slice: &[ArrayRef], + order_bys: &[&ArrayRef], + value_range: &Range<usize>, + ) -> Result<ArrayRef> { + let len = value_range.end - value_range.start; + let slice_order_columns = order_bys + .iter() + .map(|v| v.slice(value_range.start, value_range.end - value_range.start)) + .collect::<Vec<_>>(); + + let updated_zero_offset_value_range = Range { + start: 0, + end: value_range.end - value_range.start, + }; + let mut row_wise_results = vec![]; + let mut last_range: (usize, usize) = ( + updated_zero_offset_value_range.start, + updated_zero_offset_value_range.start, + ); + + for i in 0..len { + let window_frame = self.window_frame.ok_or_else(|| { Review Comment: You could probably avoid a copy of `window_frame` here via `self.window_frame.as_ref().ok_or_else()`... ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -98,70 +125,238 @@ impl AggregateWindowExpr { concat(&results).map_err(DataFusionError::ArrowError) } - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result<Field> { - self.aggregate.field() - } - - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - self.aggregate.expressions() - } - fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { &self.partition_by } fn order_by(&self) -> &[PhysicalSortExpr] { &self.order_by } +} - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), +fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( + range_columns: &[ArrayRef], + idx: usize, + delta: u64, +) -> Result<usize> { + let current_row_values = range_columns + .iter() + .map(|col| ScalarValue::try_from_array(col, idx)) + .collect::<Result<Vec<ScalarValue>>>()?; + let end_range: Result<Vec<ScalarValue>> = current_row_values + .iter() + .map(|value| { + let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + Ok(if SEARCH_SIDE { + if value.is_unsigned() && value < &offset { + ScalarValue::try_from(&value.get_datatype())? + } else { + value.sub(&offset)? + } + } else { + value.add(&offset)? + }) + }) + .collect(); + // true means left, false means right + bisect::<BISECT_SIDE>(range_columns, &end_range?) +} + +/// We use start and end bounds to calculate current row's starting and ending range. This function +/// supports different modes. +/// Currently we do not support window calculation for GROUPS inside window frames +fn calculate_current_window( + window_frame: WindowFrame, + range_columns: &[ArrayRef], + len: usize, + idx: usize, +) -> Result<(usize, usize)> { + match window_frame.units { + WindowFrameUnits::Range => { + let start = match window_frame.start_bound { + // UNBOUNDED PRECEDING case + WindowFrameBound::Preceding(None) => Ok(0), + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<true, true>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<true, true>(range_columns, idx, 0) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<true, false>(range_columns, idx, n) + } + _ => Err(DataFusionError::Internal(format!( + "Error during parsing arguments of '{:?}'", + window_frame + ))), + }; + let end = match window_frame.end_bound { + WindowFrameBound::Preceding(Some(n)) => { + calculate_index_of_row::<false, true>(range_columns, idx, n) + } + WindowFrameBound::Following(Some(n)) => { + calculate_index_of_row::<false, false>(range_columns, idx, n) + } + WindowFrameBound::CurrentRow => { + calculate_index_of_row::<false, false>(range_columns, idx, 0) + } + // UNBOUNDED FOLLOWING + WindowFrameBound::Following(None) => Ok(len), + _ => Err(DataFusionError::Internal(format!( Review Comment: As above, I recommend explicitly listing out the variants rather than use `_` so that when someone wants to add that support it is easier to find the necessary places int the code Also, this error may be confusing to users -- doesn't it really mean the syntax is not supported? It would be great to make this error message more specific ```suggestion _ => Err(DataFusionError::NotImplemented(format!( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
