This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7323af79e Chore: implement bit_not as ScalarUDFImpl (#1825)
7323af79e is described below

commit 7323af79ebb633a8f8da4f66d2b6488d800cb20d
Author: Kazantsev Maksim <kazantsev....@yandex.ru>
AuthorDate: Mon Jun 2 18:12:11 2025 +0400

    Chore: implement bit_not as ScalarUDFImpl (#1825)
    
    * implement bit_not as ScalarUDFImpl
    
    * Revert expr.proto
    
    ---------
    
    Co-authored-by: Kazantsev Maksim <mn.kazant...@gmail.com>
---
 native/core/src/execution/planner.rs               |  14 +-
 native/proto/src/proto/expr.proto                  |   1 -
 native/spark-expr/src/bitwise_funcs/bitwise_not.rs | 170 +++++++++------------
 native/spark-expr/src/bitwise_funcs/mod.rs         |   2 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  10 +-
 5 files changed, 84 insertions(+), 113 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 90601d19d..bbfdc4f35 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -64,7 +64,7 @@ use datafusion::{
     },
     prelude::SessionContext,
 };
-use datafusion_comet_spark_expr::{create_comet_physical_fun, 
create_negate_expr};
+use datafusion_comet_spark_expr::{create_comet_physical_fun, 
create_negate_expr, SparkBitwiseNot};
 
 use crate::execution::operators::ExecutionError::GeneralError;
 use crate::execution::shuffle::CompressionCodec;
@@ -102,9 +102,9 @@ use datafusion_comet_proto::{
     spark_partitioning::{partitioning::PartitioningStruct, Partitioning as 
SparkPartitioning},
 };
 use datafusion_comet_spark_expr::{
-    ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, 
Contains, Correlation,
-    Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, 
GetArrayStructFields, GetStructField,
-    HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, 
RLike, SecondExpr,
+    ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Contains, Correlation, 
Covariance,
+    CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, 
GetStructField, HourExpr,
+    IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, 
SecondExpr,
     SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, 
SumDecimal,
     TimestampTruncExpr, ToJson, UnboundColumn, Variance,
 };
@@ -150,6 +150,7 @@ impl Default for PhysicalPlanner {
 
         // register UDFs from datafusion-spark crate
         
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
+        
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default()));
 
         Self {
             exec_context_id: TEST_EXEC_CONTEXT_ID,
@@ -162,6 +163,7 @@ impl PhysicalPlanner {
     pub fn new(session_ctx: Arc<SessionContext>) -> Self {
         // register UDFs from datafusion-spark crate
         
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
+        
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default()));
         Self {
             exec_context_id: TEST_EXEC_CONTEXT_ID,
             session_ctx,
@@ -586,10 +588,6 @@ impl PhysicalPlanner {
                 let op = DataFusionOperator::BitwiseAnd;
                 Ok(Arc::new(BinaryExpr::new(left, op, right)))
             }
-            ExprStruct::BitwiseNot(expr) => {
-                let child = self.create_expr(expr.child.as_ref().unwrap(), 
input_schema)?;
-                Ok(Arc::new(BitwiseNotExpr::new(child)))
-            }
             ExprStruct::BitwiseOr(expr) => {
                 let left =
                     self.create_expr(expr.left.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
diff --git a/native/proto/src/proto/expr.proto 
b/native/proto/src/proto/expr.proto
index 90fd08948..d74e675f7 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -72,7 +72,6 @@ message Expr {
     NormalizeNaNAndZero normalize_nan_and_zero = 45;
     TruncDate truncDate = 46;
     TruncTimestamp truncTimestamp = 47;
-    UnaryExpr bitwiseNot = 48;
     Abs abs = 49;
     Subquery subquery = 50;
     UnboundReference unbound = 51;
diff --git a/native/spark-expr/src/bitwise_funcs/bitwise_not.rs 
b/native/spark-expr/src/bitwise_funcs/bitwise_not.rs
index 8720c4312..d3e5d29df 100644
--- a/native/spark-expr/src/bitwise_funcs/bitwise_not.rs
+++ b/native/spark-expr/src/bitwise_funcs/bitwise_not.rs
@@ -15,138 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::{
-    array::*,
-    datatypes::{DataType, Schema},
-    record_batch::RecordBatch,
+use arrow::{array::*, datatypes::DataType};
+use datafusion::common::{
+    exec_err, internal_datafusion_err, internal_err, DataFusionError, Result,
 };
-use datafusion::common::Result;
-use datafusion::physical_expr::PhysicalExpr;
-use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
-use std::fmt::Formatter;
-use std::hash::Hash;
+use datafusion::logical_expr::{ColumnarValue, Volatility};
+use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature};
 use std::{any::Any, sync::Arc};
 
-macro_rules! compute_op {
-    ($OPERAND:expr, $DT:ident) => {{
-        let operand = $OPERAND
-            .as_any()
-            .downcast_ref::<$DT>()
-            .expect("compute_op failed to downcast array");
-        let result: $DT = operand.iter().map(|x| x.map(|y| !y)).collect();
-        Ok(Arc::new(result))
-    }};
-}
-
-/// BitwiseNot expression
-#[derive(Debug, Eq)]
-pub struct BitwiseNotExpr {
-    /// Input expression
-    arg: Arc<dyn PhysicalExpr>,
+#[derive(Debug)]
+pub struct SparkBitwiseNot {
+    signature: Signature,
+    aliases: Vec<String>,
 }
 
-impl Hash for BitwiseNotExpr {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.arg.hash(state);
-    }
-}
-
-impl PartialEq for BitwiseNotExpr {
-    fn eq(&self, other: &Self) -> bool {
-        self.arg.eq(&other.arg)
-    }
-}
-
-impl BitwiseNotExpr {
-    /// Create new bitwise not expression
-    pub fn new(arg: Arc<dyn PhysicalExpr>) -> Self {
-        Self { arg }
-    }
-
-    /// Get the input expression
-    pub fn arg(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.arg
+impl Default for SparkBitwiseNot {
+    fn default() -> Self {
+        Self::new()
     }
 }
 
-impl std::fmt::Display for BitwiseNotExpr {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "(~ {})", self.arg)
+impl SparkBitwiseNot {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::user_defined(Volatility::Immutable),
+            aliases: vec![],
+        }
     }
 }
 
-impl PhysicalExpr for BitwiseNotExpr {
-    /// Return a reference to Any that can be used for downcasting
+impl ScalarUDFImpl for SparkBitwiseNot {
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        self.arg.data_type(input_schema)
+    fn name(&self) -> &str {
+        "bit_not"
     }
 
-    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
-        self.arg.nullable(input_schema)
+    fn signature(&self) -> &Signature {
+        &self.signature
     }
 
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        let arg = self.arg.evaluate(batch)?;
-        match arg {
-            ColumnarValue::Array(array) => {
-                let result: Result<ArrayRef> = match array.data_type() {
-                    DataType::Int8 => compute_op!(array, Int8Array),
-                    DataType::Int16 => compute_op!(array, Int16Array),
-                    DataType::Int32 => compute_op!(array, Int32Array),
-                    DataType::Int64 => compute_op!(array, Int64Array),
-                    _ => Err(DataFusionError::Execution(format!(
-                        "(- '{:?}') can't be evaluated because the 
expression's type is {:?}, not signed int",
-                        self,
-                        array.data_type(),
-                    ))),
-                };
-                result.map(ColumnarValue::Array)
-            }
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
-                "shouldn't go to bitwise not scalar path".to_string(),
-            )),
-        }
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(match arg_types[0] {
+            DataType::Int8 => DataType::Int8,
+            DataType::Int16 => DataType::Int16,
+            DataType::Int32 => DataType::Int32,
+            DataType::Int64 => DataType::Int64,
+            DataType::Null => DataType::Null,
+            _ => return exec_err!("{} function can only accept integral 
arrays", self.name()),
+        })
     }
 
-    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
-        vec![&self.arg]
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let args: [ColumnarValue; 1] = args
+            .args
+            .try_into()
+            .map_err(|_| internal_datafusion_err!("bit_not expects exactly one 
argument"))?;
+        bitwise_not(args)
     }
 
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn PhysicalExpr>>,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0]))))
+    fn aliases(&self) -> &[String] {
+        &self.aliases
     }
+}
 
