This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c22c299aac Support for median(distinct) aggregation function (#10226)
c22c299aac is described below

commit c22c299aac458474f99cbfeb9586b22b270191be
Author: Jeffrey Vo <[email protected]>
AuthorDate: Fri Apr 26 21:57:17 2024 +1000

    Support for median(distinct) aggregation function (#10226)
    
    * Support for median(distinct) aggregation function
    
    * Reduce duplication
---
 datafusion/core/benches/aggregate_query_sql.rs     |  10 +
 datafusion/physical-expr/src/aggregate/build_in.rs |   6 +-
 datafusion/physical-expr/src/aggregate/median.rs   | 425 +++++++++++++++++++--
 datafusion/physical-expr/src/expressions/mod.rs    |  34 ++
 datafusion/physical-plan/src/aggregates/mod.rs     |   1 +
 datafusion/sqllogictest/test_files/aggregate.slt   |   4 +-
 6 files changed, 437 insertions(+), 43 deletions(-)

diff --git a/datafusion/core/benches/aggregate_query_sql.rs 
b/datafusion/core/benches/aggregate_query_sql.rs
index 3734cfbe31..1d8d87ada7 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -163,6 +163,16 @@ fn criterion_benchmark(c: &mut Criterion) {
             )
         })
     });
+
+    c.bench_function("aggregate_query_distinct_median", |b| {
+        b.iter(|| {
+            query(
+                ctx.clone(),
+                "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) 
\
+                 FROM t",
+            )
+        })
+    });
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index c549e62193..57ed35b0b7 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -352,14 +352,12 @@ pub fn create_aggregate_expr(
                 "APPROX_MEDIAN(DISTINCT) aggregations are not available"
             );
         }
