This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ca6f11322 chore: Improve reporting of fallback reasons for 
CollectLimit (#1694)
ca6f11322 is described below

commit ca6f11322baf9dfb9a6a7ebf26e719ebf3e5252e
Author: Andy Grove <agr...@apache.org>
AuthorDate: Mon Jun 23 12:48:41 2025 -0600

    chore: Improve reporting of fallback reasons for CollectLimit (#1694)
---
 docs/source/user-guide/iceberg.md                  |  2 +-
 .../org/apache/comet/rules/CometExecRule.scala     | 46 +++++++++++++++-------
 .../org/apache/comet/CometExpressionSuite.scala    | 32 ++++++++++++++-
 3 files changed, 64 insertions(+), 16 deletions(-)

diff --git a/docs/source/user-guide/iceberg.md 
b/docs/source/user-guide/iceberg.md
index 23293d6c4..e9f7f2eb5 100644
--- a/docs/source/user-guide/iceberg.md
+++ b/docs/source/user-guide/iceberg.md
@@ -115,7 +115,7 @@ This should produce the following output:
 scala> spark.sql(s"SELECT * from t1").show()
 25/04/28 07:29:37 INFO core/src/lib.rs: Comet native library version 0.9.0 
initialized
 25/04/28 07:29:37 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot 
execute some parts of this plan natively (set 
spark.comet.explainFallback.enabled=false to disable this logging):
- CollectLimit [COMET: CollectLimit is not supported]
+ CollectLimit
 +-  Project [COMET: toprettystring is not supported]
    +- CometScanWrapper
 
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index 54e6e6364..2383dd844 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -19,14 +19,16 @@
 
 package org.apache.comet.rules
 
+import scala.collection.mutable.ListBuffer
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, 
EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, 
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, 
NamedExpression, Remainder}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometBroadcastHashJoinExec, CometCoalesceExec, CometCollectLimitExec, 
CometExec, CometExpandExec, CometFilterExec, CometGlobalLimitExec, 
CometHashAggregateExec, CometHashJoinExec, CometLocalLimitExec, 
CometNativeExec, CometNativeScanExec, CometPlan, CometProjectExec, 
CometScanExec, CometScanWrapper, CometSinkPlaceHolder, CometSortExec, 
CometSortMergeJoinExec, CometSparkToColumnarExec, 
CometTakeOrderedAndProjectExec, CometUnio [...]
+import org.apache.spark.sql.comet._
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometNativeShuffle, CometShuffleExchangeExec}
-import org.apache.spark.sql.execution.{CoalesceExec, CollectLimitExec, 
ExpandExec, FilterExec, GlobalLimitExec, LocalLimitExec, ProjectExec, SortExec, 
SparkPlan, TakeOrderedAndProjectExec, UnionExec}
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
BroadcastQueryStageExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, 
HashAggregateExec, ObjectHashAggregateExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
@@ -36,7 +38,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.{CometConf, ExtendedExplainInfo}
 import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED
-import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, 
isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, 
isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
+import org.apache.comet.CometSparkSessionExtensions._
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde
 
@@ -201,18 +203,34 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
           op,
           CometGlobalLimitExec(_, op, op.limit, op.child, 
SerializedPlan(None)))
 
-      case op: CollectLimitExec
-          if isCometNative(op.child) && 
CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)
-            && isCometShuffleEnabled(conf)
-            && op.offset == 0 =>
-        QueryPlanSerde
-          .operator2Proto(op)
-          .map { nativeOp =>
-            val cometOp =
-              CometCollectLimitExec(op, op.limit, op.offset, op.child)
-            CometSinkPlaceHolder(nativeOp, op, cometOp)
+      case op: CollectLimitExec =>
+        val fallbackReasons = new ListBuffer[String]()
+        if (!CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)) {
+          fallbackReasons += 
s"${CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.key} is false"
+        }
+        if (!isCometShuffleEnabled(conf)) {
+          fallbackReasons += "Comet shuffle is not enabled"
+        }
+        if (op.offset != 0) {
+          fallbackReasons += "CollectLimit with non-zero offset is not 
supported"
+        }
+        if (fallbackReasons.nonEmpty) {
+          withInfos(op, fallbackReasons.toSet)
+        } else {
+          if (!isCometNative(op.child)) {
+            // no reason to report reason if child is not native
+            op
+          } else {
+            QueryPlanSerde
+              .operator2Proto(op)
+              .map { nativeOp =>
+                val cometOp =
+                  CometCollectLimitExec(op, op.limit, op.offset, op.child)
+                CometSinkPlaceHolder(nativeOp, op, cometOp)
+              }
+              .getOrElse(op)
           }
-          .getOrElse(op)
+        }
 
       case op: ExpandExec =>
         newPlanWithProto(
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 19ed0b463..8fe04009c 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1875,7 +1875,37 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
               + "where A.c1 = B.c1 ",
             Set(
               "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled 
is not enabled",
-              "make_interval is not supported")))
+              "make_interval is not supported")),
+          (
+            s"select * from $table LIMIT 10 OFFSET 3",
+            Set(
+              "Comet shuffle is not enabled",
+              "CollectLimit with non-zero offset is not supported")))
+          .foreach(test => {
+            val qry = test._1
+            val expected = test._2
+            val df = sql(qry)
+            df.collect() // force an execution
+            checkSparkAnswerAndCompareExplainPlan(df, expected)
+          })
+      }
+    }
+  }
+
+  test("explain: CollectLimit disabled") {
+    withSQLConf(
+      CometConf.COMET_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.key -> "false",
+      EXTENDED_EXPLAIN_PROVIDERS_KEY -> 
"org.apache.comet.ExtendedExplainInfo") {
+      val table = "test"
+      withTable(table) {
+        sql(s"create table $table(c0 int, c1 int , c2 float) using parquet")
+        sql(s"insert into $table values(0, 1, 100.000001)")
+        Seq(
+          (
+            s"select * from $table LIMIT 10",
+            Set("spark.comet.exec.collectLimit.enabled is false")))
           .foreach(test => {
             val qry = test._1
             val expected = test._2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to