This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 3808306b feat: Add logging to explain reasons for Comet not being able
to run a query stage natively (#397)
3808306b is described below
commit 3808306b19f44253c087bd92742d93992ee6522d
Author: Andy Grove <[email protected]>
AuthorDate: Mon May 13 20:04:16 2024 -0600
feat: Add logging to explain reasons for Comet not being able to run a
query stage natively (#397)
---
common/src/main/scala/org/apache/comet/CometConf.scala | 8 ++++++++
docs/source/user-guide/configs.md | 1 +
docs/source/user-guide/installation.md | 14 +++++++++++++-
.../org/apache/comet/CometSparkSessionExtensions.scala | 17 +++++++++++++++++
.../scala/org/apache/comet/ExtendedExplainInfo.scala | 2 +-
5 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index e3584300..0ef2982e 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -277,6 +277,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.explainFallback.enabled")
+ .doc(
+ "When this setting is enabled, Comet will provide logging explaining
the reason(s) " +
+ "why a query stage cannot be executed natively.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a
batch can contain.")
.intConf
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index d75059a9..24f408a0 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -40,6 +40,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory
overhead that the native memory manager can use for execution. The purpose of
this config is to set aside memory for untracked data structures, as well as
imprecise size estimation during memory acquisition. Default value is 0.7. |
0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to
compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle.
By default, this config is false. Note that this requires setting
'spark.shuffle.manager' to
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'.
'spark.shuffle.manager' must be set before starting the Spark application and
cannot be changed during the application. | false |
+| spark.comet.explainFallback.enabled | When this setting is enabled, Comet
will provide logging explaining the reason(s) why a query stage cannot be
executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be
allocated as additional non-heap memory per executor process for Comet. Default
value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be
allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to
load successfully when Comet is enabled. If not, Comet will silently fallback
to Spark when it fails to load the native lib. Otherwise, an error will be
thrown and the Spark job will be aborted. | false |
diff --git a/docs/source/user-guide/installation.md
b/docs/source/user-guide/installation.md
index b948d50c..e9149019 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -67,7 +67,8 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
- --conf spark.comet.exec.all.enabled=true
+ --conf spark.comet.exec.all.enabled=true \
+ --conf spark.comet.explainFallback.enabled=true
```
### Verify Comet enabled for Spark SQL query
@@ -95,6 +96,17 @@ INFO src/lib.rs: Comet native library initialized
PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema:
struct<a:int>
```
+With the configuration `spark.comet.explainFallback.enabled=true`, Comet will
log any reasons that prevent a plan from
+being executed natively.
+
+```scala
+scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet")
+WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some
parts of this plan natively because:
+ - LocalTableScan is not supported
+ - WriteFiles is not supported
+ - Execute InsertIntoHadoopFsRelationCommand is not supported
+```
+
### Enable Comet shuffle
Comet shuffle feature is disabled by default. To enable it, please add related
configs:
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 73592d78..9cb9930b 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -734,6 +734,23 @@ class CometSparkSessionExtensions
} else {
var newPlan = transform(plan)
+ // if the plan cannot be run fully natively then explain why (when
appropriate
+ // config is enabled)
+ if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
+ new ExtendedExplainInfo().extensionInfo(newPlan) match {
+ case reasons if reasons.size == 1 =>
+ logWarning(
+ "Comet cannot execute some parts of this plan natively " +
+ s"because ${reasons.head}")
+ case reasons if reasons.size > 1 =>
+ logWarning(
+ "Comet cannot execute some parts of this plan natively" +
+ s" because:\n\t- ${reasons.mkString("\n\t- ")}")
+ case _ =>
+ // no reasons recorded
+ }
+ }
+
// Remove placeholders
newPlan = newPlan.transform {
case CometSinkPlaceHolder(_, _, s) => s
diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
index 8e5aee8b..d7ef4e9f 100644
--- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
+++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
@@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
}
}
- private def extensionInfo(node: TreeNode[_]): Set[String] = {
+ private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = {
var info = mutable.Seq[String]()
val sorted = sortup(node)
sorted.foreach { p =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]