-    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
-        unimplemented!()
-    }
+macro_rules! compute_op {
+    ($OPERAND:expr, $DT:ident) => {{
+        let operand = $OPERAND.as_any().downcast_ref::<$DT>().ok_or_else(|| {
+            DataFusionError::Execution(format!(
+                "compute_op failed to downcast array to: {:?}",
+                stringify!($DT)
+            ))
+        })?;
+        let result: $DT = operand.iter().map(|x| x.map(|y| !y)).collect();
+        Ok(Arc::new(result))
+    }};
 }
 
-pub fn bitwise_not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
-    Ok(Arc::new(BitwiseNotExpr::new(arg)))
+pub fn bitwise_not(args: [ColumnarValue; 1]) -> Result<ColumnarValue> {
+    match args {
+        [ColumnarValue::Array(array)] => {
+            let result: Result<ArrayRef> = match array.data_type() {
+                DataType::Int8 => compute_op!(array, Int8Array),
+                DataType::Int16 => compute_op!(array, Int16Array),
+                DataType::Int32 => compute_op!(array, Int32Array),
+                DataType::Int64 => compute_op!(array, Int64Array),
+                _ => exec_err!("bit_not can't be evaluated because the 
expression's type is {:?}, not signed int", array.data_type()),
+            };
+            result.map(ColumnarValue::Array)
+        }
+        [ColumnarValue::Scalar(_)] => internal_err!("shouldn't go to bitwise 
not scalar path"),
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::*;
     use datafusion::common::{cast::as_int32_array, Result};
-    use datafusion::physical_expr::expressions::col;
 
     use super::*;
 
     #[test]
     fn bitwise_not_op() -> Result<()> {
-        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
-
-        let expr = bitwise_not(col("a", &schema)?)?;
-
-        let input = Int32Array::from(vec![
+        let int_array = Int32Array::from(vec![
             Some(1),
             Some(2),
             None,
@@ -163,9 +135,13 @@ mod tests {
             Some(3455),
         ]);
 
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(input)])?;
+        let columnar_value = ColumnarValue::Array(Arc::new(int_array));
 
-        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = bitwise_not([columnar_value])?;
+        let result = match result {
+            ColumnarValue::Array(array) => array,
+            _ => panic!("Expected array"),
+        };
         let result = as_int32_array(&result).expect("failed to downcast to 
In32Array");
         assert_eq!(result, expected);
 
diff --git a/native/spark-expr/src/bitwise_funcs/mod.rs 
b/native/spark-expr/src/bitwise_funcs/mod.rs
index 718cfc7ca..47267d852 100644
--- a/native/spark-expr/src/bitwise_funcs/mod.rs
+++ b/native/spark-expr/src/bitwise_funcs/mod.rs
@@ -19,4 +19,4 @@ mod bitwise_count;
 mod bitwise_not;
 
 pub use bitwise_count::spark_bit_count;
-pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
+pub use bitwise_not::SparkBitwiseNot;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 32918677e..7267852b9 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1609,12 +1609,10 @@ object QueryPlanSerde extends Logging with 
CometExprShim {
           (builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr))
 
       case BitwiseNot(child) =>
-        createUnaryExpr(
-          expr,
-          child,
-          inputs,
-          binding,
-          (builder, unaryExpr) => builder.setBitwiseNot(unaryExpr))
+        val childProto = exprToProto(child, inputs, binding)
+        val bitNotScalarExpr =
+          scalarFunctionExprToProto("bit_not", childProto)
+        optExprWithInfo(bitNotScalarExpr, expr, expr.children: _*)
 
       case BitwiseOr(left, right) =>
         createBinaryExpr(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to