izveigor commented on code in PR #6276: URL: https://github.com/apache/arrow-datafusion/pull/6276#discussion_r1192751625
########## datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs: ########## @@ -0,0 +1,1103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +use std::any::Any; +use std::convert::TryFrom; +use std::sync::Arc; + +use crate::{AggregateExpr, PhysicalExpr}; +use arrow::datatypes::DataType; +use arrow::{ + array::{ + ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, + }, + datatypes::Field, +}; +use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; +use datafusion_expr::Accumulator; + +use crate::aggregate::row_accumulator::{ + is_row_accumulator_support_dtype, RowAccumulator, +}; +use crate::aggregate::utils::down_cast_any_ref; +use crate::expressions::format_state_name; +use arrow::array::Array; +use arrow::array::PrimitiveArray; +use arrow::datatypes::ArrowNativeTypeOp; +use arrow::datatypes::ArrowNumericType; +use datafusion_row::accessor::RowAccessor; +use std::ops::BitAnd as BitAndImplementation; +use std::ops::BitOr as BitOrImplementation; +use std::ops::BitXor as BitXorImplementation; + +fn bit_and<T>(array: &PrimitiveArray<T>) -> Option<T::Native> +where + T: ArrowNumericType, + T::Native: BitAndImplementation<Output = T::Native> + ArrowNativeTypeOp, +{ + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.values(); + + match array.nulls() { + None => { + let bit_and = data + .iter() + .fold(T::Native::ONE.neg_wrapping(), |accumulator, value| { + accumulator & *value + }); + + Some(bit_and) + } + Some(nulls) => { + let mut bit_and = T::Native::ONE.neg_wrapping(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + chunk.iter().for_each(|value| { + if (mask & index_mask) != 0 { + bit_and = bit_and & *value; + } + index_mask <<= 1; + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + bit_and = bit_and & *value; + } + }); + + Some(bit_and) + } + } +} + +fn bit_or<T>(array: &PrimitiveArray<T>) -> Option<T::Native> +where + T: ArrowNumericType, + T::Native: BitOrImplementation<Output = T::Native> + ArrowNativeTypeOp, +{ + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.values(); + + match array.nulls() { + None => { + let bit_or = data.iter().fold(T::default_value(), |accumulator, value| { + accumulator | *value + }); + + Some(bit_or) + } + Some(nulls) => { + let mut bit_or = T::default_value(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + chunk.iter().for_each(|value| { + if (mask & index_mask) != 0 { + bit_or = bit_or | *value; + } + index_mask <<= 1; + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + bit_or = bit_or | *value; + } + }); + + Some(bit_or) + } + } +} + +fn bit_xor<T>(array: &PrimitiveArray<T>) -> Option<T::Native> +where + T: ArrowNumericType, + T::Native: BitXorImplementation<Output = T::Native> + ArrowNativeTypeOp, +{ + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.values(); + + match array.nulls() { + None => { + let bit_xor = data.iter().fold(T::default_value(), |accumulator, value| { + accumulator ^ *value + }); + + Some(bit_xor) + } + Some(nulls) => { + let mut bit_xor = T::default_value(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = nulls.inner().bit_chunks(); + data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + chunk.iter().for_each(|value| { + if (mask & index_mask) != 0 { + bit_xor = bit_xor ^ *value; + } + index_mask <<= 1; + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + bit_xor = bit_xor ^ *value; + } + }); + + Some(bit_xor) + } + } +} + +// Bit and/Bit or/Bit xor aggregation can take Dictionary encode input but always produces unpacked +// (aka non Dictionary) output. We need to adjust the output data type to reflect this. +// The reason bit and/bit or/bit xor aggregate produces unpacked output because there is only one +// bit and/bit or/bit xor value per group; there is no needs to keep them Dictionary encode +fn bit_and_or_xor_aggregate_data_type(input_type: DataType) -> DataType { + if let DataType::Dictionary(_, value_type) = input_type { + *value_type + } else { + input_type + } +} + +// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account +macro_rules! typed_bit_and_or_xor_batch { + ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ + let array = downcast_value!($VALUES, $ARRAYTYPE); + let delta = $OP(array); + Ok(ScalarValue::$SCALAR(delta)) + }}; +} + +// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type. +macro_rules! bit_and_or_xor_batch { + ($VALUES:expr, $OP:ident) => {{ + match $VALUES.data_type() { Review Comment: I think the usual way is more preferable (because other aggregate functions use this method). But, I think the idea can be continued in the separate PR for all aggregate functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
