This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2daadb7523 Convert BitAnd, BitOr, BitXor to UDAF (#10930)
2daadb7523 is described below
commit 2daadb75230e2c197d2915257a9637913fa2c2e6
Author: Dharan Aditya <[email protected]>
AuthorDate: Mon Jun 17 18:36:16 2024 +0530
Convert BitAnd, BitOr, BitXor to UDAF (#10930)
* remove bit and or xor from expr
* remove bit and or xor from physical expr and proto
* add proto regen changes
* impl BitAnd, BitOr, BitXor UADF
* add support for float
* removing support for float
* refactor helper macros
* clippy'fy
* simplify Bitwise operation
* add documentation
* formatting
* fix lint issue
* remove XorDistinct
* update roundtrip_expr_api test
* linting
* support groups accumulator
---
datafusion/expr/src/aggregate_function.rs | 20 -
datafusion/expr/src/type_coercion/aggregates.rs | 18 -
.../functions-aggregate/src/bit_and_or_xor.rs | 458 ++++++++++++++
datafusion/functions-aggregate/src/lib.rs | 7 +
.../physical-expr/src/aggregate/bit_and_or_xor.rs | 695 ---------------------
datafusion/physical-expr/src/aggregate/build_in.rs | 78 +--
datafusion/physical-expr/src/aggregate/mod.rs | 1 -
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/proto/proto/datafusion.proto | 6 +-
datafusion/proto/src/generated/pbjson.rs | 9 -
datafusion/proto/src/generated/prost.rs | 12 +-
datafusion/proto/src/logical_plan/from_proto.rs | 3 -
datafusion/proto/src/logical_plan/to_proto.rs | 6 -
datafusion/proto/src/physical_plan/to_proto.rs | 19 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 4 +
15 files changed, 481 insertions(+), 856 deletions(-)
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index 441e8953df..a7fbf26feb 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,12 +47,6 @@ pub enum AggregateFunction {
Correlation,
/// Grouping
Grouping,
- /// Bit And
- BitAnd,
- /// Bit Or
- BitOr,
- /// Bit Xor
- BitXor,
/// Bool And
BoolAnd,
/// Bool Or
@@ -72,9 +66,6 @@ impl AggregateFunction {
NthValue => "NTH_VALUE",
Correlation => "CORR",
Grouping => "GROUPING",
- BitAnd => "BIT_AND",
- BitOr => "BIT_OR",
- BitXor => "BIT_XOR",
BoolAnd => "BOOL_AND",
BoolOr => "BOOL_OR",
StringAgg => "STRING_AGG",
@@ -94,9 +85,6 @@ impl FromStr for AggregateFunction {
Ok(match name {
// general
"avg" => AggregateFunction::Avg,
- "bit_and" => AggregateFunction::BitAnd,
- "bit_or" => AggregateFunction::BitOr,
- "bit_xor" => AggregateFunction::BitXor,
"bool_and" => AggregateFunction::BoolAnd,
"bool_or" => AggregateFunction::BoolOr,
"max" => AggregateFunction::Max,
@@ -144,9 +132,6 @@ impl AggregateFunction {
// The coerced_data_types is same with input_types.
Ok(coerced_data_types[0].clone())
}
- AggregateFunction::BitAnd
- | AggregateFunction::BitOr
- | AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()),
AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
Ok(DataType::Boolean)
}
@@ -199,11 +184,6 @@ impl AggregateFunction {
.collect::<Vec<_>>();
Signature::uniform(1, valid, Volatility::Immutable)
}
- AggregateFunction::BitAnd
- | AggregateFunction::BitOr
- | AggregateFunction::BitXor => {
- Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable)
- }
AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
Signature::uniform(1, vec![DataType::Boolean],
Volatility::Immutable)
}
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs
b/datafusion/expr/src/type_coercion/aggregates.rs
index 98324ed612..a216c98899 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -121,20 +121,6 @@ pub fn coerce_types(
};
Ok(vec![v])
}
- AggregateFunction::BitAnd
- | AggregateFunction::BitOr
- | AggregateFunction::BitXor => {
- // Refer to
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
- // smallint, int, bigint, real, double precision, decimal, or
interval.
- if !is_bit_and_or_xor_support_arg_type(&input_types[0]) {
- return plan_err!(
- "The function {:?} does not support inputs of type {:?}.",
- agg_fun,
- input_types[0]
- );
- }
- Ok(input_types.to_vec())
- }
AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
// Refer to
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
// smallint, int, bigint, real, double precision, decimal, or
interval.
@@ -350,10 +336,6 @@ pub fn avg_sum_type(arg_type: &DataType) ->
Result<DataType> {
}
}
-pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool {
- NUMERICS.contains(arg_type)
-}
-
pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool {
matches!(arg_type, DataType::Boolean)
}
diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs
b/datafusion/functions-aggregate/src/bit_and_or_xor.rs
new file mode 100644
index 0000000000..19e24f547d
--- /dev/null
+++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines `BitAnd`, `BitOr`, `BitXor` and `BitXor DISTINCT` aggregate
accumulators
+
+use std::any::Any;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+
+use ahash::RandomState;
+use arrow::array::{downcast_integer, Array, ArrayRef, AsArray};
+use arrow::datatypes::{
+ ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type,
Int64Type,
+ Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+};
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::type_coercion::aggregates::INTEGERS;
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::{
+ Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature,
Volatility,
+};
+
+use
datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
+use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign};
+
+/// This macro helps create group accumulators based on bitwise operations
typically used internally
+/// and might not be necessary for users to call directly.
+macro_rules! group_accumulator_helper {
+ ($t:ty, $dt:expr, $opr:expr) => {
+ match $opr {
+ BitwiseOperationType::And => Ok(Box::new(
+ PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y|
x.bitand_assign(y))
+ .with_starting_value(!0),
+ )),
+ BitwiseOperationType::Or => Ok(Box::new(
+ PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y|
x.bitor_assign(y)),
+ )),
+ BitwiseOperationType::Xor => Ok(Box::new(
+ PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y|
x.bitxor_assign(y)),
+ )),
+ }
+ };
+}
+
+/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType,
BitwiseOperationType, bool)
+macro_rules! accumulator_helper {
+ ($t:ty, $opr:expr, $is_distinct: expr) => {
+ match $opr {
+ BitwiseOperationType::And =>
Ok(Box::<BitAndAccumulator<$t>>::default()),
+ BitwiseOperationType::Or =>
Ok(Box::<BitOrAccumulator<$t>>::default()),
+ BitwiseOperationType::Xor => {
+ if $is_distinct {
+ Ok(Box::<DistinctBitXorAccumulator<$t>>::default())
+ } else {
+ Ok(Box::<BitXorAccumulator<$t>>::default())
+ }
+ }
+ }
+ };
+}
+
+/// AND, OR and XOR only supports a subset of numeric types
+///
+/// `args` is [AccumulatorArgs]
+/// `opr` is [BitwiseOperationType]
+/// `is_distinct` is boolean value indicating whether the operation is
distinct or not.
+macro_rules! downcast_bitwise_accumulator {
+ ($args:ident, $opr:expr, $is_distinct: expr) => {
+ match $args.data_type {
+ DataType::Int8 => accumulator_helper!(Int8Type, $opr,
$is_distinct),
+ DataType::Int16 => accumulator_helper!(Int16Type, $opr,
$is_distinct),
+ DataType::Int32 => accumulator_helper!(Int32Type, $opr,
$is_distinct),
+ DataType::Int64 => accumulator_helper!(Int64Type, $opr,
$is_distinct),
+ DataType::UInt8 => accumulator_helper!(UInt8Type, $opr,
$is_distinct),
+ DataType::UInt16 => accumulator_helper!(UInt16Type, $opr,
$is_distinct),
+ DataType::UInt32 => accumulator_helper!(UInt32Type, $opr,
$is_distinct),
+ DataType::UInt64 => accumulator_helper!(UInt64Type, $opr,
$is_distinct),
+ _ => {
+ not_impl_err!(
+ "{} not supported for {}: {}",
+ stringify!($opr),
+ $args.name,
+ $args.data_type
+ )
+ }
+ }
+ };
+}
+
+/// Simplifies the creation of User-Defined Aggregate Functions (UDAFs) for
performing bitwise operations in a declarative manner.
+///
+/// `EXPR_FN` identifier used to name the generated expression function.
+/// `AGGREGATE_UDF_FN` is an identifier used to name the underlying UDAF
function.
+/// `OPR_TYPE` is an expression that evaluates to the type of bitwise
operation to be performed.
+macro_rules! make_bitwise_udaf_expr_and_func {
+ ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => {
+ make_udaf_expr!(
+ $EXPR_FN,
+ expr_x,
+ concat!(
+ "Returns the bitwise",
+ stringify!($OPR_TYPE),
+ "of a group of values"
+ ),
+ $AGGREGATE_UDF_FN
+ );
+ create_func!(
+ $EXPR_FN,
+ $AGGREGATE_UDF_FN,
+ BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN))
+ );
+ };
+}
+
+make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf,
BitwiseOperationType::And);
+make_bitwise_udaf_expr_and_func!(bit_or, bit_or_udaf,
BitwiseOperationType::Or);
+make_bitwise_udaf_expr_and_func!(bit_xor, bit_xor_udaf,
BitwiseOperationType::Xor);
+
+/// The different types of bitwise operations that can be performed.
+#[derive(Debug, Clone, Eq, PartialEq)]
+enum BitwiseOperationType {
+ And,
+ Or,
+ Xor,
+}
+
+impl Display for BitwiseOperationType {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self)
+ }
+}
+
+/// [BitwiseOperation] struct encapsulates information about a bitwise
operation.
+#[derive(Debug)]
+struct BitwiseOperation {
+ signature: Signature,
+ /// `operation` indicates the type of bitwise operation to be performed.
+ operation: BitwiseOperationType,
+ func_name: &'static str,
+}
+
+impl BitwiseOperation {
+ pub fn new(operator: BitwiseOperationType, func_name: &'static str) ->
Self {
+ Self {
+ operation: operator,
+ signature: Signature::uniform(1, INTEGERS.to_vec(),
Volatility::Immutable),
+ func_name,
+ }
+ }
+}
+
+impl AggregateUDFImpl for BitwiseOperation {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ self.func_name
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let arg_type = &arg_types[0];
+ if !arg_type.is_integer() {
+ return exec_err!(
+ "[return_type] {} not supported for {}",
+ self.name(),
+ arg_type
+ );
+ }
+ Ok(arg_type.clone())
+ }
+
+ fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
+ downcast_bitwise_accumulator!(acc_args, self.operation,
acc_args.is_distinct)
+ }
+
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+ if self.operation == BitwiseOperationType::Xor && args.is_distinct {
+ Ok(vec![Field::new_list(
+ format_state_name(
+ args.name,
+ format!("{} distinct", self.name()).as_str(),
+ ),
+ Field::new("item", args.return_type.clone(), true),
+ false,
+ )])
+ } else {
+ Ok(vec![Field::new(
+ format_state_name(args.name, self.name()),
+ args.return_type.clone(),
+ true,
+ )])
+ }
+ }
+
+ fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+ true
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ args: AccumulatorArgs,
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ let data_type = args.data_type;
+ let operation = &self.operation;
+ downcast_integer! {
+ data_type => (group_accumulator_helper, data_type, operation),
+ _ => not_impl_err!(
+ "GroupsAccumulator not supported for {} with {}",
+ self.name(),
+ data_type
+ ),
+ }
+ }
+
+ fn reverse_expr(&self) -> ReversedUDAF {
+ ReversedUDAF::Identical
+ }
+}
+
+struct BitAndAccumulator<T: ArrowNumericType> {
+ value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitAndAccumulator<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "BitAndAccumulator({})", T::DATA_TYPE)
+ }
+}
+
+impl<T: ArrowNumericType> Default for BitAndAccumulator<T> {
+ fn default() -> Self {
+ Self { value: None }
+ }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitAndAccumulator<T>
+where
+ T::Native: std::ops::BitAnd<Output = T::Native>,
+{
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if let Some(x) =
arrow::compute::bit_and(values[0].as_primitive::<T>()) {
+ let v = self.value.get_or_insert(x);
+ *v = *v & x;
+ }
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![self.evaluate()?])
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ self.update_batch(states)
+ }
+}
+
+struct BitOrAccumulator<T: ArrowNumericType> {
+ value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitOrAccumulator<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "BitOrAccumulator({})", T::DATA_TYPE)
+ }
+}
+
+impl<T: ArrowNumericType> Default for BitOrAccumulator<T> {
+ fn default() -> Self {
+ Self { value: None }
+ }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitOrAccumulator<T>
+where
+ T::Native: std::ops::BitOr<Output = T::Native>,
+{
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::<T>())
{
+ let v = self.value.get_or_insert(T::Native::usize_as(0));
+ *v = *v | x;
+ }
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![self.evaluate()?])
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ self.update_batch(states)
+ }
+}
+
+struct BitXorAccumulator<T: ArrowNumericType> {
+ value: Option<T::Native>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for BitXorAccumulator<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "BitXorAccumulator({})", T::DATA_TYPE)
+ }
+}
+
+impl<T: ArrowNumericType> Default for BitXorAccumulator<T> {
+ fn default() -> Self {
+ Self { value: None }
+ }
+}
+
+impl<T: ArrowNumericType> Accumulator for BitXorAccumulator<T>
+where
+ T::Native: std::ops::BitXor<Output = T::Native>,
+{
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if let Some(x) =
arrow::compute::bit_xor(values[0].as_primitive::<T>()) {
+ let v = self.value.get_or_insert(T::Native::usize_as(0));
+ *v = *v ^ x;
+ }
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![self.evaluate()?])
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ self.update_batch(states)
+ }
+}
+
+struct DistinctBitXorAccumulator<T: ArrowNumericType> {
+ values: HashSet<T::Native, RandomState>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for DistinctBitXorAccumulator<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE)
+ }
+}
+
+impl<T: ArrowNumericType> Default for DistinctBitXorAccumulator<T> {
+ fn default() -> Self {
+ Self {
+ values: HashSet::default(),
+ }
+ }
+}
+
+impl<T: ArrowNumericType> Accumulator for DistinctBitXorAccumulator<T>
+where
+ T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
+{
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if values.is_empty() {
+ return Ok(());
+ }
+
+ let array = values[0].as_primitive::<T>();
+ match array.nulls().filter(|x| x.null_count() > 0) {
+ Some(n) => {
+ for idx in n.valid_indices() {
+ self.values.insert(array.value(idx));
+ }
+ }
+ None => array.values().iter().for_each(|x| {
+ self.values.insert(*x);
+ }),
+ }
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ let mut acc = T::Native::usize_as(0);
+ for distinct_value in self.values.iter() {
+ acc = acc ^ *distinct_value;
+ }
+ let v = (!self.values.is_empty()).then_some(acc);
+ ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ + self.values.capacity() * std::mem::size_of::<T::Native>()
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ // 1. Stores aggregate state in `ScalarValue::List`
+ // 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
+ let state_out = {
+ let values = self
+ .values
+ .iter()
+ .map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&T::DATA_TYPE))
+ .collect::<Result<Vec<_>>>()?;
+
+ let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
+ vec![ScalarValue::List(arr)]
+ };
+ Ok(state_out)
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ if let Some(state) = states.first() {
+ let list_arr = as_list_array(state)?;
+ for arr in list_arr.iter().flatten() {
+ self.update_batch(&[arr])?;
+ }
+ }
+ Ok(())
+ }
+}
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index daddb9d93f..990303bd1d 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -69,6 +69,7 @@ pub mod variance;
pub mod approx_median;
pub mod approx_percentile_cont;
pub mod approx_percentile_cont_with_weight;
+pub mod bit_and_or_xor;
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
use
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
@@ -84,6 +85,9 @@ pub mod expr_fn {
pub use super::approx_median::approx_median;
pub use super::approx_percentile_cont::approx_percentile_cont;
pub use
super::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight;
+ pub use super::bit_and_or_xor::bit_and;
+ pub use super::bit_and_or_xor::bit_or;
+ pub use super::bit_and_or_xor::bit_xor;
pub use super::count::count;
pub use super::count::count_distinct;
pub use super::covariance::covar_pop;
@@ -134,6 +138,9 @@ pub fn all_default_aggregate_functions() ->
Vec<Arc<AggregateUDF>> {
approx_distinct::approx_distinct_udaf(),
approx_percentile_cont_udaf(),
approx_percentile_cont_with_weight_udaf(),
+ bit_and_or_xor::bit_and_udaf(),
+ bit_and_or_xor::bit_or_udaf(),
+ bit_and_or_xor::bit_xor_udaf(),
]
}
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
deleted file mode 100644
index 3fa225c5e4..0000000000
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ /dev/null
@@ -1,695 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators
-
-use ahash::RandomState;
-use datafusion_common::cast::as_list_array;
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::{AggregateExpr, PhysicalExpr};
-use arrow::datatypes::DataType;
-use arrow::{array::ArrayRef, datatypes::Field};
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
-use std::collections::HashSet;
-
-use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use arrow::array::Array;
-use arrow::compute::{bit_and, bit_or, bit_xor};
-use arrow_array::cast::AsArray;
-use arrow_array::{downcast_integer, ArrowNumericType};
-use arrow_buffer::ArrowNativeType;
-
-/// BIT_AND aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitAnd {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl BitAnd {
- /// Create a new BIT_AND aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for BitAnd {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- macro_rules! helper {
- ($t:ty) => {
- Ok(Box::<BitAndAccumulator<$t>>::default())
- };
- }
- downcast_integer! {
- &self.data_type => (helper),
- _ => Err(DataFusionError::NotImplemented(format!(
- "BitAndAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ))),
- }
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
- format_state_name(&self.name, "bit_and"),
- self.data_type.clone(),
- self.nullable,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn groups_accumulator_supported(&self) -> bool {
- true
- }
-
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- use std::ops::BitAndAssign;
-
- // Note the default value for BitAnd should be all set, i.e. `!0`
- macro_rules! helper {
- ($t:ty, $dt:expr) => {
- Ok(Box::new(
- PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| {
- x.bitand_assign(y)
- })
- .with_starting_value(!0),
- ))
- };
- }
-
- let data_type = &self.data_type;
- downcast_integer! {
- data_type => (helper, data_type),
- _ => not_impl_err!(
- "GroupsAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ),
- }
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone()))
- }
-}
-
-impl PartialEq<dyn Any> for BitAnd {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-struct BitAndAccumulator<T: ArrowNumericType> {
- value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitAndAccumulator<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "BitAndAccumulator({})", T::DATA_TYPE)
- }
-}
-
-impl<T: ArrowNumericType> Default for BitAndAccumulator<T> {
- fn default() -> Self {
- Self { value: None }
- }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitAndAccumulator<T>
-where
- T::Native: std::ops::BitAnd<Output = T::Native>,
-{
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if let Some(x) = bit_and(values[0].as_primitive::<T>()) {
- let v = self.value.get_or_insert(x);
- *v = *v & x;
- }
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- self.update_batch(states)
- }
-
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.evaluate()?])
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-/// BIT_OR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitOr {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl BitOr {
- /// Create a new BIT_OR aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for BitOr {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- macro_rules! helper {
- ($t:ty) => {
- Ok(Box::<BitOrAccumulator<$t>>::default())
- };
- }
- downcast_integer! {
- &self.data_type => (helper),
- _ => Err(DataFusionError::NotImplemented(format!(
- "BitOrAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ))),
- }
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
- format_state_name(&self.name, "bit_or"),
- self.data_type.clone(),
- self.nullable,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn groups_accumulator_supported(&self) -> bool {
- true
- }
-
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- use std::ops::BitOrAssign;
- macro_rules! helper {
- ($t:ty, $dt:expr) => {
- Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new(
- $dt,
- |x, y| x.bitor_assign(y),
- )))
- };
- }
-
- let data_type = &self.data_type;
- downcast_integer! {
- data_type => (helper, data_type),
- _ => not_impl_err!(
- "GroupsAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ),
- }
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone()))
- }
-}
-
-impl PartialEq<dyn Any> for BitOr {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-struct BitOrAccumulator<T: ArrowNumericType> {
- value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitOrAccumulator<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "BitOrAccumulator({})", T::DATA_TYPE)
- }
-}
-
-impl<T: ArrowNumericType> Default for BitOrAccumulator<T> {
- fn default() -> Self {
- Self { value: None }
- }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitOrAccumulator<T>
-where
- T::Native: std::ops::BitOr<Output = T::Native>,
-{
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.evaluate()?])
- }
-
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if let Some(x) = bit_or(values[0].as_primitive::<T>()) {
- let v = self.value.get_or_insert(T::Native::usize_as(0));
- *v = *v | x;
- }
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- self.update_batch(states)
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-/// BIT_XOR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BitXor {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl BitXor {
- /// Create a new BIT_XOR aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for BitXor {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- macro_rules! helper {
- ($t:ty) => {
- Ok(Box::<BitXorAccumulator<$t>>::default())
- };
- }
- downcast_integer! {
- &self.data_type => (helper),
- _ => Err(DataFusionError::NotImplemented(format!(
- "BitXor not supported for {} with {}",
- self.name(),
- self.data_type
- ))),
- }
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
- format_state_name(&self.name, "bit_xor"),
- self.data_type.clone(),
- self.nullable,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn groups_accumulator_supported(&self) -> bool {
- true
- }
-
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- use std::ops::BitXorAssign;
- macro_rules! helper {
- ($t:ty, $dt:expr) => {
- Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new(
- $dt,
- |x, y| x.bitxor_assign(y),
- )))
- };
- }
-
- let data_type = &self.data_type;
- downcast_integer! {
- data_type => (helper, data_type),
- _ => not_impl_err!(
- "GroupsAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ),
- }
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone()))
- }
-}
-
-impl PartialEq<dyn Any> for BitXor {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-struct BitXorAccumulator<T: ArrowNumericType> {
- value: Option<T::Native>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for BitXorAccumulator<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "BitXorAccumulator({})", T::DATA_TYPE)
- }
-}
-
-impl<T: ArrowNumericType> Default for BitXorAccumulator<T> {
- fn default() -> Self {
- Self { value: None }
- }
-}
-
-impl<T: ArrowNumericType> Accumulator for BitXorAccumulator<T>
-where
- T::Native: std::ops::BitXor<Output = T::Native>,
-{
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.evaluate()?])
- }
-
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if let Some(x) = bit_xor(values[0].as_primitive::<T>()) {
- let v = self.value.get_or_insert(T::Native::usize_as(0));
- *v = *v ^ x;
- }
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- self.update_batch(states)
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-/// Expression for a BIT_XOR(DISTINCT) aggregation.
-#[derive(Debug, Clone)]
-pub struct DistinctBitXor {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl DistinctBitXor {
- /// Create a new DistinctBitXor aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for DistinctBitXor {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- macro_rules! helper {
- ($t:ty) => {
- Ok(Box::<DistinctBitXorAccumulator<$t>>::default())
- };
- }
- downcast_integer! {
- &self.data_type => (helper),
- _ => Err(DataFusionError::NotImplemented(format!(
- "DistinctBitXorAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ))),
- }
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- // State field is a List which stores items to rebuild hash set.
- Ok(vec![Field::new_list(
- format_state_name(&self.name, "bit_xor distinct"),
- Field::new("item", self.data_type.clone(), true),
- false,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-}
-
-impl PartialEq<dyn Any> for DistinctBitXor {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-struct DistinctBitXorAccumulator<T: ArrowNumericType> {
- values: HashSet<T::Native, RandomState>,
-}
-
-impl<T: ArrowNumericType> std::fmt::Debug for DistinctBitXorAccumulator<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE)
- }
-}
-
-impl<T: ArrowNumericType> Default for DistinctBitXorAccumulator<T> {
- fn default() -> Self {
- Self {
- values: HashSet::default(),
- }
- }
-}
-
-impl<T: ArrowNumericType> Accumulator for DistinctBitXorAccumulator<T>
-where
- T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
-{
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- // 1. Stores aggregate state in `ScalarValue::List`
- // 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
- let state_out = {
- let values = self
- .values
- .iter()
- .map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&T::DATA_TYPE))
- .collect::<Result<Vec<_>>>()?;
-
- let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
- vec![ScalarValue::List(arr)]
- };
- Ok(state_out)
- }
-
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if values.is_empty() {
- return Ok(());
- }
-
- let array = values[0].as_primitive::<T>();
- match array.nulls().filter(|x| x.null_count() > 0) {
- Some(n) => {
- for idx in n.valid_indices() {
- self.values.insert(array.value(idx));
- }
- }
- None => array.values().iter().for_each(|x| {
- self.values.insert(*x);
- }),
- }
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- if let Some(state) = states.first() {
- let list_arr = as_list_array(state)?;
- for arr in list_arr.iter().flatten() {
- self.update_batch(&[arr])?;
- }
- }
- Ok(())
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- let mut acc = T::Native::usize_as(0);
- for distinct_value in self.values.iter() {
- acc = acc ^ *distinct_value;
- }
- let v = (!self.values.is_empty()).then_some(acc);
- ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- + self.values.capacity() * std::mem::size_of::<T::Native>()
- }
-}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs
b/datafusion/physical-expr/src/aggregate/build_in.rs
index a1f5f153a9..6c01decdbf 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -66,26 +66,6 @@ pub fn create_aggregate_expr(
name,
data_type,
)),
- (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
- (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
- (AggregateFunction::BitXor, false) =>
Arc::new(expressions::BitXor::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
- (AggregateFunction::BitXor, true) =>
Arc::new(expressions::DistinctBitXor::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
(AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
input_phy_exprs[0].clone(),
name,
@@ -202,12 +182,10 @@ mod tests {
use datafusion_expr::{type_coercion, Signature};
use crate::expressions::{
- try_cast, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, BoolOr,
- DistinctArrayAgg, Max, Min,
+ try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min,
};
use super::*;
-
#[test]
fn test_approx_expr() -> Result<()> {
let funcs = vec![AggregateFunction::ArrayAgg];
@@ -319,60 +297,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_bit_and_or_xor_expr() -> Result<()> {
- let funcs = vec![
- AggregateFunction::BitAnd,
- AggregateFunction::BitOr,
- AggregateFunction::BitXor,
- ];
- let data_types = vec![DataType::UInt64, DataType::Int64];
- for fun in funcs {
- for data_type in &data_types {
- let input_schema =
- Schema::new(vec![Field::new("c1", data_type.clone(),
true)]);
- let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> =
vec![Arc::new(
- expressions::Column::new_with_schema("c1",
&input_schema).unwrap(),
- )];
- let result_agg_phy_exprs = create_physical_agg_expr_for_test(
- &fun,
- false,
- &input_phy_exprs[0..1],
- &input_schema,
- "c1",
- )?;
- match fun {
- AggregateFunction::BitAnd => {
- assert!(result_agg_phy_exprs.as_any().is::<BitAnd>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", data_type.clone(), true),
- result_agg_phy_exprs.field().unwrap()
- );
- }
- AggregateFunction::BitOr => {
- assert!(result_agg_phy_exprs.as_any().is::<BitOr>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", data_type.clone(), true),
- result_agg_phy_exprs.field().unwrap()
- );
- }
- AggregateFunction::BitXor => {
- assert!(result_agg_phy_exprs.as_any().is::<BitXor>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", data_type.clone(), true),
- result_agg_phy_exprs.field().unwrap()
- );
- }
- _ => {}
- };
- }
- }
- Ok(())
- }
-
#[test]
fn test_bool_and_or_expr() -> Result<()> {
let funcs = vec![AggregateFunction::BoolAnd,
AggregateFunction::BoolOr];
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index c20902c11b..0b1f5f5774 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod array_agg;
pub(crate) mod array_agg_distinct;
pub(crate) mod array_agg_ordered;
pub(crate) mod average;
-pub(crate) mod bit_and_or_xor;
pub(crate) mod bool_and_or;
pub(crate) mod correlation;
pub(crate) mod covariance;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index b9a159b21e..bffaafd7da 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -40,7 +40,6 @@ pub use
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
pub use crate::aggregate::average::Avg;
pub use crate::aggregate::average::AvgAccumulator;
-pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor,
DistinctBitXor};
pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
pub use crate::aggregate::build_in::create_aggregate_expr;
pub use crate::aggregate::correlation::Correlation;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index e5578ae62f..ae4445eaa8 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -491,9 +491,9 @@ enum AggregateFunction {
// APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
GROUPING = 17;
// MEDIAN = 18;
- BIT_AND = 19;
- BIT_OR = 20;
- BIT_XOR = 21;
+ // BIT_AND = 19;
+ // BIT_OR = 20;
+ // BIT_XOR = 21;
BOOL_AND = 22;
BOOL_OR = 23;
// REGR_SLOPE = 26;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 4a7b9610e5..243c75435f 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,9 +538,6 @@ impl serde::Serialize for AggregateFunction {
Self::ArrayAgg => "ARRAY_AGG",
Self::Correlation => "CORRELATION",
Self::Grouping => "GROUPING",
- Self::BitAnd => "BIT_AND",
- Self::BitOr => "BIT_OR",
- Self::BitXor => "BIT_XOR",
Self::BoolAnd => "BOOL_AND",
Self::BoolOr => "BOOL_OR",
Self::StringAgg => "STRING_AGG",
@@ -562,9 +559,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"ARRAY_AGG",
"CORRELATION",
"GROUPING",
- "BIT_AND",
- "BIT_OR",
- "BIT_XOR",
"BOOL_AND",
"BOOL_OR",
"STRING_AGG",
@@ -615,9 +609,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
"CORRELATION" => Ok(AggregateFunction::Correlation),
"GROUPING" => Ok(AggregateFunction::Grouping),
- "BIT_AND" => Ok(AggregateFunction::BitAnd),
- "BIT_OR" => Ok(AggregateFunction::BitOr),
- "BIT_XOR" => Ok(AggregateFunction::BitXor),
"BOOL_AND" => Ok(AggregateFunction::BoolAnd),
"BOOL_OR" => Ok(AggregateFunction::BoolOr),
"STRING_AGG" => Ok(AggregateFunction::StringAgg),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index ffaef445d6..1172eccb90 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1945,9 +1945,9 @@ pub enum AggregateFunction {
/// APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
Grouping = 17,
/// MEDIAN = 18;
- BitAnd = 19,
- BitOr = 20,
- BitXor = 21,
+ /// BIT_AND = 19;
+ /// BIT_OR = 20;
+ /// BIT_XOR = 21;
BoolAnd = 22,
BoolOr = 23,
/// REGR_SLOPE = 26;
@@ -1975,9 +1975,6 @@ impl AggregateFunction {
AggregateFunction::ArrayAgg => "ARRAY_AGG",
AggregateFunction::Correlation => "CORRELATION",
AggregateFunction::Grouping => "GROUPING",
- AggregateFunction::BitAnd => "BIT_AND",
- AggregateFunction::BitOr => "BIT_OR",
- AggregateFunction::BitXor => "BIT_XOR",
AggregateFunction::BoolAnd => "BOOL_AND",
AggregateFunction::BoolOr => "BOOL_OR",
AggregateFunction::StringAgg => "STRING_AGG",
@@ -1993,9 +1990,6 @@ impl AggregateFunction {
"ARRAY_AGG" => Some(Self::ArrayAgg),
"CORRELATION" => Some(Self::Correlation),
"GROUPING" => Some(Self::Grouping),
- "BIT_AND" => Some(Self::BitAnd),
- "BIT_OR" => Some(Self::BitOr),
- "BIT_XOR" => Some(Self::BitXor),
"BOOL_AND" => Some(Self::BoolAnd),
"BOOL_OR" => Some(Self::BoolOr),
"STRING_AGG" => Some(Self::StringAgg),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 25b7413a98..43cc352f98 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -140,9 +140,6 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
protobuf::AggregateFunction::Min => Self::Min,
protobuf::AggregateFunction::Max => Self::Max,
protobuf::AggregateFunction::Avg => Self::Avg,
- protobuf::AggregateFunction::BitAnd => Self::BitAnd,
- protobuf::AggregateFunction::BitOr => Self::BitOr,
- protobuf::AggregateFunction::BitXor => Self::BitXor,
protobuf::AggregateFunction::BoolAnd => Self::BoolAnd,
protobuf::AggregateFunction::BoolOr => Self::BoolOr,
protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index d9548325da..33a58daeaf 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -111,9 +111,6 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
AggregateFunction::Min => Self::Min,
AggregateFunction::Max => Self::Max,
AggregateFunction::Avg => Self::Avg,
- AggregateFunction::BitAnd => Self::BitAnd,
- AggregateFunction::BitOr => Self::BitOr,
- AggregateFunction::BitXor => Self::BitXor,
AggregateFunction::BoolAnd => Self::BoolAnd,
AggregateFunction::BoolOr => Self::BoolOr,
AggregateFunction::ArrayAgg => Self::ArrayAgg,
@@ -380,9 +377,6 @@ pub fn serialize_expr(
AggregateFunction::ArrayAgg =>
protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
- AggregateFunction::BitAnd =>
protobuf::AggregateFunction::BitAnd,
- AggregateFunction::BitOr =>
protobuf::AggregateFunction::BitOr,
- AggregateFunction::BitXor =>
protobuf::AggregateFunction::BitXor,
AggregateFunction::BoolAnd =>
protobuf::AggregateFunction::BoolAnd,
AggregateFunction::BoolOr =>
protobuf::AggregateFunction::BoolOr,
AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 3a4c35a93e..886179bf56 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -23,11 +23,11 @@ use
datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::physical_expr::window::{NthValueKind,
SlidingAggregateWindowExpr};
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
- ArrayAgg, Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr,
CaseExpr,
- CastExpr, Column, Correlation, CumeDist, DistinctArrayAgg, DistinctBitXor,
Grouping,
- InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr,
NotExpr,
- NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType,
RowNumber,
- StringAgg, TryCastExpr, WindowShift,
+ ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column,
Correlation,
+ CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr,
IsNullExpr, Literal,
+ Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
+ OrderSensitiveArrayAgg, Rank, RankType, RowNumber, StringAgg, TryCastExpr,
+ WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -241,15 +241,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
let inner = if aggr_expr.downcast_ref::<Grouping>().is_some() {
protobuf::AggregateFunction::Grouping
- } else if aggr_expr.downcast_ref::<BitAnd>().is_some() {
- protobuf::AggregateFunction::BitAnd
- } else if aggr_expr.downcast_ref::<BitOr>().is_some() {
- protobuf::AggregateFunction::BitOr
- } else if aggr_expr.downcast_ref::<BitXor>().is_some() {
- protobuf::AggregateFunction::BitXor
- } else if aggr_expr.downcast_ref::<DistinctBitXor>().is_some() {
- distinct = true;
- protobuf::AggregateFunction::BitXor
} else if aggr_expr.downcast_ref::<BoolAnd>().is_some() {
protobuf::AggregateFunction::BoolAnd
} else if aggr_expr.downcast_ref::<BoolOr>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index a496e22685..52696a1061 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -59,6 +59,7 @@ use datafusion_expr::{
TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
};
+use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor};
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec,
@@ -665,6 +666,9 @@ async fn roundtrip_expr_api() -> Result<()> {
approx_median(lit(2)),
approx_percentile_cont(lit(2), lit(0.5)),
approx_percentile_cont_with_weight(lit(2), lit(1), lit(0.5)),
+ bit_and(lit(2)),
+ bit_or(lit(2)),
+ bit_xor(lit(2)),
];
// ensure expressions created with the expr api can be round tripped
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]