This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 79516b6c9 feat: Add dynamic `enabled` and `allowIncompat` configs for 
all supported expressions (#2329)
79516b6c9 is described below

commit 79516b6c9eab64adb8ce5fcae698b9f03b655d66
Author: Andy Grove <agr...@apache.org>
AuthorDate: Thu Sep 11 16:29:18 2025 -0700

    feat: Add dynamic `enabled` and `allowIncompat` configs for all supported 
expressions (#2329)
---
 .../main/scala/org/apache/comet/CometConf.scala    | 24 +++++++++++++--
 docs/source/user-guide/latest/configs.md           |  1 -
 docs/source/user-guide/latest/expressions.md       | 11 +++++--
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 31 ++++++++++++++++---
 .../scala/org/apache/comet/serde/strings.scala     | 15 +++++----
 .../org/apache/comet/CometExpressionSuite.scala    | 36 +++++++++++++++++++++-
 .../apache/comet/CometStringExpressionSuite.scala  |  7 +----
 7 files changed, 100 insertions(+), 25 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 968febf69..54c88e9d5 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -65,6 +65,8 @@ object CometConf extends ShimCometConf {
 
   val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
 
+  val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
+
   val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
     .doc(
       "Whether to enable Comet extension for Spark. When this is turned on, 
Spark will use " +
@@ -228,8 +230,6 @@ object CometConf extends ShimCometConf {
     createExecEnabledConfig("window", defaultValue = true)
   val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
     createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
-  val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
-    createExecEnabledConfig("initCap", defaultValue = false)
 
   val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: 
ConfigEntry[Boolean] =
     conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
@@ -664,6 +664,26 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(defaultValue)
   }
+
+  def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
+    getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
+  }
+
+  def getExprEnabledConfigKey(name: String): String = {
+    s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled"
+  }
+
+  def isExprAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean 
= {
+    getBooleanConf(getExprAllowIncompatConfigKey(name), defaultValue = false, 
conf)
+  }
+
+  def getExprAllowIncompatConfigKey(name: String): String = {
+    s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.allowIncompatible"
+  }
+
+  def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): 
Boolean = {
+    conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) 
== "true"
+  }
 }
 
 object ConfigHelpers {
diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index 0582c2902..c923c5668 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -48,7 +48,6 @@ Comet provides the following configuration settings.
 | spark.comet.exec.filter.enabled | Whether to enable filter by default. | 
true |
 | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by 
default. | true |
 | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | 
true |
-| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | 
false |
 | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by 
default. | true |
 | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet 
native execution. When running Spark in on-heap mode, available pool types are 
'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 
'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in 
off-heap mode, available pool types are 'unified' and `fair_unified`. The 
default pool type is `greedy_task_shared` for on-heap mode and `unified` for 
off-heap mode. For more information, [...]
 | spark.comet.exec.project.enabled | Whether to enable project by default. | 
true |
diff --git a/docs/source/user-guide/latest/expressions.md 
b/docs/source/user-guide/latest/expressions.md
index 2746b02ff..5f7beb42b 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -23,8 +23,15 @@ Comet supports the following Spark expressions. Expressions 
that are marked as S
 natively in Comet and provide the same results as Spark, or will fall back to 
Spark for cases that would not
 be compatible.
 
-Expressions that are not Spark-compatible are disabled by default and can be 
enabled by setting
-`spark.comet.expression.allowIncompatible=true`.
+All expressions are enabled by default, but can be disabled by setting
+`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the 
expression name as specified in 
+the following tables, such as `Length`, or `StartsWith`.
+
+Expressions that are not Spark-compatible will fall back to Spark by default 
and can be enabled by setting
+`spark.comet.expression.EXPRNAME.allowIncompatible=true`.
+
+It is also possible to specify `spark.comet.expression.allowIncompatible=true` 
to enable all 
+incompatible expressions.
 
 ## Conditional Expressions
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index ff3beb8a4..5dcccdae3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -630,15 +630,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
       expr: Expression,
       inputs: Seq[Attribute],
       binding: Boolean): Option[Expr] = {
-    SQLConf.get
+    val conf = SQLConf.get
 
     def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): 
Option[Expr] = {
+      val exprConfName = handler.getExprConfigName(expr)
+      if (!CometConf.isExprEnabled(exprConfName)) {
+        withInfo(
+          expr,
+          "Expression support is disabled. Set " +
+            s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to 
enable it.")
+        return None
+      }
       handler.getSupportLevel(expr) match {
         case Unsupported(notes) =>
           withInfo(expr, notes.getOrElse(""))
           None
         case Incompatible(notes) =>
-          if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
+          val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
+          if (exprAllowIncompat || 
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
             if (notes.isDefined) {
               logWarning(
                 s"Comet supports $expr when 
${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
@@ -650,8 +659,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
             withInfo(
               expr,
               s"$expr is not fully compatible with Spark$optionalNotes. To 
enable it anyway, " +
-                s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
-                s"${CometConf.COMPAT_GUIDE}.")
+                s"set 
${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " +
+                s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to 
enable all " +
+                s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.")
             None
           }
         case Compatible(notes) =>
@@ -669,7 +679,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
         exprToProtoInternal(Literal(value, dataType), inputs, binding)
 
       case UnaryExpression(child) if expr.prettyName == "trycast" =>
-        val timeZoneId = SQLConf.get.sessionLocalTimeZone
+        val timeZoneId = conf.sessionLocalTimeZone
         val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY)
         convert(cast, CometCast)
 
@@ -1850,6 +1860,17 @@ trait CometOperatorSerde[T <: SparkPlan] {
  */
 trait CometExpressionSerde[T <: Expression] {
 
+  /**
+   * Get a short name for the expression that can be used as part of a config 
key related to the
+   * expression, such as enabling or disabling that expression.
+   *
+   * @param expr
+   *   The Spark expression.
+   * @return
+   *   Short name for the expression, defaulting to the Spark class name
+   */
+  def getExprConfigName(expr: T): String = expr.getClass.getSimpleName
+
   /**
    * Determine the support level of the expression based on its attributes.
    *
diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala 
b/spark/src/main/scala/org/apache/comet/serde/strings.scala
index 15046fe09..4d183f6fa 100644
--- a/spark/src/main/scala/org/apache/comet/serde/strings.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala
@@ -68,15 +68,14 @@ object CometLower extends 
CometCaseConversionBase[Lower]("lower")
 
 object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
 
+  override def getSupportLevel(expr: InitCap): SupportLevel = {
+    // Behavior differs from Spark. One example is that for the input "robert 
rose-smith", Spark
+    // will produce "Robert Rose-smith", but Comet will produce "Robert 
Rose-Smith".
+    // https://github.com/apache/datafusion-comet/issues/1052
+    Incompatible(None)
+  }
+
   override def convert(expr: InitCap, inputs: Seq[Attribute], binding: 
Boolean): Option[Expr] = {
-    if (!CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
-      withInfo(
-        expr,
-        "Comet initCap is not compatible with Spark yet. " +
-          "See https://github.com/apache/datafusion-comet/issues/1052 ." +
-          s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it 
anyway.")
-      return None
-    }
     super.convert(expr, inputs, binding)
   }
 }
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 6db2fb655..ee22c9b97 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
 import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, 
CometWindowExec}
-import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
@@ -1301,6 +1301,40 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("disable expression using dynamic config") {
+    def countSparkProjectExec(plan: SparkPlan) = {
+      plan.collect { case _: ProjectExec =>
+        true
+      }.length
+    }
+    withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") {
+      val sql = "select _1+_2 from tbl"
+      val (_, cometPlan) = checkSparkAnswer(sql)
+      assert(0 == countSparkProjectExec(cometPlan))
+      withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") {
+        val (_, cometPlan) = checkSparkAnswer(sql)
+        assert(1 == countSparkProjectExec(cometPlan))
+      }
+    }
+  }
+
+  test("enable incompat expression using dynamic config") {
+    def countSparkProjectExec(plan: SparkPlan) = {
+      plan.collect { case _: ProjectExec =>
+        true
+      }.length
+    }
+    withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") {
+      val sql = "select initcap(_1) from tbl"
+      val (_, cometPlan) = checkSparkAnswer(sql)
+      assert(1 == countSparkProjectExec(cometPlan))
+      withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> 
"true") {
+        val (_, cometPlan) = checkSparkAnswer(sql)
+        assert(0 == countSparkProjectExec(cometPlan))
+      }
+    }
+  }
+
   test("signum") {
     testDoubleScalarExpr("signum")
   }
diff --git 
a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala
index 6d8fa28b0..4c0f80ddb 100644
--- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala
@@ -103,12 +103,7 @@ class CometStringExpressionSuite extends CometTestBase {
         s"insert into $table values(1, 'james smith'), (2, 'michael rose'), " +
           "(3, 'robert williams'), (4, 'rames rose'), (5, 'james smith'), " +
           "(6, 'robert rose-smith'), (7, 'james ähtäri')")
-      if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
-        // TODO: remove this if clause 
https://github.com/apache/datafusion-comet/issues/1052
-        checkSparkAnswerAndOperator(s"SELECT initcap(name) FROM $table")
-      } else {
-        checkSparkAnswer(s"SELECT initcap(name) FROM $table")
-      }
+      checkSparkAnswer(s"SELECT initcap(name) FROM $table")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to