realno commented on a change in pull request #1539:
URL: https://github.com/apache/arrow-datafusion/pull/1539#discussion_r787338667



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -331,17 +348,28 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
         | AggregateFunction::StddevPop => {
             Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
         }
+        AggregateFunction::ApproxQuantile => Signature::one_of(

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/expressions/approx_quantile.rs
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{
+        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
+        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::{DataType, Field},
+};
+
+use crate::{
+    error::DataFusionError,
+    physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, 
PhysicalExpr},
+    scalar::ScalarValue,
+};
+
+use crate::error::Result;
+
+use super::{format_state_name, Literal};
+
+/// Return `true` if `arg_type` is of a [`DataType`] that the 
[`ApproxQuantile`]
+/// aggregation can operate on.
+pub fn is_approx_quantile_supported_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+/// APPROX_QUANTILE aggregate expression
+#[derive(Debug)]
+pub struct ApproxQuantile {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    quantile: f64,
+}
+
+impl ApproxQuantile {
+    /// Create a new ApproxQuantile aggregate function.
+    pub fn new(
+        expr: Vec<Arc<dyn PhysicalExpr>>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Result<Self> {
+        // Arguments should be [ColumnExpr, DesiredQuantileLiteral]
+        debug_assert_eq!(expr.len(), 2);
+
+        // Extract the desired quantile literal
+        let lit = expr[1]
+            .as_any()
+            .downcast_ref::<Literal>()
+            .ok_or(DataFusionError::Internal(
+                "desired quantile argument must be float literal".to_string(),
+            ))?
+            .value();
+        let quantile = match lit {
+            ScalarValue::Float32(Some(q)) => *q as f64,
+            ScalarValue::Float64(Some(q)) => *q as f64,
+            got => return Err(DataFusionError::NotImplemented(format!(
+                "Quantile value for 'APPROX_QUANTILE' must be Float32 or 
Float64 literal (got data type {})",
+                got
+            )))
+        };
+
+        Ok(Self {
+            name: name.into(),
+            input_data_type,
+            // The physical expr to evaluate during accumulation
+            expr: expr[0].clone(),
+            quantile,
+        })
+    }
+}
+
+impl AggregateExpr for ApproxQuantile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.input_data_type.clone(), false))
+    }
+
+    /// See [`TDigest::to_scalar_state()`] for a description of the serialised
+    /// state.
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "max_size"),
+                DataType::UInt64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "max"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "min"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "centroids"),
+                DataType::List(Box::new(Field::new("item", DataType::Float64, 
true))),
+                false,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            t
+            @
+            (DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64) => {
+                Box::new(ApproxQuantileAccumulator::new(self.quantile, 
t.clone()))
+            }
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'APPROX_QUANTILE' for data type {} is not 
implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+pub struct ApproxQuantileAccumulator {
+    digest: TDigest,
+    quantile: f64,
+    return_type: DataType,
+}
+
+impl ApproxQuantileAccumulator {
+    pub fn new(quantile: f64, return_type: DataType) -> Self {
+        Self {
+            digest: TDigest::new(100),
+            quantile,
+            return_type,
+        }
+    }
+}
+
+impl Accumulator for ApproxQuantileAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(self.digest.to_scalar_state())
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {

Review comment:
       `update` is removed in a recent PR, only `update_batch` is needed.

##########
File path: datafusion/src/physical_plan/expressions/approx_quantile.rs
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{
+        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
+        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::{DataType, Field},
+};
+
+use crate::{
+    error::DataFusionError,
+    physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, 
PhysicalExpr},
+    scalar::ScalarValue,
+};
+
+use crate::error::Result;
+
+use super::{format_state_name, Literal};
+
+/// Return `true` if `arg_type` is of a [`DataType`] that the 
[`ApproxQuantile`]
+/// aggregation can operate on.
+pub fn is_approx_quantile_supported_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+/// APPROX_QUANTILE aggregate expression
+#[derive(Debug)]
+pub struct ApproxQuantile {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    quantile: f64,
+}
+
+impl ApproxQuantile {
+    /// Create a new ApproxQuantile aggregate function.
+    pub fn new(
+        expr: Vec<Arc<dyn PhysicalExpr>>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Result<Self> {
+        // Arguments should be [ColumnExpr, DesiredQuantileLiteral]
+        debug_assert_eq!(expr.len(), 2);
+
+        // Extract the desired quantile literal
+        let lit = expr[1]
+            .as_any()
+            .downcast_ref::<Literal>()
+            .ok_or(DataFusionError::Internal(
+                "desired quantile argument must be float literal".to_string(),
+            ))?
+            .value();
+        let quantile = match lit {
+            ScalarValue::Float32(Some(q)) => *q as f64,
+            ScalarValue::Float64(Some(q)) => *q as f64,
+            got => return Err(DataFusionError::NotImplemented(format!(
+                "Quantile value for 'APPROX_QUANTILE' must be Float32 or 
Float64 literal (got data type {})",
+                got
+            )))
+        };
+
+        Ok(Self {
+            name: name.into(),
+            input_data_type,
+            // The physical expr to evaluate during accumulation
+            expr: expr[0].clone(),
+            quantile,
+        })
+    }
+}
+
+impl AggregateExpr for ApproxQuantile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.input_data_type.clone(), false))
+    }
+
+    /// See [`TDigest::to_scalar_state()`] for a description of the serialised
+    /// state.
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "max_size"),
+                DataType::UInt64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "max"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "min"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "centroids"),
+                DataType::List(Box::new(Field::new("item", DataType::Float64, 
true))),
+                false,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            t
+            @
+            (DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64) => {
+                Box::new(ApproxQuantileAccumulator::new(self.quantile, 
t.clone()))
+            }
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'APPROX_QUANTILE' for data type {} is not 
implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+pub struct ApproxQuantileAccumulator {
+    digest: TDigest,
+    quantile: f64,
+    return_type: DataType,
+}
+
+impl ApproxQuantileAccumulator {
+    pub fn new(quantile: f64, return_type: DataType) -> Self {
+        Self {
+            digest: TDigest::new(100),
+            quantile,
+            return_type,
+        }
+    }
+}
+
+impl Accumulator for ApproxQuantileAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(self.digest.to_scalar_state())
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        debug_assert_eq!(
+            values.len(),
+            1,
+            "invalid number of values in quantile update"
+        );
+
+        self.digest = self.digest.merge_unsorted([values[0].clone()])?;
+        Ok(())
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        debug_assert_eq!(
+            values.len(),
+            1,
+            "invalid number of values in batch quantile update"
+        );
+        let values = &values[0];
+
+        self.digest = match values.data_type() {
+            DataType::Float64 => {
+                let array = 
values.as_any().downcast_ref::<Float64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Float32 => {
+                let array = 
values.as_any().downcast_ref::<Float32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int64 => {
+                let array = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int32 => {
+                let array = 
values.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int16 => {
+                let array = 
values.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int8 => {
+                let array = 
values.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt64 => {
+                let array = 
values.as_any().downcast_ref::<UInt64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt32 => {
+                let array = 
values.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt16 => {
+                let array = 
values.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt8 => {
+                let array = 
values.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "APPROX_QUANTILE is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        };
+
+        Ok(())
+    }
+
+    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {

Review comment:
       `merge` is removed, need to implement `merge_batch` instead.

##########
File path: datafusion/tests/sql/aggregates.rs
##########
@@ -316,6 +316,95 @@ async fn csv_query_approx_count() -> Result<()> {
     Ok(())
 }
 
+// This test executes the APPROX_QUANTILE aggregation against the test data,
+// asserting the estimated quantiles are ±5% their actual values.
+//
+// Actual quantiles calculated with:
+//
+// ```r
+// read_csv("./testing/data/csv/aggregate_test_100.csv") |>
+//     select_if(is.numeric) |>
+//     summarise_all(~ quantile(., c(0.1, 0.5, 0.9)))
+// ```
+//
+// Giving:
+//
+// ```text
+//      c2    c3      c4           c5       c6    c7     c8          c9     
c10   c11    c12
+//   <dbl> <dbl>   <dbl>        <dbl>    <dbl> <dbl>  <dbl>       <dbl>   
<dbl> <dbl>  <dbl>
+// 1     1 -95.3 -22925. -1882606710  -7.25e18  18.9  2671.  472608672. 
1.83e18 0.109 0.0714
+// 2     3  15.5   4599    377164262   1.13e18 134.  30634  2365817608. 
9.30e18 0.491 0.551
+// 3     5 102.   25334.  1991374996.  7.37e18 231   57518. 3776538487. 
1.61e19 0.834 0.946
+// ```
+//
+// Column `c12` is omitted due to a large relative error (~10%) due to the 
small
+// float values.
+#[tokio::test]
+async fn csv_query_approx_quantile() -> Result<()> {

Review comment:
       @alamb is this test sufficient for testing `merge` functions?  

##########
File path: datafusion/src/physical_plan/expressions/approx_quantile.rs
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{
+        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
+        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::{DataType, Field},
+};
+
+use crate::{
+    error::DataFusionError,
+    physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, 
PhysicalExpr},
+    scalar::ScalarValue,
+};
+
+use crate::error::Result;
+
+use super::{format_state_name, Literal};
+
+/// Return `true` if `arg_type` is of a [`DataType`] that the 
[`ApproxQuantile`]
+/// aggregation can operate on.
+pub fn is_approx_quantile_supported_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+/// APPROX_QUANTILE aggregate expression
+#[derive(Debug)]
+pub struct ApproxQuantile {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    quantile: f64,
+}
+
+impl ApproxQuantile {
+    /// Create a new ApproxQuantile aggregate function.
+    pub fn new(
+        expr: Vec<Arc<dyn PhysicalExpr>>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Result<Self> {
+        // Arguments should be [ColumnExpr, DesiredQuantileLiteral]
+        debug_assert_eq!(expr.len(), 2);
+
+        // Extract the desired quantile literal
+        let lit = expr[1]
+            .as_any()
+            .downcast_ref::<Literal>()
+            .ok_or(DataFusionError::Internal(
+                "desired quantile argument must be float literal".to_string(),
+            ))?
+            .value();
+        let quantile = match lit {
+            ScalarValue::Float32(Some(q)) => *q as f64,
+            ScalarValue::Float64(Some(q)) => *q as f64,
+            got => return Err(DataFusionError::NotImplemented(format!(
+                "Quantile value for 'APPROX_QUANTILE' must be Float32 or 
Float64 literal (got data type {})",
+                got
+            )))
+        };
+
+        Ok(Self {
+            name: name.into(),
+            input_data_type,
+            // The physical expr to evaluate during accumulation
+            expr: expr[0].clone(),
+            quantile,
+        })
+    }
+}
+
+impl AggregateExpr for ApproxQuantile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.input_data_type.clone(), false))
+    }
+
+    /// See [`TDigest::to_scalar_state()`] for a description of the serialised
+    /// state.
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "max_size"),
+                DataType::UInt64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "max"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "min"),
+                DataType::Float64,
+                false,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "centroids"),
+                DataType::List(Box::new(Field::new("item", DataType::Float64, 
true))),
+                false,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
+            t
+            @
+            (DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64) => {
+                Box::new(ApproxQuantileAccumulator::new(self.quantile, 
t.clone()))
+            }
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Support for 'APPROX_QUANTILE' for data type {} is not 
implemented",
+                    other
+                )))
+            }
+        };
+        Ok(accumulator)
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+pub struct ApproxQuantileAccumulator {
+    digest: TDigest,
+    quantile: f64,
+    return_type: DataType,
+}
+
+impl ApproxQuantileAccumulator {
+    pub fn new(quantile: f64, return_type: DataType) -> Self {
+        Self {
+            digest: TDigest::new(100),
+            quantile,
+            return_type,
+        }
+    }
+}
+
+impl Accumulator for ApproxQuantileAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(self.digest.to_scalar_state())
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        debug_assert_eq!(
+            values.len(),
+            1,
+            "invalid number of values in quantile update"
+        );
+
+        self.digest = self.digest.merge_unsorted([values[0].clone()])?;
+        Ok(())
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        debug_assert_eq!(
+            values.len(),
+            1,
+            "invalid number of values in batch quantile update"
+        );
+        let values = &values[0];
+
+        self.digest = match values.data_type() {
+            DataType::Float64 => {
+                let array = 
values.as_any().downcast_ref::<Float64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Float32 => {
+                let array = 
values.as_any().downcast_ref::<Float32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int64 => {
+                let array = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int32 => {
+                let array = 
values.as_any().downcast_ref::<Int32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int16 => {
+                let array = 
values.as_any().downcast_ref::<Int16Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::Int8 => {
+                let array = 
values.as_any().downcast_ref::<Int8Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt64 => {
+                let array = 
values.as_any().downcast_ref::<UInt64Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt32 => {
+                let array = 
values.as_any().downcast_ref::<UInt32Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt16 => {
+                let array = 
values.as_any().downcast_ref::<UInt16Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            DataType::UInt8 => {
+                let array = 
values.as_any().downcast_ref::<UInt8Array>().unwrap();
+                self.digest.merge_unsorted(array.values().iter().cloned())?
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "APPROX_QUANTILE is not expected to receive the type {:?}",
+                    e
+                )));
+            }
+        };
+
+        Ok(())
+    }
+
+    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
+        debug_assert_eq!(
+            states.len(),
+            6,
+            "invalid number of state fields for quantile accumulator"
+        );
+
+        let other = TDigest::from_scalar_state(states);
+        self.digest = TDigest::merge_digests(&[self.digest.clone(), other]);
+
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        let q = self.digest.estimate_quantile(self.quantile);
+
+        // These acceptable return types MUST match the validation in
+        // ApproxQuantile::create_accumulator.
+        Ok(match &self.return_type {
+            DataType::Int8 => ScalarValue::Int8(Some(q as i8)),
+            DataType::Int16 => ScalarValue::Int16(Some(q as i16)),
+            DataType::Int32 => ScalarValue::Int32(Some(q as i32)),
+            DataType::Int64 => ScalarValue::Int64(Some(q as i64)),
+            DataType::UInt8 => ScalarValue::UInt8(Some(q as u8)),
+            DataType::UInt16 => ScalarValue::UInt16(Some(q as u16)),
+            DataType::UInt32 => ScalarValue::UInt32(Some(q as u32)),
+            DataType::UInt64 => ScalarValue::UInt64(Some(q as u64)),
+            DataType::Float32 => ScalarValue::Float32(Some(q as f32)),
+            DataType::Float64 => ScalarValue::Float64(Some(q as f64)),
+            v => unreachable!("unexpected return type {:?}", v),
+        })
+    }
+}

Review comment:
       I think we can use more unit tests here if necessary.

##########
File path: datafusion/src/physical_plan/expressions/approx_quantile.rs
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{
+        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
+        Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::{DataType, Field},
+};
+
+use crate::{
+    error::DataFusionError,
+    physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, 
PhysicalExpr},
+    scalar::ScalarValue,
+};
+
+use crate::error::Result;
+
+use super::{format_state_name, Literal};
+
+/// Return `true` if `arg_type` is of a [`DataType`] that the 
[`ApproxQuantile`]
+/// aggregation can operate on.
+pub fn is_approx_quantile_supported_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+/// APPROX_QUANTILE aggregate expression
+#[derive(Debug)]
+pub struct ApproxQuantile {
+    name: String,
+    input_data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    quantile: f64,
+}
+
+impl ApproxQuantile {
+    /// Create a new ApproxQuantile aggregate function.
+    pub fn new(
+        expr: Vec<Arc<dyn PhysicalExpr>>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Result<Self> {
+        // Arguments should be [ColumnExpr, DesiredQuantileLiteral]
+        debug_assert_eq!(expr.len(), 2);
+
+        // Extract the desired quantile literal
+        let lit = expr[1]
+            .as_any()
+            .downcast_ref::<Literal>()
+            .ok_or(DataFusionError::Internal(
+                "desired quantile argument must be float literal".to_string(),
+            ))?
+            .value();
+        let quantile = match lit {

Review comment:
       We should check `quantile` is within [0,1]

##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -331,17 +348,28 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
         | AggregateFunction::StddevPop => {
             Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
         }
+        AggregateFunction::ApproxQuantile => Signature::one_of(
+            // Accept any numeric value paired with a float64 quantile
+            NUMERICS
+                .iter()
+                .map(|t| TypeSignature::Exact(vec![t.clone(), 
DataType::Float64]))

Review comment:
       This is the part I am not fully sure yet. The second argument is a 
literal for quantile, here, to my understanding, is for checking the input 
schema type. I did some local test with signature and get some weird results. I 
will take a another look deeper and report back. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to