This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2daadb7523 Convert BitAnd, BitOr, BitXor to UDAF (#10930)
2daadb7523 is described below

commit 2daadb75230e2c197d2915257a9637913fa2c2e6
Author: Dharan Aditya <[email protected]>
AuthorDate: Mon Jun 17 18:36:16 2024 +0530

    Convert BitAnd, BitOr, BitXor to UDAF (#10930)
    
    * remove bit and or xor from expr
    
    * remove bit and or xor from physical expr and proto
    
    * add proto regen changes
    
    * impl BitAnd, BitOr, BitXor UADF
    
    * add support for float
    
    * removing support for float
    
    * refactor helper macros
    
    * clippy'fy
    
    * simplify Bitwise operation
    
    * add documentation
    
    * formatting
    
    * fix lint issue
    
    * remove XorDistinct
    
    * update roundtrip_expr_api test
    
    * linting
    
    * support groups accumulator
---
 datafusion/expr/src/aggregate_function.rs          |  20 -
 datafusion/expr/src/type_coercion/aggregates.rs    |  18 -
 .../functions-aggregate/src/bit_and_or_xor.rs      | 458 ++++++++++++++
 datafusion/functions-aggregate/src/lib.rs          |   7 +
 .../physical-expr/src/aggregate/bit_and_or_xor.rs  | 695 ---------------------
 datafusion/physical-expr/src/aggregate/build_in.rs |  78 +--
 datafusion/physical-expr/src/aggregate/mod.rs      |   1 -
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/proto/proto/datafusion.proto            |   6 +-
 datafusion/proto/src/generated/pbjson.rs           |   9 -
 datafusion/proto/src/generated/prost.rs            |  12 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   3 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   6 -
 datafusion/proto/src/physical_plan/to_proto.rs     |  19 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   4 +
 15 files changed, 481 insertions(+), 856 deletions(-)

diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index 441e8953df..a7fbf26feb 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,12 +47,6 @@ pub enum AggregateFunction {
     Correlation,
     /// Grouping
     Grouping,
-    /// Bit And
-    BitAnd,
-    /// Bit Or
-    BitOr,
-    /// Bit Xor
-    BitXor,
     /// Bool And
     BoolAnd,
     /// Bool Or
@@ -72,9 +66,6 @@ impl AggregateFunction {
             NthValue => "NTH_VALUE",
             Correlation => "CORR",
             Grouping => "GROUPING",
-            BitAnd => "BIT_AND",
-            BitOr => "BIT_OR",
-            BitXor => "BIT_XOR",
             BoolAnd => "BOOL_AND",
             BoolOr => "BOOL_OR",
             StringAgg => "STRING_AGG",
@@ -94,9 +85,6 @@ impl FromStr for AggregateFunction {
         Ok(match name {
             // general
             "avg" => AggregateFunction::Avg,
-            "bit_and" => AggregateFunction::BitAnd,
-            "bit_or" => AggregateFunction::BitOr,
-            "bit_xor" => AggregateFunction::BitXor,
             "bool_and" => AggregateFunction::BoolAnd,
             "bool_or" => AggregateFunction::BoolOr,
             "max" => AggregateFunction::Max,
@@ -144,9 +132,6 @@ impl AggregateFunction {
                 // The coerced_data_types is same with input_types.
                 Ok(coerced_data_types[0].clone())
             }
-            AggregateFunction::BitAnd
-            | AggregateFunction::BitOr
-            | AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()),
             AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
                 Ok(DataType::Boolean)
             }
@@ -199,11 +184,6 @@ impl AggregateFunction {
                     .collect::<Vec<_>>();
                 Signature::uniform(1, valid, Volatility::Immutable)
             }
-            AggregateFunction::BitAnd
-            | AggregateFunction::BitOr
-            | AggregateFunction::BitXor => {
-                Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable)
-            }
             AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
                 Signature::uniform(1, vec![DataType::Boolean], 
Volatility::Immutable)
             }
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index 98324ed612..a216c98899 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -121,20 +121,6 @@ pub fn coerce_types(
             };
             Ok(vec![v])
         }
-        AggregateFunction::BitAnd
-        | AggregateFunction::BitOr
-        | AggregateFunction::BitXor => {
-            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
-            // smallint, int, bigint, real, double precision, decimal, or 
interval.
-            if !is_bit_and_or_xor_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
-        }
         AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
             // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
             // smallint, int, bigint, real, double precision, decimal, or 
interval.
@@ -350,10 +336,6 @@ pub fn avg_sum_type(arg_type: &DataType) -> 
Result<DataType> {
     }
 }
 
-pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool {
-    NUMERICS.contains(arg_type)
-}
-
 pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool {
     matches!(arg_type, DataType::Boolean)
 }
diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs 
b/datafusion/functions-aggregate/src/bit_and_or_xor.rs
new file mode 100644
index 0000000000..19e24f547d
--- /dev/null
+++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines `BitAnd`, `BitOr`, `BitXor` and `BitXor DISTINCT` aggregate 
accumulators
+
+use std::any::Any;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+
+use ahash::RandomState;
+use arrow::array::{downcast_integer, Array, ArrayRef, AsArray};
+use arrow::datatypes::{
+    ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, 
Int64Type,
+    Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+};
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::type_coercion::aggregates::INTEGERS;
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::{
+    Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, 
Volatility,
+};
+
+use 
datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
+use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign};
+
+/// This macro helps create group accumulators based on bitwise operations 
typically used internally
+/// and might not be necessary for users to call directly.
+macro_rules! group_accumulator_helper {
+    ($t:ty, $dt:expr, $opr:expr) => {
+        match $opr {
+            BitwiseOperationType::And => Ok(Box::new(
+                PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| 
x.bitand_assign(y))
+                    .with_starting_value(!0),
+            )),
+            BitwiseOperationType::Or => Ok(Box::new(
+                PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| 
x.bitor_assign(y)),
+            )),
+            BitwiseOperationType::Xor => Ok(Box::new(
+                PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| 
x.bitxor_assign(y)),
+            )),
+        }
+    };
+}
+
+/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, 
BitwiseOperationType, bool)
+macro_rules! accumulator_helper {
+    ($t:ty, $opr:expr, $is_distinct: expr) => {
+        match $opr {
+            BitwiseOperationType::And => 
Ok(Box::<BitAndAccumulator<$t>>::default()),
+            BitwiseOperationType::Or => 
Ok(Box::<BitOrAccumulator<$t>>::default()),
+            BitwiseOperationType::Xor => {
+                if $is_distinct {
+                    Ok(Box::<DistinctBitXorAccumulator<$t>>::default())
+                } else {
+                    Ok(Box::<BitXorAccumulator<$t>>::default())
+                }
+            }
+        }
+    };
+}
+
+/// AND, OR and XOR only supports a subset of numeric types
+///
+/// `args` is [AccumulatorArgs]
+/// `opr` is [BitwiseOperationType]
+/// `is_distinct` is boolean value indicating whether the operation is 
distinct or not.
+macro_rules! downcast_bitwise_accumulator {
+    ($args:ident, $opr:expr, $is_distinct: expr) => {
+        match $args.data_type {
+            DataType::Int8 => accumulator_helper!(Int8Type, $opr, 
$is_distinct),
+            DataType::Int16 => accumulator_helper!(Int16Type, $opr, 
$is_distinct),
+            DataType::Int32 => accumulator_helper!(Int32Type, $opr, 
$is_distinct),
+            DataType::Int64 => accumulator_helper!(Int64Type, $opr, 
$is_distinct),
+            DataType::UInt8 => accumulator_helper!(UInt8Type, $opr, 
$is_distinct),
+            DataType::UInt16 => accumulator_helper!(UInt16Type, $opr, 
$is_distinct),
+            DataType::UInt32 => accumulator_helper!(UInt32Type, $opr, 
$is_distinct),
+            DataType::UInt64 => accumulator_helper!(UInt64Type, $opr, 
$is_distinct),
+            _ => {
+                not_impl_err!(
+                    "{} not supported for {}: {}",
+                    stringify!($opr),
+                    $args.name,
+                    $args.data_type
+                )
+            }
+        }
+    };
+}
+
+/// Simplifies the creation of User-Defined Aggregate Functions (UDAFs) for 
performing bitwise operations in a declarative manner.
+///
+/// `EXPR_FN` identifier used to name the generated expression function.
+/// `AGGREGATE_UDF_FN` is an identifier used to name the underlying UDAF 
function.
+/// `OPR_TYPE` is an expression that evaluates to the type of bitwise 
operation to be performed.
+macro_rules! make_bitwise_udaf_expr_and_func {
+    ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => {
+        make_udaf_expr!(
+            $EXPR_FN,
+            expr_x,
+            concat!(
+                "Returns the bitwise",
+                stringify!($OPR_TYPE),
+                "of a group of values"
+            ),
+            $AGGREGATE_UDF_FN
+        );
+        create_func!(
+            $EXPR_FN,
+            $AGGREGATE_UDF_FN,
+            BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN))
+        );
+    };
+}
+
+make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf, 
BitwiseOperationType::And);
+make_bitwise_udaf_expr_and_func!(bit_or, bit_or_udaf, 
BitwiseOperationType::Or);
+make_bitwise_udaf_expr_and_func!(bit_xor, bit_xor_udaf, 
BitwiseOperationType::Xor);
+
+/// The different types of bitwise operations that can be performed.
+#[derive(Debug, Clone, Eq, PartialEq)]
+enum BitwiseOperationType {
+    And,
+    Or,
+    Xor,
+}
+
+impl Display for BitwiseOperationType {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{:?}", self)
+    }
+}
+
+/// [BitwiseOperation] struct encapsulates information about a bitwise 
operation.
+#[derive(Debug)]
+struct BitwiseOperation {
+    signature: Signature,
+    /// `operation` indicates the type of bitwise operation to be performed.
+    operation: BitwiseOperationType,
+    func_name: &'static str,
+}
+
+impl BitwiseOperation {
+    pub fn new(operator: BitwiseOperationType, func_name: &'static str) -> 
Self {
+        Self {
+            operation: operator,
+            signature: Signature::uniform(1, INTEGERS.to_vec(), 
Volatility::Immutable),
+            func_name,
+        }
+    }
+}
+
+impl AggregateUDFImpl for BitwiseOperation {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        self.func_name
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let arg_type = &arg_types[0];
+        if !arg_type.is_integer() {
+            return exec_err!(
+                "[return_type] {} not supported for {}",
+                self.name(),
+                arg_type
+            );
+        }
+        Ok(arg_type.clone())
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        downcast_bitwise_accumulator!(acc_args, self.operation, 
acc_args.is_distinct)
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        if self.operation == BitwiseOperationType::Xor && args.is_distinct {
+            Ok(vec![Field::new_list(
+                format_state_name(
+                    args.name,
+                    format!("{} distinct", self.name()).as_str(),
+                ),
+                Field::new("item", args.return_type.clone(), true),
+                false,
+            )])
+        } else {
+            Ok(vec![Field::new(
+                format_state_name(args.name, self.name()),
+                args.return_type.clone(),
+                true,
+            )])
+        }
+    }
+
+    fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+        true
+    }
+
+    fn create_groups_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        let data_type = args.data_type;
+        let operation = &self.operation;
+        downcast_integer! {
+            data_type => (group_accumulator_helper, data_type, operation),
+            _ => not_impl_err!(
+                "GroupsAccumulator not supported for {} with {}",
+                self.name(),
+                data_type
+            ),
+        }
+    }
+
+    fn reverse_expr(&self) -> ReversedUDAF {
+        ReversedUDAF::Identical
+    }
+}
+
+struct BitAndAccumulator<T: ArrowNumericType> {
+    value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitAndAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "BitAndAccumulator({})", T::DATA_TYPE)
+    }
+}
+
+impl<T: ArrowNumericType> Default for BitAndAccumulator<T> {
+    fn default() -> Self {
+        Self { value: None }
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitAndAccumulator<T>
+where
+    T::Native: std::ops::BitAnd<Output = T::Native>,
+{
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if let Some(x) = 
arrow::compute::bit_and(values[0].as_primitive::<T>()) {
+            let v = self.value.get_or_insert(x);
+            *v = *v & x;
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.evaluate()?])
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+}
+
+struct BitOrAccumulator<T: ArrowNumericType> {
+    value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitOrAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "BitOrAccumulator({})", T::DATA_TYPE)
+    }
+}
+
+impl<T: ArrowNumericType> Default for BitOrAccumulator<T> {
+    fn default() -> Self {
+        Self { value: None }
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitOrAccumulator<T>
+where
+    T::Native: std::ops::BitOr<Output = T::Native>,
+{
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::<T>()) 
{
+            let v = self.value.get_or_insert(T::Native::usize_as(0));
+            *v = *v | x;
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.evaluate()?])
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+}
+
+struct BitXorAccumulator<T: ArrowNumericType> {
+    value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitXorAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "BitXorAccumulator({})", T::DATA_TYPE)
+    }
+}
+
+impl<T: ArrowNumericType> Default for BitXorAccumulator<T> {
+    fn default() -> Self {
+        Self { value: None }
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitXorAccumulator<T>
+where
+    T::Native: std::ops::BitXor<Output = T::Native>,
+{
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if let Some(x) = 
arrow::compute::bit_xor(values[0].as_primitive::<T>()) {
+            let v = self.value.get_or_insert(T::Native::usize_as(0));
+            *v = *v ^ x;
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.evaluate()?])
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+}
+
+struct DistinctBitXorAccumulator<T: ArrowNumericType> {
+    values: HashSet<T::Native, RandomState>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for DistinctBitXorAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE)
+    }
+}
+
+impl<T: ArrowNumericType> Default for DistinctBitXorAccumulator<T> {
+    fn default() -> Self {
+        Self {
+            values: HashSet::default(),
+        }
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for DistinctBitXorAccumulator<T>
+where
+    T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
+{
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let array = values[0].as_primitive::<T>();
+        match array.nulls().filter(|x| x.null_count() > 0) {
+            Some(n) => {
+                for idx in n.valid_indices() {
+                    self.values.insert(array.value(idx));
+                }
+            }
+            None => array.values().iter().for_each(|x| {
+                self.values.insert(*x);
+            }),
+        }
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        let mut acc = T::Native::usize_as(0);
+        for distinct_value in self.values.iter() {
+            acc = acc ^ *distinct_value;
+        }
+        let v = (!self.values.is_empty()).then_some(acc);
+        ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+            + self.values.capacity() * std::mem::size_of::<T::Native>()
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        // 1. Stores aggregate state in `ScalarValue::List`
+        // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
+        let state_out = {
+            let values = self
+                .values
+                .iter()
+                .map(|x| ScalarValue::new_primitive::<T>(Some(*x), 
&T::DATA_TYPE))
+                .collect::<Result<Vec<_>>>()?;
+
+            let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
+            vec![ScalarValue::List(arr)]
+        };
+        Ok(state_out)
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if let Some(state) = states.first() {
+            let list_arr = as_list_array(state)?;
+            for arr in list_arr.iter().flatten() {
+                self.update_batch(&[arr])?;
+            }
+        }
+        Ok(())
+    }
+}
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index daddb9d93f..990303bd1d 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -69,6 +69,7 @@ pub mod variance;
 pub mod approx_median;
 pub mod approx_percentile_cont;
 pub mod approx_percentile_cont_with_weight;
+pub mod bit_and_or_xor;
 
 use crate::approx_percentile_cont::approx_percentile_cont_udaf;
 use 
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
@@ -84,6 +85,9 @@ pub mod expr_fn {
     pub use super::approx_median::approx_median;
     pub use super::approx_percentile_cont::approx_percentile_cont;
     pub use 
super::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight;
+    pub use super::bit_and_or_xor::bit_and;
+    pub use super::bit_and_or_xor::bit_or;
+    pub use super::bit_and_or_xor::bit_xor;
     pub use super::count::count;
     pub use super::count::count_distinct;
     pub use super::covariance::covar_pop;
@@ -134,6 +138,9 @@ pub fn all_default_aggregate_functions() -> 
Vec<Arc<AggregateUDF>> {
         approx_distinct::approx_distinct_udaf(),
         approx_percentile_cont_udaf(),
         approx_percentile_cont_with_weight_udaf(),
+        bit_and_or_xor::bit_and_udaf(),
+        bit_and_or_xor::bit_or_udaf(),
+        bit_and_or_xor::bit_xor_udaf(),
     ]
 }
 
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs 
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
deleted file mode 100644
index 3fa225c5e4..0000000000
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ /dev/null
@@ -1,695 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators
-
-use ahash::RandomState;
-use datafusion_common::cast::as_list_array;
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::{AggregateExpr, PhysicalExpr};
-use arrow::datatypes::DataType;
-use arrow::{array::ArrayRef, datatypes::Field};
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
-use std::collections::HashSet;
-
-use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use arrow::array::Array;
-use arrow::compute::{bit_and, bit_or, bit_xor};
-use arrow_array::cast::AsArray;
-use arrow_array::{downcast_integer, ArrowNumericType};
-use arrow_buffer::ArrowNativeType;
-
-/// BIT_AND aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitAnd {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl BitAnd {
-    /// Create a new BIT_AND aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for BitAnd {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        macro_rules! helper {
-            ($t:ty) => {
-                Ok(Box::<BitAndAccumulator<$t>>::default())
-            };
-        }
-        downcast_integer! {
-            &self.data_type => (helper),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "BitAndAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ))),
-        }
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            format_state_name(&self.name, "bit_and"),
-            self.data_type.clone(),
-            self.nullable,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn groups_accumulator_supported(&self) -> bool {
-        true
-    }
-
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        use std::ops::BitAndAssign;
-
-        // Note the default value for BitAnd should be all set, i.e. `!0`
-        macro_rules! helper {
-            ($t:ty, $dt:expr) => {
-                Ok(Box::new(
-                    PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| {
-                        x.bitand_assign(y)
-                    })
-                    .with_starting_value(!0),
-                ))
-            };
-        }
-
-        let data_type = &self.data_type;
-        downcast_integer! {
-            data_type => (helper, data_type),
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ),
-        }
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone()))
-    }
-}
-
-impl PartialEq<dyn Any> for BitAnd {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-struct BitAndAccumulator<T: ArrowNumericType> {
-    value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitAndAccumulator<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "BitAndAccumulator({})", T::DATA_TYPE)
-    }
-}
-
-impl<T: ArrowNumericType> Default for BitAndAccumulator<T> {
-    fn default() -> Self {
-        Self { value: None }
-    }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitAndAccumulator<T>
-where
-    T::Native: std::ops::BitAnd<Output = T::Native>,
-{
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if let Some(x) = bit_and(values[0].as_primitive::<T>()) {
-            let v = self.value.get_or_insert(x);
-            *v = *v & x;
-        }
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        self.update_batch(states)
-    }
-
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.evaluate()?])
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-    }
-}
-
-/// BIT_OR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitOr {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl BitOr {
-    /// Create a new BIT_OR aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for BitOr {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        macro_rules! helper {
-            ($t:ty) => {
-                Ok(Box::<BitOrAccumulator<$t>>::default())
-            };
-        }
-        downcast_integer! {
-            &self.data_type => (helper),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "BitOrAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ))),
-        }
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            format_state_name(&self.name, "bit_or"),
-            self.data_type.clone(),
-            self.nullable,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn groups_accumulator_supported(&self) -> bool {
-        true
-    }
-
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        use std::ops::BitOrAssign;
-        macro_rules! helper {
-            ($t:ty, $dt:expr) => {
-                Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new(
-                    $dt,
-                    |x, y| x.bitor_assign(y),
-                )))
-            };
-        }
-
-        let data_type = &self.data_type;
-        downcast_integer! {
-            data_type => (helper, data_type),
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ),
-        }
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone()))
-    }
-}
-
-impl PartialEq<dyn Any> for BitOr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-struct BitOrAccumulator<T: ArrowNumericType> {
-    value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitOrAccumulator<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "BitOrAccumulator({})", T::DATA_TYPE)
-    }
-}
-
-impl<T: ArrowNumericType> Default for BitOrAccumulator<T> {
-    fn default() -> Self {
-        Self { value: None }
-    }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitOrAccumulator<T>
-where
-    T::Native: std::ops::BitOr<Output = T::Native>,
-{
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.evaluate()?])
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if let Some(x) = bit_or(values[0].as_primitive::<T>()) {
-            let v = self.value.get_or_insert(T::Native::usize_as(0));
-            *v = *v | x;
-        }
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        self.update_batch(states)
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-    }
-}
-
-/// BIT_XOR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitXor {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl BitXor {
-    /// Create a new BIT_XOR aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for BitXor {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        macro_rules! helper {
-            ($t:ty) => {
-                Ok(Box::<BitXorAccumulator<$t>>::default())
-            };
-        }
-        downcast_integer! {
-            &self.data_type => (helper),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "BitXor not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ))),
-        }
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            format_state_name(&self.name, "bit_xor"),
-            self.data_type.clone(),
-            self.nullable,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn groups_accumulator_supported(&self) -> bool {
-        true
-    }
-
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        use std::ops::BitXorAssign;
-        macro_rules! helper {
-            ($t:ty, $dt:expr) => {
-                Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new(
-                    $dt,
-                    |x, y| x.bitxor_assign(y),
-                )))
-            };
-        }
-
-        let data_type = &self.data_type;
-        downcast_integer! {
-            data_type => (helper, data_type),
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ),
-        }
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone()))
-    }
-}
-
-impl PartialEq<dyn Any> for BitXor {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-struct BitXorAccumulator<T: ArrowNumericType> {
-    value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitXorAccumulator<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "BitXorAccumulator({})", T::DATA_TYPE)
-    }
-}
-
-impl<T: ArrowNumericType> Default for BitXorAccumulator<T> {
-    fn default() -> Self {
-        Self { value: None }
-    }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitXorAccumulator<T>
-where
-    T::Native: std::ops::BitXor<Output = T::Native>,
-{
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.evaluate()?])
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if let Some(x) = bit_xor(values[0].as_primitive::<T>()) {
-            let v = self.value.get_or_insert(T::Native::usize_as(0));
-            *v = *v ^ x;
-        }
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        self.update_batch(states)
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-    }
-}
-
-/// Expression for a BIT_XOR(DISTINCT) aggregation.
-#[derive(Debug, Clone)]
-pub struct DistinctBitXor {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl DistinctBitXor {
-    /// Create a new DistinctBitXor aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for DistinctBitXor {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        macro_rules! helper {
-            ($t:ty) => {
-                Ok(Box::<DistinctBitXorAccumulator<$t>>::default())
-            };
-        }
-        downcast_integer! {
-            &self.data_type => (helper),
-            _ => Err(DataFusionError::NotImplemented(format!(
-                "DistinctBitXorAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ))),
-        }
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        // State field is a List which stores items to rebuild hash set.
-        Ok(vec![Field::new_list(
-            format_state_name(&self.name, "bit_xor distinct"),
-            Field::new("item", self.data_type.clone(), true),
-            false,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-}
-
-impl PartialEq<dyn Any> for DistinctBitXor {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-struct DistinctBitXorAccumulator<T: ArrowNumericType> {
-    values: HashSet<T::Native, RandomState>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for DistinctBitXorAccumulator<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE)
-    }
-}
-
-impl<T: ArrowNumericType> Default for DistinctBitXorAccumulator<T> {
-    fn default() -> Self {
-        Self {
-            values: HashSet::default(),
-        }
-    }
-}
-
-impl<T: ArrowNumericType> Accumulator for DistinctBitXorAccumulator<T>
-where
-    T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
-{
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        // 1. Stores aggregate state in `ScalarValue::List`
-        // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
-        let state_out = {
-            let values = self
-                .values
-                .iter()
-                .map(|x| ScalarValue::new_primitive::<T>(Some(*x), 
&T::DATA_TYPE))
-                .collect::<Result<Vec<_>>>()?;
-
-            let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
-            vec![ScalarValue::List(arr)]
-        };
-        Ok(state_out)
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if values.is_empty() {
-            return Ok(());
-        }
-
-        let array = values[0].as_primitive::<T>();
-        match array.nulls().filter(|x| x.null_count() > 0) {
-            Some(n) => {
-                for idx in n.valid_indices() {
-                    self.values.insert(array.value(idx));
-                }
-            }
-            None => array.values().iter().for_each(|x| {
-                self.values.insert(*x);
-            }),
-        }
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if let Some(state) = states.first() {
-            let list_arr = as_list_array(state)?;
-            for arr in list_arr.iter().flatten() {
-                self.update_batch(&[arr])?;
-            }
-        }
-        Ok(())
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        let mut acc = T::Native::usize_as(0);
-        for distinct_value in self.values.iter() {
-            acc = acc ^ *distinct_value;
-        }
-        let v = (!self.values.is_empty()).then_some(acc);
-        ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-            + self.values.capacity() * std::mem::size_of::<T::Native>()
-    }
-}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index a1f5f153a9..6c01decdbf 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -66,26 +66,6 @@ pub fn create_aggregate_expr(
             name,
             data_type,
         )),
-        (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
-        (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
-        (AggregateFunction::BitXor, false) => 
Arc::new(expressions::BitXor::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
-        (AggregateFunction::BitXor, true) => 
Arc::new(expressions::DistinctBitXor::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
         (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
             input_phy_exprs[0].clone(),
             name,
@@ -202,12 +182,10 @@ mod tests {
     use datafusion_expr::{type_coercion, Signature};
 
     use crate::expressions::{
-        try_cast, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, BoolOr,
-        DistinctArrayAgg, Max, Min,
+        try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min,
     };
 
     use super::*;
-
     #[test]
     fn test_approx_expr() -> Result<()> {
         let funcs = vec![AggregateFunction::ArrayAgg];
@@ -319,60 +297,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_bit_and_or_xor_expr() -> Result<()> {
-        let funcs = vec![
-            AggregateFunction::BitAnd,
-            AggregateFunction::BitOr,
-            AggregateFunction::BitXor,
-        ];
-        let data_types = vec![DataType::UInt64, DataType::Int64];
-        for fun in funcs {
-            for data_type in &data_types {
-                let input_schema =
-                    Schema::new(vec![Field::new("c1", data_type.clone(), 
true)]);
-                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = 
vec![Arc::new(
-                    expressions::Column::new_with_schema("c1", 
&input_schema).unwrap(),
-                )];
-                let result_agg_phy_exprs = create_physical_agg_expr_for_test(
-                    &fun,
-                    false,
-                    &input_phy_exprs[0..1],
-                    &input_schema,
-                    "c1",
-                )?;
-                match fun {
-                    AggregateFunction::BitAnd => {
-                        assert!(result_agg_phy_exprs.as_any().is::<BitAnd>());
-                        assert_eq!("c1", result_agg_phy_exprs.name());
-                        assert_eq!(
-                            Field::new("c1", data_type.clone(), true),
-                            result_agg_phy_exprs.field().unwrap()
-                        );
-                    }
-                    AggregateFunction::BitOr => {
-                        assert!(result_agg_phy_exprs.as_any().is::<BitOr>());
-                        assert_eq!("c1", result_agg_phy_exprs.name());
-                        assert_eq!(
-                            Field::new("c1", data_type.clone(), true),
-                            result_agg_phy_exprs.field().unwrap()
-                        );
-                    }
-                    AggregateFunction::BitXor => {
-                        assert!(result_agg_phy_exprs.as_any().is::<BitXor>());
-                        assert_eq!("c1", result_agg_phy_exprs.name());
-                        assert_eq!(
-                            Field::new("c1", data_type.clone(), true),
-                            result_agg_phy_exprs.field().unwrap()
-                        );
-                    }
-                    _ => {}
-                };
-            }
-        }
-        Ok(())
-    }
-
     #[test]
     fn test_bool_and_or_expr() -> Result<()> {
         let funcs = vec![AggregateFunction::BoolAnd, 
AggregateFunction::BoolOr];
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr/src/aggregate/mod.rs
index c20902c11b..0b1f5f5774 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod array_agg;
 pub(crate) mod array_agg_distinct;
 pub(crate) mod array_agg_ordered;
 pub(crate) mod average;
-pub(crate) mod bit_and_or_xor;
 pub(crate) mod bool_and_or;
 pub(crate) mod correlation;
 pub(crate) mod covariance;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index b9a159b21e..bffaafd7da 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -40,7 +40,6 @@ pub use 
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
 pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
 pub use crate::aggregate::average::Avg;
 pub use crate::aggregate::average::AvgAccumulator;
-pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, 
DistinctBitXor};
 pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
 pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::correlation::Correlation;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index e5578ae62f..ae4445eaa8 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -491,9 +491,9 @@ enum AggregateFunction {
   // APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
   GROUPING = 17;
   // MEDIAN = 18;
-  BIT_AND = 19;
-  BIT_OR = 20;
-  BIT_XOR = 21;
+  // BIT_AND = 19;
+  // BIT_OR = 20;
+  // BIT_XOR = 21;
   BOOL_AND = 22;
   BOOL_OR = 23;
   // REGR_SLOPE = 26;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 4a7b9610e5..243c75435f 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,9 +538,6 @@ impl serde::Serialize for AggregateFunction {
             Self::ArrayAgg => "ARRAY_AGG",
             Self::Correlation => "CORRELATION",
             Self::Grouping => "GROUPING",
-            Self::BitAnd => "BIT_AND",
-            Self::BitOr => "BIT_OR",
-            Self::BitXor => "BIT_XOR",
             Self::BoolAnd => "BOOL_AND",
             Self::BoolOr => "BOOL_OR",
             Self::StringAgg => "STRING_AGG",
@@ -562,9 +559,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "ARRAY_AGG",
             "CORRELATION",
             "GROUPING",
-            "BIT_AND",
-            "BIT_OR",
-            "BIT_XOR",
             "BOOL_AND",
             "BOOL_OR",
             "STRING_AGG",
@@ -615,9 +609,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
                     "CORRELATION" => Ok(AggregateFunction::Correlation),
                     "GROUPING" => Ok(AggregateFunction::Grouping),
-                    "BIT_AND" => Ok(AggregateFunction::BitAnd),
-                    "BIT_OR" => Ok(AggregateFunction::BitOr),
-                    "BIT_XOR" => Ok(AggregateFunction::BitXor),
                     "BOOL_AND" => Ok(AggregateFunction::BoolAnd),
                     "BOOL_OR" => Ok(AggregateFunction::BoolOr),
                     "STRING_AGG" => Ok(AggregateFunction::StringAgg),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index ffaef445d6..1172eccb90 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1945,9 +1945,9 @@ pub enum AggregateFunction {
     /// APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
     Grouping = 17,
     /// MEDIAN = 18;
-    BitAnd = 19,
-    BitOr = 20,
-    BitXor = 21,
+    /// BIT_AND = 19;
+    /// BIT_OR = 20;
+    /// BIT_XOR = 21;
     BoolAnd = 22,
     BoolOr = 23,
     /// REGR_SLOPE = 26;
@@ -1975,9 +1975,6 @@ impl AggregateFunction {
             AggregateFunction::ArrayAgg => "ARRAY_AGG",
             AggregateFunction::Correlation => "CORRELATION",
             AggregateFunction::Grouping => "GROUPING",
-            AggregateFunction::BitAnd => "BIT_AND",
-            AggregateFunction::BitOr => "BIT_OR",
-            AggregateFunction::BitXor => "BIT_XOR",
             AggregateFunction::BoolAnd => "BOOL_AND",
             AggregateFunction::BoolOr => "BOOL_OR",
             AggregateFunction::StringAgg => "STRING_AGG",
@@ -1993,9 +1990,6 @@ impl AggregateFunction {
             "ARRAY_AGG" => Some(Self::ArrayAgg),
             "CORRELATION" => Some(Self::Correlation),
             "GROUPING" => Some(Self::Grouping),
-            "BIT_AND" => Some(Self::BitAnd),
-            "BIT_OR" => Some(Self::BitOr),
-            "BIT_XOR" => Some(Self::BitXor),
             "BOOL_AND" => Some(Self::BoolAnd),
             "BOOL_OR" => Some(Self::BoolOr),
             "STRING_AGG" => Some(Self::StringAgg),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 25b7413a98..43cc352f98 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -140,9 +140,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::Min => Self::Min,
             protobuf::AggregateFunction::Max => Self::Max,
             protobuf::AggregateFunction::Avg => Self::Avg,
-            protobuf::AggregateFunction::BitAnd => Self::BitAnd,
-            protobuf::AggregateFunction::BitOr => Self::BitOr,
-            protobuf::AggregateFunction::BitXor => Self::BitXor,
             protobuf::AggregateFunction::BoolAnd => Self::BoolAnd,
             protobuf::AggregateFunction::BoolOr => Self::BoolOr,
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index d9548325da..33a58daeaf 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -111,9 +111,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::Min => Self::Min,
             AggregateFunction::Max => Self::Max,
             AggregateFunction::Avg => Self::Avg,
-            AggregateFunction::BitAnd => Self::BitAnd,
-            AggregateFunction::BitOr => Self::BitOr,
-            AggregateFunction::BitXor => Self::BitXor,
             AggregateFunction::BoolAnd => Self::BoolAnd,
             AggregateFunction::BoolOr => Self::BoolOr,
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
@@ -380,9 +377,6 @@ pub fn serialize_expr(
                     AggregateFunction::ArrayAgg => 
protobuf::AggregateFunction::ArrayAgg,
                     AggregateFunction::Min => protobuf::AggregateFunction::Min,
                     AggregateFunction::Max => protobuf::AggregateFunction::Max,
-                    AggregateFunction::BitAnd => 
protobuf::AggregateFunction::BitAnd,
-                    AggregateFunction::BitOr => 
protobuf::AggregateFunction::BitOr,
-                    AggregateFunction::BitXor => 
protobuf::AggregateFunction::BitXor,
                     AggregateFunction::BoolAnd => 
protobuf::AggregateFunction::BoolAnd,
                     AggregateFunction::BoolOr => 
protobuf::AggregateFunction::BoolOr,
                     AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 3a4c35a93e..886179bf56 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -23,11 +23,11 @@ use 
datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::physical_expr::window::{NthValueKind, 
SlidingAggregateWindowExpr};
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
-    ArrayAgg, Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, 
CaseExpr,
-    CastExpr, Column, Correlation, CumeDist, DistinctArrayAgg, DistinctBitXor, 
Grouping,
-    InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, 
NotExpr,
-    NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, 
RowNumber,
-    StringAgg, TryCastExpr, WindowShift,
+    ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, 
Correlation,
+    CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, 
IsNullExpr, Literal,
+    Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
+    OrderSensitiveArrayAgg, Rank, RankType, RowNumber, StringAgg, TryCastExpr,
+    WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -241,15 +241,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
 
     let inner = if aggr_expr.downcast_ref::<Grouping>().is_some() {
         protobuf::AggregateFunction::Grouping
-    } else if aggr_expr.downcast_ref::<BitAnd>().is_some() {
-        protobuf::AggregateFunction::BitAnd
-    } else if aggr_expr.downcast_ref::<BitOr>().is_some() {
-        protobuf::AggregateFunction::BitOr
-    } else if aggr_expr.downcast_ref::<BitXor>().is_some() {
-        protobuf::AggregateFunction::BitXor
-    } else if aggr_expr.downcast_ref::<DistinctBitXor>().is_some() {
-        distinct = true;
-        protobuf::AggregateFunction::BitXor
     } else if aggr_expr.downcast_ref::<BoolAnd>().is_some() {
         protobuf::AggregateFunction::BoolAnd
     } else if aggr_expr.downcast_ref::<BoolOr>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a496e22685..52696a1061 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -59,6 +59,7 @@ use datafusion_expr::{
     TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
     WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
 };
+use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor};
 use datafusion_proto::bytes::{
     logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
     logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec,
@@ -665,6 +666,9 @@ async fn roundtrip_expr_api() -> Result<()> {
         approx_median(lit(2)),
         approx_percentile_cont(lit(2), lit(0.5)),
         approx_percentile_cont_with_weight(lit(2), lit(1), lit(0.5)),
+        bit_and(lit(2)),
+        bit_or(lit(2)),
+        bit_xor(lit(2)),
     ];
 
     // ensure expressions created with the expr api can be round tripped


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to