metesynnada commented on code in PR #3570:
URL: https://github.com/apache/arrow-datafusion/pull/3570#discussion_r982267741
##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -98,70 +125,238 @@ impl AggregateWindowExpr {
concat(&results).map_err(DataFusionError::ArrowError)
}
- fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> {
- Err(DataFusionError::NotImplemented(format!(
- "Group based evaluation for {} is not yet implemented",
- self.name()
- )))
- }
-
- fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result<ArrayRef> {
- Err(DataFusionError::NotImplemented(format!(
- "Row based evaluation for {} is not yet implemented",
- self.name()
- )))
- }
-}
-
-impl WindowExpr for AggregateWindowExpr {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- self.aggregate.name()
- }
-
- fn field(&self) -> Result<Field> {
- self.aggregate.field()
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- self.aggregate.expressions()
- }
-
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.partition_by
}
fn order_by(&self) -> &[PhysicalSortExpr] {
&self.order_by
}
+}
- /// evaluate the window function values against the batch
- fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
- match self.evaluation_mode() {
- WindowFrameUnits::Range => self.peer_based_evaluate(batch),
- WindowFrameUnits::Rows => self.row_based_evaluate(batch),
- WindowFrameUnits::Groups => self.group_based_evaluate(batch),
+fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+ range_columns: &[ArrayRef],
+ idx: usize,
+ delta: u64,
+) -> Result<usize> {
+ let current_row_values = range_columns
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, idx))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+ let end_range: Result<Vec<ScalarValue>> = current_row_values
+ .iter()
+ .map(|value| {
+ let offset = ScalarValue::try_from_value(&value.get_datatype(),
delta)?;
+ Ok(if SEARCH_SIDE {
+ if value.is_unsigned() && value < &offset {
+ ScalarValue::try_from(&value.get_datatype())?
+ } else {
+ value.sub(&offset)?
+ }
+ } else {
+ value.add(&offset)?
+ })
+ })
+ .collect();
+ // true means left, false means right
+ bisect::<BISECT_SIDE>(range_columns, &end_range?)
+}
+
+/// We use start and end bounds to calculate current row's starting and ending
range. This function
+/// supports different modes.
+/// Currently we do not support window calculation for GROUPS inside window
frames
+fn calculate_current_window(
+ window_frame: WindowFrame,
+ range_columns: &[ArrayRef],
+ len: usize,
+ idx: usize,
+) -> Result<(usize, usize)> {
+ match window_frame.units {
+ WindowFrameUnits::Range => {
+ let start = match window_frame.start_bound {
+ // UNBOUNDED PRECEDING case
+ WindowFrameBound::Preceding(None) => Ok(0),
+ WindowFrameBound::Preceding(Some(n)) => {
+ calculate_index_of_row::<true, true>(range_columns, idx, n)
+ }
+ WindowFrameBound::CurrentRow => {
+ calculate_index_of_row::<true, true>(range_columns, idx, 0)
+ }
+ WindowFrameBound::Following(Some(n)) => {
+ calculate_index_of_row::<true, false>(range_columns, idx,
n)
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Error during parsing arguments of '{:?}'",
+ window_frame
+ ))),
+ };
+ let end = match window_frame.end_bound {
+ WindowFrameBound::Preceding(Some(n)) => {
+ calculate_index_of_row::<false, true>(range_columns, idx,
n)
+ }
+ WindowFrameBound::Following(Some(n)) => {
+ calculate_index_of_row::<false, false>(range_columns, idx,
n)
+ }
+ WindowFrameBound::CurrentRow => {
+ calculate_index_of_row::<false, false>(range_columns, idx,
0)
+ }
+ // UNBOUNDED FOLLOWING
+ WindowFrameBound::Following(None) => Ok(len),
+ _ => Err(DataFusionError::Internal(format!(
Review Comment:
As per your suggestion, we have explicitly written all variants in the
match. The variants returning error indicates the invalid window frames. These
variants should be rejected in the Logical plan phase (they are indeed
rejected). Hence those variants return DataFusionError::Internal. Thanks for
the suggestion, listing the variants explicitly clarified the error cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]