This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/branch-0.10 by this push: new 9cb0cc4e6 feat: Add dynamic `enabled` and `allowIncompat` configs for all supported expressions (#2329) (#2385) 9cb0cc4e6 is described below commit 9cb0cc4e676a49061194efaafaf2e908130f1321 Author: Andy Grove <agr...@apache.org> AuthorDate: Thu Sep 11 18:08:54 2025 -0700 feat: Add dynamic `enabled` and `allowIncompat` configs for all supported expressions (#2329) (#2385) --- .../main/scala/org/apache/comet/CometConf.scala | 24 +++++++++++++-- docs/source/user-guide/latest/configs.md | 1 - docs/source/user-guide/latest/expressions.md | 11 +++++-- .../org/apache/comet/serde/QueryPlanSerde.scala | 31 ++++++++++++++++--- .../scala/org/apache/comet/serde/strings.scala | 15 +++++---- .../org/apache/comet/CometExpressionSuite.scala | 36 +++++++++++++++++++++- .../apache/comet/CometStringExpressionSuite.scala | 7 +---- 7 files changed, 100 insertions(+), 25 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 968febf69..54c88e9d5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -65,6 +65,8 @@ object CometConf extends ShimCometConf { val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec"; + val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression"; + val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .doc( "Whether to enable Comet extension for Spark. When this is turned on, Spark will use " + @@ -228,8 +230,6 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) - val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig("initCap", defaultValue = false) val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") @@ -664,6 +664,26 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(defaultValue) } + + def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = { + getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf) + } + + def getExprEnabledConfigKey(name: String): String = { + s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled" + } + + def isExprAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = { + getBooleanConf(getExprAllowIncompatConfigKey(name), defaultValue = false, conf) + } + + def getExprAllowIncompatConfigKey(name: String): String = { + s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.allowIncompatible" + } + + def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = { + conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true" + } } object ConfigHelpers { diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 0582c2902..c923c5668 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -48,7 +48,6 @@ Comet provides the following configuration settings. | spark.comet.exec.filter.enabled | Whether to enable filter by default. | true | | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true | | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | -| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, [...] | spark.comet.exec.project.enabled | Whether to enable project by default. | true | diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 2746b02ff..5f7beb42b 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -23,8 +23,15 @@ Comet supports the following Spark expressions. Expressions that are marked as S natively in Comet and provide the same results as Spark, or will fall back to Spark for cases that would not be compatible. -Expressions that are not Spark-compatible are disabled by default and can be enabled by setting -`spark.comet.expression.allowIncompatible=true`. +All expressions are enabled by default, but can be disabled by setting +`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the expression name as specified in +the following tables, such as `Length`, or `StartsWith`. + +Expressions that are not Spark-compatible will fall back to Spark by default and can be enabled by setting +`spark.comet.expression.EXPRNAME.allowIncompatible=true`. + +It is also possible to specify `spark.comet.expression.allowIncompatible=true` to enable all +incompatible expressions. ## Conditional Expressions diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2cfd00e28..b9a9e87fb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -595,15 +595,24 @@ object QueryPlanSerde extends Logging with CometExprShim { expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - SQLConf.get + val conf = SQLConf.get def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { + val exprConfName = handler.getExprConfigName(expr) + if (!CometConf.isExprEnabled(exprConfName)) { + withInfo( + expr, + "Expression support is disabled. Set " + + s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to enable it.") + return None + } handler.getSupportLevel(expr) match { case Unsupported(notes) => withInfo(expr, notes.getOrElse("")) None case Incompatible(notes) => - if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { + val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName) + if (exprAllowIncompat || CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { if (notes.isDefined) { logWarning( s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " + @@ -615,8 +624,9 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo( expr, s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " + - s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " + - s"${CometConf.COMPAT_GUIDE}.") + s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " + + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to enable all " + + s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.") None } case Compatible(notes) => @@ -634,7 +644,7 @@ object QueryPlanSerde extends Logging with CometExprShim { exprToProtoInternal(Literal(value, dataType), inputs, binding) case UnaryExpression(child) if expr.prettyName == "trycast" => - val timeZoneId = SQLConf.get.sessionLocalTimeZone + val timeZoneId = conf.sessionLocalTimeZone val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY) convert(cast, CometCast) @@ -1988,6 +1998,17 @@ trait CometOperatorSerde[T <: SparkPlan] { */ trait CometExpressionSerde[T <: Expression] { + /** + * Get a short name for the expression that can be used as part of a config key related to the + * expression, such as enabling or disabling that expression. + * + * @param expr + * The Spark expression. + * @return + * Short name for the expression, defaulting to the Spark class name + */ + def getExprConfigName(expr: T): String = expr.getClass.getSimpleName + /** * Determine the support level of the expression based on its attributes. * diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 15046fe09..4d183f6fa 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -68,15 +68,14 @@ object CometLower extends CometCaseConversionBase[Lower]("lower") object CometInitCap extends CometScalarFunction[InitCap]("initcap") { + override def getSupportLevel(expr: InitCap): SupportLevel = { + // Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark + // will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith". + // https://github.com/apache/datafusion-comet/issues/1052 + Incompatible(None) + } + override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - if (!CometConf.COMET_EXEC_INITCAP_ENABLED.get()) { - withInfo( - expr, - "Comet initCap is not compatible with Spark yet. " + - "See https://github.com/apache/datafusion-comet/issues/1052 ." + - s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it anyway.") - return None - } super.convert(expr, inputs, binding) } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 6db2fb655..ee22c9b97 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec} -import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ @@ -1301,6 +1301,40 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("disable expression using dynamic config") { + def countSparkProjectExec(plan: SparkPlan) = { + plan.collect { case _: ProjectExec => + true + }.length + } + withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") { + val sql = "select _1+_2 from tbl" + val (_, cometPlan) = checkSparkAnswer(sql) + assert(0 == countSparkProjectExec(cometPlan)) + withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") { + val (_, cometPlan) = checkSparkAnswer(sql) + assert(1 == countSparkProjectExec(cometPlan)) + } + } + } + + test("enable incompat expression using dynamic config") { + def countSparkProjectExec(plan: SparkPlan) = { + plan.collect { case _: ProjectExec => + true + }.length + } + withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") { + val sql = "select initcap(_1) from tbl" + val (_, cometPlan) = checkSparkAnswer(sql) + assert(1 == countSparkProjectExec(cometPlan)) + withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") { + val (_, cometPlan) = checkSparkAnswer(sql) + assert(0 == countSparkProjectExec(cometPlan)) + } + } + } + test("signum") { testDoubleScalarExpr("signum") } diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 6d8fa28b0..4c0f80ddb 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -103,12 +103,7 @@ class CometStringExpressionSuite extends CometTestBase { s"insert into $table values(1, 'james smith'), (2, 'michael rose'), " + "(3, 'robert williams'), (4, 'rames rose'), (5, 'james smith'), " + "(6, 'robert rose-smith'), (7, 'james ähtäri')") - if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) { - // TODO: remove this if clause https://github.com/apache/datafusion-comet/issues/1052 - checkSparkAnswerAndOperator(s"SELECT initcap(name) FROM $table") - } else { - checkSparkAnswer(s"SELECT initcap(name) FROM $table") - } + checkSparkAnswer(s"SELECT initcap(name) FROM $table") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org