This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f56006a89 docs: Update contributor guide for adding a new expression 
(#2704)
f56006a89 is described below

commit f56006a897a970a95721974b384677c60f0d8ba6
Author: Andy Grove <[email protected]>
AuthorDate: Fri Nov 7 14:01:32 2025 -0700

    docs: Update contributor guide for adding a new expression (#2704)
---
 .../contributor-guide/adding_a_new_expression.md   | 267 ++++++++++++++++-----
 1 file changed, 210 insertions(+), 57 deletions(-)

diff --git a/docs/source/contributor-guide/adding_a_new_expression.md 
b/docs/source/contributor-guide/adding_a_new_expression.md
index 6d906c662..480ef1818 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -41,26 +41,172 @@ Once you know what you want to add, you'll need to update 
the query planner to r
 
 ### Adding the Expression in Scala
 
-The `QueryPlanSerde` object has a method `exprToProto`, which is responsible 
for converting a Spark expression to a protobuf expression. Within that method 
is an `exprToProtoInternal` method that contains a large match statement for 
each expression type. You'll need to add a new case to this match statement for 
your new expression.
+DataFusion Comet uses a framework based on the `CometExpressionSerde` trait 
for converting Spark expressions to protobuf. Instead of a large match 
statement, each expression type has its own serialization handler. For 
aggregate expressions, use the `CometAggregateExpressionSerde` trait instead.
+
+#### Creating a CometExpressionSerde Implementation
+
+First, create an object that extends `CometExpressionSerde[T]` where `T` is 
the Spark expression type. This is typically added to one of the serde files in 
`spark/src/main/scala/org/apache/comet/serde/` (e.g., `math.scala`, 
`strings.scala`, `arrays.scala`, etc.).
 
 For example, the `unhex` function looks like this:
 
 ```scala
-case e: Unhex =>
-  val unHex = unhexSerde(e)
+object CometUnhex extends CometExpressionSerde[Unhex] {
+  override def convert(
+      expr: Unhex,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val childExpr = exprToProtoInternal(expr.child, inputs, binding)
+    val failOnErrorExpr = exprToProtoInternal(Literal(expr.failOnError), 
inputs, binding)
+
+    val optExpr =
+      scalarFunctionExprToProtoWithReturnType(
+        "unhex",
+        expr.dataType,
+        false,
+        childExpr,
+        failOnErrorExpr)
+    optExprWithInfo(optExpr, expr, expr.child)
+  }
+}
+```
+
+The `CometExpressionSerde` trait provides three methods you can override:
+
+* `convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr]` - 
**Required**. Converts the Spark expression to protobuf. Return `None` if the 
expression cannot be converted.
+* `getSupportLevel(expr: T): SupportLevel` - Optional. Returns the level of 
support for the expression. See "Using getSupportLevel" section below for 
details.
+* `getExprConfigName(expr: T): String` - Optional. Returns a short name for 
configuration keys. Defaults to the Spark class name.
+
+For simple scalar functions that map directly to a DataFusion function, you 
can use the built-in `CometScalarFunction` implementation:
+
+```scala
+classOf[Cos] -> CometScalarFunction("cos")
+```
+
+#### Registering the Expression Handler
+
+Once you've created your `CometExpressionSerde` implementation, register it in 
`QueryPlanSerde.scala` by adding it to the appropriate expression map (e.g., 
`mathExpressions`, `stringExpressions`, `predicateExpressions`, etc.):
+
+```scala
+private val mathExpressions: Map[Class[_ <: Expression], 
CometExpressionSerde[_]] = Map(
+  // ... other expressions ...
+  classOf[Unhex] -> CometUnhex,
+  classOf[Hex] -> CometHex)
+```
+
+The `exprToProtoInternal` method will automatically use this mapping to find 
and invoke your handler when it encounters the corresponding Spark expression 
type.
+
+A few things to note:
+
+* The `convert` method is recursively called on child expressions using 
`exprToProtoInternal`, so you'll need to make sure that the child expressions 
are also converted to protobuf.
+* `scalarFunctionExprToProtoWithReturnType` is for scalar functions that need 
to return type information. Your expression may use a different method 
depending on the type of expression.
+* Use helper methods like `createBinaryExpr` and `createUnaryExpr` from 
`QueryPlanSerde` for common expression patterns.
+
+#### Using getSupportLevel
+
+The `getSupportLevel` method allows you to control whether an expression 
should be executed by Comet based on various conditions such as data types, 
parameter values, or other expression-specific constraints. This is 
particularly useful when:
+
+1. Your expression only supports specific data types
+2. Your expression has known incompatibilities with Spark's behavior
+3. Your expression has edge cases that aren't yet supported
+
+The method returns one of three `SupportLevel` values:
+
+* **`Compatible(notes: Option[String] = None)`** - Comet supports this 
expression with full compatibility with Spark, or may have known differences in 
specific edge cases that are unlikely to be an issue for most users. This is 
the default if you don't override `getSupportLevel`.
+* **`Incompatible(notes: Option[String] = None)`** - Comet supports this 
expression but results can be different from Spark. The expression will only be 
used if `spark.comet.expr.allowIncompatible=true` or the expression-specific 
config `spark.comet.expr.<exprName>.allowIncompatible=true` is set.
+* **`Unsupported(notes: Option[String] = None)`** - Comet does not support 
this expression under the current conditions. The expression will not be used 
and Spark will fall back to its native execution.
+
+All three support levels accept an optional `notes` parameter to provide 
additional context about the support level.
+
+##### Examples
+
+**Example 1: Restricting to specific data types**
+
+The `Abs` expression only supports numeric types:
+
+```scala
+object CometAbs extends CometExpressionSerde[Abs] {
+  override def getSupportLevel(expr: Abs): SupportLevel = {
+    expr.child.dataType match {
+      case _: NumericType =>
+        Compatible()
+      case _ =>
+        // Spark supports NumericType, DayTimeIntervalType, and 
YearMonthIntervalType
+        Unsupported(Some("Only integral, floating-point, and decimal types are 
supported"))
+    }
+  }
+
+  override def convert(
+      expr: Abs,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    // ... conversion logic ...
+  }
+}
+```
+
+**Example 2: Validating parameter values**
+
+The `TruncDate` expression only supports specific format strings:
+
+```scala
+object CometTruncDate extends CometExpressionSerde[TruncDate] {
+  val supportedFormats: Seq[String] =
+    Seq("year", "yyyy", "yy", "quarter", "mon", "month", "mm", "week")
+
+  override def getSupportLevel(expr: TruncDate): SupportLevel = {
+    expr.format match {
+      case Literal(fmt: UTF8String, _) =>
+        if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
+          Compatible()
+        } else {
+          Unsupported(Some(s"Format $fmt is not supported"))
+        }
+      case _ =>
+        Incompatible(
+          Some("Invalid format strings will throw an exception instead of 
returning NULL"))
+    }
+  }
+
+  override def convert(
+      expr: TruncDate,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    // ... conversion logic ...
+  }
+}
+```
+
+**Example 3: Marking known incompatibilities**
 
-  val childExpr = exprToProtoInternal(unHex._1, inputs)
-  val failOnErrorExpr = exprToProtoInternal(unHex._2, inputs)
+The `ArrayAppend` expression has known behavioral differences from Spark:
 
-  val optExpr =
-    scalarExprToProtoWithReturnType("unhex", e.dataType, childExpr, 
failOnErrorExpr)
-  optExprWithInfo(optExpr, expr, unHex._1)
+```scala
+object CometArrayAppend extends CometExpressionSerde[ArrayAppend] {
+  override def getSupportLevel(expr: ArrayAppend): SupportLevel = 
Incompatible(None)
+
+  override def convert(
+      expr: ArrayAppend,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    // ... conversion logic ...
+  }
+}
 ```
 
-A few things to note here:
+This expression will only be used when users explicitly enable incompatible 
expressions via configuration.
+
+##### How getSupportLevel Affects Execution
+
+When the query planner encounters an expression:
 
-* The function is recursively called on child expressions, so you'll need to 
make sure that the child expressions are also converted to protobuf.
-* `scalarExprToProtoWithReturnType` is for scalar functions that need return 
type information. Your expression may use a different method depending on the 
type of expression.
+1. It first checks if the expression is explicitly disabled via 
`spark.comet.expr.<exprName>.enabled=false`
+2. It then calls `getSupportLevel` on the expression handler
+3. Based on the result:
+   - `Compatible()`: Expression proceeds to conversion
+   - `Incompatible()`: Expression is skipped unless 
`spark.comet.expr.allowIncompatible=true` or expression-specific allow config 
is set
+   - `Unsupported()`: Expression is skipped and a fallback to Spark occurs
+
+Any notes provided will be logged to help with debugging and understanding why 
an expression was not used.
 
 #### Adding Spark-side Tests for the New Expression
 
@@ -92,9 +238,9 @@ test("unhex") {
 
 ### Adding the Expression To the Protobuf Definition
 
-Once you have the expression implemented in Scala, you might need to update 
the protobuf definition to include the new expression. You may not need to do 
this if the expression is already covered by the existing protobuf definition 
(e.g. you're adding a new scalar function).
+Once you have the expression implemented in Scala, you might need to update 
the protobuf definition to include the new expression. You may not need to do 
this if the expression is already covered by the existing protobuf definition 
(e.g. you're adding a new scalar function that uses the `ScalarFunc` message).
 
-You can find the protobuf definition in `expr.proto`, and in particular the 
`Expr` or potentially the `AggExpr`. These are similar in theory to the large 
case statement in `QueryPlanSerde`, but in protobuf format. So if you were to 
add a new expression called `Add2`, you would add a new case to the `Expr` 
message like so:
+You can find the protobuf definition in `native/proto/src/proto/expr.proto`, 
and in particular the `Expr` or potentially the `AggExpr` messages. If you were 
to add a new expression called `Add2`, you would add a new case to the `Expr` 
message like so:
 
 ```proto
 message Expr {
@@ -118,51 +264,58 @@ message Add2 {
 
 With the serialization complete, the next step is to implement the expression 
in Rust and ensure that the incoming plan can make use of it.
 
-How this works, is somewhat dependent on the type of expression you're adding, 
so see the `core/src/execution/datafusion/expressions` directory for examples 
of how to implement different types of expressions.
+How this works is somewhat dependent on the type of expression you're adding. 
Expression implementations live in the `native/spark-expr/src/` directory, 
organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`).
 
 #### Generally Adding a New Expression
 
-If you're adding a new expression, you'll need to review `create_plan` and 
`create_expr`. `create_plan` is responsible for translating the incoming plan 
into a DataFusion plan, and may delegate to `create_expr` to create the 
physical expressions for the plan.
+If you're adding a new expression that requires custom protobuf serialization, 
you may need to:
 
-If you added a new message to the protobuf definition, you'll add a new match 
case to the `create_expr` method to handle the new expression. For example, if 
you added an `Add2` expression, you would add a new case like so:
+1. Add a new message to the protobuf definition in 
`native/proto/src/proto/expr.proto`
+2. Update the Rust deserialization code to handle the new protobuf message type
 
-```rust
-match spark_expr.expr_struct.as_ref().unwrap() {
-  ...
-  ExprStruct::Add2(add2) => self.create_binary_expr(...)
-}
-```
-
-`self.create_binary_expr` is for a binary expression, but if something out of 
the box is needed, you can create a new `PhysicalExpr` implementation. For 
example, see `if_expr.rs` for an example of an implementation that doesn't fit 
the `create_binary_expr` mold.
+For most expressions, you can skip this step if you're using the existing 
scalar function infrastructure.
 
 #### Adding a New Scalar Function Expression
 
-For a new scalar function, you can reuse a lot of code by updating the 
`create_comet_physical_fun` method to match on the function name and make the 
scalar UDF to be called. For example, the diff to add the `unhex` function is:
-
-```diff
-macro_rules! make_comet_scalar_udf {
-    ($name:expr, $func:ident, $data_type:ident) => {{
-
-+       "unhex" => {
-+           let func = Arc::new(spark_unhex);
-+           make_comet_scalar_udf!("unhex", func, without data_type)
-+       }
+For a new scalar function, you can reuse a lot of code by updating the 
`create_comet_physical_fun` method in 
`native/spark-expr/src/comet_scalar_funcs.rs`. Add a match case for your 
function name:
 
-    }}
+```rust
+match fun_name {
+    // ... other functions ...
+    "unhex" => {
+        let func = Arc::new(spark_unhex);
+        make_comet_scalar_udf!("unhex", func, without data_type)
+    }
+    // ... more functions ...
 }
 ```
 
-With that addition, you can now implement the spark function in Rust. This 
function will look very similar to DataFusion code. For examples, see the 
`core/src/execution/datafusion/expressions/scalar_funcs` directory.
+The `make_comet_scalar_udf!` macro has several variants depending on whether 
your function needs:
+- A data type parameter: `make_comet_scalar_udf!("ceil", spark_ceil, 
data_type)`
+- No data type parameter: `make_comet_scalar_udf!("unhex", func, without 
data_type)`
+- An eval mode: `make_comet_scalar_udf!("decimal_div", spark_decimal_div, 
data_type, eval_mode)`
+- A fail_on_error flag: `make_comet_scalar_udf!("spark_modulo", func, without 
data_type, fail_on_error)`
 
-Without getting into the internals, the function signature will look like:
+#### Implementing the Function
+
+Then implement your function in an appropriate module under 
`native/spark-expr/src/`. The function signature will look like:
 
 ```rust
-pub(super) fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
+pub fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue, 
DataFusionError> {
     // Do the work here
 }
 ```
 
-> **_NOTE:_**  If you call the `make_comet_scalar_udf` macro with the data 
type, the function signature will look include the data type as a second 
argument.
+If your function uses the data type or eval mode, the signature will include 
those as additional parameters:
+
+```rust
+pub fn spark_ceil(
+    args: &[ColumnarValue],
+    data_type: &DataType
+) -> Result<ColumnarValue, DataFusionError> {
+    // Implementation
+}
+```
 
 ### API Differences Between Spark Versions
 
@@ -173,33 +326,33 @@ If the expression you're adding has different behavior 
across different Spark ve
 
 ## Shimming to Support Different Spark Versions
 
-By adding shims for each Spark version, you can provide a consistent interface 
for the expression across different Spark versions. For example, `unhex` added 
a new optional parameter is Spark 3.4, for if it should `failOnError` or not. 
So for version 3.3, the shim is:
+If the expression you're adding has different behavior across different Spark 
versions, you can use the shim system located in 
`spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala`
 for each Spark version.
 
-```scala
-trait CometExprShim {
-    /**
-      * Returns a tuple of expressions for the `unhex` function.
-      */
-    def unhexSerde(unhex: Unhex): (Expression, Expression) = {
-        (unhex.child, Literal(false))
-    }
-}
-```
+The `CometExprShim` trait provides several mechanisms for handling version 
differences:
+
+1. **Version-specific methods** - Override methods in the trait to provide 
version-specific behavior
+2. **Version-specific expression handling** - Use 
`versionSpecificExprToProtoInternal` to handle expressions that only exist in 
certain Spark versions
 
-And for version 3.4, the shim is:
+For example, the `StringDecode` expression only exists in certain Spark 
versions. The shim handles this:
 
 ```scala
 trait CometExprShim {
-    /**
-      * Returns a tuple of expressions for the `unhex` function.
-      */
-    def unhexSerde(unhex: Unhex): (Expression, Expression) = {
-        (unhex.child, unhex.failOnError)
+  def versionSpecificExprToProtoInternal(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[Expr] = {
+    expr match {
+      case s: StringDecode =>
+        stringDecode(expr, s.charset, s.bin, inputs, binding)
+      case _ => None
     }
+  }
 }
 ```
 
-Then when `unhexSerde` is called in the `QueryPlanSerde` object, it will use 
the correct shim for the Spark version.
+The `QueryPlanSerde.exprToProtoInternal` method calls 
`versionSpecificExprToProtoInternal` first, allowing shims to intercept and 
handle version-specific expressions before falling back to the standard 
expression maps.
+
+Your `CometExpressionSerde` implementation can also access shim methods by 
mixing in the `CometExprShim` trait, though in most cases you can directly 
access the expression properties if they're available across all supported 
Spark versions.
 
 ## Resources
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to