This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e8fdc09c6e Convert `VariancePopulation` to UDAF (#10836)
e8fdc09c6e is described below
commit e8fdc09c6e05be5803a136e2b828f90175fc645c
Author: Matt Nawara <[email protected]>
AuthorDate: Sun Jun 9 22:30:23 2024 -0400
Convert `VariancePopulation` to UDAF (#10836)
---
datafusion/expr/src/aggregate_function.rs | 11 +-
datafusion/expr/src/type_coercion/aggregates.rs | 10 --
datafusion/functions-aggregate/src/lib.rs | 2 +
datafusion/functions-aggregate/src/variance.rs | 85 +++++++++-
datafusion/physical-expr/src/aggregate/build_in.rs | 45 -----
datafusion/physical-expr/src/aggregate/variance.rs | 184 +--------------------
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/generated/pbjson.rs | 3 -
datafusion/proto/src/generated/prost.rs | 4 +-
datafusion/proto/src/logical_plan/from_proto.rs | 1 -
datafusion/proto/src/logical_plan/to_proto.rs | 4 -
datafusion/proto/src/physical_plan/to_proto.rs | 5 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 4 +-
datafusion/sqllogictest/test_files/aggregate.slt | 9 +
15 files changed, 105 insertions(+), 265 deletions(-)
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index edefd0f3ed..9e4f7a50ac 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,8 +47,6 @@ pub enum AggregateFunction {
ArrayAgg,
/// N'th value in a group according to some ordering
NthValue,
- /// Variance (Population)
- VariancePop,
/// Correlation
Correlation,
/// Slope from linear regression
@@ -102,7 +100,6 @@ impl AggregateFunction {
ApproxDistinct => "APPROX_DISTINCT",
ArrayAgg => "ARRAY_AGG",
NthValue => "NTH_VALUE",
- VariancePop => "VAR_POP",
Correlation => "CORR",
RegrSlope => "REGR_SLOPE",
RegrIntercept => "REGR_INTERCEPT",
@@ -153,7 +150,6 @@ impl FromStr for AggregateFunction {
"string_agg" => AggregateFunction::StringAgg,
// statistical
"corr" => AggregateFunction::Correlation,
- "var_pop" => AggregateFunction::VariancePop,
"regr_slope" => AggregateFunction::RegrSlope,
"regr_intercept" => AggregateFunction::RegrIntercept,
"regr_count" => AggregateFunction::RegrCount,
@@ -216,9 +212,6 @@ impl AggregateFunction {
AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
Ok(DataType::Boolean)
}
- AggregateFunction::VariancePop => {
- variance_return_type(&coerced_data_types[0])
- }
AggregateFunction::Correlation => {
correlation_return_type(&coerced_data_types[0])
}
@@ -291,9 +284,7 @@ impl AggregateFunction {
AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
Signature::uniform(1, vec![DataType::Boolean],
Volatility::Immutable)
}
- AggregateFunction::Avg
- | AggregateFunction::VariancePop
- | AggregateFunction::ApproxMedian => {
+ AggregateFunction::Avg | AggregateFunction::ApproxMedian => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
AggregateFunction::NthValue => Signature::any(2,
Volatility::Immutable),
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs
b/datafusion/expr/src/type_coercion/aggregates.rs
index 3ac6fa41d9..4b4d526532 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -151,16 +151,6 @@ pub fn coerce_types(
}
Ok(input_types.to_vec())
}
- AggregateFunction::VariancePop => {
- if !is_variance_support_arg_type(&input_types[0]) {
- return plan_err!(
- "The function {:?} does not support inputs of type {:?}.",
- agg_fun,
- input_types[0]
- );
- }
- Ok(vec![Float64, Float64])
- }
AggregateFunction::Correlation => {
if !is_correlation_support_arg_type(&input_types[0]) {
return plan_err!(
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index 2f58b9afac..b8a2e7032a 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -78,6 +78,7 @@ pub mod expr_fn {
pub use super::stddev::stddev;
pub use super::stddev::stddev_pop;
pub use super::sum::sum;
+ pub use super::variance::var_pop;
pub use super::variance::var_sample;
}
@@ -91,6 +92,7 @@ pub fn all_default_aggregate_functions() ->
Vec<Arc<AggregateUDF>> {
covariance::covar_pop_udaf(),
median::median_udaf(),
variance::var_samp_udaf(),
+ variance::var_pop_udaf(),
stddev::stddev_udaf(),
stddev::stddev_pop_udaf(),
]
diff --git a/datafusion/functions-aggregate/src/variance.rs
b/datafusion/functions-aggregate/src/variance.rs
index b5d467d0e7..b9b11c186f 100644
--- a/datafusion/functions-aggregate/src/variance.rs
+++ b/datafusion/functions-aggregate/src/variance.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-//! [`VarianceSample`]: covariance sample aggregations.
+//! [`VarianceSample`]: variance sample aggregations.
+//! [`VariancePopulation`]: variance population aggregations.
use std::fmt::Debug;
@@ -43,6 +44,14 @@ make_udaf_expr_and_func!(
var_samp_udaf
);
+make_udaf_expr_and_func!(
+ VariancePopulation,
+ var_pop,
+ expression,
+ "Computes the population variance.",
+ var_pop_udaf
+);
+
pub struct VarianceSample {
signature: Signature,
aliases: Vec<String>,
@@ -115,6 +124,80 @@ impl AggregateUDFImpl for VarianceSample {
}
}
+pub struct VariancePopulation {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl Debug for VariancePopulation {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("VariancePopulation")
+ .field("name", &self.name())
+ .field("signature", &self.signature)
+ .finish()
+ }
+}
+
+impl Default for VariancePopulation {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl VariancePopulation {
+ pub fn new() -> Self {
+ Self {
+ aliases: vec![String::from("var_population")],
+ signature: Signature::numeric(1, Volatility::Immutable),
+ }
+ }
+}
+
+impl AggregateUDFImpl for VariancePopulation {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "var_pop"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ if !arg_types[0].is_numeric() {
+ return plan_err!("Variance requires numeric input types");
+ }
+
+ Ok(DataType::Float64)
+ }
+
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+ let name = args.name;
+ Ok(vec![
+ Field::new(format_state_name(name, "count"), DataType::UInt64,
true),
+ Field::new(format_state_name(name, "mean"), DataType::Float64,
true),
+ Field::new(format_state_name(name, "m2"), DataType::Float64, true),
+ ])
+ }
+
+ fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
+ if acc_args.is_distinct {
+ return not_impl_err!("VAR_POP(DISTINCT) aggregations are not
available");
+ }
+
+ Ok(Box::new(VarianceAccumulator::try_new(
+ StatsType::Population,
+ )?))
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
/// An accumulator to compute variance
/// The algrithm used is an online implementation and numerically stable. It
is based on this paper:
/// Welford, B. P. (1962). "Note on a method for calculating corrected sums of
squares and products".
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs
b/datafusion/physical-expr/src/aggregate/build_in.rs
index d92d0cd61c..f0cff53fb3 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -157,12 +157,6 @@ pub fn create_aggregate_expr(
(AggregateFunction::Avg, true) => {
return not_impl_err!("AVG(DISTINCT) aggregations are not
available");
}
- (AggregateFunction::VariancePop, false) => Arc::new(
- expressions::VariancePop::new(input_phy_exprs[0].clone(), name,
data_type),
- ),
- (AggregateFunction::VariancePop, true) => {
- return not_impl_err!("VAR_POP(DISTINCT) aggregations are not
available");
- }
(AggregateFunction::Correlation, false) => {
Arc::new(expressions::Correlation::new(
input_phy_exprs[0].clone(),
@@ -340,7 +334,6 @@ pub fn create_aggregate_expr(
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
- use expressions::VariancePop;
use super::*;
use crate::expressions::{
@@ -693,44 +686,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_var_pop_expr() -> Result<()> {
- let funcs = vec![AggregateFunction::VariancePop];
- let data_types = vec![
- DataType::UInt32,
- DataType::UInt64,
- DataType::Int32,
- DataType::Int64,
- DataType::Float32,
- DataType::Float64,
- ];
- for fun in funcs {
- for data_type in &data_types {
- let input_schema =
- Schema::new(vec![Field::new("c1", data_type.clone(),
true)]);
- let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> =
vec![Arc::new(
- expressions::Column::new_with_schema("c1",
&input_schema).unwrap(),
- )];
- let result_agg_phy_exprs = create_physical_agg_expr_for_test(
- &fun,
- false,
- &input_phy_exprs[0..1],
- &input_schema,
- "c1",
- )?;
- if fun == AggregateFunction::VariancePop {
- assert!(result_agg_phy_exprs.as_any().is::<VariancePop>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", DataType::Float64, true),
- result_agg_phy_exprs.field().unwrap()
- )
- }
- }
- }
- Ok(())
- }
-
#[test]
fn test_median_expr() -> Result<()> {
let funcs = vec![AggregateFunction::ApproxMedian];
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs
b/datafusion/physical-expr/src/aggregate/variance.rs
index 3db3c0e3ae..27c67a2f9c 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -17,102 +17,20 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
-use std::any::Any;
-use std::sync::Arc;
-
use crate::aggregate::stats::StatsType;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::Float64Array;
use arrow::{
array::{ArrayRef, UInt64Array},
compute::cast,
datatypes::DataType,
- datatypes::Field,
};
use datafusion_common::downcast_value;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
-/// VAR_POP aggregate expression
-#[derive(Debug)]
-pub struct VariancePop {
- name: String,
- expr: Arc<dyn PhysicalExpr>,
-}
-
-impl VariancePop {
- /// Create a new VAR_POP aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- // the result of variance just support FLOAT64 data type.
- assert!(matches!(data_type, DataType::Float64));
- Self {
- name: name.into(),
- expr,
- }
- }
-}
-
-impl AggregateExpr for VariancePop {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, DataType::Float64, true))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(VarianceAccumulator::try_new(
- StatsType::Population,
- )?))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(VarianceAccumulator::try_new(
- StatsType::Population,
- )?))
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![
- Field::new(
- format_state_name(&self.name, "count"),
- DataType::UInt64,
- true,
- ),
- Field::new(
- format_state_name(&self.name, "mean"),
- DataType::Float64,
- true,
- ),
- Field::new(format_state_name(&self.name, "m2"), DataType::Float64,
true),
- ])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-}
-
-impl PartialEq<dyn Any> for VariancePop {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| self.name == x.name && self.expr.eq(&x.expr))
- .unwrap_or(false)
- }
-}
+// TODO only holds the definition of `VarianceAccumulator` for use by
`StddevAccumulator` in `physical-expr`,
+// which in turn only has it there for legacy `CorrelationAccumulator`, but
this whole file should go away
+// once the latter is moved to `functions-aggregate`.
/// An accumulator to compute variance
/// The algrithm used is an online implementation and numerically stable. It
is based on this paper:
@@ -256,99 +174,3 @@ impl Accumulator for VarianceAccumulator {
std::mem::size_of_val(self)
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::aggregate::utils::get_accum_scalar_values_as_arrays;
- use crate::expressions::col;
- use arrow::{array::*, datatypes::*};
-
- #[test]
- fn variance_f64_merge_1() -> Result<()> {
- let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64]));
- let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64]));
-
- let schema = Schema::new(vec![Field::new("a", DataType::Float64,
false)]);
-
- let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
- let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
-
- let agg1 = Arc::new(VariancePop::new(
- col("a", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
-
- let agg2 = Arc::new(VariancePop::new(
- col("a", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
-
- let actual = merge(&batch1, &batch2, agg1, agg2)?;
- assert!(actual == ScalarValue::from(2_f64));
-
- Ok(())
- }
-
- #[test]
- fn variance_f64_merge_2() -> Result<()> {
- let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64,
5_f64]));
- let b = Arc::new(Float64Array::from(vec![None]));
-
- let schema = Schema::new(vec![Field::new("a", DataType::Float64,
true)]);
-
- let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
- let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?;
-
- let agg1 = Arc::new(VariancePop::new(
- col("a", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
-
- let agg2 = Arc::new(VariancePop::new(
- col("a", &schema)?,
- "bla".to_string(),
- DataType::Float64,
- ));
-
- let actual = merge(&batch1, &batch2, agg1, agg2)?;
- assert!(actual == ScalarValue::from(2_f64));
-
- Ok(())
- }
-
- fn merge(
- batch1: &RecordBatch,
- batch2: &RecordBatch,
- agg1: Arc<dyn AggregateExpr>,
- agg2: Arc<dyn AggregateExpr>,
- ) -> Result<ScalarValue> {
- let mut accum1 = agg1.create_accumulator()?;
- let mut accum2 = agg2.create_accumulator()?;
- let expr1 = agg1.expressions();
- let expr2 = agg2.expressions();
-
- let values1 = expr1
- .iter()
- .map(|e| {
- e.evaluate(batch1)
- .and_then(|v| v.into_array(batch1.num_rows()))
- })
- .collect::<Result<Vec<_>>>()?;
- let values2 = expr2
- .iter()
- .map(|e| {
- e.evaluate(batch2)
- .and_then(|v| v.into_array(batch2.num_rows()))
- })
- .collect::<Result<Vec<_>>>()?;
- accum1.update_batch(&values1)?;
- accum2.update_batch(&values2)?;
- let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
- accum1.merge_batch(&state2)?;
- accum1.evaluate()
- }
-}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index a6133eeb25..476cbe3907 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -57,7 +57,6 @@ pub use crate::aggregate::nth_value::NthValueAgg;
pub use crate::aggregate::regr::{Regr, RegrType};
pub use crate::aggregate::stats::StatsType;
pub use crate::aggregate::string_agg::StringAgg;
-pub use crate::aggregate::variance::VariancePop;
pub use crate::window::cume_dist::{cume_dist, CumeDist};
pub use crate::window::lead_lag::{lag, lead, WindowShift};
pub use crate::window::nth_value::NthValue;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 5aaae07f4d..0071a43bbe 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -480,7 +480,7 @@ enum AggregateFunction {
APPROX_DISTINCT = 5;
ARRAY_AGG = 6;
// VARIANCE = 7;
- VARIANCE_POP = 8;
+ // VARIANCE_POP = 8;
// COVARIANCE = 9;
// COVARIANCE_POP = 10;
// STDDEV = 11;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index cd754e4d9f..e6aded8901 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,7 +538,6 @@ impl serde::Serialize for AggregateFunction {
Self::Count => "COUNT",
Self::ApproxDistinct => "APPROX_DISTINCT",
Self::ArrayAgg => "ARRAY_AGG",
- Self::VariancePop => "VARIANCE_POP",
Self::Correlation => "CORRELATION",
Self::ApproxPercentileCont => "APPROX_PERCENTILE_CONT",
Self::ApproxMedian => "APPROX_MEDIAN",
@@ -577,7 +576,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"COUNT",
"APPROX_DISTINCT",
"ARRAY_AGG",
- "VARIANCE_POP",
"CORRELATION",
"APPROX_PERCENTILE_CONT",
"APPROX_MEDIAN",
@@ -645,7 +643,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"COUNT" => Ok(AggregateFunction::Count),
"APPROX_DISTINCT" => Ok(AggregateFunction::ApproxDistinct),
"ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
- "VARIANCE_POP" => Ok(AggregateFunction::VariancePop),
"CORRELATION" => Ok(AggregateFunction::Correlation),
"APPROX_PERCENTILE_CONT" =>
Ok(AggregateFunction::ApproxPercentileCont),
"APPROX_MEDIAN" => Ok(AggregateFunction::ApproxMedian),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 1b38168ba1..7ec9187491 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1924,7 +1924,7 @@ pub enum AggregateFunction {
ApproxDistinct = 5,
ArrayAgg = 6,
/// VARIANCE = 7;
- VariancePop = 8,
+ /// VARIANCE_POP = 8;
/// COVARIANCE = 9;
/// COVARIANCE_POP = 10;
/// STDDEV = 11;
@@ -1965,7 +1965,6 @@ impl AggregateFunction {
AggregateFunction::Count => "COUNT",
AggregateFunction::ApproxDistinct => "APPROX_DISTINCT",
AggregateFunction::ArrayAgg => "ARRAY_AGG",
- AggregateFunction::VariancePop => "VARIANCE_POP",
AggregateFunction::Correlation => "CORRELATION",
AggregateFunction::ApproxPercentileCont =>
"APPROX_PERCENTILE_CONT",
AggregateFunction::ApproxMedian => "APPROX_MEDIAN",
@@ -2000,7 +1999,6 @@ impl AggregateFunction {
"COUNT" => Some(Self::Count),
"APPROX_DISTINCT" => Some(Self::ApproxDistinct),
"ARRAY_AGG" => Some(Self::ArrayAgg),
- "VARIANCE_POP" => Some(Self::VariancePop),
"CORRELATION" => Some(Self::Correlation),
"APPROX_PERCENTILE_CONT" => Some(Self::ApproxPercentileCont),
"APPROX_MEDIAN" => Some(Self::ApproxMedian),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 0ebf3f7117..a77d361983 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -148,7 +148,6 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
protobuf::AggregateFunction::Count => Self::Count,
protobuf::AggregateFunction::ApproxDistinct =>
Self::ApproxDistinct,
protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
- protobuf::AggregateFunction::VariancePop => Self::VariancePop,
protobuf::AggregateFunction::Correlation => Self::Correlation,
protobuf::AggregateFunction::RegrSlope => Self::RegrSlope,
protobuf::AggregateFunction::RegrIntercept => Self::RegrIntercept,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 490ef9a4f2..9c4c7685b3 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -119,7 +119,6 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
AggregateFunction::Count => Self::Count,
AggregateFunction::ApproxDistinct => Self::ApproxDistinct,
AggregateFunction::ArrayAgg => Self::ArrayAgg,
- AggregateFunction::VariancePop => Self::VariancePop,
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::RegrSlope => Self::RegrSlope,
AggregateFunction::RegrIntercept => Self::RegrIntercept,
@@ -413,9 +412,6 @@ pub fn serialize_expr(
AggregateFunction::BoolOr =>
protobuf::AggregateFunction::BoolOr,
AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
AggregateFunction::Count =>
protobuf::AggregateFunction::Count,
- AggregateFunction::VariancePop => {
- protobuf::AggregateFunction::VariancePop
- }
AggregateFunction::Correlation => {
protobuf::AggregateFunction::Correlation
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 66405d4b9a..5d07d5c0fa 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -28,8 +28,7 @@ use datafusion::physical_plan::expressions::{
CastExpr, Column, Correlation, Count, CumeDist, DistinctArrayAgg,
DistinctBitXor,
DistinctCount, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal,
Max, Min,
NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
OrderSensitiveArrayAgg, Rank,
- RankType, Regr, RegrType, RowNumber, StringAgg, TryCastExpr, VariancePop,
- WindowShift,
+ RankType, Regr, RegrType, RowNumber, StringAgg, TryCastExpr, WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -276,8 +275,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
protobuf::AggregateFunction::Max
} else if aggr_expr.downcast_ref::<Avg>().is_some() {
protobuf::AggregateFunction::Avg
- } else if aggr_expr.downcast_ref::<VariancePop>().is_some() {
- protobuf::AggregateFunction::VariancePop
} else if aggr_expr.downcast_ref::<Correlation>().is_some() {
protobuf::AggregateFunction::Correlation
} else if let Some(regr_expr) = aggr_expr.downcast_ref::<Regr>() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a6889633d2..b1cad69b14 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -34,7 +34,8 @@ use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::expr_fn::{
- covar_pop, covar_samp, first_value, median, stddev, stddev_pop, sum,
var_sample,
+ covar_pop, covar_samp, first_value, median, stddev, stddev_pop, sum,
var_pop,
+ var_sample,
};
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -654,6 +655,7 @@ async fn roundtrip_expr_api() -> Result<()> {
sum(lit(1)),
median(lit(2)),
var_sample(lit(2.2)),
+ var_pop(lit(2.2)),
stddev(lit(2.2)),
stddev_pop(lit(2.2)),
];
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 0233f46d08..9958f8ac38 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -449,6 +449,15 @@ SELECT var(distinct c2) FROM aggregate_test_100
statement error DataFusion error: This feature is not implemented:
VAR\(DISTINCT\) aggregations are not available
SELECT var(c2), var(distinct c2) FROM aggregate_test_100
+# csv_query_distinct_variance_population
+query R
+SELECT var_pop(distinct c2) FROM aggregate_test_100
+----
+2
+
+statement error DataFusion error: This feature is not implemented:
VAR_POP\(DISTINCT\) aggregations are not available
+SELECT var_pop(c2), var_pop(distinct c2) FROM aggregate_test_100
+
# csv_query_variance_5
query R
SELECT var_samp(c2) FROM aggregate_test_100
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]