This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c6d5520716 feat: support distinct for window (#16925)
c6d5520716 is described below

commit c6d55207161e400e53645d5ee7d7bf16cd024c2f
Author: Qi Zhu <821684...@qq.com>
AuthorDate: Mon Jul 28 17:26:49 2025 +0800

    feat: support distinct for window (#16925)
    
    * feat: support distinct for window
    
    * fix
    
    * fix
    
    * fisx
    
    * fix unparse
    
    * fix test
    
    * fix test
    
    * easy way
    
    * add test
    
    * add comments
---
 datafusion-examples/examples/advanced_udwf.rs      |   1 +
 datafusion/core/src/physical_planner.rs            |   2 +
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |   3 +
 .../tests/physical_optimizer/enforce_sorting.rs    |   1 +
 .../core/tests/physical_optimizer/test_utils.rs    |   1 +
 datafusion/expr/src/expr.rs                        |  27 +++-
 datafusion/expr/src/expr_fn.rs                     |   1 +
 datafusion/expr/src/planner.rs                     |   1 +
 datafusion/expr/src/tree_node.rs                   |  12 ++
 datafusion/expr/src/udaf.rs                        |  42 ++++--
 datafusion/functions-aggregate/src/count.rs        | 163 ++++++++++++++++++++-
 datafusion/functions-window/src/planner.rs         |  15 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |  29 +++-
 .../src/windows/bounded_window_agg_exec.rs         |   1 +
 datafusion/physical-plan/src/windows/mod.rs        |  42 ++++--
 datafusion/proto/src/logical_plan/to_proto.rs      |   1 +
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 +
 datafusion/sql/src/expr/function.rs                |  12 ++
 datafusion/sql/src/unparser/expr.rs                |  13 +-
 datafusion/sqllogictest/test_files/window.slt      |  79 ++++++++++
 .../logical_plan/consumer/expr/window_function.rs  |   1 +
 .../logical_plan/producer/expr/window_function.rs  |   1 +
 22 files changed, 407 insertions(+), 42 deletions(-)

diff --git a/datafusion-examples/examples/advanced_udwf.rs 
b/datafusion-examples/examples/advanced_udwf.rs
index f7316ddc1b..e0fab7ee9f 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -199,6 +199,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
                     order_by: window_function.params.order_by,
                     window_frame: window_function.params.window_frame,
                     null_treatment: window_function.params.null_treatment,
+                    distinct: window_function.params.distinct,
                 },
             }))
         };
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index e1f4154324..1021abc9e4 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1649,6 +1649,7 @@ pub fn create_window_expr_with_name(
                         order_by,
                         window_frame,
                         null_treatment,
+                        distinct,
                     },
             } = window_fun.as_ref();
             let physical_args =
@@ -1677,6 +1678,7 @@ pub fn create_window_expr_with_name(
                 window_frame,
                 physical_schema,
                 ignore_nulls,
+                *distinct,
             )
         }
         other => plan_err!("Invalid window expression '{other:?}'"),
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 316d3ba5a9..23e3281cf3 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -288,6 +288,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
                     Arc::new(window_frame),
                     &extended_schema,
                     false,
+                    false,
                 )?;
                 let running_window_exec = 
Arc::new(BoundedWindowAggExec::try_new(
                     vec![window_expr],
@@ -660,6 +661,7 @@ async fn run_window_test(
             Arc::new(window_frame.clone()),
             &extended_schema,
             false,
+            false,
         )?],
         exec1,
         false,
@@ -678,6 +680,7 @@ async fn run_window_test(
             Arc::new(window_frame.clone()),
             &extended_schema,
             false,
+            false,
         )?],
         exec2,
         search_mode.clone(),
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 9d8e0fcfa6..d10459ce86 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -3685,6 +3685,7 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
             case.window_frame,
             input_schema.as_ref(),
             false,