-        (AggregateFunction::Median, false) => 
Arc::new(expressions::Median::new(
+        (AggregateFunction::Median, distinct) => 
Arc::new(expressions::Median::new(
             input_phy_exprs[0].clone(),
             name,
             data_type,
+            distinct,
         )),
-        (AggregateFunction::Median, true) => {
-            return not_impl_err!("MEDIAN(DISTINCT) aggregations are not 
available");
-        }
         (AggregateFunction::FirstValue, _) => Arc::new(
             expressions::FirstValue::new(
                 input_phy_exprs[0].clone(),
diff --git a/datafusion/physical-expr/src/aggregate/median.rs 
b/datafusion/physical-expr/src/aggregate/median.rs
index ed373ba13d..f4f56fa46e 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -17,7 +17,7 @@
 
 //! # Median
 
-use crate::aggregate::utils::down_cast_any_ref;
+use crate::aggregate::utils::{down_cast_any_ref, Hashable};
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::{Array, ArrayRef};
@@ -28,17 +28,24 @@ use arrow_buffer::ArrowNativeType;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
 use std::any::Any;
+use std::collections::HashSet;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-/// MEDIAN aggregate expression. This uses a lot of memory because all values 
need to be
-/// stored in memory before a result can be computed. If an approximation is 
sufficient
-/// then APPROX_MEDIAN provides a much more efficient solution.
+/// MEDIAN aggregate expression. If using the non-distinct variation, then 
this uses a
+/// lot of memory because all values need to be stored in memory before a 
result can be
+/// computed. If an approximation is sufficient then APPROX_MEDIAN provides a 
much more
+/// efficient solution.
+///
+/// If using the distinct variation, the memory usage will be similarly high 
if the
+/// cardinality is high as it stores all distinct values in memory before 
computing the
+/// result, but if cardinality is low then memory usage will also be lower.
 #[derive(Debug)]
 pub struct Median {
     name: String,
     expr: Arc<dyn PhysicalExpr>,
     data_type: DataType,
+    distinct: bool,
 }
 
 impl Median {
@@ -47,11 +54,13 @@ impl Median {
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         data_type: DataType,
+        distinct: bool,
     ) -> Self {
         Self {
             name: name.into(),
             expr,
             data_type,
+            distinct,
         }
     }
 }
@@ -70,10 +79,17 @@ impl AggregateExpr for Median {
         use arrow_array::types::*;
         macro_rules! helper {
             ($t:ty, $dt:expr) => {
-                Ok(Box::new(MedianAccumulator::<$t> {
-                    data_type: $dt.clone(),
-                    all_values: vec![],
-                }))
+                if self.distinct {
+                    Ok(Box::new(DistinctMedianAccumulator::<$t> {
+                        data_type: $dt.clone(),
+                        distinct_values: HashSet::new(),
+                    }))
+                } else {
+                    Ok(Box::new(MedianAccumulator::<$t> {
+                        data_type: $dt.clone(),
+                        all_values: vec![],
+                    }))
+                }
             };
         }
         let dt = &self.data_type;
@@ -96,9 +112,14 @@ impl AggregateExpr for Median {
         //Intermediate state is a list of the elements we have collected so far
         let field = Field::new("item", self.data_type.clone(), true);
         let data_type = DataType::List(Arc::new(field));
+        let state_name = if self.distinct {
+            "distinct_median"
+        } else {
+            "median"
+        };
 
         Ok(vec![Field::new(
-            format_state_name(&self.name, "median"),
+            format_state_name(&self.name, state_name),
             data_type,
             true,
         )])
@@ -121,6 +142,7 @@ impl PartialEq<dyn Any> for Median {
                 self.name == x.name
                     && self.data_type == x.data_type
                     && self.expr.eq(&x.expr)
+                    && self.distinct == x.distinct
             })
             .unwrap_or(false)
     }
@@ -172,21 +194,8 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 
     fn evaluate(&mut self) -> Result<ScalarValue> {
-        let mut d = std::mem::take(&mut self.all_values);
-        let cmp = |x: &T::Native, y: &T::Native| x.compare(*y);
-
-        let len = d.len();
-        let median = if len == 0 {
-            None
-        } else if len % 2 == 0 {
-            let (low, high, _) = d.select_nth_unstable_by(len / 2, cmp);
-            let (_, low, _) = low.select_nth_unstable_by(low.len() - 1, cmp);
-            let median = 
low.add_wrapping(*high).div_wrapping(T::Native::usize_as(2));
-            Some(median)
-        } else {
-            let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
-            Some(*median)
-        };
+        let d = std::mem::take(&mut self.all_values);
+        let median = calculate_median::<T>(d);
         ScalarValue::new_primitive::<T>(median, &self.data_type)
     }
 
@@ -196,12 +205,103 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// The distinct median accumulator accumulates the raw input values
+/// as `ScalarValue`s
+///
+/// The intermediate state is represented as a List of scalar values updated by
+/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
+/// in the final evaluation step so that we avoid expensive conversions and
+/// allocations during `update_batch`.
+struct DistinctMedianAccumulator<T: ArrowNumericType> {
+    data_type: DataType,
+    distinct_values: HashSet<Hashable<T::Native>>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for DistinctMedianAccumulator<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DistinctMedianAccumulator({})", self.data_type)
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for DistinctMedianAccumulator<T> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        let all_values = self
+            .distinct_values
+            .iter()
+            .map(|x| ScalarValue::new_primitive::<T>(Some(x.0), 
&self.data_type))
+            .collect::<Result<Vec<_>>>()?;
+
+        let arr = ScalarValue::new_list(&all_values, &self.data_type);
+        Ok(vec![ScalarValue::List(arr)])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let array = values[0].as_primitive::<T>();
+        match array.nulls().filter(|x| x.null_count() > 0) {
+            Some(n) => {
+                for idx in n.valid_indices() {
+                    self.distinct_values.insert(Hashable(array.value(idx)));
+                }
+            }
+            None => array.values().iter().for_each(|x| {
+                self.distinct_values.insert(Hashable(*x));
+            }),
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        let array = states[0].as_list::<i32>();
+        for v in array.iter().flatten() {
+            self.update_batch(&[v])?
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        let d = std::mem::take(&mut self.distinct_values)
+            .into_iter()
+            .map(|v| v.0)
+            .collect::<Vec<_>>();
+        let median = calculate_median::<T>(d);
+        ScalarValue::new_primitive::<T>(median, &self.data_type)
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+            + self.distinct_values.capacity() * 
std::mem::size_of::<T::Native>()
+    }
+}
+
+fn calculate_median<T: ArrowNumericType>(
+    mut values: Vec<T::Native>,
+) -> Option<T::Native> {
+    let cmp = |x: &T::Native, y: &T::Native| x.compare(*y);
+
+    let len = values.len();
+    if len == 0 {
+        None
+    } else if len % 2 == 0 {
+        let (low, high, _) = values.select_nth_unstable_by(len / 2, cmp);
+        let (_, low, _) = low.select_nth_unstable_by(low.len() - 1, cmp);
+        let median = 
low.add_wrapping(*high).div_wrapping(T::Native::usize_as(2));
+        Some(median)
+    } else {
+        let (_, median, _) = values.select_nth_unstable_by(len / 2, cmp);
+        Some(*median)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::expressions::col;
     use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
+    use crate::generic_test_distinct_op;
     use arrow::{array::*, datatypes::*};
 
     #[test]
@@ -214,10 +314,11 @@ mod tests {
                 .with_precision_and_scale(10, 4)?,
         );
 
-        generic_test_op!(
+        generic_test_distinct_op!(
             array,
             DataType::Decimal128(10, 4),
             Median,
+            false,
             ScalarValue::Decimal128(Some(3), 10, 4)
         )
     }
@@ -230,10 +331,11 @@ mod tests {
                 .collect::<Decimal128Array>()
                 .with_precision_and_scale(10, 4)?,
         );
-        generic_test_op!(
+        generic_test_distinct_op!(
             array,
             DataType::Decimal128(10, 4),
             Median,
+            false,
             ScalarValue::Decimal128(Some(3), 10, 4)
         )
     }
@@ -247,10 +349,11 @@ mod tests {
                 .collect::<Decimal128Array>()
                 .with_precision_and_scale(10, 4)?,
         );
-        generic_test_op!(
+        generic_test_distinct_op!(
             array,
             DataType::Decimal128(10, 4),
             Median,
+            false,
             ScalarValue::Decimal128(None, 10, 4)
         )
     }
@@ -258,13 +361,25 @@ mod tests {
     #[test]
     fn median_i32_odd() -> Result<()> {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        generic_test_op!(a, DataType::Int32, Median, ScalarValue::from(3_i32))
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            false,
+            ScalarValue::from(3_i32)
+        )
     }
 
     #[test]
     fn median_i32_even() -> Result<()> {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
-        generic_test_op!(a, DataType::Int32, Median, ScalarValue::from(3_i32))
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            false,
+            ScalarValue::from(3_i32)
+        )
     }
 
     #[test]
@@ -276,20 +391,38 @@ mod tests {
             Some(4),
             Some(5),
         ]));
-        generic_test_op!(a, DataType::Int32, Median, ScalarValue::from(3i32))
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            false,
+            ScalarValue::from(3i32)
+        )
     }
 
     #[test]
     fn median_i32_all_nulls() -> Result<()> {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        generic_test_op!(a, DataType::Int32, Median, ScalarValue::Int32(None))
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            false,
+            ScalarValue::Int32(None)
+        )
     }
 
     #[test]
     fn median_u32_odd() -> Result<()> {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 
5_u32]));
-        generic_test_op!(a, DataType::UInt32, Median, ScalarValue::from(3u32))
+        generic_test_distinct_op!(
+            a,
+            DataType::UInt32,
+            Median,
+            false,
+            ScalarValue::from(3u32)
+        )
     }
 
     #[test]
