goldmedal commented on code in PR #10834:
URL: https://github.com/apache/datafusion/pull/10834#discussion_r1632069853
##########
datafusion/physical-expr/src/aggregate/stddev.rs:
##########
@@ -17,168 +17,14 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
-use std::any::Any;
-use std::sync::Arc;
+use arrow::array::ArrayRef;
-use crate::aggregate::stats::StatsType;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::aggregate::variance::VarianceAccumulator;
-use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
-use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
use datafusion_common::ScalarValue;
use datafusion_common::{internal_err, Result};
use datafusion_expr::Accumulator;
-/// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression
-#[derive(Debug)]
-pub struct Stddev {
- name: String,
- expr: Arc<dyn PhysicalExpr>,
-}
-
-/// STDDEV_POP population aggregate expression
-#[derive(Debug)]
-pub struct StddevPop {
- name: String,
- expr: Arc<dyn PhysicalExpr>,
-}
-
-impl Stddev {
- /// Create a new STDDEV aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- // the result of stddev just support FLOAT64 and Decimal data type.
- assert!(matches!(data_type, DataType::Float64));
- Self {
- name: name.into(),
- expr,
- }
- }
-}
-
-impl AggregateExpr for Stddev {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, DataType::Float64, true))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(StddevAccumulator::try_new(StatsType::Sample)?))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(StddevAccumulator::try_new(StatsType::Sample)?))
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![
- Field::new(
- format_state_name(&self.name, "count"),
- DataType::UInt64,
- true,
- ),
- Field::new(
- format_state_name(&self.name, "mean"),
- DataType::Float64,
- true,
- ),
- Field::new(format_state_name(&self.name, "m2"), DataType::Float64,
true),
- ])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-}
-
-impl PartialEq<dyn Any> for Stddev {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| self.name == x.name && self.expr.eq(&x.expr))
- .unwrap_or(false)
- }
-}
-
-impl StddevPop {
- /// Create a new STDDEV aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- // the result of stddev just support FLOAT64 and Decimal data type.
- assert!(matches!(data_type, DataType::Float64));
- Self {
- name: name.into(),
- expr,
- }
- }
-}
-
-impl AggregateExpr for StddevPop {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, DataType::Float64, true))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(StddevAccumulator::try_new(StatsType::Population)?))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(StddevAccumulator::try_new(StatsType::Population)?))
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![
- Field::new(
- format_state_name(&self.name, "count"),
- DataType::UInt64,
- true,
- ),
- Field::new(
- format_state_name(&self.name, "mean"),
- DataType::Float64,
- true,
- ),
- Field::new(format_state_name(&self.name, "m2"), DataType::Float64,
true),
- ])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-}
-
-impl PartialEq<dyn Any> for StddevPop {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| self.name == x.name && self.expr.eq(&x.expr))
- .unwrap_or(false)
- }
-}
+use crate::aggregate::stats::StatsType;
+use crate::aggregate::variance::VarianceAccumulator;
/// An accumulator to compute the average
#[derive(Debug)]
Review Comment:
`StddevAccumulator` is used by `correlation`. That's why I keep it.
We should remove it after converting correlation to UDAF.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]