+            false,
         )?;
         let window_exec = if window_expr.uses_bounded_memory() {
             Arc::new(BoundedWindowAggExec::try_new(
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs 
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 7fb0f795f2..7f7926060e 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -265,6 +265,7 @@ pub fn bounded_window_exec_with_partition(
         Arc::new(WindowFrame::new(Some(false))),
         schema.as_ref(),
         false,
+        false,
     )
     .unwrap();
 
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 0749ff0e98..efe8a63908 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -1131,6 +1131,8 @@ pub struct WindowFunctionParams {
     pub window_frame: WindowFrame,
     /// Specifies how NULL value is treated: ignore or respect
     pub null_treatment: Option<NullTreatment>,
+    /// Distinct flag
+    pub distinct: bool,
 }
 
 impl WindowFunction {
@@ -1145,6 +1147,7 @@ impl WindowFunction {
                 order_by: Vec::default(),
                 window_frame: WindowFrame::new(None),
                 null_treatment: None,
+                distinct: false,
             },
         }
     }
@@ -2291,6 +2294,7 @@ impl NormalizeEq for Expr {
                             partition_by: self_partition_by,
                             order_by: self_order_by,
                             null_treatment: self_null_treatment,
+                            distinct: self_distinct,
                         },
                 } = left.as_ref();
                 let WindowFunction {
@@ -2302,6 +2306,7 @@ impl NormalizeEq for Expr {
                             partition_by: other_partition_by,
                             order_by: other_order_by,
                             null_treatment: other_null_treatment,
+                            distinct: other_distinct,
                         },
                 } = other.as_ref();
 
@@ -2325,6 +2330,7 @@ impl NormalizeEq for Expr {
                                 && a.nulls_first == b.nulls_first
                                 && a.expr.normalize_eq(&b.expr)
                         })
+                    && self_distinct == other_distinct
             }
             (
                 Expr::Exists(Exists {
@@ -2558,11 +2564,13 @@ impl HashNode for Expr {
                             order_by: _,
                             window_frame,
                             null_treatment,
+                            distinct,
                         },
                 } = window_fun.as_ref();
                 fun.hash(state);
                 window_frame.hash(state);
                 null_treatment.hash(state);
+                distinct.hash(state);
             }
             Expr::InList(InList {
                 expr: _expr,
@@ -2865,15 +2873,27 @@ impl Display for SchemaDisplay<'_> {
                             order_by,
                             window_frame,
                             null_treatment,
+                            distinct,
                         } = params;
 
+                        // Write function name and open parenthesis
+                        write!(f, "{fun}(")?;
+
+                        // If DISTINCT, emit the keyword
+                        if *distinct {
+                            write!(f, "DISTINCT ")?;
+                        }
+
+                        // Write the comma‑separated argument list
                         write!(
                             f,
-                            "{}({})",
-                            fun,
+                            "{}",
                             
schema_name_from_exprs_comma_separated_without_space(args)?
                         )?;
 
+                        // **Close the argument parenthesis**
+                        write!(f, ")")?;
+
                         if let Some(null_treatment) = null_treatment {
                             write!(f, " {null_treatment}")?;
                         }
@@ -3260,9 +3280,10 @@ impl Display for Expr {
                             order_by,
                             window_frame,
                             null_treatment,
+                            distinct,
                         } = params;
 
-                        fmt_function(f, &fun.to_string(), false, args, true)?;
+                        fmt_function(f, &fun.to_string(), *distinct, args, 
true)?;
 
                         if let Some(nt) = null_treatment {
                             write!(f, "{nt}")?;
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 1d8d183807..a511d082c0 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -946,6 +946,7 @@ impl ExprFuncBuilder {
                         window_frame: window_frame
                             .unwrap_or_else(|| WindowFrame::new(has_order_by)),
                         null_treatment,
+                        distinct,
                     },
                 })
             }
diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs
index 067c7a9427..b04fe32d37 100644
--- a/datafusion/expr/src/planner.rs
+++ b/datafusion/expr/src/planner.rs
@@ -308,6 +308,7 @@ pub struct RawWindowExpr {
     pub order_by: Vec<SortExpr>,
     pub window_frame: WindowFrame,
     pub null_treatment: Option<NullTreatment>,
+    pub distinct: bool,
 }
 
 /// Result of planning a raw expr with [`ExprPlanner`]
diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs
index f953aec5a1..b6f583ca4c 100644
--- a/datafusion/expr/src/tree_node.rs
+++ b/datafusion/expr/src/tree_node.rs
@@ -242,10 +242,22 @@ impl TreeNode for Expr {
                             order_by,
                             window_frame,
                             null_treatment,
+                            distinct,
                         },
                 } = *window_fun;
                 (args, partition_by, order_by).map_elements(f)?.update_data(
                     |(new_args, new_partition_by, new_order_by)| {
+                        if distinct {
+                            return Expr::from(WindowFunction::new(fun, 
new_args))
+                                .partition_by(new_partition_by)
+                                .order_by(new_order_by)
+                                .window_frame(window_frame)
+                                .null_treatment(null_treatment)
+                                .distinct()
+                                .build()
+                                .unwrap();
+                        }
+
                         Expr::from(WindowFunction::new(fun, new_args))
                             .partition_by(new_partition_by)
                             .order_by(new_order_by)
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index b6c8eb627c..15c0dd57ad 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -554,14 +554,25 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
             order_by,
             window_frame,
             null_treatment,
+            distinct,
         } = params;
 
         let mut schema_name = String::new();
-        schema_name.write_fmt(format_args!(
-            "{}({})",
-            self.name(),
-            schema_name_from_exprs(args)?
-        ))?;
+
+        // Inject DISTINCT into the schema name when requested
+        if *distinct {
+            schema_name.write_fmt(format_args!(
+                "{}(DISTINCT {})",
+                self.name(),
+                schema_name_from_exprs(args)?
+            ))?;
+        } else {
+            schema_name.write_fmt(format_args!(
+                "{}({})",
+                self.name(),
+                schema_name_from_exprs(args)?
+            ))?;
+        }
 
         if let Some(null_treatment) = null_treatment {
             schema_name.write_fmt(format_args!(" {null_treatment}"))?;
@@ -579,7 +590,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
                 " ORDER BY [{}]",
                 schema_name_from_sorts(order_by)?
             ))?;
-        };
+        }
 
         schema_name.write_fmt(format_args!(" {window_frame}"))?;
 
@@ -648,15 +659,24 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
             order_by,
             window_frame,
             null_treatment,
+            distinct,
         } = params;
 
         let mut display_name = String::new();
 