@@ -297,14 +430,26 @@ mod tests {
         let a: ArrayRef = Arc::new(UInt32Array::from(vec![
             1_u32, 2_u32, 3_u32, 4_u32, 5_u32, 6_u32,
         ]));
-        generic_test_op!(a, DataType::UInt32, Median, ScalarValue::from(3u32))
+        generic_test_distinct_op!(
+            a,
+            DataType::UInt32,
+            Median,
+            false,
+            ScalarValue::from(3u32)
+        )
     }
 
     #[test]
     fn median_f32_odd() -> Result<()> {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 
5_f32]));
-        generic_test_op!(a, DataType::Float32, Median, 
ScalarValue::from(3_f32))
+        generic_test_distinct_op!(
+            a,
+            DataType::Float32,
+            Median,
+            false,
+            ScalarValue::from(3_f32)
+        )
     }
 
     #[test]
@@ -312,14 +457,26 @@ mod tests {
         let a: ArrayRef = Arc::new(Float32Array::from(vec![
             1_f32, 2_f32, 3_f32, 4_f32, 5_f32, 6_f32,
         ]));
-        generic_test_op!(a, DataType::Float32, Median, 
ScalarValue::from(3.5_f32))
+        generic_test_distinct_op!(
+            a,
+            DataType::Float32,
+            Median,
+            false,
+            ScalarValue::from(3.5_f32)
+        )
     }
 
     #[test]
     fn median_f64_odd() -> Result<()> {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        generic_test_op!(a, DataType::Float64, Median, 
ScalarValue::from(3_f64))
+        generic_test_distinct_op!(
+            a,
+            DataType::Float64,
+            Median,
+            false,
+            ScalarValue::from(3_f64)
+        )
     }
 
     #[test]
