This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new ca6f11322 chore: Improve reporting of fallback reasons for CollectLimit (#1694) ca6f11322 is described below commit ca6f11322baf9dfb9a6a7ebf26e719ebf3e5252e Author: Andy Grove <agr...@apache.org> AuthorDate: Mon Jun 23 12:48:41 2025 -0600 chore: Improve reporting of fallback reasons for CollectLimit (#1694) --- docs/source/user-guide/iceberg.md | 2 +- .../org/apache/comet/rules/CometExecRule.scala | 46 +++++++++++++++------- .../org/apache/comet/CometExpressionSuite.scala | 32 ++++++++++++++- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/docs/source/user-guide/iceberg.md b/docs/source/user-guide/iceberg.md index 23293d6c4..e9f7f2eb5 100644 --- a/docs/source/user-guide/iceberg.md +++ b/docs/source/user-guide/iceberg.md @@ -115,7 +115,7 @@ This should produce the following output: scala> spark.sql(s"SELECT * from t1").show() 25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized 25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): - CollectLimit [COMET: CollectLimit is not supported] + CollectLimit +- Project [COMET: toprettystring is not supported] +- CometScanWrapper diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 54e6e6364..2383dd844 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -19,14 +19,16 @@ package org.apache.comet.rules +import scala.collection.mutable.ListBuffer + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCoalesceExec, CometCollectLimitExec, CometExec, CometExpandExec, CometFilterExec, CometGlobalLimitExec, CometHashAggregateExec, CometHashJoinExec, CometLocalLimitExec, CometNativeExec, CometNativeScanExec, CometPlan, CometProjectExec, CometScanExec, CometScanWrapper, CometSinkPlaceHolder, CometSortExec, CometSortMergeJoinExec, CometSparkToColumnarExec, CometTakeOrderedAndProjectExec, CometUnio [...] +import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} -import org.apache.spark.sql.execution.{CoalesceExec, CollectLimitExec, ExpandExec, FilterExec, GlobalLimitExec, LocalLimitExec, ProjectExec, SortExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} @@ -36,7 +38,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.{CometConf, ExtendedExplainInfo} import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo} +import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -201,18 +203,34 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { op, CometGlobalLimitExec(_, op, op.limit, op.child, SerializedPlan(None))) - case op: CollectLimitExec - if isCometNative(op.child) && CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf) - && isCometShuffleEnabled(conf) - && op.offset == 0 => - QueryPlanSerde - .operator2Proto(op) - .map { nativeOp => - val cometOp = - CometCollectLimitExec(op, op.limit, op.offset, op.child) - CometSinkPlaceHolder(nativeOp, op, cometOp) + case op: CollectLimitExec => + val fallbackReasons = new ListBuffer[String]() + if (!CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)) { + fallbackReasons += s"${CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.key} is false" + } + if (!isCometShuffleEnabled(conf)) { + fallbackReasons += "Comet shuffle is not enabled" + } + if (op.offset != 0) { + fallbackReasons += "CollectLimit with non-zero offset is not supported" + } + if (fallbackReasons.nonEmpty) { + withInfos(op, fallbackReasons.toSet) + } else { + if (!isCometNative(op.child)) { + // no reason to report reason if child is not native + op + } else { + QueryPlanSerde + .operator2Proto(op) + .map { nativeOp => + val cometOp = + CometCollectLimitExec(op, op.limit, op.offset, op.child) + CometSinkPlaceHolder(nativeOp, op, cometOp) + } + .getOrElse(op) } - .getOrElse(op) + } case op: ExpandExec => newPlanWithProto( diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 19ed0b463..8fe04009c 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1875,7 +1875,37 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + "where A.c1 = B.c1 ", Set( "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled", - "make_interval is not supported"))) + "make_interval is not supported")), + ( + s"select * from $table LIMIT 10 OFFSET 3", + Set( + "Comet shuffle is not enabled", + "CollectLimit with non-zero offset is not supported"))) + .foreach(test => { + val qry = test._1 + val expected = test._2 + val df = sql(qry) + df.collect() // force an execution + checkSparkAnswerAndCompareExplainPlan(df, expected) + }) + } + } + } + + test("explain: CollectLimit disabled") { + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.key -> "false", + EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") { + val table = "test" + withTable(table) { + sql(s"create table $table(c0 int, c1 int , c2 float) using parquet") + sql(s"insert into $table values(0, 1, 100.000001)") + Seq( + ( + s"select * from $table LIMIT 10", + Set("spark.comet.exec.collectLimit.enabled is false"))) .foreach(test => { val qry = test._1 val expected = test._2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org