Jimexist commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651358846



##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr {
         self.window.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        self.window.create_accumulator()
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, 
which will not be the
+        // case when partition_by is supported, in which case we'll 
parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        let values = self.evaluate_args(batch)?;
+        self.window.evaluate(batch.num_rows(), &values)
     }
 }
 
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
     aggregate: Arc<dyn AggregateExpr>,
+    partition_by: Vec<Arc<dyn PhysicalExpr>>,
+    order_by: Vec<PhysicalSortExpr>,
+    window_frame: Option<WindowFrame>,
 }
 
-#[derive(Debug)]
-struct AggregateWindowAccumulator {
-    accumulator: Box<dyn Accumulator>,
-}
+impl AggregateWindowExpr {
+    /// the aggregate window function operates based on window frame, and by 
default the mode is
+    /// "range".
+    fn evaluation_mode(&self) -> WindowFrameUnits {
+        self.window_frame.unwrap_or_default().units
+    }
 
-impl WindowAccumulator for AggregateWindowAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.accumulator.update(values)?;
-        Ok(None)
+    /// create a new accumulator based on the underlying aggregation function
+    fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> {
+        let accumulator = self.aggregate.create_accumulator()?;
+        Ok(AggregateWindowAccumulator { accumulator })
     }
 
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.accumulator.evaluate()?))
+    /// peer based evaluation based on the fact that batch is pre-sorted given 
the sort columns
+    /// and then per partition point we'll evaluate the peer group (e.g. SUM 
or MAX gives the same
+    /// results for peers) and concatenate the results.
+    fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {

Review comment:
       i will possibly change this naming in implementing #361 but for the 
moment, `range` and `groups` both evaluates with peers but `rows` evaluates 
based on rows on each `scan`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to