@@ -327,6 +484,198 @@ mod tests {
         let a: ArrayRef = Arc::new(Float64Array::from(vec![
             1_f64, 2_f64, 3_f64, 4_f64, 5_f64, 6_f64,
         ]));
-        generic_test_op!(a, DataType::Float64, Median, 
ScalarValue::from(3.5_f64))
+        generic_test_distinct_op!(
+            a,
+            DataType::Float64,
+            Median,
+            false,
+            ScalarValue::from(3.5_f64)
+        )
+    }
+
+    #[test]
+    fn distinct_median_decimal() -> Result<()> {
+        let array: ArrayRef = Arc::new(
+            vec![1, 1, 1, 1, 2, 3, 1, 1, 3]
+                .into_iter()
+                .map(Some)
+                .collect::<Decimal128Array>()
+                .with_precision_and_scale(10, 4)?,
+        );
+
+        generic_test_distinct_op!(
+            array,
+            DataType::Decimal128(10, 4),
+            Median,
+            true,
+            ScalarValue::Decimal128(Some(2), 10, 4)
+        )
+    }
+
+    #[test]
+    fn distinct_median_decimal_with_nulls() -> Result<()> {
+        let array: ArrayRef = Arc::new(
+            vec![Some(3), Some(1), None, Some(3), Some(2), Some(3), Some(3)]
+                .into_iter()
+                .collect::<Decimal128Array>()
+                .with_precision_and_scale(10, 4)?,
+        );
+        generic_test_distinct_op!(
+            array,
+            DataType::Decimal128(10, 4),
+            Median,
+            true,
+            ScalarValue::Decimal128(Some(2), 10, 4)
+        )
+    }
+
+    #[test]
+    fn distinct_median_decimal_all_nulls() -> Result<()> {
+        let array: ArrayRef = Arc::new(
+            std::iter::repeat::<Option<i128>>(None)
+                .take(6)
+                .collect::<Decimal128Array>()
+                .with_precision_and_scale(10, 4)?,
+        );
+        generic_test_distinct_op!(
+            array,
+            DataType::Decimal128(10, 4),
+            Median,
+            true,
+            ScalarValue::Decimal128(None, 10, 4)
+        )
+    }
+
+    #[test]
+    fn distinct_median_i32_odd() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![2, 1, 1, 2, 1, 3]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            true,
+            ScalarValue::from(2_i32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_i32_even() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 3, 1, 1]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            true,
+            ScalarValue::from(2_i32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_i32_with_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1),
+            None,
+            Some(1),
+            Some(1),
+            Some(3),
+        ]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            true,
+            ScalarValue::from(2i32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_i32_all_nulls() -> Result<()> {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Int32,
+            Median,
+            true,
+            ScalarValue::Int32(None)
+        )
+    }
+
+    #[test]
+    fn distinct_median_u32_odd() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(UInt32Array::from(vec![1_u32, 1_u32, 2_u32, 1_u32, 
3_u32]));
+        generic_test_distinct_op!(
+            a,
+            DataType::UInt32,
+            Median,
+            true,
+            ScalarValue::from(2u32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_u32_even() -> Result<()> {
+        let a: ArrayRef = Arc::new(UInt32Array::from(vec![
+            1_u32, 1_u32, 1_u32, 1_u32, 3_u32, 3_u32,
+        ]));
+        generic_test_distinct_op!(
+            a,
+            DataType::UInt32,
+            Median,
+            true,
+            ScalarValue::from(2u32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_f32_odd() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Float32Array::from(vec![3_f32, 2_f32, 1_f32, 1_f32, 
1_f32]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Float32,
+            Median,
+            true,
+            ScalarValue::from(2_f32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_f32_even() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Float32Array::from(vec![1_f32, 1_f32, 1_f32, 1_f32, 
2_f32]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Float32,
+            Median,
+            true,
+            ScalarValue::from(1.5_f32)
+        )
+    }
+
+    #[test]
+    fn distinct_median_f64_odd() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Float64Array::from(vec![1_f64, 1_f64, 1_f64, 2_f64, 
3_f64]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Float64,
+            Median,
+            true,
+            ScalarValue::from(2_f64)
+        )
+    }
+
+    #[test]
+    fn distinct_median_f64_even() -> Result<()> {
+        let a: ArrayRef =
+            Arc::new(Float64Array::from(vec![1_f64, 1_f64, 1_f64, 1_f64, 
2_f64]));
+        generic_test_distinct_op!(
+            a,
+            DataType::Float64,
+            Median,
+            true,
+            ScalarValue::from(1.5_f64)
+        )
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 688d5ce6ea..55ebd9ed8c 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -134,6 +134,40 @@ pub(crate) mod tests {
         }};
     }
 
+    /// Same as [`generic_test_op`] but with support for providing a 4th 
argument, usually
+    /// a boolean to indicate if using the distinct version of the op.
+    #[macro_export]
+    macro_rules! generic_test_distinct_op {
+        ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $DISTINCT:expr, 
$EXPECTED:expr) => {
+            generic_test_distinct_op!(
+                $ARRAY,
+                $DATATYPE,
+                $OP,
+                $DISTINCT,
+                $EXPECTED,
+                $EXPECTED.data_type()
+            )
+        };
+        ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $DISTINCT:expr, 
$EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{
+            let schema = Schema::new(vec![Field::new("a", $DATATYPE, true)]);
+
+            let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![$ARRAY])?;
+
+            let agg = Arc::new(<$OP>::new(
+                col("a", &schema)?,
+                "bla".to_string(),
+                $EXPECTED_DATATYPE,
+                $DISTINCT,
+            ));
+            let actual = aggregate(&batch, agg)?;
+            let expected = ScalarValue::from($EXPECTED);
+
+            assert_eq!(expected, actual);
+
+            Ok(()) as Result<(), ::datafusion_common::DataFusionError>
+        }};
+    }
+
     /// macro to perform an aggregation using [`crate::GroupsAccumulator`] and 
verify the result.
     ///
     /// The difference between this and the above `generic_test_op` is that 
the former checks
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 14485c8337..25f5508365 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1800,6 +1800,7 @@ mod tests {
             col("a", &input_schema)?,
             "MEDIAN(a)".to_string(),
             DataType::UInt32,
+            false,
         ))];
 
         // use slow-path in `hash.rs`
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 8b5b84e766..1c10d2c0e5 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -472,8 +472,10 @@ SELECT median(distinct col_i8) FROM median_table
 ----
 100
 
-statement error DataFusion error: This feature is not implemented: 
MEDIAN\(DISTINCT\) aggregations are not available
+query II
 SELECT median(col_i8), median(distinct col_i8) FROM median_table
+----
+-14 100
 
 # approx_distinct_median_i8
 query I


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to