alamb commented on code in PR #4777:
URL: https://github.com/apache/arrow-datafusion/pull/4777#discussion_r1060640095


##########
datafusion/core/Cargo.toml:
##########
@@ -75,6 +75,7 @@ flate2 = { version = "1.0.24", optional = true }
 futures = "0.3"
 glob = "0.3.0"
 hashbrown = { version = "0.13", features = ["raw"] }
+indexmap = "1.9.2"

Review Comment:
   I would like to point out that indexmap is already a dependency of several 
other datafusion dependencies (such as `arrow-json`, `clap`, etc. Therefore 
this is not a new dependency (though it is a new explicit dependency)



##########
datafusion/core/src/physical_optimizer/pipeline_checker.rs:
##########
@@ -301,7 +301,7 @@ mod sql_tests {
         let case = QueryCase {
             sql: "SELECT
                     c9,
-                    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 
1 PRECEDING AND 5 FOLLOWING) as sum1
+                    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 
1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1

Review Comment:
   why were these tests changed (rather than new tests added)?



##########
datafusion/core/src/execution/context.rs:
##########
@@ -1480,14 +1480,14 @@ impl SessionState {
         // Since the transformations it applies may alter output partitioning 
properties of operators
         // (e.g. by swapping hash join sides), this rule runs before 
BasicEnforcement.
         physical_optimizers.push(Arc::new(PipelineFixer::new()));
-        // It's for adding essential repartition and local sorting operator to 
satisfy the
-        // required distribution and local sort.
+        // BasicEnforcement is for adding essential repartition and local 
sorting operators
+        // to satisfy the required distribution and local sort requirements.
         // Please make sure that the whole plan tree is determined.
         physical_optimizers.push(Arc::new(BasicEnforcement::new()));
-        // `BasicEnforcement` stage conservatively inserts `SortExec`s to 
satisfy ordering requirements.
-        // However, a deeper analysis may sometimes reveal that such a 
`SortExec` is actually unnecessary.
-        // These cases typically arise when we have reversible 
`WindowAggExec`s or deep subqueries. The
-        // rule below performs this analysis and removes unnecessary 
`SortExec`s.
+        // The BasicEnforcement stage conservatively inserts sorts to satisfy 
ordering requirements.

Review Comment:
   👍 



##########
datafusion/physical-expr/src/window/window_expr.rs:
##########
@@ -61,6 +65,17 @@ pub trait WindowExpr: Send + Sync + Debug {
     /// evaluate the window function values against the batch
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
 
+    /// evaluate the window function values against the batch
+    fn evaluate_stateful(
+        &self,
+        _partition_batches: &PartitionBatches,
+        _window_agg_state: &mut PartitionWindowAggStates,
+    ) -> Result<()> {
+        Err(DataFusionError::Internal(
+            "evaluate_stateful is not implemented".to_string(),
+        ))

Review Comment:
   ```suggestion
           Err(DataFusionError::Internal(
               format!("evaluate_stateful is not implemented for {}", 
self.name())
           ))
   ```
   
   Might make errors easier to track down during implementation



##########
datafusion/core/tests/window_fuzz.rs:
##########
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array};
+use arrow::compute::{concat_batches, SortOptions};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use hashbrown::HashMap;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::runtime::Builder;
+
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::windows::{
+    create_window_expr, BoundedWindowAggExec, WindowAggExec,
+};
+use datafusion_expr::{
+    AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
+    WindowFrameUnits, WindowFunction,
+};
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::ScalarValue;
+use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_physical_expr::PhysicalSortExpr;
+use test_utils::add_empty_batches;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]

Review Comment:
   I think you can run multiple threads in tokio tests with an annotation 
rather than building the runtime yourself:
   
   
   ```suggestion
       #[tokio::test(threaded_scheduler)]
   ```



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner {
                         })
                         .collect::<Result<Vec<_>>>()?;
 
-                    Ok(Arc::new(WindowAggExec::try_new(
-                        window_expr,
-                        input_exec,
-                        physical_input_schema,
-                        physical_partition_keys,
-                        physical_sort_keys,
-                    )?))
+                    let uses_bounded_memory = window_expr

Review Comment:
   I wonder if it would be cleaner to leave the conversion to 
`BoundedWindowAggExec` in 
`datafusion/core/src/physical_optimizer/optimize_sorts.rs` rather than also 
doing it here in the physical planner.
   
   That would both keep the physical plan simpler as well as ensure all the 
cases you care about are covered in 
datafusion/core/src/physical_optimizer/optimize_sorts.rs



##########
datafusion/physical-expr/src/window/window_expr.rs:
##########
@@ -132,3 +151,129 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) 
-> Vec<PhysicalSortExpr
         })
         .collect()
 }
