This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 7323af79e Chore: implement bit_not as ScalarUDFImpl (#1825) 7323af79e is described below commit 7323af79ebb633a8f8da4f66d2b6488d800cb20d Author: Kazantsev Maksim <kazantsev....@yandex.ru> AuthorDate: Mon Jun 2 18:12:11 2025 +0400 Chore: implement bit_not as ScalarUDFImpl (#1825) * implement bit_not as ScalarUDFImpl * Revert expr.proto --------- Co-authored-by: Kazantsev Maksim <mn.kazant...@gmail.com> --- native/core/src/execution/planner.rs | 14 +- native/proto/src/proto/expr.proto | 1 - native/spark-expr/src/bitwise_funcs/bitwise_not.rs | 170 +++++++++------------ native/spark-expr/src/bitwise_funcs/mod.rs | 2 +- .../org/apache/comet/serde/QueryPlanSerde.scala | 10 +- 5 files changed, 84 insertions(+), 113 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 90601d19d..bbfdc4f35 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -64,7 +64,7 @@ use datafusion::{ }, prelude::SessionContext, }; -use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; +use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr, SparkBitwiseNot}; use crate::execution::operators::ExecutionError::GeneralError; use crate::execution::shuffle::CompressionCodec; @@ -102,9 +102,9 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, Contains, Correlation, - Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, - HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, + ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Contains, Correlation, Covariance, + CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, + IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; @@ -150,6 +150,7 @@ impl Default for PhysicalPlanner { // register UDFs from datafusion-spark crate session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); Self { exec_context_id: TEST_EXEC_CONTEXT_ID, @@ -162,6 +163,7 @@ impl PhysicalPlanner { pub fn new(session_ctx: Arc<SessionContext>) -> Self { // register UDFs from datafusion-spark crate session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); Self { exec_context_id: TEST_EXEC_CONTEXT_ID, session_ctx, @@ -586,10 +588,6 @@ impl PhysicalPlanner { let op = DataFusionOperator::BitwiseAnd; Ok(Arc::new(BinaryExpr::new(left, op, right))) } - ExprStruct::BitwiseNot(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(BitwiseNotExpr::new(child))) - } ExprStruct::BitwiseOr(expr) => { let left = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 90fd08948..d74e675f7 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -72,7 +72,6 @@ message Expr { NormalizeNaNAndZero normalize_nan_and_zero = 45; TruncDate truncDate = 46; TruncTimestamp truncTimestamp = 47; - UnaryExpr bitwiseNot = 48; Abs abs = 49; Subquery subquery = 50; UnboundReference unbound = 51; diff --git a/native/spark-expr/src/bitwise_funcs/bitwise_not.rs b/native/spark-expr/src/bitwise_funcs/bitwise_not.rs index 8720c4312..d3e5d29df 100644 --- a/native/spark-expr/src/bitwise_funcs/bitwise_not.rs +++ b/native/spark-expr/src/bitwise_funcs/bitwise_not.rs @@ -15,138 +15,110 @@ // specific language governing permissions and limitations // under the License. -use arrow::{ - array::*, - datatypes::{DataType, Schema}, - record_batch::RecordBatch, +use arrow::{array::*, datatypes::DataType}; +use datafusion::common::{ + exec_err, internal_datafusion_err, internal_err, DataFusionError, Result, }; -use datafusion::common::Result; -use datafusion::physical_expr::PhysicalExpr; -use datafusion::{error::DataFusionError, logical_expr::ColumnarValue}; -use std::fmt::Formatter; -use std::hash::Hash; +use datafusion::logical_expr::{ColumnarValue, Volatility}; +use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature}; use std::{any::Any, sync::Arc}; -macro_rules! compute_op { - ($OPERAND:expr, $DT:ident) => {{ - let operand = $OPERAND - .as_any() - .downcast_ref::<$DT>() - .expect("compute_op failed to downcast array"); - let result: $DT = operand.iter().map(|x| x.map(|y| !y)).collect(); - Ok(Arc::new(result)) - }}; -} - -/// BitwiseNot expression -#[derive(Debug, Eq)] -pub struct BitwiseNotExpr { - /// Input expression - arg: Arc<dyn PhysicalExpr>, +#[derive(Debug)] +pub struct SparkBitwiseNot { + signature: Signature, + aliases: Vec<String>, } -impl Hash for BitwiseNotExpr { - fn hash<H: std::hash::Hasher>(&self, state: &mut H) { - self.arg.hash(state); - } -} - -impl PartialEq for BitwiseNotExpr { - fn eq(&self, other: &Self) -> bool { - self.arg.eq(&other.arg) - } -} - -impl BitwiseNotExpr { - /// Create new bitwise not expression - pub fn new(arg: Arc<dyn PhysicalExpr>) -> Self { - Self { arg } - } - - /// Get the input expression - pub fn arg(&self) -> &Arc<dyn PhysicalExpr> { - &self.arg +impl Default for SparkBitwiseNot { + fn default() -> Self { + Self::new() } } -impl std::fmt::Display for BitwiseNotExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "(~ {})", self.arg) +impl SparkBitwiseNot { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec![], + } } } -impl PhysicalExpr for BitwiseNotExpr { - /// Return a reference to Any that can be used for downcasting +impl ScalarUDFImpl for SparkBitwiseNot { fn as_any(&self) -> &dyn Any { self } - fn data_type(&self, input_schema: &Schema) -> Result<DataType> { - self.arg.data_type(input_schema) + fn name(&self) -> &str { + "bit_not" } - fn nullable(&self, input_schema: &Schema) -> Result<bool> { - self.arg.nullable(input_schema) + fn signature(&self) -> &Signature { + &self.signature } - fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { - let arg = self.arg.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let result: Result<ArrayRef> = match array.data_type() { - DataType::Int8 => compute_op!(array, Int8Array), - DataType::Int16 => compute_op!(array, Int16Array), - DataType::Int32 => compute_op!(array, Int32Array), - DataType::Int64 => compute_op!(array, Int64Array), - _ => Err(DataFusionError::Execution(format!( - "(- '{:?}') can't be evaluated because the expression's type is {:?}, not signed int", - self, - array.data_type(), - ))), - }; - result.map(ColumnarValue::Array) - } - ColumnarValue::Scalar(_) => Err(DataFusionError::Internal( - "shouldn't go to bitwise not scalar path".to_string(), - )), - } + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + Ok(match arg_types[0] { + DataType::Int8 => DataType::Int8, + DataType::Int16 => DataType::Int16, + DataType::Int32 => DataType::Int32, + DataType::Int64 => DataType::Int64, + DataType::Null => DataType::Null, + _ => return exec_err!("{} function can only accept integral arrays", self.name()), + }) } - fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { - vec![&self.arg] + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + let args: [ColumnarValue; 1] = args + .args + .try_into() + .map_err(|_| internal_datafusion_err!("bit_not expects exactly one argument"))?; + bitwise_not(args) } - fn with_new_children( - self: Arc<Self>, - children: Vec<Arc<dyn PhysicalExpr>>, - ) -> Result<Arc<dyn PhysicalExpr>> { - Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0])))) + fn aliases(&self) -> &[String] { + &self.aliases } +} - fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } +macro_rules! compute_op { + ($OPERAND:expr, $DT:ident) => {{ + let operand = $OPERAND.as_any().downcast_ref::<$DT>().ok_or_else(|| { + DataFusionError::Execution(format!( + "compute_op failed to downcast array to: {:?}", + stringify!($DT) + )) + })?; + let result: $DT = operand.iter().map(|x| x.map(|y| !y)).collect(); + Ok(Arc::new(result)) + }}; } -pub fn bitwise_not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> { - Ok(Arc::new(BitwiseNotExpr::new(arg))) +pub fn bitwise_not(args: [ColumnarValue; 1]) -> Result<ColumnarValue> { + match args { + [ColumnarValue::Array(array)] => { + let result: Result<ArrayRef> = match array.data_type() { + DataType::Int8 => compute_op!(array, Int8Array), + DataType::Int16 => compute_op!(array, Int16Array), + DataType::Int32 => compute_op!(array, Int32Array), + DataType::Int64 => compute_op!(array, Int64Array), + _ => exec_err!("bit_not can't be evaluated because the expression's type is {:?}, not signed int", array.data_type()), + }; + result.map(ColumnarValue::Array) + } + [ColumnarValue::Scalar(_)] => internal_err!("shouldn't go to bitwise not scalar path"), + } } #[cfg(test)] mod tests { - use arrow::datatypes::*; use datafusion::common::{cast::as_int32_array, Result}; - use datafusion::physical_expr::expressions::col; use super::*; #[test] fn bitwise_not_op() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - - let expr = bitwise_not(col("a", &schema)?)?; - - let input = Int32Array::from(vec![ + let int_array = Int32Array::from(vec![ Some(1), Some(2), None, @@ -163,9 +135,13 @@ mod tests { Some(3455), ]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; + let columnar_value = ColumnarValue::Array(Arc::new(int_array)); - let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = bitwise_not([columnar_value])?; + let result = match result { + ColumnarValue::Array(array) => array, + _ => panic!("Expected array"), + }; let result = as_int32_array(&result).expect("failed to downcast to In32Array"); assert_eq!(result, expected); diff --git a/native/spark-expr/src/bitwise_funcs/mod.rs b/native/spark-expr/src/bitwise_funcs/mod.rs index 718cfc7ca..47267d852 100644 --- a/native/spark-expr/src/bitwise_funcs/mod.rs +++ b/native/spark-expr/src/bitwise_funcs/mod.rs @@ -19,4 +19,4 @@ mod bitwise_count; mod bitwise_not; pub use bitwise_count::spark_bit_count; -pub use bitwise_not::{bitwise_not, BitwiseNotExpr}; +pub use bitwise_not::SparkBitwiseNot; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 32918677e..7267852b9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1609,12 +1609,10 @@ object QueryPlanSerde extends Logging with CometExprShim { (builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr)) case BitwiseNot(child) => - createUnaryExpr( - expr, - child, - inputs, - binding, - (builder, unaryExpr) => builder.setBitwiseNot(unaryExpr)) + val childProto = exprToProto(child, inputs, binding) + val bitNotScalarExpr = + scalarFunctionExprToProto("bit_not", childProto) + optExprWithInfo(bitNotScalarExpr, expr, expr.children: _*) case BitwiseOr(left, right) => createBinaryExpr( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org