This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f56006a89 docs: Update contributor guide for adding a new expression
(#2704)
f56006a89 is described below
commit f56006a897a970a95721974b384677c60f0d8ba6
Author: Andy Grove <[email protected]>
AuthorDate: Fri Nov 7 14:01:32 2025 -0700
docs: Update contributor guide for adding a new expression (#2704)
---
.../contributor-guide/adding_a_new_expression.md | 267 ++++++++++++++++-----
1 file changed, 210 insertions(+), 57 deletions(-)
diff --git a/docs/source/contributor-guide/adding_a_new_expression.md
b/docs/source/contributor-guide/adding_a_new_expression.md
index 6d906c662..480ef1818 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -41,26 +41,172 @@ Once you know what you want to add, you'll need to update
the query planner to r
### Adding the Expression in Scala
-The `QueryPlanSerde` object has a method `exprToProto`, which is responsible
for converting a Spark expression to a protobuf expression. Within that method
is an `exprToProtoInternal` method that contains a large match statement for
each expression type. You'll need to add a new case to this match statement for
your new expression.
+DataFusion Comet uses a framework based on the `CometExpressionSerde` trait
for converting Spark expressions to protobuf. Instead of a large match
statement, each expression type has its own serialization handler. For
aggregate expressions, use the `CometAggregateExpressionSerde` trait instead.
+
+#### Creating a CometExpressionSerde Implementation
+
+First, create an object that extends `CometExpressionSerde[T]` where `T` is
the Spark expression type. This is typically added to one of the serde files in
`spark/src/main/scala/org/apache/comet/serde/` (e.g., `math.scala`,
`strings.scala`, `arrays.scala`, etc.).
For example, the `unhex` function looks like this:
```scala
-case e: Unhex =>
- val unHex = unhexSerde(e)
+object CometUnhex extends CometExpressionSerde[Unhex] {
+ override def convert(
+ expr: Unhex,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val childExpr = exprToProtoInternal(expr.child, inputs, binding)
+ val failOnErrorExpr = exprToProtoInternal(Literal(expr.failOnError),
inputs, binding)
+
+ val optExpr =
+ scalarFunctionExprToProtoWithReturnType(
+ "unhex",
+ expr.dataType,
+ false,
+ childExpr,
+ failOnErrorExpr)
+ optExprWithInfo(optExpr, expr, expr.child)
+ }
+}
+```
+
+The `CometExpressionSerde` trait provides three methods you can override:
+
+* `convert(expr: T, inputs: Seq[Attribute], binding: Boolean): Option[Expr]` -
**Required**. Converts the Spark expression to protobuf. Return `None` if the
expression cannot be converted.
+* `getSupportLevel(expr: T): SupportLevel` - Optional. Returns the level of
support for the expression. See "Using getSupportLevel" section below for
details.
+* `getExprConfigName(expr: T): String` - Optional. Returns a short name for
configuration keys. Defaults to the Spark class name.
+
+For simple scalar functions that map directly to a DataFusion function, you
can use the built-in `CometScalarFunction` implementation:
+
+```scala
+classOf[Cos] -> CometScalarFunction("cos")
+```
+
+#### Registering the Expression Handler
+
+Once you've created your `CometExpressionSerde` implementation, register it in
`QueryPlanSerde.scala` by adding it to the appropriate expression map (e.g.,
`mathExpressions`, `stringExpressions`, `predicateExpressions`, etc.):
+
+```scala
+private val mathExpressions: Map[Class[_ <: Expression],
CometExpressionSerde[_]] = Map(
+ // ... other expressions ...
+ classOf[Unhex] -> CometUnhex,
+ classOf[Hex] -> CometHex)
+```
+
+The `exprToProtoInternal` method will automatically use this mapping to find
and invoke your handler when it encounters the corresponding Spark expression
type.
+
+A few things to note:
+
+* The `convert` method is recursively called on child expressions using
`exprToProtoInternal`, so you'll need to make sure that the child expressions
are also converted to protobuf.
+* `scalarFunctionExprToProtoWithReturnType` is for scalar functions that need
to return type information. Your expression may use a different method
depending on the type of expression.
+* Use helper methods like `createBinaryExpr` and `createUnaryExpr` from
`QueryPlanSerde` for common expression patterns.
+
+#### Using getSupportLevel
+
+The `getSupportLevel` method allows you to control whether an expression
should be executed by Comet based on various conditions such as data types,
parameter values, or other expression-specific constraints. This is
particularly useful when:
+
+1. Your expression only supports specific data types
+2. Your expression has known incompatibilities with Spark's behavior
+3. Your expression has edge cases that aren't yet supported
+
+The method returns one of three `SupportLevel` values:
+
+* **`Compatible(notes: Option[String] = None)`** - Comet supports this
expression with full compatibility with Spark, or may have known differences in
specific edge cases that are unlikely to be an issue for most users. This is
the default if you don't override `getSupportLevel`.
+* **`Incompatible(notes: Option[String] = None)`** - Comet supports this
expression but results can be different from Spark. The expression will only be
used if `spark.comet.expr.allowIncompatible=true` or the expression-specific
config `spark.comet.expr.<exprName>.allowIncompatible=true` is set.
+* **`Unsupported(notes: Option[String] = None)`** - Comet does not support
this expression under the current conditions. The expression will not be used
and Spark will fall back to its native execution.
+
+All three support levels accept an optional `notes` parameter to provide
additional context about the support level.
+
+##### Examples
+
+**Example 1: Restricting to specific data types**
+
+The `Abs` expression only supports numeric types:
+
+```scala
+object CometAbs extends CometExpressionSerde[Abs] {
+ override def getSupportLevel(expr: Abs): SupportLevel = {
+ expr.child.dataType match {
+ case _: NumericType =>
+ Compatible()
+ case _ =>
+ // Spark supports NumericType, DayTimeIntervalType, and
YearMonthIntervalType
+ Unsupported(Some("Only integral, floating-point, and decimal types are
supported"))
+ }
+ }
+
+ override def convert(
+ expr: Abs,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ // ... conversion logic ...
+ }
+}
+```
+
+**Example 2: Validating parameter values**
+
+The `TruncDate` expression only supports specific format strings:
+
+```scala
+object CometTruncDate extends CometExpressionSerde[TruncDate] {
+ val supportedFormats: Seq[String] =
+ Seq("year", "yyyy", "yy", "quarter", "mon", "month", "mm", "week")
+
+ override def getSupportLevel(expr: TruncDate): SupportLevel = {
+ expr.format match {
+ case Literal(fmt: UTF8String, _) =>
+ if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
+ Compatible()
+ } else {
+ Unsupported(Some(s"Format $fmt is not supported"))
+ }
+ case _ =>
+ Incompatible(
+ Some("Invalid format strings will throw an exception instead of
returning NULL"))
+ }
+ }
+
+ override def convert(
+ expr: TruncDate,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ // ... conversion logic ...
+ }
+}
+```
+
+**Example 3: Marking known incompatibilities**
- val childExpr = exprToProtoInternal(unHex._1, inputs)
- val failOnErrorExpr = exprToProtoInternal(unHex._2, inputs)
+The `ArrayAppend` expression has known behavioral differences from Spark:
- val optExpr =
- scalarExprToProtoWithReturnType("unhex", e.dataType, childExpr,
failOnErrorExpr)
- optExprWithInfo(optExpr, expr, unHex._1)
+```scala
+object CometArrayAppend extends CometExpressionSerde[ArrayAppend] {
+ override def getSupportLevel(expr: ArrayAppend): SupportLevel =
Incompatible(None)
+
+ override def convert(
+ expr: ArrayAppend,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ // ... conversion logic ...
+ }
+}
```
-A few things to note here:
+This expression will only be used when users explicitly enable incompatible
expressions via configuration.
+
+##### How getSupportLevel Affects Execution
+
+When the query planner encounters an expression:
-* The function is recursively called on child expressions, so you'll need to
make sure that the child expressions are also converted to protobuf.
-* `scalarExprToProtoWithReturnType` is for scalar functions that need return
type information. Your expression may use a different method depending on the
type of expression.
+1. It first checks if the expression is explicitly disabled via
`spark.comet.expr.<exprName>.enabled=false`
+2. It then calls `getSupportLevel` on the expression handler
+3. Based on the result:
+ - `Compatible()`: Expression proceeds to conversion
+ - `Incompatible()`: Expression is skipped unless
`spark.comet.expr.allowIncompatible=true` or expression-specific allow config
is set
+ - `Unsupported()`: Expression is skipped and a fallback to Spark occurs
+
+Any notes provided will be logged to help with debugging and understanding why
an expression was not used.
#### Adding Spark-side Tests for the New Expression
@@ -92,9 +238,9 @@ test("unhex") {
### Adding the Expression To the Protobuf Definition
-Once you have the expression implemented in Scala, you might need to update
the protobuf definition to include the new expression. You may not need to do
this if the expression is already covered by the existing protobuf definition
(e.g. you're adding a new scalar function).
+Once you have the expression implemented in Scala, you might need to update
the protobuf definition to include the new expression. You may not need to do
this if the expression is already covered by the existing protobuf definition
(e.g. you're adding a new scalar function that uses the `ScalarFunc` message).
-You can find the protobuf definition in `expr.proto`, and in particular the
`Expr` or potentially the `AggExpr`. These are similar in theory to the large
case statement in `QueryPlanSerde`, but in protobuf format. So if you were to
add a new expression called `Add2`, you would add a new case to the `Expr`
message like so:
+You can find the protobuf definition in `native/proto/src/proto/expr.proto`,
and in particular the `Expr` or potentially the `AggExpr` messages. If you were
to add a new expression called `Add2`, you would add a new case to the `Expr`
message like so:
```proto
message Expr {
@@ -118,51 +264,58 @@ message Add2 {
With the serialization complete, the next step is to implement the expression
in Rust and ensure that the incoming plan can make use of it.
-How this works, is somewhat dependent on the type of expression you're adding,
so see the `core/src/execution/datafusion/expressions` directory for examples
of how to implement different types of expressions.
+How this works is somewhat dependent on the type of expression you're adding.
Expression implementations live in the `native/spark-expr/src/` directory,
organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`).
#### Generally Adding a New Expression
-If you're adding a new expression, you'll need to review `create_plan` and
`create_expr`. `create_plan` is responsible for translating the incoming plan
into a DataFusion plan, and may delegate to `create_expr` to create the
physical expressions for the plan.
+If you're adding a new expression that requires custom protobuf serialization,
you may need to:
-If you added a new message to the protobuf definition, you'll add a new match
case to the `create_expr` method to handle the new expression. For example, if
you added an `Add2` expression, you would add a new case like so:
+1. Add a new message to the protobuf definition in
`native/proto/src/proto/expr.proto`
+2. Update the Rust deserialization code to handle the new protobuf message type
-```rust
-match spark_expr.expr_struct.as_ref().unwrap() {
- ...
- ExprStruct::Add2(add2) => self.create_binary_expr(...)
-}
-```
-
-`self.create_binary_expr` is for a binary expression, but if something out of
the box is needed, you can create a new `PhysicalExpr` implementation. For
example, see `if_expr.rs` for an example of an implementation that doesn't fit
the `create_binary_expr` mold.
+For most expressions, you can skip this step if you're using the existing
scalar function infrastructure.
#### Adding a New Scalar Function Expression
-For a new scalar function, you can reuse a lot of code by updating the
`create_comet_physical_fun` method to match on the function name and make the
scalar UDF to be called. For example, the diff to add the `unhex` function is:
-
-```diff
-macro_rules! make_comet_scalar_udf {
- ($name:expr, $func:ident, $data_type:ident) => {{
-
-+ "unhex" => {
-+ let func = Arc::new(spark_unhex);
-+ make_comet_scalar_udf!("unhex", func, without data_type)
-+ }
+For a new scalar function, you can reuse a lot of code by updating the
`create_comet_physical_fun` method in
`native/spark-expr/src/comet_scalar_funcs.rs`. Add a match case for your
function name:
- }}
+```rust
+match fun_name {
+ // ... other functions ...
+ "unhex" => {
+ let func = Arc::new(spark_unhex);
+ make_comet_scalar_udf!("unhex", func, without data_type)
+ }
+ // ... more functions ...
}
```
-With that addition, you can now implement the spark function in Rust. This
function will look very similar to DataFusion code. For examples, see the
`core/src/execution/datafusion/expressions/scalar_funcs` directory.
+The `make_comet_scalar_udf!` macro has several variants depending on whether
your function needs:
+- A data type parameter: `make_comet_scalar_udf!("ceil", spark_ceil,
data_type)`
+- No data type parameter: `make_comet_scalar_udf!("unhex", func, without
data_type)`
+- An eval mode: `make_comet_scalar_udf!("decimal_div", spark_decimal_div,
data_type, eval_mode)`
+- A fail_on_error flag: `make_comet_scalar_udf!("spark_modulo", func, without
data_type, fail_on_error)`
-Without getting into the internals, the function signature will look like:
+#### Implementing the Function
+
+Then implement your function in an appropriate module under
`native/spark-expr/src/`. The function signature will look like:
```rust
-pub(super) fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue,
DataFusionError> {
+pub fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue,
DataFusionError> {
// Do the work here
}
```
-> **_NOTE:_** If you call the `make_comet_scalar_udf` macro with the data
type, the function signature will look include the data type as a second
argument.
+If your function uses the data type or eval mode, the signature will include
those as additional parameters:
+
+```rust
+pub fn spark_ceil(
+ args: &[ColumnarValue],
+ data_type: &DataType
+) -> Result<ColumnarValue, DataFusionError> {
+ // Implementation
+}
+```
### API Differences Between Spark Versions
@@ -173,33 +326,33 @@ If the expression you're adding has different behavior
across different Spark ve
## Shimming to Support Different Spark Versions
-By adding shims for each Spark version, you can provide a consistent interface
for the expression across different Spark versions. For example, `unhex` added
a new optional parameter is Spark 3.4, for if it should `failOnError` or not.
So for version 3.3, the shim is:
+If the expression you're adding has different behavior across different Spark
versions, you can use the shim system located in
`spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala`
for each Spark version.
-```scala
-trait CometExprShim {
- /**
- * Returns a tuple of expressions for the `unhex` function.
- */
- def unhexSerde(unhex: Unhex): (Expression, Expression) = {
- (unhex.child, Literal(false))
- }
-}
-```
+The `CometExprShim` trait provides several mechanisms for handling version
differences:
+
+1. **Version-specific methods** - Override methods in the trait to provide
version-specific behavior
+2. **Version-specific expression handling** - Use
`versionSpecificExprToProtoInternal` to handle expressions that only exist in
certain Spark versions
-And for version 3.4, the shim is:
+For example, the `StringDecode` expression only exists in certain Spark
versions. The shim handles this:
```scala
trait CometExprShim {
- /**
- * Returns a tuple of expressions for the `unhex` function.
- */
- def unhexSerde(unhex: Unhex): (Expression, Expression) = {
- (unhex.child, unhex.failOnError)
+ def versionSpecificExprToProtoInternal(
+ expr: Expression,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[Expr] = {
+ expr match {
+ case s: StringDecode =>
+ stringDecode(expr, s.charset, s.bin, inputs, binding)
+ case _ => None
}
+ }
}
```
-Then when `unhexSerde` is called in the `QueryPlanSerde` object, it will use
the correct shim for the Spark version.
+The `QueryPlanSerde.exprToProtoInternal` method calls
`versionSpecificExprToProtoInternal` first, allowing shims to intercept and
handle version-specific expressions before falling back to the standard
expression maps.
+
+Your `CometExpressionSerde` implementation can also access shim methods by
mixing in the `CometExprShim` trait, though in most cases you can directly
access the expression properties if they're available across all supported
Spark versions.
## Resources
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]