mustafasrepo commented on code in PR #4777:
URL: https://github.com/apache/arrow-datafusion/pull/4777#discussion_r1061236413


##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -0,0 +1,705 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream and channel implementations for window function expressions.
+//! The executor given here uses bounded memory (does not maintain all
+//! the input data seen so far), which makes it appropriate when processing
+//! infinite inputs.
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
+use crate::physical_plan::{
+    ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+};
+use arrow::array::Array;
+use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn};
+use arrow::{
+    array::ArrayRef,
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use datafusion_common::{DataFusionError, ScalarValue};
+use futures::stream::Stream;
+use futures::{ready, StreamExt};
+use std::any::Any;
+use std::cmp::min;
+use std::collections::HashMap;
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::physical_plan::common::merge_batches;
+use datafusion_physical_expr::window::{
+    PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
+    WindowAggState, WindowState,
+};
+use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use indexmap::IndexMap;
+use log::debug;
+
+/// Window execution plan
+#[derive(Debug)]
+pub struct BoundedWindowAggExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Window function expression
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    /// Schema after the window is run
+    schema: SchemaRef,
+    /// Schema before the window
+    input_schema: SchemaRef,
+    /// Partition Keys
+    pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+    /// Sort Keys
+    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl BoundedWindowAggExec {
+    /// Create a new execution plan for window aggregates
+    pub fn try_new(
+        window_expr: Vec<Arc<dyn WindowExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+        input_schema: SchemaRef,
+        partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+        sort_keys: Option<Vec<PhysicalSortExpr>>,
+    ) -> Result<Self> {
+        let schema = create_schema(&input_schema, &window_expr)?;
+        let schema = Arc::new(schema);
+        Ok(Self {
+            input,
+            window_expr,
+            schema,
+            input_schema,
+            partition_keys,
+            sort_keys,
+            metrics: ExecutionPlanMetricsSet::new(),
+        })
+    }
+
+    /// Window expressions
+    pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
+        &self.window_expr
+    }
+
+    /// Input plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Get the input schema before any window functions are applied
+    pub fn input_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+
+    /// Return the output sort order of partition keys: For example
+    /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
+    // We are sure that partition by columns are always at the beginning of 
sort_keys
+    // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` 
columns can be used safely
+    // to calculate partition separation points
+    pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
+        let mut result = vec![];
+        // All window exprs have the same partition by, so we just use the 
first one:
+        let partition_by = self.window_expr()[0].partition_by();
+        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        for item in partition_by {
+            if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
+                result.push(a.clone());
+            } else {
+                return Err(DataFusionError::Execution(
+                    "Partition key not found in sort keys".to_string(),
+                ));
+            }
+        }
+        Ok(result)
+    }
+}
+
+impl ExecutionPlan for BoundedWindowAggExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // As we can have repartitioning using the partition keys, this can
+        // be either one or more than one, depending on the presence of
+        // repartitioning.
+        self.input.output_partitioning()
+    }
+
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input().output_ordering()
+    }
+
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            debug!("No partition defined for BoundedWindowAggExec!!!");
+            vec![Distribution::SinglePartition]
+        } else {
+            //TODO support PartitionCollections if there is no common 
partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]
+        }
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input().equivalence_properties()
+    }
+
+    fn maintains_input_order(&self) -> bool {
+        true
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(BoundedWindowAggExec::try_new(
+            self.window_expr.clone(),
+            children[0].clone(),
+            self.input_schema.clone(),
+            self.partition_keys.clone(),
+            self.sort_keys.clone(),
+        )?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input = self.input.execute(partition, context)?;
+        let stream = Box::pin(SortedPartitionByBoundedWindowStream::new(
+            self.schema.clone(),
+            self.window_expr.clone(),
+            input,
+            BaselineMetrics::new(&self.metrics, partition),
+            self.partition_by_sort_keys()?,
+        ));
+        Ok(stream)
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "BoundedWindowAggExec: ")?;
+                let g: Vec<String> = self
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        format!(
+                            "{}: {:?}, frame: {:?}",
+                            e.name().to_owned(),
+                            e.field(),
+                            e.get_window_frame()
+                        )
+                    })
+                    .collect();
+                write!(f, "wdw=[{}]", g.join(", "))?;
+            }
+        }
+        Ok(())
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Statistics {
+        let input_stat = self.input.statistics();
+        let win_cols = self.window_expr.len();
+        let input_cols = self.input_schema.fields().len();
+        // TODO stats: some windowing function will maintain invariants such 
as min, max...
+        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
+        if let Some(input_col_stats) = input_stat.column_statistics {
+            column_statistics.extend(input_col_stats);
+        } else {
+            column_statistics.extend(vec![ColumnStatistics::default(); 
input_cols]);
+        }
+        column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
+        Statistics {
+            is_exact: input_stat.is_exact,
+            num_rows: input_stat.num_rows,
+            column_statistics: Some(column_statistics),
+            total_byte_size: None,
+        }
+    }
+}
+
+fn create_schema(
+    input_schema: &Schema,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Schema> {
+    let mut fields = Vec::with_capacity(input_schema.fields().len() + 
window_expr.len());
+    fields.extend_from_slice(input_schema.fields());
+    // append results to the schema
+    for expr in window_expr {
+        fields.push(expr.field()?);
+    }
+    Ok(Schema::new(fields))
+}
+
+/// This trait defines the interface for updating the state and calculating
+/// results for window functions. Depending on the partitioning scheme, one
+/// may have different implementations for the functions within.

Review Comment:
   Yes, it is correct. Sorting according to `PARTITION BY` column is not a 
strict requirement to calculate window results. Hence, in the future there may 
be a variant where table is not sorted according to `PARTITION BY` columns. By 
this way even if expressions used for `PARTITON BY` are not sorted. We could 
produce window results without blocking pipeline (given `ORDER BY` expressions 
are sorted)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to