Jimexist commented on a change in pull request #520: URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651358846
########## File path: datafusion/src/physical_plan/windows.rs ########## @@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr { self.window.expressions() } - fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> { - self.window.create_accumulator() + fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] { + &self.partition_by + } + + fn order_by(&self) -> &[PhysicalSortExpr] { + &self.order_by + } + + fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { + // FIXME, for now we assume all the rows belong to the same partition, which will not be the + // case when partition_by is supported, in which case we'll parallelize the calls. + // See https://github.com/apache/arrow-datafusion/issues/299 + let values = self.evaluate_args(batch)?; + self.window.evaluate(batch.num_rows(), &values) } } /// A window expr that takes the form of an aggregate function #[derive(Debug)] pub struct AggregateWindowExpr { aggregate: Arc<dyn AggregateExpr>, + partition_by: Vec<Arc<dyn PhysicalExpr>>, + order_by: Vec<PhysicalSortExpr>, + window_frame: Option<WindowFrame>, } -#[derive(Debug)] -struct AggregateWindowAccumulator { - accumulator: Box<dyn Accumulator>, -} +impl AggregateWindowExpr { + /// the aggregate window function operates based on window frame, and by default the mode is + /// "range". + fn evaluation_mode(&self) -> WindowFrameUnits { + self.window_frame.unwrap_or_default().units + } -impl WindowAccumulator for AggregateWindowAccumulator { - fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> { - self.accumulator.update(values)?; - Ok(None) + /// create a new accumulator based on the underlying aggregation function + fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> { + let accumulator = self.aggregate.create_accumulator()?; + Ok(AggregateWindowAccumulator { accumulator }) } - /// returns its value based on its current state. - fn evaluate(&self) -> Result<Option<ScalarValue>> { - Ok(Some(self.accumulator.evaluate()?)) + /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns + /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same + /// results for peers) and concatenate the results. + fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { Review comment: i will possibly change this naming in implementing #361 but for the moment, `range` and `groups` both evaluates with peers but `rows` evaluates based on rows on each `scan`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org