alamb commented on code in PR #6276:
URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192445618


##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {

Review Comment:
   TIL 
https://docs.rs/num-traits/latest/num_traits/ops/wrapping/trait.WrappingNeg.html
 👍 



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {

Review Comment:
   This loop and the ones in `bit_and` are *very* similar -- (specifically the 
iteration over data chunks, the bit mask, and the remainder).  The only 
difference seems to be the initialization condition as well as how to update on 
each value.
   
   Could you please try and refactor them to avoid the duplication?



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but 
always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to 
reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output 
because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them 
Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, 
taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its 
corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {

Review Comment:
   I thin you could probably use 
https://docs.rs/arrow/latest/arrow/macro.downcast_primitive_array.html to avoid 
some of this replication



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but 
always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to 
reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output 
because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them 
Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, 
taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its 
corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Int64 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP)
+            }
+            DataType::Int32 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP)
+            }
+            DataType::Int16 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP)
+            }
+            DataType::Int8 => typed_bit_and_or_xor_batch!($VALUES, Int8Array, 
Int8, $OP),
+            DataType::UInt64 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP)
+            }
+            DataType::UInt32 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP)
+            }
+            DataType::UInt16 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP)
+            }
+            DataType::UInt8 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bit and/Bit or/Bit xor is not expected to receive the 
type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bit_and(array) -> ScalarValue
+fn bit_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_and)
+}
+
+/// dynamically-typed bit_or(array) -> ScalarValue
+fn bit_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_or)
+}
+
+/// dynamically-typed bit_xor(array) -> ScalarValue
+fn bit_xor_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_xor)
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor {
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
+        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
+            (None, None) => None,
+            (Some(a), None) => Some(*a),
+            (None, Some(b)) => Some(*b),
+            (Some(a), Some(b)) => Some((*a).$OP(*b)),
+        })
+    }};
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $TYPE:ident, $OP:ident) => {{
+        paste::item! {
+            match $SCALAR {
+                None => {}
+                Some(v) => $ACC.[<$OP _ $TYPE>]($INDEX, *v as $TYPE)
+            }
+        }
+    }};
+}
+
+// bit_and/bit_or/bit_xor of two scalar values of the same type
+macro_rules! bit_and_or_xor {
+    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+        Ok(match ($VALUE, $DELTA) {
+            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt64, $OP)
+            }
+            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt32, $OP)
+            }
+            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt16, $OP)
+            }
+            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, UInt8, $OP)
+            }
+            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int64, $OP)
+            }
+            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int32, $OP)
+            }
+            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int16, $OP)
+            }
+            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
+                typed_bit_and_or_xor!(lhs, rhs, Int8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BIT AND/BIT OR/BIT XOR is not expected to receive scalars 
of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+macro_rules! bit_and_or_xor_v2 {
+    ($INDEX:ident, $ACC:ident, $SCALAR:expr, $OP:ident) => {{
+        Ok(match $SCALAR {
+            ScalarValue::UInt64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u64, $OP)
+            }
+            ScalarValue::UInt32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u32, $OP)
+            }
+            ScalarValue::UInt16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u16, $OP)
+            }
+            ScalarValue::UInt8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, u8, $OP)
+            }
+            ScalarValue::Int64(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i64, $OP)
+            }
+            ScalarValue::Int32(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i32, $OP)
+            }
+            ScalarValue::Int16(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i16, $OP)
+            }
+            ScalarValue::Int8(rhs) => {
+                typed_bit_and_or_xor_v2!($INDEX, $ACC, rhs, i8, $OP)
+            }
+            ScalarValue::Null => {
+                // do nothing
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "BIT AND/BIT OR/BIT XOR is not expected to receive scalars 
of incompatible types {:?}",
+                    e
+                )))
+            }
+        })
+    }};
+}
+
+/// the bit_and of two scalar values
+pub fn compute_bit_and(lhs: &ScalarValue, rhs: &ScalarValue) -> 
Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitand)
+}
+
+pub fn bit_and_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitand)
+}
+
+/// the bit_or of two scalar values
+pub fn compute_bit_or(lhs: &ScalarValue, rhs: &ScalarValue) -> 
Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitor)
+}
+
+pub fn bit_or_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitor)
+}
+
+/// the bit_xor of two scalar values
+pub fn compute_bit_xor(lhs: &ScalarValue, rhs: &ScalarValue) -> 
Result<ScalarValue> {
+    bit_and_or_xor!(lhs, rhs, bitxor)
+}
+
+pub fn bit_xor_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    bit_and_or_xor_v2!(index, accessor, s, bitxor)
+}
+
+/// BIT_AND aggregate expression
+#[derive(Debug, Clone)]
+pub struct BitAnd {
+    name: String,
+    pub data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
+}
+
+impl BitAnd {
+    /// Create a new BIT_AND aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type: bit_and_or_xor_aggregate_data_type(data_type),
+            nullable: true,
+        }
+    }
+}
+
+impl AggregateExpr for BitAnd {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(BitAndAccumulator::try_new(&self.data_type)?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "bit_and"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn row_accumulator_supported(&self) -> bool {
+        is_row_accumulator_support_dtype(&self.data_type)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn create_row_accumulator(
+        &self,
+        start_index: usize,
+    ) -> Result<Box<dyn RowAccumulator>> {
+        Ok(Box::new(BitAndRowAccumulator::new(
+            start_index,
+            self.data_type.clone(),
+        )))
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        Some(Arc::new(self.clone()))
+    }
+
+    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {

Review Comment:
   I don't think the BitAnd and other accumulators in this file will correctly 
implement sliding accumulators (this is another place where 
`supportes_bounded_execution` should return false as suggested by @mustafasrepo 
)



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but 
always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to 
reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output 
because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them 
Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {

Review Comment:
   again, not sure if this is the case or if it is left over copy/paste. I 
didn't see any tests for dictionary encoded values 



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -0,0 +1,1103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{
+        ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
+        UInt32Array, UInt64Array, UInt8Array,
+    },
+    datatypes::Field,
+};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use crate::aggregate::row_accumulator::{
+    is_row_accumulator_support_dtype, RowAccumulator,
+};
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use arrow::array::Array;
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::ArrowNativeTypeOp;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_row::accessor::RowAccessor;
+use std::ops::BitAnd as BitAndImplementation;
+use std::ops::BitOr as BitOrImplementation;
+use std::ops::BitXor as BitXorImplementation;
+
+fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_and = data
+                .iter()
+                .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| {
+                    accumulator & *value
+                });
+
+            Some(bit_and)
+        }
+        Some(nulls) => {
+            let mut bit_and = T::Native::ONE.neg_wrapping();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_and = bit_and & *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_and = bit_and & *value;
+                }
+            });
+
+            Some(bit_and)
+        }
+    }
+}
+
+fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_or = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator | *value
+            });
+
+            Some(bit_or)
+        }
+        Some(nulls) => {
+            let mut bit_or = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_or = bit_or | *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_or = bit_or | *value;
+                }
+            });
+
+            Some(bit_or)
+        }
+    }
+}
+
+fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+    T: ArrowNumericType,
+    T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp,
+{
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return None;
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let bit_xor = data.iter().fold(T::default_value(), |accumulator, 
value| {
+                accumulator ^ *value
+            });
+
+            Some(bit_xor)
+        }
+        Some(nulls) => {
+            let mut bit_xor = T::default_value();
+            let data_chunks = data.chunks_exact(64);
+            let remainder = data_chunks.remainder();
+
+            let bit_chunks = nulls.inner().bit_chunks();
+            data_chunks
+                .zip(bit_chunks.iter())
+                .for_each(|(chunk, mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    chunk.iter().for_each(|value| {
+                        if (mask & index_mask) != 0 {
+                            bit_xor = bit_xor ^ *value;
+                        }
+                        index_mask <<= 1;
+                    });
+                });
+
+            let remainder_bits = bit_chunks.remainder_bits();
+
+            remainder.iter().enumerate().for_each(|(i, value)| {
+                if remainder_bits & (1 << i) != 0 {
+                    bit_xor = bit_xor ^ *value;
+                }
+            });
+
+            Some(bit_xor)
+        }
+    }
+}
+
+// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but 
always produces unpacked
+// (aka non Dictionary) output. We need to adjust the output data type to 
reflect this.
+// The reason bit and/bit or/bit xor aggregate produces unpacked output 
because there is only one
+// bit and/bit or/bit xor value per group; there is no needs to keep them 
Dictionary encode
+fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType {
+    if let DataType::Dictionary(_, value_type) = input_type {
+        *value_type
+    } else {
+        input_type
+    }
+}
+
+// returns the new value after bit_and/bit_or/bit_xor with the new values, 
taking nullability into account
+macro_rules! typed_bit_and_or_xor_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its 
corresponding type.
+macro_rules! bit_and_or_xor_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Int64 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP)
+            }
+            DataType::Int32 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP)
+            }
+            DataType::Int16 => {
+                typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP)
+            }
+            DataType::Int8 => typed_bit_and_or_xor_batch!($VALUES, Int8Array, 
Int8, $OP),
+            DataType::UInt64 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP)
+            }
+            DataType::UInt32 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP)
+            }
+            DataType::UInt16 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP)
+            }
+            DataType::UInt8 => {
+                typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP)
+            }
+            e => {
+                return Err(DataFusionError::Internal(format!(
+                    "Bit and/Bit or/Bit xor is not expected to receive the 
type {e:?}"
+                )));
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bit_and(array) -> ScalarValue
+fn bit_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_and)
+}
+
+/// dynamically-typed bit_or(array) -> ScalarValue
+fn bit_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_or)
+}
+
+/// dynamically-typed bit_xor(array) -> ScalarValue
+fn bit_xor_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bit_and_or_xor_batch!(values, bit_xor)
+}
+
+// bit_and/bit_or/bit_xor of two scalar values.
+macro_rules! typed_bit_and_or_xor {

Review Comment:
   I recommend we make this a method on `ScalarValue` so that it is more 
discoverable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to