+
+pub enum WindowFn {
+    Builtin(Box<dyn PartitionEvaluator>),
+    Aggregate(Box<dyn Accumulator>),
+}
+
+impl fmt::Debug for WindowFn {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFn::Builtin(builtin, ..) => {
+                write!(f, "partition evaluator: {:?}", builtin)
+            }
+            WindowFn::Aggregate(aggregate, ..) => {
+                write!(f, "accumulator: {:?}", aggregate)
+            }
+        }
+    }
+}
+
+/// State for RANK(percent_rank, rank, dense_rank)
+/// builtin window function
+#[derive(Debug, Clone, Default)]
+pub struct RankState {
+    /// The last values for rank as these values change, we increase n_rank
+    pub last_rank_data: Vec<ScalarValue>,
+    /// The index where last_rank_boundary is started
+    pub last_rank_boundary: usize,
+    /// Rank number kept from the start
+    pub n_rank: usize,
+}
+
+/// State for 'ROW_NUMBER' builtin window function
+#[derive(Debug, Clone, Default)]
+pub struct NumRowsState {
+    pub n_rows: usize,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct NthValueState {
+    pub range: Range<usize>,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct LeadLagState {
+    pub idx: usize,
+}
+
+#[derive(Debug, Clone, Default)]
+pub enum BuiltinWindowState {
+    Rank(RankState),
+    NumRows(NumRowsState),
+    NthValue(NthValueState),
+    LeadLag(LeadLagState),
+    #[default]
+    Default,
+}
+#[derive(Debug)]
+pub enum WindowFunctionState {
+    /// Different Aggregate functions may have different state definitions
+    /// In [Accumulator] trait, [fn state(&self) -> Result<Vec<ScalarValue>>] 
implementation
+    /// dictates that.
+    AggregateState(Vec<ScalarValue>),
+    /// BuiltinWindowState
+    BuiltinWindowState(BuiltinWindowState),
+}
+
+#[derive(Debug)]
+pub struct WindowAggState {
+    /// The range that we calculate the window function
+    pub window_frame_range: Range<usize>,
+    /// The index of the last row that its result is calculated inside the 
partition record batch buffer.
+    pub last_calculated_index: usize,
+    /// The offset of the deleted row number
+    pub offset_pruned_rows: usize,
+    ///

Review Comment:
   seems like a comment was missed here



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -0,0 +1,705 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream and channel implementations for window function expressions.
+//! The executor given here uses bounded memory (does not maintain all
+//! the input data seen so far), which makes it appropriate when processing
+//! infinite inputs.
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
+use crate::physical_plan::{
+    ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+};
+use arrow::array::Array;
+use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn};
+use arrow::{
+    array::ArrayRef,
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use datafusion_common::{DataFusionError, ScalarValue};
+use futures::stream::Stream;
+use futures::{ready, StreamExt};
+use std::any::Any;
+use std::cmp::min;
+use std::collections::HashMap;
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::physical_plan::common::merge_batches;
+use datafusion_physical_expr::window::{
+    PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
+    WindowAggState, WindowState,
+};
+use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use indexmap::IndexMap;
+use log::debug;
+
+/// Window execution plan
+#[derive(Debug)]
+pub struct BoundedWindowAggExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Window function expression
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    /// Schema after the window is run
+    schema: SchemaRef,
+    /// Schema before the window
+    input_schema: SchemaRef,
+    /// Partition Keys
+    pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+    /// Sort Keys
+    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl BoundedWindowAggExec {
+    /// Create a new execution plan for window aggregates
+    pub fn try_new(
+        window_expr: Vec<Arc<dyn WindowExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+        input_schema: SchemaRef,
+        partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+        sort_keys: Option<Vec<PhysicalSortExpr>>,
+    ) -> Result<Self> {
+        let schema = create_schema(&input_schema, &window_expr)?;
+        let schema = Arc::new(schema);
+        Ok(Self {
+            input,
+            window_expr,
+            schema,
+            input_schema,
+            partition_keys,
+            sort_keys,
+            metrics: ExecutionPlanMetricsSet::new(),
+        })
+    }
+
+    /// Window expressions
+    pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
+        &self.window_expr
+    }
+
+    /// Input plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Get the input schema before any window functions are applied
+    pub fn input_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+
+    /// Return the output sort order of partition keys: For example
+    /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
+    // We are sure that partition by columns are always at the beginning of 
sort_keys
+    // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` 
columns can be used safely
+    // to calculate partition separation points
+    pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
+        let mut result = vec![];
+        // All window exprs have the same partition by, so we just use the 
first one:
+        let partition_by = self.window_expr()[0].partition_by();
+        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        for item in partition_by {
+            if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
+                result.push(a.clone());
+            } else {
+                return Err(DataFusionError::Execution(

Review Comment:
   ```suggestion
                   return Err(DataFusionError::Internal(
   ```



##########
datafusion/physical-expr/src/window/rank.rs:
##########
@@ -98,18 +99,77 @@ impl BuiltInWindowFunctionExpr for Rank {
         &self.name
     }
 
+    fn supports_bounded_execution(&self) -> bool {
+        matches!(self.rank_type, RankType::Basic | RankType::Dense)
+    }
+
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(RankEvaluator {
+            state: RankState::default(),
             rank_type: self.rank_type,
         }))
     }
 }
 
+#[derive(Debug)]
 pub(crate) struct RankEvaluator {
+    state: RankState,
     rank_type: RankType,
 }
 
 impl PartitionEvaluator for RankEvaluator {
+    fn get_range(&self, state: &WindowAggState, _n_rows: usize) -> 
Result<Range<usize>> {
+        Ok(Range {
+            start: state.last_calculated_index,
+            end: state.last_calculated_index + 1,
+        })
+    }
+
+    fn state(&self) -> Result<BuiltinWindowState> {
+        Ok(BuiltinWindowState::Rank(self.state.clone()))
+    }
+
+    fn update_state(

Review Comment:
   While reading through these implementaitons I can't help but wonder if there 
is some way to avoid implementing each PartitionEvaluator multiple times 
(`evaluate_stateful` seems like a more generic version of `evaluate` where the 
state is explicitly tracked rather than in the local variables).
   
   I can't remember why we also needed `evaluate_with_rank`
   
   Maybe the window functions can eventually look like the aggregators where 
they all have some state and there are functions to update that state / merge 
that state. 🤔 
   
   It appears you somewhat did this in 
`datafusion/physical-expr/src/window/sliding_aggregate.rs` where the 
accumulator is used in both `evaluate` and `evaluate_stateful`



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -88,6 +88,12 @@ pub trait AggregateExpr: Send + Sync + Debug {
         false
     }
 
+    /// Specifies whether this aggregate function can run using bounded memory.

Review Comment:
   👍 



##########
datafusion/physical-expr/src/window/partition_evaluator.rs:
##########
@@ -17,26 +17,54 @@
 
 //! partition evaluation module
 
+use crate::window::window_expr::BuiltinWindowState;
+use crate::window::WindowAggState;
 use arrow::array::ArrayRef;
 use datafusion_common::Result;
 use datafusion_common::{DataFusionError, ScalarValue};
+use std::fmt::Debug;
 use std::ops::Range;
 
 /// Partition evaluator
-pub trait PartitionEvaluator {
+pub trait PartitionEvaluator: Debug + Send + Sync {

Review Comment:
   It probably only needs to be `Send` (rather than `Send` + `Sync`)



##########
datafusion/core/src/physical_plan/common.rs:
##########
@@ -95,6 +96,47 @@ pub async fn collect(stream: SendableRecordBatchStream) -> 
Result<Vec<RecordBatc
         .map_err(DataFusionError::from)
 }
 
+/// Merge two record batch references into a single record batch.
+/// All the record batches inside the slice must have the same schema.
+pub fn merge_batches(

Review Comment:
   Unless I am missing something, I think you can use `concat_batches` here 
instead of these two new functions:
   
   https://docs.rs/arrow/30.0.0/arrow/compute/fn.concat_batches.html



##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -162,4 +162,12 @@ impl WindowExpr for AggregateWindowExpr {
             }
         })
     }
+
+    fn uses_bounded_memory(&self) -> bool {
+        // NOTE: Currently, groups queries do not support the bounded memory 
variant.

Review Comment:
   In theory I think groups queries can not be bounded, given that degenerate 
cases like all rows having the same grouping key (in other words, I don't think 
we could ever remove the "currently" here)



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2351,3 +2353,251 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
 
     Ok(())
 }
+
+fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
+    let ts_field = Field::new("ts", DataType::Int32, false);
+    let inc_field = Field::new("inc_col", DataType::Int32, false);
+    let desc_field = Field::new("desc_col", DataType::Int32, false);
+
+    let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));
+
+    let batch = RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(Int32Array::from_slice([
+                1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 
47, 51, 53,
+                53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 
97, 98, 100,
+                101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 
126, 131,
+                131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 
161, 163,
+                164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 
203, 207,
+                210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 
244, 245,
+                247, 250, 254, 258, 262, 264, 264,
+            ])),
+            Arc::new(Int32Array::from_slice([
+                1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 
51, 53, 58,
+                61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 
111, 115, 119,
+                120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 
151, 155,
+                156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 
192, 196,
+                197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 
226, 231,
+                236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 
272, 275,
+                278, 283, 286, 289, 291, 296, 301, 305,
+            ])),
+            Arc::new(Int32Array::from_slice([
+                100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 
55, 50, 45,
+                41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, 
-4, -5, -6,
+                -8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, 
-48, -49, -53,
+                -57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, 
-87, -91,
+                -95, -98, -101, -105, -106, -111, -114, -116, -120, -125, 
-128, -129,
+                -134, -139, -142, -143, -146, -150, -154, -158, -163, -168, 
-172, -176,
+                -181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
+            ])),
+        ],
+    )?;
+    let n_chunk = batch.num_rows() / n_file;
+    for i in 0..n_file {
+        let target_file = tmpdir.path().join(format!("{}.parquet", i));
+        let file = File::create(target_file).unwrap();
+        // Default writer properties
+        let props = WriterProperties::builder().build();
+        let chunks_start = i * n_chunk;
+        let cur_batch = batch.slice(chunks_start, n_chunk);
+        // let chunks_end = chunks_start + n_chunk;
+        let mut writer =
+            ArrowWriter::try_new(file, cur_batch.schema(), 
Some(props)).unwrap();
+
+        writer.write(&cur_batch).expect("Writing batch");
+
+        // writer must be closed to write footer
+        writer.close().unwrap();
+    }
+    Ok(())
+}
+
+async fn get_test_context(tmpdir: &TempDir) -> Result<SessionContext> {
+    let session_config = SessionConfig::new().with_target_partitions(1);
+    let ctx = SessionContext::with_config(session_config);
+
+    let parquet_read_options = ParquetReadOptions::default();
+    // The sort order is specified (not actually correct in this case)
+    let file_sort_order = [col("ts")]
+        .into_iter()
+        .map(|e| {
+            let ascending = true;
+            let nulls_first = false;
+            e.sort(ascending, nulls_first)
+        })
+        .collect::<Vec<_>>();
+
+    let options_sort = parquet_read_options
+        .to_listing_options(&ctx.copied_config())
+        .with_file_sort_order(Some(file_sort_order));
+
+    write_test_data_to_parquet(tmpdir, 1)?;
+    let provided_schema = None;
+    let sql_definition = None;
+    ctx.register_listing_table(
+        "annotated_data",
+        tmpdir.path().to_string_lossy(),
+        options_sort.clone(),
+        provided_schema,
+        sql_definition,
+    )
+    .await
+    .unwrap();
+    Ok(ctx)
+}
+
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_source_sorted_aggregate() -> Result<()> {
+        let tmpdir = TempDir::new().unwrap();
+        let ctx = get_test_context(&tmpdir).await?;
+
+        let sql = "SELECT
+            SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING) as sum1,
+            SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING) as sum2,
+            SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING) as sum3,
+            MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING) as min1,
+            MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING) as min2,
+            MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING) as min3,
+            MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING) as max1,
+            MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING) as max2,
+            MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING) as max3,
+            COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 
FOLLOWING) as cnt1,
+            COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING) as cnt2,
+            SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING) as sumr1,
+            SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 
8 FOLLOWING) as sumr2,
+            SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sumr3,
+            MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 
1 FOLLOWING) as minr1,
+            MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 
1 FOLLOWING) as minr2,
+            MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING) as minr3,
+            MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 
1 FOLLOWING) as maxr1,
+            MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 
1 FOLLOWING) as maxr2,
+            MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING) as maxr3,
+            COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 
FOLLOWING) as cntr1,
+            COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING) as cntr2,
+            SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as 
sum4,
+            COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
+            FROM annotated_data
+            ORDER BY inc_col DESC
+            LIMIT 5
+            ";
+
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let dataframe = ctx.sql(sql).await.expect(&msg);
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = 
displayable(physical_plan.as_ref()).indent().to_string();
+        let expected = {
+            vec![
+                "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 
as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 
as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, 
sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, 
minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, 
cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
+                "  GlobalLimitExec: skip=0, fetch=5",
+                "    SortExec: [inc_col@24 DESC]",
+                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER 
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@14 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts 
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as sum2, 
SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as sum3, MIN(annotated_data.inc_col) 
ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@17 as min1, MIN(annotated_data.desc_col) ORDER BY [annotated_data.ts 
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@18 as min2, 
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@19 as min3, MAX(annotated_data.inc_col) 
ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@20 as max1, MAX(annotated_data.desc_col) ORDER BY [annotated_data.ts 
ASC NULL
 S LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@21 as max2, 
MAX(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as max3, COUNT(UInt8(1)) ORDER BY 
[annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING@23 
as cnt1, COUNT(UInt8(1)) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING@24 as cnt2, SUM(annotated_data.inc_col) 
ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING@3 as sumr1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as sumr2, 
SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sumr3, MIN(annotated_data.inc_col) 
ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@6 as minr1, MIN(annotated_data.desc_col) ORDER BY [annotated_data.ts 
DESC NULLS 
 FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@7 as minr2, 
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as minr3, MAX(annotated_data.inc_col) 
ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@9 as maxr1, MAX(annotated_data.desc_col) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@10 as maxr2, 
MAX(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@11 as maxr3, COUNT(UInt8(1)) ORDER BY 
[annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 
FOLLOWING@12 as cntr1, COUNT(UInt8(1)) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@13 as cntr2, 
SUM(annotated_data.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@25 as 
sum4, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING@26 as cnt3, 
inc_col@1 as inc_col]",
+                "        BoundedWindowAggExec: 
wdw=[SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: 
Following(UInt64(1)) }]",
+                "          BoundedWindowAggExec: 
wdw=[SUM(annotated_data.inc_col): Ok(Field { name: 
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, 
SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, 
SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), 
end_bound: Following(UInt64(10)) }, MIN(annotated_data.inc_col): Ok(Field { 
name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_i
 s_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, 
MIN(annotated_data.desc_col): Ok(Field { name: 
\"MIN(annotated_data.desc_col)\", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, 
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), 
end_bound: Following(UInt64(10)) }, MAX(annotated_data.inc_col): Ok(Field { 
name: \"MAX(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) 
}, MAX(annotated_data.desc_col): Ok(Field { name:
  \"MAX(annotated_data.desc_col)\", data_type: Int32, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(5)), end_bound: Following(Int32(1)) }, 
MAX(annotated_data.inc_col): Ok(Field { name: \"MAX(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), 
end_bound: Following(UInt64(10)) }, COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(4)), end_bound: Following(Int32(8)) }, 
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: 
Following(UInt64(1)) }]",
+                "            BoundedWindowAggExec: 
wdw=[SUM(annotated_data.inc_col): Ok(Field { name: 
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, 
SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(8)), end_bound: Following(Int32(1)) }, 
SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, 
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict
 _is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, 
MIN(annotated_data.desc_col): Ok(Field { name: 
\"MIN(annotated_data.desc_col)\", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) }, 
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), 
end_bound: Following(UInt64(1)) }, MAX(annotated_data.inc_col): Ok(Field { 
name: \"MAX(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) 
}, MAX(annotated_data.desc_col): Ok(Field { nam
 e: \"MAX(annotated_data.desc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(5)) 
}, MAX(annotated_data.inc_col): Ok(Field { name: 
\"MAX(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(2)), end_bound: 
Following(Int32(6)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), 
end_bound: Following(UInt64(8)) }]",
+            ]
+        };
+
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        let actual_len = actual.len();
+        let actual_trim_last = &actual[..actual_len - 1];
+        assert_eq!(
+            expected, actual_trim_last,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            
"+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
+            "| sum1 | sum2 | sum3 | min1 | min2 | min3 | max1 | max2 | max3 | 
cnt1 | cnt2 | sumr1 | sumr2 | sumr3 | minr1 | minr2 | minr3 | maxr1 | maxr2 | 
maxr3 | cntr1 | cntr2 | sum4  | cnt3 |",
+            
"+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
+            "| 1482 | -631 | 606  | 289  | -213 | 301  | 305  | -208 | 305  | 
3    | 9    | 902   | -834  | -1231 | 301   | -213  | 269   | 305   | -210  | 
305   | 3     | 2     | -1797 | 9    |",
+            "| 1482 | -631 | 902  | 289  | -213 | 296  | 305  | -208 | 305  | 
3    | 10   | 902   | -834  | -1424 | 301   | -213  | 266   | 305   | -210  | 
305   | 3     | 3     | -1978 | 10   |",
+            "| 876  | -411 | 1193 | 289  | -208 | 291  | 296  | -203 | 305  | 
4    | 10   | 587   | -612  | -1400 | 296   | -213  | 261   | 305   | -208  | 
301   | 3     | 4     | -1941 | 10   |",
+            "| 866  | -404 | 1482 | 286  | -203 | 289  | 291  | -201 | 305  | 
5    | 10   | 580   | -600  | -1374 | 291   | -208  | 259   | 305   | -203  | 
296   | 4     | 5     | -1903 | 10   |",
+            "| 1411 | -397 | 1768 | 275  | -201 | 286  | 289  | -196 | 305  | 
4    | 10   | 575   | -590  | -1347 | 289   | -203  | 254   | 305   | -201  | 
291   | 2     | 6     | -1863 | 10   |",
+            
"+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_source_sorted_builtin() -> Result<()> {
+        let tmpdir = TempDir::new().unwrap();
+        let ctx = get_test_context(&tmpdir).await?;
+
+        let sql = "SELECT
+            FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as fv1,
+            FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as fv2,
+            LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as lv1,
+            LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 
1 FOLLOWING) as lv2,
+            NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as nv1,
+            NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as nv2,
+            ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 
FOLLOWING) AS rn1,
+            ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as rn2,
+            RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 
FOLLOWING) AS rank1,
+            RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) 
as rank2,
+            DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 
FOLLOWING) AS dense_rank1,
+            DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as dense_rank2,
+            LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING 
and 10 FOLLOWING) AS lag1,
+            LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as lag2,
+            LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING 
and 10 FOLLOWING) AS lead1,
+            LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING 
and 1 FOLLOWING) as lead2,
+            FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 
PRECEDING and 1 FOLLOWING) as fvr1,
+            FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 
PRECEDING and 1 FOLLOWING) as fvr2,
+            LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 
PRECEDING and 1 FOLLOWING) as lvr1,
+            LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 
PRECEDING and 1 FOLLOWING) as lvr2,
+            LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 
PRECEDING and 10 FOLLOWING) AS lagr1,
+            LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 
PRECEDING and 1 FOLLOWING) as lagr2,
+            LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 
PRECEDING and 10 FOLLOWING) AS leadr1,
+            LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 
PRECEDING and 1 FOLLOWING) as leadr2
+            FROM annotated_data
+            ORDER BY ts DESC
+            LIMIT 5
+            ";
+
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let dataframe = ctx.sql(sql).await.expect(&msg);
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = 
displayable(physical_plan.as_ref()).indent().to_string();
+        let expected = {
+            vec![
+                "ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as 
lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, 
rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, 
dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as 
lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, 
lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, 
leadr2@23 as leadr2]",
+                "  GlobalLimitExec: skip=0, fetch=5",
+                "    SortExec: [ts@24 DESC]",
+                "      ProjectionExec: 
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv2, 
NTH_VALUE(annotated_data.inc_col,Int64(5)) ORDER BY [annotated_data.ts ASC 
NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as nv1, 
NTH_VALUE(annotated_data.inc_col,Int64(5)) ORDER BY [annotated_data.ts ASC 
NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as nv2, ROW_NUMBER() 
ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING@16 as rn1, ROW_NUMBER() ORDER BY [annot
 ated_data.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as 
rn2, RANK() ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 1 
PRECEDING AND 10 FOLLOWING@18 as rank1, RANK() ORDER BY [annotated_data.ts ASC 
NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as rank2, DENSE_RANK() 
ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING@20 as dense_rank1, DENSE_RANK() ORDER BY [annotated_data.ts ASC NULLS 
LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as dense_rank2, 
LAG(annotated_data.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data.ts 
ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as lag1, 
LAG(annotated_data.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data.ts 
ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@23 as lag2, 
LEAD(annotated_data.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data.ts 
ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@24 as lead1, 
LEAD(annotated_data
 .inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data.ts ASC NULLS LAST] 
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@25 as lead2, 
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@2 as fvr1, 
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@3 as fvr2, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lvr1, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lvr2, 
LAG(annotated_data.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@6 as lagr1, 
LAG(annotated_data.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as lagr2, 
LEAD(annotated_data.inc_col,
 Int64(-1),Int64(1001)) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as leadr1, 
LEAD(annotated_data.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as leadr2, ts@0 
as ts]",
+                "        BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) 
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(10)), end_bound: 
Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) 
}, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_col
 )\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, 
NTH_VALUE(annotated_data.inc_col,Int64(5)): Ok(Field { name: 
\"NTH_VALUE(annotated_data.inc_col,Int64(5))\", data_type: Int32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) 
}, NTH_VALUE(annotated_data.inc_col,Int64(5)): Ok(Field { name: 
\"NTH_VALUE(annotated_data.inc_col,Int64(5))\", data_type: Int32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(10)), end_bound: 
Following(UInt64(1)) }, ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int
 32(1)), end_bound: Following(Int32(10)) }, ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, RANK(): 
Ok(Field { name: \"RANK()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, RANK(): 
Ok(Field { name: \"RANK()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, 
DENSE_RANK(): Ok(Field { name: \"DENSE_RANK()\", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(Int32(1)), end_bound: 
Following(Int32(10)) }, DENSE_RA
 NK(): Ok(Field { name: \"DENSE_RANK()\", data_type: UInt64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(10)), end_bound: 
Following(UInt64(1)) }, LAG(annotated_data.inc_col,Int64(1),Int64(1001)): 
Ok(Field { name: \"LAG(annotated_data.inc_col,Int64(1),Int64(1001))\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), 
end_bound: Following(Int32(10)) }, 
LAG(annotated_data.inc_col,Int64(2),Int64(1002)): Ok(Field { name: 
\"LAG(annotated_data.inc_col,Int64(2),Int64(1002))\", data_type: Int32, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: 
Following(UInt64(1)) }, LEAD(annotated_data.inc_col,Int64(-1),Int64(1001)): 
Ok(Field { name: \"LEAD(annotated_data.inc_col,Int64(-1),Int64(1001))\", 
data_type: I
 nt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: 
Following(Int32(10)) }, LEAD(annotated_data.inc_col,Int64(4),Int64(1004)): 
Ok(Field { name: \"LEAD(annotated_data.inc_col,Int64(4),Int64(1004))\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), 
end_bound: Following(UInt64(1)) }]",
+                "          BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) 
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(10)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) 
}, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_c
 ol)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, 
LAG(annotated_data.inc_col,Int64(1),Int64(1001)): Ok(Field { name: 
\"LAG(annotated_data.inc_col,Int64(1),Int64(1001))\", data_type: Int32, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: 
Following(Int32(1)) }, LAG(annotated_data.inc_col,Int64(2),Int64(1002)): 
Ok(Field { name: \"LAG(annotated_data.inc_col,Int64(2),Int64(1002))\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), 
end_bound: Following(UInt64(10)) }, 
LEAD(annotated_data.inc_col,Int64(-1),Int64(1001)): Ok(Field { name: 
\"LEAD(annotated_data.inc_col,Int64(-1),Int64(1001))\", data_type: Int32, 
nullable: true, dict_id: 0, di
 ct_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, 
LEAD(annotated_data.inc_col,Int64(4),Int64(1004)): Ok(Field { name: 
\"LEAD(annotated_data.inc_col,Int64(4),Int64(1004))\", data_type: Int32, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(10)) }]",

Review Comment:
   I am surprised I don't see the ParquetExec in this plan (though the lack of 
sort input to the bounded window agg exec looks good to me) 👍 



##########
datafusion/core/tests/window_fuzz.rs:
##########
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array};
+use arrow::compute::{concat_batches, SortOptions};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use hashbrown::HashMap;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::runtime::Builder;
+
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::windows::{
+    create_window_expr, BoundedWindowAggExec, WindowAggExec,
+};
+use datafusion_expr::{
+    AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
+    WindowFrameUnits, WindowFunction,
+};
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::ScalarValue;
+use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_physical_expr::PhysicalSortExpr;
+use test_utils::add_empty_batches;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn single_order_by_test() {
+        let rt = Builder::new_multi_thread()
+            .worker_threads(8)
+            .build()
+            .unwrap();
+        let n = 100;
+        let handles_low_cardinality = (1..n).map(|i| {
+            rt.spawn(run_window_test(
+                make_staggered_batches::<true, 1>(1000, i),
+                i,
+                vec!["a"],
+                vec![],
+            ))
+        });
+        let handles_high_cardinality = (1..n).map(|i| {
+            rt.spawn(run_window_test(
+                make_staggered_batches::<true, 100>(1000, i),
+                i,
+                vec!["a"],
+                vec![],
+            ))
+        });
+        let handles = handles_low_cardinality
+            .into_iter()
+            .chain(handles_high_cardinality.into_iter())
+            .collect::<Vec<tokio::task::JoinHandle<_>>>();
+        rt.block_on(async {
+            for handle in handles {
+                handle.await.unwrap();
+            }
+        });
+    }
+
+    #[test]
+    fn order_by_with_partition_test() {
+        let rt = Builder::new_multi_thread()
+            .worker_threads(8)
+            .build()
+            .unwrap();
+        let n = 100;
+        // since we have sorted pairs (a,b) to not violate per partition soring
+        // partition should be field a, order by should be field b
+        let handles_low_cardinality = (1..n).map(|i| {
+            rt.spawn(run_window_test(
+                make_staggered_batches::<true, 1>(1000, i),
+                i,
+                vec!["b"],
+                vec!["a"],
+            ))
+        });
+        let handles_high_cardinality = (1..n).map(|i| {
+            rt.spawn(run_window_test(
+                make_staggered_batches::<true, 100>(1000, i),
+                i,
+                vec!["b"],
+                vec!["a"],
+            ))
+        });
+        let handles = handles_low_cardinality
+            .into_iter()
+            .chain(handles_high_cardinality.into_iter())
+            .collect::<Vec<tokio::task::JoinHandle<_>>>();
+        rt.block_on(async {
+            for handle in handles {
+                handle.await.unwrap();
+            }
+        });
+    }
+}
+
+/// Perform batch and running window same input
+/// and verify outputs of `WindowAggExec` and `BoundedWindowAggExec` are equal
+async fn run_window_test(
+    input1: Vec<RecordBatch>,
+    random_seed: u64,
+    orderby_columns: Vec<&str>,
+    partition_by_columns: Vec<&str>,
+) {
+    let mut rng = StdRng::seed_from_u64(random_seed);
+    let schema = input1[0].schema();
+    let mut args = vec![col("x", &schema).unwrap()];
+    let mut window_fn_map = HashMap::new();
+    // HashMap values consists of tuple first element is WindowFunction, 
second is additional argument
+    // window function requires if any. For most of the window functions 
additional argument is empty
+    window_fn_map.insert(
+        "sum",
+        (
+            WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "count",
+        (
+            WindowFunction::AggregateFunction(AggregateFunction::Count),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "min",
+        (
+            WindowFunction::AggregateFunction(AggregateFunction::Min),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "max",
+        (
+            WindowFunction::AggregateFunction(AggregateFunction::Max),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "row_number",
+        (
+            
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "rank",
+        (
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "first_value",
+        (
+            
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "last_value",
+        (
+            
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::LastValue),
+            vec![],
+        ),
+    );
+    window_fn_map.insert(
+        "nth_value",
+        (
+            
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::NthValue),
+            vec![lit(ScalarValue::Int64(Some(rng.gen_range(1..10))))],
+        ),
+    );
+    window_fn_map.insert(
+        "lead",
+        (
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead),
+            vec![
+                lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
+                lit(ScalarValue::Int64(Some(rng.gen_range(1..1000)))),
+            ],
+        ),
+    );
+    window_fn_map.insert(
+        "lag",
+        (
+            WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lag),
+            vec![
+                lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
+                lit(ScalarValue::Int64(Some(rng.gen_range(1..1000)))),
+            ],
+        ),
+    );
+
+    let session_config = SessionConfig::new().with_batch_size(50);
+    let ctx = SessionContext::with_config(session_config);
+    let rand_fn_idx = rng.gen_range(0..window_fn_map.len());
+    let fn_name = window_fn_map.keys().collect::<Vec<_>>()[rand_fn_idx];
+    let (window_fn, new_args) = 
window_fn_map.values().collect::<Vec<_>>()[rand_fn_idx];
+    for new_arg in new_args {
+        args.push(new_arg.clone());
+    }
+    let preceding = rng.gen_range(0..50);
+    let following = rng.gen_range(0..50);
+    let rand_num = rng.gen_range(0..3);
+    let units = if rand_num < 1 {
+        WindowFrameUnits::Range
+    } else if rand_num < 2 {
+        WindowFrameUnits::Rows
+    } else {
+        // For now we do not support GROUPS in BoundedWindowAggExec 
implementation
+        // TODO: once GROUPS handling is available, use 
WindowFrameUnits::GROUPS in randomized tests also.
+        WindowFrameUnits::Range
+    };
+    let window_frame = match units {
+        // In range queries window frame boundaries should match column type
+        WindowFrameUnits::Range => WindowFrame {
+            units,
+            start_bound: 
WindowFrameBound::Preceding(ScalarValue::Int32(Some(preceding))),
+            end_bound: 
WindowFrameBound::Following(ScalarValue::Int32(Some(following))),
+        },
+        // In window queries, window frame boundary should be Uint64
+        WindowFrameUnits::Rows => WindowFrame {
+            units,
+            start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(Some(
+                preceding as u64,
+            ))),
+            end_bound: WindowFrameBound::Following(ScalarValue::UInt64(Some(
+                following as u64,
+            ))),
+        },
+        // Once GROUPS support is added construct window frame for this case 
also
+        _ => todo!(),
+    };
+    let mut orderby_exprs = vec![];
+    for column in orderby_columns {
+        orderby_exprs.push(PhysicalSortExpr {
+            expr: col(column, &schema).unwrap(),
+            options: SortOptions::default(),
+        })
+    }
+    let mut partitionby_exprs = vec![];
+    for column in partition_by_columns {
+        partitionby_exprs.push(col(column, &schema).unwrap());
+    }
+    let mut sort_keys = vec![];
+    for partition_by_expr in &partitionby_exprs {
+        sort_keys.push(PhysicalSortExpr {
+            expr: partition_by_expr.clone(),
+            options: SortOptions::default(),
+        })
+    }
+    for order_by_expr in &orderby_exprs {
+        sort_keys.push(order_by_expr.clone())
+    }
+
+    let concat_input_record = concat_batches(&schema, &input1).unwrap();
+    let exec1 = Arc::new(
+        MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), 
None).unwrap(),
+    );
+    let usual_window_exec = Arc::new(
+        WindowAggExec::try_new(
+            vec![create_window_expr(
+                window_fn,
+                fn_name.to_string(),
+                &args,
+                &partitionby_exprs,
+                &orderby_exprs,
+                Arc::new(window_frame.clone()),
+                schema.as_ref(),
+            )
+            .unwrap()],
+            exec1,
+            schema.clone(),
+            vec![],
+            Some(sort_keys.clone()),
+        )
+        .unwrap(),
+    );
+    let exec2 =
+        Arc::new(MemoryExec::try_new(&[input1.clone()], schema.clone(), 
None).unwrap());
+    let running_window_exec = Arc::new(
+        BoundedWindowAggExec::try_new(
+            vec![create_window_expr(
+                window_fn,
+                fn_name.to_string(),
+                &args,
+                &partitionby_exprs,
+                &orderby_exprs,
+                Arc::new(window_frame.clone()),
+                schema.as_ref(),
+            )
+            .unwrap()],
+            exec2,
+            schema.clone(),
+            vec![],
+            Some(sort_keys),
+        )
+        .unwrap(),
+    );
+
+    let task_ctx = ctx.task_ctx();
+    let collected_usual = collect(usual_window_exec, 
task_ctx.clone()).await.unwrap();
+
+    let collected_running = collect(running_window_exec, task_ctx.clone())
+        .await
+        .unwrap();
+    // compare

Review Comment:
   this is a very cool test



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -0,0 +1,705 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream and channel implementations for window function expressions.
+//! The executor given here uses bounded memory (does not maintain all
+//! the input data seen so far), which makes it appropriate when processing
+//! infinite inputs.
+
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
+use crate::physical_plan::{
+    ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+};
+use arrow::array::Array;
+use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn};
+use arrow::{
+    array::ArrayRef,
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+use datafusion_common::{DataFusionError, ScalarValue};
+use futures::stream::Stream;
+use futures::{ready, StreamExt};
+use std::any::Any;
+use std::cmp::min;
+use std::collections::HashMap;
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::physical_plan::common::merge_batches;
+use datafusion_physical_expr::window::{
+    PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
+    WindowAggState, WindowState,
+};
+use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use indexmap::IndexMap;
+use log::debug;
+
+/// Window execution plan
+#[derive(Debug)]
+pub struct BoundedWindowAggExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Window function expression
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    /// Schema after the window is run
+    schema: SchemaRef,
+    /// Schema before the window
+    input_schema: SchemaRef,
+    /// Partition Keys
+    pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+    /// Sort Keys
+    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl BoundedWindowAggExec {
+    /// Create a new execution plan for window aggregates
+    pub fn try_new(
+        window_expr: Vec<Arc<dyn WindowExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+        input_schema: SchemaRef,
+        partition_keys: Vec<Arc<dyn PhysicalExpr>>,
+        sort_keys: Option<Vec<PhysicalSortExpr>>,
+    ) -> Result<Self> {
+        let schema = create_schema(&input_schema, &window_expr)?;
+        let schema = Arc::new(schema);
+        Ok(Self {
+            input,
+            window_expr,
+            schema,
+            input_schema,
+            partition_keys,
+            sort_keys,
+            metrics: ExecutionPlanMetricsSet::new(),
+        })
+    }
+
+    /// Window expressions
+    pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
+        &self.window_expr
+    }
+
+    /// Input plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Get the input schema before any window functions are applied
+    pub fn input_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+
+    /// Return the output sort order of partition keys: For example
+    /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
+    // We are sure that partition by columns are always at the beginning of 
sort_keys
+    // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` 
columns can be used safely
+    // to calculate partition separation points
+    pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
+        let mut result = vec![];
+        // All window exprs have the same partition by, so we just use the 
first one:
+        let partition_by = self.window_expr()[0].partition_by();
+        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        for item in partition_by {
+            if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
+                result.push(a.clone());
+            } else {
+                return Err(DataFusionError::Execution(
+                    "Partition key not found in sort keys".to_string(),
+                ));
+            }
+        }
+        Ok(result)
+    }
+}
+
+impl ExecutionPlan for BoundedWindowAggExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // As we can have repartitioning using the partition keys, this can
+        // be either one or more than one, depending on the presence of
+        // repartitioning.
+        self.input.output_partitioning()
+    }
+
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input().output_ordering()
+    }
+
+    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+        let sort_keys = self.sort_keys.as_deref();
+        vec![sort_keys]
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.partition_keys.is_empty() {
+            debug!("No partition defined for BoundedWindowAggExec!!!");
+            vec![Distribution::SinglePartition]
+        } else {
+            //TODO support PartitionCollections if there is no common 
partition columns in the window_expr
+            vec![Distribution::HashPartitioned(self.partition_keys.clone())]
+        }
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        self.input().equivalence_properties()
+    }
+
+    fn maintains_input_order(&self) -> bool {
+        true
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(BoundedWindowAggExec::try_new(
+            self.window_expr.clone(),
+            children[0].clone(),
+            self.input_schema.clone(),
+            self.partition_keys.clone(),
+            self.sort_keys.clone(),
+        )?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input = self.input.execute(partition, context)?;
+        let stream = Box::pin(SortedPartitionByBoundedWindowStream::new(
+            self.schema.clone(),
+            self.window_expr.clone(),
+            input,
+            BaselineMetrics::new(&self.metrics, partition),
+            self.partition_by_sort_keys()?,
+        ));
+        Ok(stream)
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "BoundedWindowAggExec: ")?;
+                let g: Vec<String> = self
+                    .window_expr
+                    .iter()
+                    .map(|e| {
+                        format!(
+                            "{}: {:?}, frame: {:?}",
+                            e.name().to_owned(),
+                            e.field(),
+                            e.get_window_frame()
+                        )
+                    })
+                    .collect();
+                write!(f, "wdw=[{}]", g.join(", "))?;
+            }
+        }
+        Ok(())
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Statistics {
+        let input_stat = self.input.statistics();
+        let win_cols = self.window_expr.len();
+        let input_cols = self.input_schema.fields().len();
+        // TODO stats: some windowing function will maintain invariants such 
as min, max...
+        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
+        if let Some(input_col_stats) = input_stat.column_statistics {
+            column_statistics.extend(input_col_stats);
+        } else {
+            column_statistics.extend(vec![ColumnStatistics::default(); 
input_cols]);
+        }
+        column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
+        Statistics {
+            is_exact: input_stat.is_exact,
+            num_rows: input_stat.num_rows,
+            column_statistics: Some(column_statistics),
+            total_byte_size: None,
+        }
+    }
+}
+
+fn create_schema(
+    input_schema: &Schema,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Schema> {
+    let mut fields = Vec::with_capacity(input_schema.fields().len() + 
window_expr.len());
+    fields.extend_from_slice(input_schema.fields());
+    // append results to the schema
+    for expr in window_expr {
+        fields.push(expr.field()?);
+    }
+    Ok(Schema::new(fields))
+}
+
+/// This trait defines the interface for updating the state and calculating
+/// results for window functions. Depending on the partitioning scheme, one
+/// may have different implementations for the functions within.

Review Comment:
   Is this hinting at a future implementation? I see only one `impl 
PartitionByHandler`



##########
datafusion/physical-expr/src/window/built_in.rs:
##########
@@ -122,6 +127,102 @@ impl WindowExpr for BuiltInWindowExpr {
         }
     }
 
+    /// Evaluate the window function against the batch. This function 
facilitates
+    /// stateful, bounded-memory implementations.
+    fn evaluate_stateful(
+        &self,
+        partition_batches: &PartitionBatches,
+        window_agg_state: &mut PartitionWindowAggStates,
+    ) -> Result<()> {
+        let field = self.expr.field()?;
+        let out_type = field.data_type();
+        let sort_options = self.order_by.iter().map(|o| 
o.options).collect::<Vec<_>>();
+        for (partition_row, partition_batch_state) in partition_batches.iter() 
{
+            if !window_agg_state.contains_key(partition_row) {
+                let evaluator = self.expr.create_evaluator()?;
+                window_agg_state.insert(
+                    partition_row.clone(),
+                    WindowState {
+                        state: WindowAggState::new(
+                            out_type,
+                            WindowFunctionState::BuiltinWindowState(
+                                BuiltinWindowState::Default,
+                            ),
+                        )?,
+                        window_fn: WindowFn::Builtin(evaluator),
+                    },
+                );
+            };
+            let window_state =
+                window_agg_state.get_mut(partition_row).ok_or_else(|| {
+                    DataFusionError::Execution("Cannot find state".to_string())
+                })?;
+            let evaluator = match &mut window_state.window_fn {
+                WindowFn::Builtin(evaluator) => evaluator,
+                _ => unreachable!(),

Review Comment:
   Why is this unreachable? I would have thought it could happen with `sum()` 
or `count()` as well that are unbounded 🤔 



##########
.github/workflows/rust.yml:
##########
@@ -64,7 +64,7 @@ jobs:
       - name: Check Cargo.lock for datafusion-cli
         run: |
           # If this test fails, try running `cargo update` in the 
`datafusion-cli` directory
-          cargo check --manifest-path datafusion-cli/Cargo.toml --locked

Review Comment:
   I think you simply need to follow the instruction in the comments above.
   
   Specifically run `cargo update` in the `datafusion-cli` directory and check 
in the resulting `Cargo.lock` file
   
   It seems like the comment doesn't explicitly say "check in `Cargo.lock`" 🤔 



##########
datafusion/core/tests/window_fuzz.rs:
##########
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array};
+use arrow::compute::{concat_batches, SortOptions};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use hashbrown::HashMap;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::runtime::Builder;
+
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::windows::{
+    create_window_expr, BoundedWindowAggExec, WindowAggExec,
+};
+use datafusion_expr::{
+    AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
+    WindowFrameUnits, WindowFunction,
+};
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::ScalarValue;
+use datafusion_physical_expr::expressions::{col, lit};
+use datafusion_physical_expr::PhysicalSortExpr;
+use test_utils::add_empty_batches;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]

Review Comment:
   BTW why is a multi-threaded executor required here?



##########
datafusion/physical-expr/src/window/window_expr.rs:
##########
@@ -132,3 +151,129 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) 
-> Vec<PhysicalSortExpr
         })
         .collect()
 }
+
+pub enum WindowFn {
+    Builtin(Box<dyn PartitionEvaluator>),
+    Aggregate(Box<dyn Accumulator>),
+}
+
+impl fmt::Debug for WindowFn {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFn::Builtin(builtin, ..) => {
+                write!(f, "partition evaluator: {:?}", builtin)

Review Comment:
   I wonder why not use the standard `#[derive(Debug)]` here?
   
   If there is some reason for a custom `Debug` impl, I recommend that it at 
least contain the struct name to be consistent with automatically derived debug 
impls. Something like this perhaps?
   
   ```suggestion
                   write!(f, "WindowFn::BuiltIn(partition evaluator: {:?})", 
builtin)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to