jayzhan211 commented on code in PR #10917: URL: https://github.com/apache/datafusion/pull/10917#discussion_r1640559776
########## datafusion/functions-aggregate/src/approx_percentile_cont.rs: ########## @@ -15,19 +15,254 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::{Debug, Formatter}; + use arrow::{ array::{ ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::DataType, }; +use arrow_schema::Field; -use datafusion_common::{downcast_value, internal_err, DataFusionError, ScalarValue}; -use datafusion_expr::Accumulator; +use datafusion_common::{ + downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, ScalarValue, +}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, Expr, Signature, TypeSignature, Volatility, +}; use datafusion_physical_expr_common::aggregate::tdigest::{ TDigest, TryIntoF64, DEFAULT_MAX_SIZE, }; +use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref; + +make_udaf_expr_and_func!( + ApproxPercentileCont, + approx_percentile_cont, + expression percentile, + "Computes the approximate percentile continuous of a set of numbers", + approx_percentile_cont_udaf +); + +pub struct ApproxPercentileCont { + signature: Signature, +} + +impl Debug for ApproxPercentileCont { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("ApproxPercentileCont") + .field("name", &self.name()) + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxPercentileCont { + fn default() -> Self { + Self::new() + } +} + +impl ApproxPercentileCont { + /// Create a new [`ApproxPercentileCont`] aggregate function. + pub fn new() -> Self { + let mut variants = Vec::with_capacity(NUMERICS.len() * (INTEGERS.len() + 1)); + // Accept any numeric value paired with a float64 percentile + for num in NUMERICS { + variants.push(TypeSignature::Exact(vec![num.clone(), DataType::Float64])); + // Additionally accept an integer number of centroids for T-Digest + for int in INTEGERS { + variants.push(TypeSignature::Exact(vec![ + num.clone(), + DataType::Float64, + int.clone(), + ])) + } + } + Self { + signature: Signature::one_of(variants, Volatility::Immutable), + } + } + + pub(crate) fn create_accumulator( + &self, + args: AccumulatorArgs, + ) -> datafusion_common::Result<ApproxPercentileAccumulator> { + let percentile = validate_input_percentile_expr(&args.args[1])?; Review Comment: could we rename `args.args` to `args.input_exprs`? I think it would be more clear and consistent with the name of `args.input_type` ########## datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs: ########## @@ -15,91 +15,139 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::ApproxPercentileCont; -use crate::{AggregateExpr, PhysicalExpr}; +use std::any::Any; +use std::fmt::{Debug, Formatter}; + use arrow::{ array::ArrayRef, datatypes::{DataType, Field}, }; -use datafusion_functions_aggregate::approx_percentile_cont::ApproxPercentileAccumulator; + +use datafusion_common::ScalarValue; +use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::NUMERICS; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, Signature, TypeSignature, Volatility, +}; use datafusion_physical_expr_common::aggregate::tdigest::{ Centroid, TDigest, DEFAULT_MAX_SIZE, }; +use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; -use datafusion_common::Result; -use datafusion_common::ScalarValue; -use datafusion_expr::Accumulator; +use crate::approx_percentile_cont::{ApproxPercentileAccumulator, ApproxPercentileCont}; -use crate::aggregate::utils::down_cast_any_ref; -use std::{any::Any, sync::Arc}; +make_udaf_expr_and_func!( + ApproxPercentileContWithWeight, + approx_percentile_cont_with_weight, + expression weight percentile, + "Computes the approximate percentile continuous with weight of a set of numbers", + approx_percentile_cont_with_weight_udaf +); /// APPROX_PERCENTILE_CONT_WITH_WEIGTH aggregate expression -#[derive(Debug)] pub struct ApproxPercentileContWithWeight { + signature: Signature, approx_percentile_cont: ApproxPercentileCont, - column_expr: Arc<dyn PhysicalExpr>, - weight_expr: Arc<dyn PhysicalExpr>, - percentile_expr: Arc<dyn PhysicalExpr>, +} + +impl Debug for ApproxPercentileContWithWeight { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ApproxPercentileContWithWeight") + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxPercentileContWithWeight { + fn default() -> Self { + Self::new() + } } impl ApproxPercentileContWithWeight { /// Create a new [`ApproxPercentileContWithWeight`] aggregate function. - pub fn new( - expr: Vec<Arc<dyn PhysicalExpr>>, - name: impl Into<String>, - return_type: DataType, - ) -> Result<Self> { - // Arguments should be [ColumnExpr, WeightExpr, DesiredPercentileLiteral] - debug_assert_eq!(expr.len(), 3); - - let sub_expr = vec![expr[0].clone(), expr[2].clone()]; - let approx_percentile_cont = - ApproxPercentileCont::new(sub_expr, name, return_type)?; - - Ok(Self { - approx_percentile_cont, - column_expr: expr[0].clone(), - weight_expr: expr[1].clone(), - percentile_expr: expr[2].clone(), - }) + pub fn new() -> Self { + Self { + signature: Signature::one_of( + // Accept any numeric value paired with a float64 percentile + NUMERICS + .iter() + .map(|t| { + TypeSignature::Exact(vec![ + t.clone(), + t.clone(), + DataType::Float64, + ]) + }) + .collect(), + Volatility::Immutable, + ), + approx_percentile_cont: ApproxPercentileCont::new(), + } } } -impl AggregateExpr for ApproxPercentileContWithWeight { +impl AggregateUDFImpl for ApproxPercentileContWithWeight { fn as_any(&self) -> &dyn Any { self } - fn field(&self) -> Result<Field> { - self.approx_percentile_cont.field() + fn name(&self) -> &str { + "approx_percentile_cont_with_weight" } - #[allow(rustdoc::private_intra_doc_links)] - /// See [`TDigest::to_scalar_state()`] for a description of the serialised - /// state. - fn state_fields(&self) -> Result<Vec<Field>> { - self.approx_percentile_cont.state_fields() + fn signature(&self) -> &Signature { + &self.signature } - fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> { - vec![ - self.column_expr.clone(), - self.weight_expr.clone(), - self.percentile_expr.clone(), - ] + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if !arg_types[0].is_numeric() { + return plan_err!( + "approx_percentile_cont_with_weight requires numeric input types" + ); + } + if !arg_types[1].is_numeric() { + return plan_err!( + "approx_percentile_cont_with_weight requires numeric weight input types" + ); + } + if !arg_types[2].is_floating() { Review Comment: with signature::numeric(3), we can just check if the third arg is f64 ########## datafusion/functions-aggregate/src/approx_percentile_cont.rs: ########## @@ -15,19 +15,254 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::{Debug, Formatter}; + use arrow::{ array::{ ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::DataType, }; +use arrow_schema::Field; -use datafusion_common::{downcast_value, internal_err, DataFusionError, ScalarValue}; -use datafusion_expr::Accumulator; +use datafusion_common::{ + downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, ScalarValue, +}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, Expr, Signature, TypeSignature, Volatility, +}; use datafusion_physical_expr_common::aggregate::tdigest::{ TDigest, TryIntoF64, DEFAULT_MAX_SIZE, }; +use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref; + +make_udaf_expr_and_func!( + ApproxPercentileCont, + approx_percentile_cont, + expression percentile, + "Computes the approximate percentile continuous of a set of numbers", + approx_percentile_cont_udaf +); + +pub struct ApproxPercentileCont { + signature: Signature, +} + +impl Debug for ApproxPercentileCont { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("ApproxPercentileCont") + .field("name", &self.name()) + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxPercentileCont { + fn default() -> Self { + Self::new() + } +} + +impl ApproxPercentileCont { + /// Create a new [`ApproxPercentileCont`] aggregate function. + pub fn new() -> Self { + let mut variants = Vec::with_capacity(NUMERICS.len() * (INTEGERS.len() + 1)); + // Accept any numeric value paired with a float64 percentile + for num in NUMERICS { + variants.push(TypeSignature::Exact(vec![num.clone(), DataType::Float64])); + // Additionally accept an integer number of centroids for T-Digest + for int in INTEGERS { + variants.push(TypeSignature::Exact(vec![ + num.clone(), + DataType::Float64, + int.clone(), + ])) + } + } + Self { + signature: Signature::one_of(variants, Volatility::Immutable), Review Comment: Is it better to have `Signature::one_of(vec![Numeric::(2), Numeric::(3)])`? ########## datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs: ########## @@ -15,91 +15,139 @@ // specific language governing permissions and limitations // under the License. -use crate::expressions::ApproxPercentileCont; -use crate::{AggregateExpr, PhysicalExpr}; +use std::any::Any; +use std::fmt::{Debug, Formatter}; + use arrow::{ array::ArrayRef, datatypes::{DataType, Field}, }; -use datafusion_functions_aggregate::approx_percentile_cont::ApproxPercentileAccumulator; + +use datafusion_common::ScalarValue; +use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::NUMERICS; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, Signature, TypeSignature, Volatility, +}; use datafusion_physical_expr_common::aggregate::tdigest::{ Centroid, TDigest, DEFAULT_MAX_SIZE, }; +use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; -use datafusion_common::Result; -use datafusion_common::ScalarValue; -use datafusion_expr::Accumulator; +use crate::approx_percentile_cont::{ApproxPercentileAccumulator, ApproxPercentileCont}; -use crate::aggregate::utils::down_cast_any_ref; -use std::{any::Any, sync::Arc}; +make_udaf_expr_and_func!( + ApproxPercentileContWithWeight, + approx_percentile_cont_with_weight, + expression weight percentile, + "Computes the approximate percentile continuous with weight of a set of numbers", + approx_percentile_cont_with_weight_udaf +); /// APPROX_PERCENTILE_CONT_WITH_WEIGTH aggregate expression -#[derive(Debug)] pub struct ApproxPercentileContWithWeight { + signature: Signature, approx_percentile_cont: ApproxPercentileCont, - column_expr: Arc<dyn PhysicalExpr>, - weight_expr: Arc<dyn PhysicalExpr>, - percentile_expr: Arc<dyn PhysicalExpr>, +} + +impl Debug for ApproxPercentileContWithWeight { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ApproxPercentileContWithWeight") + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxPercentileContWithWeight { + fn default() -> Self { + Self::new() + } } impl ApproxPercentileContWithWeight { /// Create a new [`ApproxPercentileContWithWeight`] aggregate function. - pub fn new( - expr: Vec<Arc<dyn PhysicalExpr>>, - name: impl Into<String>, - return_type: DataType, - ) -> Result<Self> { - // Arguments should be [ColumnExpr, WeightExpr, DesiredPercentileLiteral] - debug_assert_eq!(expr.len(), 3); - - let sub_expr = vec![expr[0].clone(), expr[2].clone()]; - let approx_percentile_cont = - ApproxPercentileCont::new(sub_expr, name, return_type)?; - - Ok(Self { - approx_percentile_cont, - column_expr: expr[0].clone(), - weight_expr: expr[1].clone(), - percentile_expr: expr[2].clone(), - }) + pub fn new() -> Self { + Self { + signature: Signature::one_of( Review Comment: Maybe use Signature::Numeric(3)? ########## datafusion/functions-aggregate/src/approx_percentile_cont.rs: ########## @@ -15,19 +15,265 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use arrow::array::RecordBatch; use arrow::{ array::{ ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, datatypes::DataType, }; +use arrow_schema::{Field, Schema}; -use datafusion_common::{downcast_value, internal_err, DataFusionError, ScalarValue}; -use datafusion_expr::Accumulator; +use datafusion_common::{ + downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, ScalarValue, +}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, ColumnarValue, Expr, Signature, TypeSignature, + Volatility, +}; use datafusion_physical_expr_common::aggregate::tdigest::{ TDigest, TryIntoF64, DEFAULT_MAX_SIZE, }; +use datafusion_physical_expr_common::aggregate::utils::down_cast_any_ref; +use datafusion_physical_expr_common::utils::limited_convert_logical_expr_to_physical_expr; + +make_udaf_expr_and_func!( + ApproxPercentileCont, + approx_percentile_cont, + expression percentile, + "Computes the approximate percentile continuous of a set of numbers", + approx_percentile_cont_udaf +); + +pub struct ApproxPercentileCont { + signature: Signature, +} + +impl Debug for ApproxPercentileCont { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("ApproxPercentileCont") + .field("name", &self.name()) + .field("signature", &self.signature) + .finish() + } +} + +impl Default for ApproxPercentileCont { + fn default() -> Self { + Self::new() + } +} + +impl ApproxPercentileCont { + /// Create a new [`ApproxPercentileCont`] aggregate function. + pub fn new() -> Self { + let mut variants = Vec::with_capacity(NUMERICS.len() * (INTEGERS.len() + 1)); + // Accept any numeric value paired with a float64 percentile + for num in NUMERICS { + variants.push(TypeSignature::Exact(vec![num.clone(), DataType::Float64])); + // Additionally accept an integer number of centroids for T-Digest + for int in INTEGERS { + variants.push(TypeSignature::Exact(vec![ + num.clone(), + DataType::Float64, + int.clone(), + ])) + } + } + Self { + signature: Signature::one_of(variants, Volatility::Immutable), + } + } + + pub(crate) fn create_accumulator( + &self, + args: AccumulatorArgs, + ) -> datafusion_common::Result<ApproxPercentileAccumulator> { + let percentile = validate_input_percentile_expr(&args.args[1])?; + let tdigest_max_size = if args.args.len() == 3 { + Some(validate_input_max_size_expr(&args.args[2])?) + } else { + None + }; + + let accumulator: ApproxPercentileAccumulator = match args.input_type { + t @ (DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64) => { + if let Some(max_size) = tdigest_max_size { + ApproxPercentileAccumulator::new_with_max_size(percentile, t.clone(), max_size) + }else{ + ApproxPercentileAccumulator::new(percentile, t.clone()) + + } + } + other => { + return not_impl_err!( + "Support for 'APPROX_PERCENTILE_CONT' for data type {other} is not implemented" + ) + } + }; + + Ok(accumulator) + } +} + +impl PartialEq for ApproxPercentileCont { + fn eq(&self, other: &ApproxPercentileCont) -> bool { + self.signature == other.signature + } +} + +impl PartialEq<dyn Any> for ApproxPercentileCont { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::<Self>() + .map(|x| self.eq(x)) + .unwrap_or(false) + } +} + +fn get_lit_value(expr: &Expr) -> datafusion_common::Result<ScalarValue> { + let empty_schema = Arc::new(Schema::empty()); + let empty_batch = RecordBatch::new_empty(Arc::clone(&empty_schema)); + let expr = limited_convert_logical_expr_to_physical_expr(expr, &empty_schema)?; + let result = expr.evaluate(&empty_batch)?; + match result { + ColumnarValue::Array(_) => Err(DataFusionError::Internal(format!( + "The expr {:?} can't be evaluated to scalar value", + expr + ))), + ColumnarValue::Scalar(scalar_value) => Ok(scalar_value), + } +} + +fn validate_input_percentile_expr(expr: &Expr) -> datafusion_common::Result<f64> { + let lit = get_lit_value(expr)?; + let percentile = match &lit { + ScalarValue::Float32(Some(q)) => *q as f64, + ScalarValue::Float64(Some(q)) => *q, + got => return not_impl_err!( + "Percentile value for 'APPROX_PERCENTILE_CONT' must be Float32 or Float64 literal (got data type {})", + got.data_type() + ) + }; + + // Ensure the percentile is between 0 and 1. + if !(0.0..=1.0).contains(&percentile) { + return plan_err!( + "Percentile value must be between 0.0 and 1.0 inclusive, {percentile} is invalid" + ); + } + Ok(percentile) +} + +fn validate_input_max_size_expr(expr: &Expr) -> datafusion_common::Result<usize> { + let lit = get_lit_value(expr)?; + let max_size = match &lit { + ScalarValue::UInt8(Some(q)) => *q as usize, + ScalarValue::UInt16(Some(q)) => *q as usize, + ScalarValue::UInt32(Some(q)) => *q as usize, + ScalarValue::UInt64(Some(q)) => *q as usize, + ScalarValue::Int32(Some(q)) if *q > 0 => *q as usize, + ScalarValue::Int64(Some(q)) if *q > 0 => *q as usize, + ScalarValue::Int16(Some(q)) if *q > 0 => *q as usize, + ScalarValue::Int8(Some(q)) if *q > 0 => *q as usize, + got => return not_impl_err!( + "Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal (got data type {}).", + got.data_type() + ) + }; + Ok(max_size) +} + +impl AggregateUDFImpl for ApproxPercentileCont { + fn as_any(&self) -> &dyn Any { + self + } + + #[allow(rustdoc::private_intra_doc_links)] + /// See [`datafusion_physical_expr_common::aggregate::tdigest::TDigest::to_scalar_state()`] for a description of the serialised + /// state. + fn state_fields( + &self, + args: StateFieldsArgs, + ) -> datafusion_common::Result<Vec<Field>> { + Ok(vec![ + Field::new( + format_state_name(args.name, "max_size"), + DataType::UInt64, + false, + ), + Field::new( + format_state_name(args.name, "sum"), + DataType::Float64, + false, + ), + Field::new( + format_state_name(args.name, "count"), + DataType::Float64, + false, + ), + Field::new( + format_state_name(args.name, "max"), + DataType::Float64, + false, + ), + Field::new( + format_state_name(args.name, "min"), + DataType::Float64, + false, + ), + Field::new_list( + format_state_name(args.name, "centroids"), + Field::new("item", DataType::Float64, true), + false, + ), + ]) + } + + fn name(&self) -> &str { + "approx_percentile_cont" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn accumulator( + &self, + acc_args: AccumulatorArgs, + ) -> datafusion_common::Result<Box<dyn Accumulator>> { + Ok(Box::new(self.create_accumulator(acc_args)?)) Review Comment: We can inline the code of `create_accumulator` here ########## datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs: ########## @@ -108,10 +156,8 @@ impl PartialEq<dyn Any> for ApproxPercentileContWithWeight { down_cast_any_ref(other) .downcast_ref::<Self>() .map(|x| { - self.approx_percentile_cont == x.approx_percentile_cont - && self.column_expr.eq(&x.column_expr) - && self.weight_expr.eq(&x.weight_expr) - && self.percentile_expr.eq(&x.percentile_expr) + self.signature == x.signature Review Comment: I don't think we need this, we should get `AggregateFunctionExpr`, and it has partialEq implemented already ########## datafusion/proto/src/physical_plan/mod.rs: ########## @@ -496,11 +496,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } AggregateFunction::UserDefinedAggrFunction(udaf_name) => { let agg_udf = registry.udaf(udaf_name)?; + // TODO: 'logical_exprs' is not supported for UDAF yet. + // approx_percentile_cont and approx_percentile_cont_weight are not supported for UDAF from protobuf yet. + let logical_exprs = &[]; Review Comment: I think we don't need to care about logical plan conversion in physical plan roundtrip, but I'm not pretty sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org