Weijun-H commented on code in PR #12211:
URL: https://github.com/apache/datafusion/pull/12211#discussion_r1734412318


##########
datafusion/functions-aggregate/src/distance.rs:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`Distance`]: Euclidean distance aggregations.
+
+use std::any::Any;
+use std::fmt::Debug;
+
+use arrow::compute::kernels::cast;
+use arrow::compute::{and, filter, is_not_null};
+use arrow::{
+    array::{ArrayRef, Float64Array},
+    datatypes::{DataType, Field},
+};
+
+use datafusion_common::{
+    downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result,
+    ScalarValue,
+};
+use datafusion_expr::{
+    function::{AccumulatorArgs, StateFieldsArgs},
+    type_coercion::aggregates::NUMERICS,
+    utils::format_state_name,
+    Accumulator, AggregateUDFImpl, Signature, Volatility,
+};
+
+make_udaf_expr_and_func!(
+    Distance,
+    dis,
+    y x,
+    "Distance between two numeric values.",

Review Comment:
   ```suggestion
       "Calculates the Euclidean distance between two numeric values.",
   ```



##########
datafusion/functions-aggregate/src/distance.rs:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`Distance`]: Euclidean distance aggregations.
+
+use std::any::Any;
+use std::fmt::Debug;
+
+use arrow::compute::kernels::cast;
+use arrow::compute::{and, filter, is_not_null};
+use arrow::{
+    array::{ArrayRef, Float64Array},
+    datatypes::{DataType, Field},
+};
+
+use datafusion_common::{
+    downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result,
+    ScalarValue,
+};
+use datafusion_expr::{
+    function::{AccumulatorArgs, StateFieldsArgs},
+    type_coercion::aggregates::NUMERICS,
+    utils::format_state_name,
+    Accumulator, AggregateUDFImpl, Signature, Volatility,
+};
+
+make_udaf_expr_and_func!(
+    Distance,
+    dis,
+    y x,
+    "Distance between two numeric values.",
+    dis_udaf
+);
+
+#[derive(Debug)]
+pub struct Distance {
+    signature: Signature,
+}
+
+impl Default for Distance {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Distance {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),

Review Comment:
   ```suggestion
               signature: Signature::numeric(2, Volatility::Immutable),
   ```



##########
datafusion/functions-aggregate/src/distance.rs:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`Distance`]: Euclidean distance aggregations.
+
+use std::any::Any;
+use std::fmt::Debug;
+
+use arrow::compute::kernels::cast;
+use arrow::compute::{and, filter, is_not_null};
+use arrow::{
+    array::{ArrayRef, Float64Array},
+    datatypes::{DataType, Field},
+};
+
+use datafusion_common::{
+    downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result,
+    ScalarValue,
+};
+use datafusion_expr::{
+    function::{AccumulatorArgs, StateFieldsArgs},
+    type_coercion::aggregates::NUMERICS,
+    utils::format_state_name,
+    Accumulator, AggregateUDFImpl, Signature, Volatility,
+};
+
+make_udaf_expr_and_func!(
+    Distance,
+    dis,
+    y x,
+    "Distance between two numeric values.",
+    dis_udaf
+);
+
+#[derive(Debug)]
+pub struct Distance {
+    signature: Signature,
+}
+
+impl Default for Distance {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Distance {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),
+        }
+    }
+}
+
+impl AggregateUDFImpl for Distance {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "distance"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if !arg_types[0].is_numeric() {
+            return plan_err!("Distance requires numeric input types");
+        }
+
+        Ok(DataType::Float64)
+    }
+    fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        Ok(Box::new(DistanceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        let name = args.name;
+        Ok(vec![Field::new(
+            format_state_name(name, "sum_of_squares"),
+            DataType::Float64,
+            true,
+        )])
+    }
+}
+
+/// An accumulator to compute distance of two numeric columns
+#[derive(Debug)]
+pub struct DistanceAccumulator {
+    sum_of_squares: f64,
+}
+
+impl DistanceAccumulator {
+    /// Creates a new `DistanceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            sum_of_squares: 0_f64,
+        })
+    }
+}
+
+impl Accumulator for DistanceAccumulator {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::from(self.sum_of_squares)])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = if values[0].null_count() != 0 || values[1].null_count() 
!= 0 {
+            let mask = and(&is_not_null(&values[0])?, 
&is_not_null(&values[1])?)?;
+            let values1 = filter(&values[0], &mask)?;
+            let values2 = filter(&values[1], &mask)?;
+
+            vec![values1, values2]
+        } else {
+            values.to_vec()
+        };
+
+        let values1 = &cast(&values[0], &DataType::Float64)?;
+        let values2 = &cast(&values[1], &DataType::Float64)?;
+
+        let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
+        let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();