-        display_name.write_fmt(format_args!(
-            "{}({})",
-            self.name(),
-            expr_vec_fmt!(args)
-        ))?;
+        if *distinct {
+            display_name.write_fmt(format_args!(
+                "{}(DISTINCT {})",
+                self.name(),
+                expr_vec_fmt!(args)
+            ))?;
+        } else {
+            display_name.write_fmt(format_args!(
+                "{}({})",
+                self.name(),
+                expr_vec_fmt!(args)
+            ))?;
+        }
 
         if let Some(null_treatment) = null_treatment {
             display_name.write_fmt(format_args!(" {null_treatment}"))?;
diff --git a/datafusion/functions-aggregate/src/count.rs 
b/datafusion/functions-aggregate/src/count.rs
index 09904bbad6..7a7c2879aa 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -31,7 +31,7 @@ use arrow::{
 };
 use datafusion_common::{
     downcast_value, internal_err, not_impl_err, stats::Precision,
-    utils::expr::COUNT_STAR_EXPANSION, Result, ScalarValue,
+    utils::expr::COUNT_STAR_EXPANSION, HashMap, Result, ScalarValue,
 };
 use datafusion_expr::{
     expr::WindowFunction,
@@ -59,6 +59,7 @@ use std::{
     ops::BitAnd,
     sync::Arc,
 };
+
 make_udaf_expr_and_func!(
     Count,
     count,
@@ -406,6 +407,98 @@ impl AggregateUDFImpl for Count {
         // the same as new values are seen.
         SetMonotonicity::Increasing
     }
+
+    fn create_sliding_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn Accumulator>> {
+        if args.is_distinct {
+            let acc =
+                
SlidingDistinctCountAccumulator::try_new(args.return_field.data_type())?;
+            Ok(Box::new(acc))
+        } else {
+            let acc = CountAccumulator::new();
+            Ok(Box::new(acc))
+        }
+    }
+}
+
+// DistinctCountAccumulator does not support retract_batch and sliding window
+// this is a specialized accumulator for distinct count that supports 
retract_batch
+// and sliding window.
+#[derive(Debug)]
+pub struct SlidingDistinctCountAccumulator {
+    counts: HashMap<ScalarValue, usize, RandomState>,
+    data_type: DataType,
+}
+
+impl SlidingDistinctCountAccumulator {
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            counts: HashMap::default(),
+            data_type: data_type.clone(),
+        })
+    }
+}
+
+impl Accumulator for SlidingDistinctCountAccumulator {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        let keys = self.counts.keys().cloned().collect::<Vec<_>>();
+        Ok(vec![ScalarValue::List(ScalarValue::new_list_nullable(
+            keys.as_slice(),
+            &self.data_type,
+        ))])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let arr = &values[0];
+        for i in 0..arr.len() {
+            let v = ScalarValue::try_from_array(arr, i)?;
+            if !v.is_null() {
+                *self.counts.entry(v).or_default() += 1;
+            }
+        }
+        Ok(())
+    }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let arr = &values[0];
+        for i in 0..arr.len() {
+            let v = ScalarValue::try_from_array(arr, i)?;
+            if !v.is_null() {
+                if let Some(cnt) = self.counts.get_mut(&v) {
+                    *cnt -= 1;
+                    if *cnt == 0 {
+                        self.counts.remove(&v);
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        let list_arr = states[0].as_list::<i32>();
+        for inner in list_arr.iter().flatten() {
+            for j in 0..inner.len() {
+                let v = ScalarValue::try_from_array(&*inner, j)?;
+                *self.counts.entry(v).or_default() += 1;
+            }
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.counts.len() as i64)))
+    }
+
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
+    fn size(&self) -> usize {
+        size_of_val(self)
+    }
 }
 
 #[derive(Debug)]
@@ -878,4 +971,72 @@ mod tests {
         assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
         Ok(())
     }
