This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e8fdc09c6e Convert `VariancePopulation` to UDAF (#10836)
e8fdc09c6e is described below

commit e8fdc09c6e05be5803a136e2b828f90175fc645c
Author: Matt Nawara <[email protected]>
AuthorDate: Sun Jun 9 22:30:23 2024 -0400

    Convert `VariancePopulation` to UDAF (#10836)
---
 datafusion/expr/src/aggregate_function.rs          |  11 +-
 datafusion/expr/src/type_coercion/aggregates.rs    |  10 --
 datafusion/functions-aggregate/src/lib.rs          |   2 +
 datafusion/functions-aggregate/src/variance.rs     |  85 +++++++++-
 datafusion/physical-expr/src/aggregate/build_in.rs |  45 -----
 datafusion/physical-expr/src/aggregate/variance.rs | 184 +--------------------
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/proto/proto/datafusion.proto            |   2 +-
 datafusion/proto/src/generated/pbjson.rs           |   3 -
 datafusion/proto/src/generated/prost.rs            |   4 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   1 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 -
 datafusion/proto/src/physical_plan/to_proto.rs     |   5 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   4 +-
 datafusion/sqllogictest/test_files/aggregate.slt   |   9 +
 15 files changed, 105 insertions(+), 265 deletions(-)

diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index edefd0f3ed..9e4f7a50ac 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,8 +47,6 @@ pub enum AggregateFunction {
     ArrayAgg,
     /// N'th value in a group according to some ordering
     NthValue,
-    /// Variance (Population)
-    VariancePop,
     /// Correlation
     Correlation,
     /// Slope from linear regression
@@ -102,7 +100,6 @@ impl AggregateFunction {
             ApproxDistinct => "APPROX_DISTINCT",
             ArrayAgg => "ARRAY_AGG",
             NthValue => "NTH_VALUE",
-            VariancePop => "VAR_POP",
             Correlation => "CORR",
             RegrSlope => "REGR_SLOPE",
             RegrIntercept => "REGR_INTERCEPT",
@@ -153,7 +150,6 @@ impl FromStr for AggregateFunction {
             "string_agg" => AggregateFunction::StringAgg,
             // statistical
             "corr" => AggregateFunction::Correlation,
-            "var_pop" => AggregateFunction::VariancePop,
             "regr_slope" => AggregateFunction::RegrSlope,
             "regr_intercept" => AggregateFunction::RegrIntercept,
             "regr_count" => AggregateFunction::RegrCount,
@@ -216,9 +212,6 @@ impl AggregateFunction {
             AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
                 Ok(DataType::Boolean)
             }
-            AggregateFunction::VariancePop => {
-                variance_return_type(&coerced_data_types[0])
-            }
             AggregateFunction::Correlation => {
                 correlation_return_type(&coerced_data_types[0])
             }
@@ -291,9 +284,7 @@ impl AggregateFunction {
             AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
                 Signature::uniform(1, vec![DataType::Boolean], 
Volatility::Immutable)
             }
-            AggregateFunction::Avg
-            | AggregateFunction::VariancePop
-            | AggregateFunction::ApproxMedian => {
+            AggregateFunction::Avg | AggregateFunction::ApproxMedian => {
                 Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
             }
             AggregateFunction::NthValue => Signature::any(2, 
Volatility::Immutable),
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index 3ac6fa41d9..4b4d526532 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -151,16 +151,6 @@ pub fn coerce_types(
             }
             Ok(input_types.to_vec())
         }
-        AggregateFunction::VariancePop => {
-            if !is_variance_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(vec![Float64, Float64])
-        }
         AggregateFunction::Correlation => {
             if !is_correlation_support_arg_type(&input_types[0]) {
                 return plan_err!(
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index 2f58b9afac..b8a2e7032a 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -78,6 +78,7 @@ pub mod expr_fn {
     pub use super::stddev::stddev;
     pub use super::stddev::stddev_pop;
     pub use super::sum::sum;
+    pub use super::variance::var_pop;
     pub use super::variance::var_sample;
 }
 
@@ -91,6 +92,7 @@ pub fn all_default_aggregate_functions() -> 
Vec<Arc<AggregateUDF>> {
         covariance::covar_pop_udaf(),
         median::median_udaf(),
         variance::var_samp_udaf(),
+        variance::var_pop_udaf(),
         stddev::stddev_udaf(),
         stddev::stddev_pop_udaf(),
     ]
diff --git a/datafusion/functions-aggregate/src/variance.rs 
b/datafusion/functions-aggregate/src/variance.rs
index b5d467d0e7..b9b11c186f 100644
--- a/datafusion/functions-aggregate/src/variance.rs
+++ b/datafusion/functions-aggregate/src/variance.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`VarianceSample`]: covariance sample aggregations.
+//! [`VarianceSample`]: variance sample aggregations.
+//! [`VariancePopulation`]: variance population aggregations.
 
 use std::fmt::Debug;
 
@@ -43,6 +44,14 @@ make_udaf_expr_and_func!(
     var_samp_udaf
 );
 
+make_udaf_expr_and_func!(
+    VariancePopulation,
+    var_pop,
+    expression,
+    "Computes the population variance.",
+    var_pop_udaf
+);
+
 pub struct VarianceSample {
     signature: Signature,
     aliases: Vec<String>,
@@ -115,6 +124,80 @@ impl AggregateUDFImpl for VarianceSample {
     }
 }
 
+pub struct VariancePopulation {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Debug for VariancePopulation {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("VariancePopulation")
+            .field("name", &self.name())
+            .field("signature", &self.signature)
+            .finish()
+    }
+}
+
+impl Default for VariancePopulation {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl VariancePopulation {
+    pub fn new() -> Self {
+        Self {
+            aliases: vec![String::from("var_population")],
+            signature: Signature::numeric(1, Volatility::Immutable),
+        }
+    }
+}
+
+impl AggregateUDFImpl for VariancePopulation {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "var_pop"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if !arg_types[0].is_numeric() {
+            return plan_err!("Variance requires numeric input types");
+        }
+
+        Ok(DataType::Float64)
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        let name = args.name;
+        Ok(vec![
+            Field::new(format_state_name(name, "count"), DataType::UInt64, 
true),
+            Field::new(format_state_name(name, "mean"), DataType::Float64, 
true),
+            Field::new(format_state_name(name, "m2"), DataType::Float64, true),
+        ])
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        if acc_args.is_distinct {
+            return not_impl_err!("VAR_POP(DISTINCT) aggregations are not 
available");
+        }
+
+        Ok(Box::new(VarianceAccumulator::try_new(
+            StatsType::Population,
+        )?))
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
 /// An accumulator to compute variance
 /// The algrithm used is an online implementation and numerically stable. It 
is based on this paper:
 /// Welford, B. P. (1962). "Note on a method for calculating corrected sums of 
squares and products".
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index d92d0cd61c..f0cff53fb3 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -157,12 +157,6 @@ pub fn create_aggregate_expr(
         (AggregateFunction::Avg, true) => {
             return not_impl_err!("AVG(DISTINCT) aggregations are not 
available");
         }
-        (AggregateFunction::VariancePop, false) => Arc::new(
-            expressions::VariancePop::new(input_phy_exprs[0].clone(), name, 
data_type),
-        ),
-        (AggregateFunction::VariancePop, true) => {
-            return not_impl_err!("VAR_POP(DISTINCT) aggregations are not 
available");
-        }
         (AggregateFunction::Correlation, false) => {
             Arc::new(expressions::Correlation::new(
                 input_phy_exprs[0].clone(),
@@ -340,7 +334,6 @@ pub fn create_aggregate_expr(
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::{DataType, Field};
-    use expressions::VariancePop;
 
     use super::*;
     use crate::expressions::{
@@ -693,44 +686,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_var_pop_expr() -> Result<()> {
-        let funcs = vec![AggregateFunction::VariancePop];
-        let data_types = vec![
-            DataType::UInt32,
-            DataType::UInt64,
-            DataType::Int32,
-            DataType::Int64,
-            DataType::Float32,
-            DataType::Float64,
-        ];
-        for fun in funcs {
-            for data_type in &data_types {
-                let input_schema =
-                    Schema::new(vec![Field::new("c1", data_type.clone(), 
true)]);
-                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = 
vec![Arc::new(
-                    expressions::Column::new_with_schema("c1", 
&input_schema).unwrap(),
-                )];
-                let result_agg_phy_exprs = create_physical_agg_expr_for_test(
-                    &fun,
-                    false,
-                    &input_phy_exprs[0..1],
-                    &input_schema,
-                    "c1",
-                )?;
-                if fun == AggregateFunction::VariancePop {
-                    assert!(result_agg_phy_exprs.as_any().is::<VariancePop>());
-                    assert_eq!("c1", result_agg_phy_exprs.name());
-                    assert_eq!(
-                        Field::new("c1", DataType::Float64, true),
-                        result_agg_phy_exprs.field().unwrap()
-                    )
-                }
-            }
-        }
-        Ok(())
-    }
-
     #[test]
     fn test_median_expr() -> Result<()> {
         let funcs = vec![AggregateFunction::ApproxMedian];
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs 
b/datafusion/physical-expr/src/aggregate/variance.rs
index 3db3c0e3ae..27c67a2f9c 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -17,102 +17,20 @@
 
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
-use std::any::Any;
-use std::sync::Arc;
-
 use crate::aggregate::stats::StatsType;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::Float64Array;
 use arrow::{
     array::{ArrayRef, UInt64Array},
     compute::cast,
     datatypes::DataType,
-    datatypes::Field,
 };
 use datafusion_common::downcast_value;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
 
-/// VAR_POP aggregate expression
-#[derive(Debug)]
-pub struct VariancePop {
-    name: String,
-    expr: Arc<dyn PhysicalExpr>,
-}
-
-impl VariancePop {
-    /// Create a new VAR_POP aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        // the result of variance just support FLOAT64 data type.
-        assert!(matches!(data_type, DataType::Float64));
-        Self {
-            name: name.into(),
-            expr,
-        }
-    }
-}
-
-impl AggregateExpr for VariancePop {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, DataType::Float64, true))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(VarianceAccumulator::try_new(
-            StatsType::Population,
-        )?))
-    }
-
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(VarianceAccumulator::try_new(
-            StatsType::Population,
-        )?))
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![
-            Field::new(
-                format_state_name(&self.name, "count"),
-                DataType::UInt64,
-                true,
-            ),
-            Field::new(
-                format_state_name(&self.name, "mean"),
-                DataType::Float64,
-                true,
-            ),
-            Field::new(format_state_name(&self.name, "m2"), DataType::Float64, 
true),
-        ])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-}
-
-impl PartialEq<dyn Any> for VariancePop {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| self.name == x.name && self.expr.eq(&x.expr))
-            .unwrap_or(false)
-    }
-}
+// TODO only holds the definition of `VarianceAccumulator` for use by 
`StddevAccumulator` in `physical-expr`,
+// which in turn only has it there for legacy `CorrelationAccumulator`, but 
this whole file should go away
+// once the latter is moved to `functions-aggregate`.
 
 /// An accumulator to compute variance
 /// The algrithm used is an online implementation and numerically stable. It 
is based on this paper:
@@ -256,99 +174,3 @@ impl Accumulator for VarianceAccumulator {
         std::mem::size_of_val(self)
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::aggregate::utils::get_accum_scalar_values_as_arrays;
-    use crate::expressions::col;
-    use arrow::{array::*, datatypes::*};
-
-    #[test]
-    fn variance_f64_merge_1() -> Result<()> {
-        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
-        let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
-
-        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
false)]);
-
-        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
-        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
-
-        let agg1 = Arc::new(VariancePop::new(
-            col("a", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let agg2 = Arc::new(VariancePop::new(
-            col("a", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let actual = merge(&batch1, &batch2, agg1, agg2)?;
-        assert!(actual == ScalarValue::from(2_f64));
-
-        Ok(())
-    }
-
-    #[test]
-    fn variance_f64_merge_2() -> Result<()> {
-        let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        let b = Arc::new(Float64Array::from(vec![None]));
-
-        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
true)]);
-
-        let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
-        let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
-
-        let agg1 = Arc::new(VariancePop::new(
-            col("a", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let agg2 = Arc::new(VariancePop::new(
-            col("a", &schema)?,
-            "bla".to_string(),
-            DataType::Float64,
-        ));
-
-        let actual = merge(&batch1, &batch2, agg1, agg2)?;
-        assert!(actual == ScalarValue::from(2_f64));
-
-        Ok(())
-    }
-
-    fn merge(
-        batch1: &RecordBatch,
-        batch2: &RecordBatch,
-        agg1: Arc<dyn AggregateExpr>,
-        agg2: Arc<dyn AggregateExpr>,
-    ) -> Result<ScalarValue> {
-        let mut accum1 = agg1.create_accumulator()?;
-        let mut accum2 = agg2.create_accumulator()?;
-        let expr1 = agg1.expressions();
-        let expr2 = agg2.expressions();
-
-        let values1 = expr1
-            .iter()
-            .map(|e| {
-                e.evaluate(batch1)
-                    .and_then(|v| v.into_array(batch1.num_rows()))
-            })
-            .collect::<Result<Vec<_>>>()?;
-        let values2 = expr2
-            .iter()
-            .map(|e| {
-                e.evaluate(batch2)
-                    .and_then(|v| v.into_array(batch2.num_rows()))
-            })
-            .collect::<Result<Vec<_>>>()?;
-        accum1.update_batch(&values1)?;
-        accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
-        accum1.merge_batch(&state2)?;
-        accum1.evaluate()
-    }
-}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index a6133eeb25..476cbe3907 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -57,7 +57,6 @@ pub use crate::aggregate::nth_value::NthValueAgg;
 pub use crate::aggregate::regr::{Regr, RegrType};
 pub use crate::aggregate::stats::StatsType;
 pub use crate::aggregate::string_agg::StringAgg;
-pub use crate::aggregate::variance::VariancePop;
 pub use crate::window::cume_dist::{cume_dist, CumeDist};
 pub use crate::window::lead_lag::{lag, lead, WindowShift};
 pub use crate::window::nth_value::NthValue;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 5aaae07f4d..0071a43bbe 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -480,7 +480,7 @@ enum AggregateFunction {
   APPROX_DISTINCT = 5;
   ARRAY_AGG = 6;
   // VARIANCE = 7;
-  VARIANCE_POP = 8;
+  // VARIANCE_POP = 8;
   // COVARIANCE = 9;
   // COVARIANCE_POP = 10;
   // STDDEV = 11;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index cd754e4d9f..e6aded8901 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,7 +538,6 @@ impl serde::Serialize for AggregateFunction {
             Self::Count => "COUNT",
             Self::ApproxDistinct => "APPROX_DISTINCT",
             Self::ArrayAgg => "ARRAY_AGG",
-            Self::VariancePop => "VARIANCE_POP",
             Self::Correlation => "CORRELATION",
             Self::ApproxPercentileCont => "APPROX_PERCENTILE_CONT",
             Self::ApproxMedian => "APPROX_MEDIAN",
@@ -577,7 +576,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "COUNT",
             "APPROX_DISTINCT",
             "ARRAY_AGG",
-            "VARIANCE_POP",
             "CORRELATION",
             "APPROX_PERCENTILE_CONT",
             "APPROX_MEDIAN",
@@ -645,7 +643,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "COUNT" => Ok(AggregateFunction::Count),
                     "APPROX_DISTINCT" => Ok(AggregateFunction::ApproxDistinct),
                     "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
-                    "VARIANCE_POP" => Ok(AggregateFunction::VariancePop),
                     "CORRELATION" => Ok(AggregateFunction::Correlation),
                     "APPROX_PERCENTILE_CONT" => 
Ok(AggregateFunction::ApproxPercentileCont),
                     "APPROX_MEDIAN" => Ok(AggregateFunction::ApproxMedian),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 1b38168ba1..7ec9187491 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1924,7 +1924,7 @@ pub enum AggregateFunction {
     ApproxDistinct = 5,
     ArrayAgg = 6,
     /// VARIANCE = 7;
-    VariancePop = 8,
+    /// VARIANCE_POP = 8;
     /// COVARIANCE = 9;
     /// COVARIANCE_POP = 10;
     /// STDDEV = 11;
@@ -1965,7 +1965,6 @@ impl AggregateFunction {
             AggregateFunction::Count => "COUNT",
             AggregateFunction::ApproxDistinct => "APPROX_DISTINCT",
             AggregateFunction::ArrayAgg => "ARRAY_AGG",
-            AggregateFunction::VariancePop => "VARIANCE_POP",
             AggregateFunction::Correlation => "CORRELATION",
             AggregateFunction::ApproxPercentileCont => 
"APPROX_PERCENTILE_CONT",
             AggregateFunction::ApproxMedian => "APPROX_MEDIAN",
@@ -2000,7 +1999,6 @@ impl AggregateFunction {
             "COUNT" => Some(Self::Count),
             "APPROX_DISTINCT" => Some(Self::ApproxDistinct),
             "ARRAY_AGG" => Some(Self::ArrayAgg),
-            "VARIANCE_POP" => Some(Self::VariancePop),
             "CORRELATION" => Some(Self::Correlation),
             "APPROX_PERCENTILE_CONT" => Some(Self::ApproxPercentileCont),
             "APPROX_MEDIAN" => Some(Self::ApproxMedian),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 0ebf3f7117..a77d361983 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -148,7 +148,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::Count => Self::Count,
             protobuf::AggregateFunction::ApproxDistinct => 
Self::ApproxDistinct,
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
-            protobuf::AggregateFunction::VariancePop => Self::VariancePop,
             protobuf::AggregateFunction::Correlation => Self::Correlation,
             protobuf::AggregateFunction::RegrSlope => Self::RegrSlope,
             protobuf::AggregateFunction::RegrIntercept => Self::RegrIntercept,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 490ef9a4f2..9c4c7685b3 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,7 +119,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::Count => Self::Count,
             AggregateFunction::ApproxDistinct => Self::ApproxDistinct,
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
-            AggregateFunction::VariancePop => Self::VariancePop,
             AggregateFunction::Correlation => Self::Correlation,
             AggregateFunction::RegrSlope => Self::RegrSlope,
             AggregateFunction::RegrIntercept => Self::RegrIntercept,
@@ -413,9 +412,6 @@ pub fn serialize_expr(
                     AggregateFunction::BoolOr => 
protobuf::AggregateFunction::BoolOr,
                     AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
                     AggregateFunction::Count => 
protobuf::AggregateFunction::Count,
-                    AggregateFunction::VariancePop => {
-                        protobuf::AggregateFunction::VariancePop
-                    }
                     AggregateFunction::Correlation => {
                         protobuf::AggregateFunction::Correlation
                     }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 66405d4b9a..5d07d5c0fa 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -28,8 +28,7 @@ use datafusion::physical_plan::expressions::{
     CastExpr, Column, Correlation, Count, CumeDist, DistinctArrayAgg, 
DistinctBitXor,
     DistinctCount, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, 
Max, Min,
     NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, 
OrderSensitiveArrayAgg, Rank,
-    RankType, Regr, RegrType, RowNumber, StringAgg, TryCastExpr, VariancePop,
-    WindowShift,
+    RankType, Regr, RegrType, RowNumber, StringAgg, TryCastExpr, WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -276,8 +275,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
         protobuf::AggregateFunction::Max
     } else if aggr_expr.downcast_ref::<Avg>().is_some() {
         protobuf::AggregateFunction::Avg
-    } else if aggr_expr.downcast_ref::<VariancePop>().is_some() {
-        protobuf::AggregateFunction::VariancePop
     } else if aggr_expr.downcast_ref::<Correlation>().is_some() {
         protobuf::AggregateFunction::Correlation
     } else if let Some(regr_expr) = aggr_expr.downcast_ref::<Regr>() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a6889633d2..b1cad69b14 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -34,7 +34,8 @@ use datafusion::execution::context::SessionState;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::execution::FunctionRegistry;
 use datafusion::functions_aggregate::expr_fn::{
-    covar_pop, covar_samp, first_value, median, stddev, stddev_pop, sum, 
var_sample,
+    covar_pop, covar_samp, first_value, median, stddev, stddev_pop, sum, 
var_pop,
+    var_sample,
 };
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -654,6 +655,7 @@ async fn roundtrip_expr_api() -> Result<()> {
         sum(lit(1)),
         median(lit(2)),
         var_sample(lit(2.2)),
+        var_pop(lit(2.2)),
         stddev(lit(2.2)),
         stddev_pop(lit(2.2)),
     ];
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 0233f46d08..9958f8ac38 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -449,6 +449,15 @@ SELECT var(distinct c2) FROM aggregate_test_100
 statement error DataFusion error: This feature is not implemented: 
VAR\(DISTINCT\) aggregations are not available
 SELECT var(c2), var(distinct c2) FROM aggregate_test_100
 
+# csv_query_distinct_variance_population
+query R
+SELECT var_pop(distinct c2) FROM aggregate_test_100
+----
+2
+
+statement error DataFusion error: This feature is not implemented: 
VAR_POP\(DISTINCT\) aggregations are not available
+SELECT var_pop(c2), var_pop(distinct c2) FROM aggregate_test_100
+
 # csv_query_variance_5
 query R
 SELECT var_samp(c2) FROM aggregate_test_100


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to