This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3808306b feat: Add logging to explain reasons for Comet not being able 
to run a query stage natively (#397)
3808306b is described below

commit 3808306b19f44253c087bd92742d93992ee6522d
Author: Andy Grove <[email protected]>
AuthorDate: Mon May 13 20:04:16 2024 -0600

    feat: Add logging to explain reasons for Comet not being able to run a 
query stage natively (#397)
---
 common/src/main/scala/org/apache/comet/CometConf.scala  |  8 ++++++++
 docs/source/user-guide/configs.md                       |  1 +
 docs/source/user-guide/installation.md                  | 14 +++++++++++++-
 .../org/apache/comet/CometSparkSessionExtensions.scala  | 17 +++++++++++++++++
 .../scala/org/apache/comet/ExtendedExplainInfo.scala    |  2 +-
 5 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index e3584300..0ef2982e 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -277,6 +277,14 @@ object CometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.explainFallback.enabled")
+      .doc(
+        "When this setting is enabled, Comet will provide logging explaining 
the reason(s) " +
+          "why a query stage cannot be executed natively.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
     .doc("The columnar batch size, i.e., the maximum number of rows that a 
batch can contain.")
     .intConf
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index d75059a9..24f408a0 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -40,6 +40,7 @@ Comet provides the following configuration settings.
 | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory 
overhead that the native memory manager can use for execution. The purpose of 
this config is to set aside memory for untracked data structures, as well as 
imprecise size estimation during memory acquisition. Default value is 0.7. | 
0.7 |
 | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to 
compress shuffle data. Only zstd is supported. | zstd |
 | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. 
By default, this config is false. Note that this requires setting 
'spark.shuffle.manager' to 
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 
'spark.shuffle.manager' must be set before starting the Spark application and 
cannot be changed during the application. | false |
+| spark.comet.explainFallback.enabled | When this setting is enabled, Comet 
will provide logging explaining the reason(s) why a query stage cannot be 
executed natively. | false |
 | spark.comet.memory.overhead.factor | Fraction of executor memory to be 
allocated as additional non-heap memory per executor process for Comet. Default 
value is 0.2. | 0.2 |
 | spark.comet.memory.overhead.min | Minimum amount of additional memory to be 
allocated per executor process for Comet, in MiB. | 402653184b |
 | spark.comet.nativeLoadRequired | Whether to require Comet native library to 
load successfully when Comet is enabled. If not, Comet will silently fallback 
to Spark when it fails to load the native lib. Otherwise, an error will be 
thrown and the Spark job will be aborted. | false |
diff --git a/docs/source/user-guide/installation.md 
b/docs/source/user-guide/installation.md
index b948d50c..e9149019 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -67,7 +67,8 @@ $SPARK_HOME/bin/spark-shell \
     --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
     --conf spark.comet.enabled=true \
     --conf spark.comet.exec.enabled=true \
-    --conf spark.comet.exec.all.enabled=true
+    --conf spark.comet.exec.all.enabled=true \
+    --conf spark.comet.explainFallback.enabled=true
 ```
 
 ### Verify Comet enabled for Spark SQL query
@@ -95,6 +96,17 @@ INFO src/lib.rs: Comet native library initialized
              PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: 
struct<a:int>
 ```
 
+With the configuration `spark.comet.explainFallback.enabled=true`, Comet will 
log any reasons that prevent a plan from
+being executed natively.
+
+```scala
+scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet")
+WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some 
parts of this plan natively because:
+  - LocalTableScan is not supported
+  - WriteFiles is not supported
+  - Execute InsertIntoHadoopFsRelationCommand is not supported
+```
+
 ### Enable Comet shuffle
 
 Comet shuffle feature is disabled by default. To enable it, please add related 
configs:
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 73592d78..9cb9930b 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -734,6 +734,23 @@ class CometSparkSessionExtensions
       } else {
         var newPlan = transform(plan)
 
+        // if the plan cannot be run fully natively then explain why (when 
appropriate
+        // config is enabled)
+        if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
+          new ExtendedExplainInfo().extensionInfo(newPlan) match {
+            case reasons if reasons.size == 1 =>
+              logWarning(
+                "Comet cannot execute some parts of this plan natively " +
+                  s"because ${reasons.head}")
+            case reasons if reasons.size > 1 =>
+              logWarning(
+                "Comet cannot execute some parts of this plan natively" +
+                  s" because:\n\t- ${reasons.mkString("\n\t- ")}")
+            case _ =>
+            // no reasons recorded
+          }
+        }
+
         // Remove placeholders
         newPlan = newPlan.transform {
           case CometSinkPlaceHolder(_, _, s) => s
diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala 
b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
index 8e5aee8b..d7ef4e9f 100644
--- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
+++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
@@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
     }
   }
 
-  private def extensionInfo(node: TreeNode[_]): Set[String] = {
+  private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = {
     var info = mutable.Seq[String]()
     val sorted = sortup(node)
     sorted.foreach { p =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to