+
+    #[test]
+    fn sliding_distinct_count_accumulator_basic() -> Result<()> {
+        // Basic update_batch + evaluate functionality
+        let mut acc = 
SlidingDistinctCountAccumulator::try_new(&DataType::Int32)?;
+        // Create an Int32Array: [1, 2, 2, 3, null]
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            Some(2),
+            Some(2),
+            Some(3),
+            None,
+        ]));
+        acc.update_batch(&[values])?;
+        // Expect distinct values {1,2,3} → count = 3
+        assert_eq!(acc.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
+
+    #[test]
+    fn sliding_distinct_count_accumulator_retract() -> Result<()> {
+        // Test that retract_batch properly decrements counts
+        let mut acc = 
SlidingDistinctCountAccumulator::try_new(&DataType::Utf8)?;
+        // Initial batch: ["a", "b", "a"]
+        let arr1 = Arc::new(StringArray::from(vec![Some("a"), Some("b"), 
Some("a")]))
+            as ArrayRef;
+        acc.update_batch(&[arr1])?;
+        assert_eq!(acc.evaluate()?, ScalarValue::Int64(Some(2))); // {"a","b"}
+
+        // Retract batch: ["a", null, "b"]
+        let arr2 =
+            Arc::new(StringArray::from(vec![Some("a"), None, Some("b")])) as 
ArrayRef;
+        acc.retract_batch(&[arr2])?;
+        // Before: a→2, b→1; after retract a→1, b→0 → b removed; remaining 
{"a"}
+        assert_eq!(acc.evaluate()?, ScalarValue::Int64(Some(1)));
+        Ok(())
+    }
+
+    #[test]
+    fn sliding_distinct_count_accumulator_merge_states() -> Result<()> {
+        // Test merging multiple accumulator states with merge_batch
+        let mut acc1 = 
SlidingDistinctCountAccumulator::try_new(&DataType::Int32)?;
+        let mut acc2 = 
SlidingDistinctCountAccumulator::try_new(&DataType::Int32)?;
+        // acc1 sees [1, 2]
+        acc1.update_batch(&[Arc::new(Int32Array::from(vec![Some(1), 
Some(2)]))])?;
+        // acc2 sees [2, 3]
+        acc2.update_batch(&[Arc::new(Int32Array::from(vec![Some(2), 
Some(3)]))])?;
+        // Extract their states as Vec<ScalarValue>
+        let state_sv1 = acc1.state()?;
+        let state_sv2 = acc2.state()?;
+        // Convert ScalarValue states into Vec<ArrayRef>, propagating errors
+        // NOTE we pass `1` because each ScalarValue.to_array produces a 1‑row 
ListArray
+        let state_arr1: Vec<ArrayRef> = state_sv1
+            .into_iter()
+            .map(|sv| sv.to_array())
+            .collect::<Result<_>>()?;
+        let state_arr2: Vec<ArrayRef> = state_sv2
+            .into_iter()
+            .map(|sv| sv.to_array())
+            .collect::<Result<_>>()?;
+        // Merge both states into a fresh accumulator
+        let mut merged = 
SlidingDistinctCountAccumulator::try_new(&DataType::Int32)?;
+        merged.merge_batch(&state_arr1)?;
+        merged.merge_batch(&state_arr2)?;
+        // Expect distinct {1,2,3} → count = 3
+        assert_eq!(merged.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
 }
diff --git a/datafusion/functions-window/src/planner.rs 
b/datafusion/functions-window/src/planner.rs
index 091737bb9c..5e3a6bc633 100644
--- a/datafusion/functions-window/src/planner.rs
+++ b/datafusion/functions-window/src/planner.rs
@@ -41,6 +41,7 @@ impl ExprPlanner for WindowFunctionPlanner {
             order_by,
             window_frame,
             null_treatment,
+            distinct,
         } = raw_expr;
 
         let origin_expr = Expr::from(WindowFunction {
@@ -51,6 +52,7 @@ impl ExprPlanner for WindowFunctionPlanner {
                 order_by,
                 window_frame,
                 null_treatment,
+                distinct,
             },
         });
 
@@ -68,6 +70,7 @@ impl ExprPlanner for WindowFunctionPlanner {
                     order_by,
                     window_frame,
                     null_treatment,
+                    distinct,
                 },
         } = *window_fun;
         let raw_expr = RawWindowExpr {
@@ -77,6 +80,7 @@ impl ExprPlanner for WindowFunctionPlanner {
             order_by,
             window_frame,
             null_treatment,
+            distinct,
         };
 
         // TODO: remove the next line after `Expr::Wildcard` is removed
@@ -93,18 +97,23 @@ impl ExprPlanner for WindowFunctionPlanner {
                 order_by,
                 window_frame,
                 null_treatment,
+                distinct,
             } = raw_expr;
 
-            let new_expr = Expr::from(WindowFunction::new(
+            let mut new_expr_before_build = Expr::from(WindowFunction::new(
                 func_def,
                 vec![Expr::Literal(COUNT_STAR_EXPANSION, None)],
             ))
             .partition_by(partition_by)
             .order_by(order_by)
             .window_frame(window_frame)
-            .null_treatment(null_treatment)
-            .build()?;
+            .null_treatment(null_treatment);
 
+            if distinct {
+                new_expr_before_build = new_expr_before_build.distinct();
+            }
+
+            let new_expr = new_expr_before_build.build()?;
             let new_expr = saved_name.restore(new_expr);
 
             return Ok(PlannerResult::Planned(new_expr));
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index a98b0fdcc3..e6fc006cb2 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -549,6 +549,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
                             order_by,
                             window_frame,
                             null_treatment,
+                            distinct,
                         },
                 } = *window_fun;
                 let window_frame =
@@ -565,14 +566,26 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
                     _ => args,
                 };
 
-                Ok(Transformed::yes(
-                    Expr::from(WindowFunction::new(fun, args))
-                        .partition_by(partition_by)
-                        .order_by(order_by)
-                        .window_frame(window_frame)
-                        .null_treatment(null_treatment)
-                        .build()?,
-                ))
+                if distinct {
+                    Ok(Transformed::yes(
+                        Expr::from(WindowFunction::new(fun, args))
+                            .partition_by(partition_by)
+                            .order_by(order_by)
+                            .window_frame(window_frame)
+                            .null_treatment(null_treatment)
+                            .distinct()
+                            .build()?,
+                    ))
+                } else {
+                    Ok(Transformed::yes(
+                        Expr::from(WindowFunction::new(fun, args))
+                            .partition_by(partition_by)
+                            .order_by(order_by)
+                            .window_frame(window_frame)
+                            .null_treatment(null_treatment)
+                            .build()?,
+                    ))
+                }
             }
             // TODO: remove the next line after `Expr::Wildcard` is removed
             #[expect(deprecated)]
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index d3335c0e7f..4c991544f8 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1377,6 +1377,7 @@ mod tests {
                 Arc::new(window_frame),
                 &input.schema(),
                 false,
+                false,
             )?],
             input,
             input_order_mode,
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 5583abfd72..085b17cab9 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -103,21 +103,38 @@ pub fn create_window_expr(
     window_frame: Arc<WindowFrame>,
     input_schema: &Schema,
     ignore_nulls: bool,
+    distinct: bool,
 ) -> Result<Arc<dyn WindowExpr>> {
     Ok(match fun {
         WindowFunctionDefinition::AggregateUDF(fun) => {
-            let aggregate = AggregateExprBuilder::new(Arc::clone(fun), 
args.to_vec())
-                .schema(Arc::new(input_schema.clone()))
-                .alias(name)
-                .with_ignore_nulls(ignore_nulls)
-                .build()
-                .map(Arc::new)?;
-            window_expr_from_aggregate_expr(
-                partition_by,
-                order_by,
-                window_frame,
-                aggregate,
-            )
+            if distinct {
+                let aggregate = AggregateExprBuilder::new(Arc::clone(fun), 
args.to_vec())
+                    .schema(Arc::new(input_schema.clone()))
+                    .alias(name)
+                    .with_ignore_nulls(ignore_nulls)
+                    .distinct()
+                    .build()
+                    .map(Arc::new)?;
+                window_expr_from_aggregate_expr(
+                    partition_by,
+                    order_by,
+                    window_frame,
+                    aggregate,
+                )
+            } else {
+                let aggregate = AggregateExprBuilder::new(Arc::clone(fun), 
args.to_vec())
+                    .schema(Arc::new(input_schema.clone()))
+                    .alias(name)
+                    .with_ignore_nulls(ignore_nulls)
+                    .build()
+                    .map(Arc::new)?;
+                window_expr_from_aggregate_expr(
+                    partition_by,
+                    order_by,
+                    window_frame,
+                    aggregate,
+                )
+            }
         }
         WindowFunctionDefinition::WindowUDF(fun) => 
Arc::new(StandardWindowExpr::new(
             create_udwf_window_expr(fun, args, input_schema, name, 
ignore_nulls)?,
@@ -805,6 +822,7 @@ mod tests {
                 Arc::new(WindowFrame::new(None)),
                 schema.as_ref(),
                 false,
+                false,
             )?],
             blocking_exec,
             false,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 43afaa0fbe..f59e97df0d 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -316,6 +316,7 @@ pub fn serialize_expr(
                         ref window_frame,
                         // TODO: support null treatment in proto
                         null_treatment: _,
+                        distinct: _,
                     },
             } = window_fun.as_ref();
             let mut buf = Vec::new();
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index a01b121af6..a24e9e10e4 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -179,6 +179,7 @@ pub fn parse_physical_window_expr(
         Arc::new(window_frame),
         &extended_schema,
         false,
+        false,
     )
 }
 
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index e63ca75d01..1c2e50560e 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -352,6 +352,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     order_by,
                     window_frame,
                     null_treatment,
+                    distinct: function_args.distinct,
                 };
 
                 for planner in 
self.context_provider.get_expr_planners().iter() {
@@ -368,8 +369,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     order_by,
                     window_frame,
                     null_treatment,
+                    distinct,
                 } = window_expr;
 
+                if distinct {
+                    return Expr::from(expr::WindowFunction::new(func_def, 
args))
+                        .partition_by(partition_by)
+                        .order_by(order_by)
+                        .window_frame(window_frame)
+                        .null_treatment(null_treatment)
+                        .distinct()
+                        .build();
+                }
+
                 return Expr::from(expr::WindowFunction::new(func_def, args))
                     .partition_by(partition_by)
                     .order_by(order_by)
diff --git a/datafusion/sql/src/unparser/expr.rs 
b/datafusion/sql/src/unparser/expr.rs
index 4ddd5ccccb..4c0dc31661 100644
--- a/datafusion/sql/src/unparser/expr.rs
+++ b/datafusion/sql/src/unparser/expr.rs
@@ -18,8 +18,9 @@
 use datafusion_expr::expr::{AggregateFunctionParams, Unnest, 
WindowFunctionParams};
 use sqlparser::ast::Value::SingleQuotedString;
 use sqlparser::ast::{
-    self, Array, BinaryOperator, CaseWhen, Expr as AstExpr, Function, Ident, 
Interval,
-    ObjectName, OrderByOptions, Subscript, TimezoneInfo, UnaryOperator, 
ValueWithSpan,
+    self, Array, BinaryOperator, CaseWhen, DuplicateTreatment, Expr as 
AstExpr, Function,
+    Ident, Interval, ObjectName, OrderByOptions, Subscript, TimezoneInfo, 
UnaryOperator,
+    ValueWithSpan,
 };
 use std::sync::Arc;
 use std::vec;
@@ -198,6 +199,7 @@ impl Unparser<'_> {
                             partition_by,
                             order_by,
                             window_frame,
+                            distinct,
                             ..
                         },
                 } = window_fun.as_ref();