Review Comment:
   ```suggestion
           let mut arr1 = as_float64_array(values1)?.iter().flatten();
           let mut arr2 = as_float64_array(values2)?.iter().flatten();
   ```



##########
datafusion/functions-aggregate/src/distance.rs:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`Distance`]: Euclidean distance aggregations.
+
+use std::any::Any;
+use std::fmt::Debug;
+
+use arrow::compute::kernels::cast;
+use arrow::compute::{and, filter, is_not_null};
+use arrow::{
+    array::{ArrayRef, Float64Array},
+    datatypes::{DataType, Field},
+};
+
+use datafusion_common::{
+    downcast_value, plan_err, unwrap_or_internal_err, DataFusionError, Result,
+    ScalarValue,
+};
+use datafusion_expr::{
+    function::{AccumulatorArgs, StateFieldsArgs},
+    type_coercion::aggregates::NUMERICS,
+    utils::format_state_name,
+    Accumulator, AggregateUDFImpl, Signature, Volatility,
+};
+
+make_udaf_expr_and_func!(
+    Distance,
+    dis,
+    y x,
+    "Distance between two numeric values.",
+    dis_udaf
+);
+
+#[derive(Debug)]
+pub struct Distance {
+    signature: Signature,
+}
+
+impl Default for Distance {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Distance {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::uniform(2, NUMERICS.to_vec(), 
Volatility::Immutable),
+        }
+    }
+}
+
+impl AggregateUDFImpl for Distance {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "distance"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if !arg_types[0].is_numeric() {
+            return plan_err!("Distance requires numeric input types");
+        }

Review Comment:
   ```suggestion
           if !arg_types[0].is_numeric() || !arg_types[1].is_numeric() {
               return exec_err!("Distance requires numeric input types");
           }
   ```



##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -416,6 +416,50 @@ from data
 ----
 1
 
+# csv_query_distance
+query R
+SELECT distance(c2, c12) FROM aggregate_test_100
+----
+27.565541154252
+
+# single_row_query_distance
+query R
+select distance(sq.column1, sq.column2) from (values (1.1, 2.2)) as sq
+----
+1.1
+
+# all_nulls_query_distance
+query R
+with data as (
+  select null::int as f, null::int as b
+  union all
+  select null::int as f, null::int as b
+)
+select distance(f, b)
+from data
+----
+0
+
+# distance_query_with_nulls
+query R
+with data as (
+  select 1 as f,       4 as b
+  union all
+  select null as f,   99 as b
+  union all
+  select 2 as f,       5 as b
+  union all
+  select 98 as f,   null as b
+  union all
+  select 3 as f,       6 as b
+  union all
+  select null as f, null as b
+)
+select distance(f, b)
+from data
+----
+5.196152422707
+

Review Comment:
   ```
   > select distance(sq.column1, sq.column2) from (values (NULL, 2), (0,0)) as 
sq;
   +---------------------------------+
   | distance(sq.column1,sq.column2) |
   +---------------------------------+
   | 0.0                             |
   +---------------------------------+
   ```
   
   I prefer it to be `NULL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to