@@ -256,7 +258,8 @@ impl Unparser<'_> {
                         span: Span::empty(),
                     }]),
                     args: 
ast::FunctionArguments::List(ast::FunctionArgumentList {
-                        duplicate_treatment: None,
+                        duplicate_treatment: distinct
+                            .then_some(DuplicateTreatment::Distinct),
                         args,
                         clauses: vec![],
                     }),
@@ -339,7 +342,7 @@ impl Unparser<'_> {
                     }]),
                     args: 
ast::FunctionArguments::List(ast::FunctionArgumentList {
                         duplicate_treatment: distinct
-                            .then_some(ast::DuplicateTreatment::Distinct),
+                            .then_some(DuplicateTreatment::Distinct),
                         args,
                         clauses: vec![],
                     }),
@@ -2051,6 +2054,7 @@ mod tests {
                         order_by: vec![],
                         window_frame: WindowFrame::new(None),
                         null_treatment: None,
+                        distinct: false,
                     },
                 }),
                 r#"row_number(col) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)"#,
@@ -2076,6 +2080,7 @@ mod tests {
                             ),
                         ),
                         null_treatment: None,
+                        distinct: false,
                     },
                 }),
                 r#"count(*) OVER (ORDER BY a DESC NULLS FIRST RANGE BETWEEN 6 
PRECEDING AND 2 FOLLOWING)"#,
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 82de113028..bed9121eec 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -5650,3 +5650,82 @@ WINDOW
 3 7
 4 11
 5 16
+
+
+# window with distinct operation
+statement ok
+CREATE TABLE table_test_distinct_count (
+    k VARCHAR,
+    v Int,
+    time TIMESTAMP WITH TIME ZONE
+);
+
+statement ok
+INSERT INTO table_test_distinct_count (k, v, time) VALUES
+    ('a', 1, '1970-01-01T00:01:00.00Z'),
+    ('a', 1, '1970-01-01T00:02:00.00Z'),
+    ('a', 1, '1970-01-01T00:03:00.00Z'),
+    ('a', 2, '1970-01-01T00:03:00.00Z'),
+    ('a', 1, '1970-01-01T00:04:00.00Z'),
+    ('b', 3, '1970-01-01T00:01:00.00Z'),
+    ('b', 3, '1970-01-01T00:02:00.00Z'),
+    ('b', 4, '1970-01-01T00:03:00.00Z'),
+    ('b', 4, '1970-01-01T00:03:00.00Z');
+
+query TPII
+SELECT
+    k,
+    time,
+    COUNT(v) OVER (
+        PARTITION BY k
+        ORDER BY time
+        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
+    ) AS normal_count,
+    COUNT(DISTINCT v) OVER (
+        PARTITION BY k
+        ORDER BY time
+        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
+    ) AS distinct_count
+FROM table_test_distinct_count
+ORDER BY k, time;
+----
+a 1970-01-01T00:01:00Z 1 1
+a 1970-01-01T00:02:00Z 2 1
+a 1970-01-01T00:03:00Z 4 2
+a 1970-01-01T00:03:00Z 4 2
+a 1970-01-01T00:04:00Z 4 2
+b 1970-01-01T00:01:00Z 1 1
+b 1970-01-01T00:02:00Z 2 1
+b 1970-01-01T00:03:00Z 4 2
+b 1970-01-01T00:03:00Z 4 2
+
+
+query TT
+EXPLAIN SELECT
+    k,
+    time,
+    COUNT(v) OVER (
+        PARTITION BY k
+        ORDER BY time
+        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
+    ) AS normal_count,
+    COUNT(DISTINCT v) OVER (
+        PARTITION BY k
+        ORDER BY time
+        RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
+    ) AS distinct_count
+FROM table_test_distinct_count
+ODER BY k, time;
+----
+logical_plan
+01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS 
normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time 
ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS 
distinct_count
+02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, 
days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) 
PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 
minutes PRECEDING AND CURRENT ROW, count(DISTINCT oder.v) PARTITION BY [oder.k] 
ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { 
months: 0, days: 0, nanoseconds: 12 [...]
+03)----SubqueryAlias: oder
+04)------TableScan: table_test_distinct_count projection=[k, v, time]
+physical_plan
+01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION BY 
[oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING 
AND CURRENT ROW@3 as normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] 
ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND 
CURRENT ROW@4 as distinct_count]
+02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY 
[oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: 
Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC 
NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
120000000000 } PRECEDING AND CURRE [...]
+03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], 
preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=1
+05)--------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
+06)----------DataSourceExec: partitions=2, partition_sizes=[5, 4]
diff --git 
a/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs 
b/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs
index 80b643a547..27f0de84b7 100644
--- a/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs
+++ b/datafusion/substrait/src/logical_plan/consumer/expr/window_function.rs
@@ -112,6 +112,7 @@ pub async fn from_window_function(
             order_by,
             window_frame,
             null_treatment: None,
+            distinct: false,
         },
     }))
 }
diff --git 
a/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs 
b/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs
index 17e71f2d7c..94a39e930f 100644
--- a/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs
+++ b/datafusion/substrait/src/logical_plan/producer/expr/window_function.rs
@@ -42,6 +42,7 @@ pub fn from_window_function(
                 order_by,
                 window_frame,
                 null_treatment: _,
+                distinct: _,
             },
     } = window_fn;
     // function reference


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to