This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d1479ec feat: Improve native explain (#795)
1d1479ec is described below

commit 1d1479ece50957060684d663d3d79c2317f916be
Author: Andy Grove <[email protected]>
AuthorDate: Fri Aug 9 02:42:23 2024 -0600

    feat: Improve native explain (#795)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  9 +++++++
 docs/source/user-guide/configs.md                  |  1 +
 native/core/src/execution/datafusion/planner.rs    |  5 +++-
 native/core/src/execution/jni_api.rs               | 29 +++++++++++++---------
 native/core/src/execution/operators/scan.rs        |  8 ++++--
 native/proto/src/proto/operator.proto              |  4 +++
 .../scala/org/apache/comet/CometExecIterator.scala |  3 ++-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  1 +
 .../org/apache/comet/exec/CometExecSuite.scala     | 14 +++++++++++
 9 files changed, 58 insertions(+), 16 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index ffef381d..8e555580 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -282,6 +282,15 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_EXPLAIN_NATIVE_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.explain.native.enabled")
+      .doc(
+        "When this setting is enabled, Comet will provide a tree 
representation of " +
+          "the native query plan before execution and again after execution, 
with " +
+          "metrics.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.explainFallback.enabled")
       .doc(
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 74a2104b..f95c29ae 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -39,6 +39,7 @@ Comet provides the following configuration settings.
 | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to 
compress shuffle data. Only zstd is supported. | zstd |
 | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. 
By default, this config is false. Note that this requires setting 
'spark.shuffle.manager' to 
'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 
'spark.shuffle.manager' must be set before starting the Spark application and 
cannot be changed during the application. | false |
 | spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is 
only effective if Comet shuffle is enabled. Available modes are 'native', 
'jvm', and 'auto'. 'native' is for native shuffle which has best performance in 
general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than 
native shuffle. 'auto' is for Comet to choose the best shuffle mode based on 
the query plan. By default, this config is 'jvm'. | jvm |
+| spark.comet.explain.native.enabled | When this setting is enabled, Comet 
will provide a tree representation of the native query plan before execution 
and again after execution, with metrics. | false |
 | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet 
will provide a verbose tree representation of the extended information. | false 
|
 | spark.comet.explainFallback.enabled | When this setting is enabled, Comet 
will provide logging explaining the reason(s) why a query stage cannot be 
executed natively. Set this to false to reduce the amount of logging. | false |
 | spark.comet.memory.overhead.factor | Fraction of executor memory to be 
allocated as additional non-heap memory per executor process for Comet. Default 
value is 0.2. | 0.2 |
diff --git a/native/core/src/execution/datafusion/planner.rs 
b/native/core/src/execution/datafusion/planner.rs
index b604e98b..e776b416 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -885,7 +885,7 @@ impl PhysicalPlanner {
                     };
 
                 // The `ScanExec` operator will take actual arrays from Spark 
during execution
-                let scan = ScanExec::new(self.exec_context_id, input_source, 
fields)?;
+                let scan = ScanExec::new(self.exec_context_id, input_source, 
&scan.source, fields)?;
                 Ok((vec![scan.clone()], Arc::new(scan)))
             }
             OpStruct::ShuffleWriter(writer) => {
@@ -1944,6 +1944,7 @@ mod tests {
                     type_id: 3, // Int32
                     type_info: None,
                 }],
+                source: "".to_string(),
             })),
         };
 
@@ -2015,6 +2016,7 @@ mod tests {
                     type_id: STRING_TYPE_ID, // String
                     type_info: None,
                 }],
+                source: "".to_string(),
             })),
         };
 
@@ -2101,6 +2103,7 @@ mod tests {
                         type_id: 3,
                         type_info: None,
                     }],
+                    source: "".to_string(),
                 },
             )),
         };
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index eb5f698b..a29b8380 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -90,6 +90,8 @@ struct ExecutionContext {
     pub session_ctx: Arc<SessionContext>,
     /// Whether to enable additional debugging checks & messages
     pub debug_native: bool,
+    /// Whether to write native plans with metrics to stdout
+    pub explain_native: bool,
 }
 
 /// Accept serialized query plan and return the address of the native query 
plan.
@@ -128,10 +130,8 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
         }
 
         // Whether we've enabled additional debugging on the native side
-        let debug_native = configs
-            .get("debug_native")
-            .and_then(|x| x.parse::<bool>().ok())
-            .unwrap_or(false);
+        let debug_native = parse_bool(&configs, "debug_native")?;
+        let explain_native = parse_bool(&configs, "explain_native")?;
 
         // Use multi-threaded tokio runtime to prevent blocking spawned tasks 
if any
         let runtime = tokio::runtime::Builder::new_multi_thread()
@@ -170,6 +170,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             metrics,
             session_ctx: Arc::new(session),
             debug_native,
+            explain_native,
         });
 
         Ok(Box::into_raw(exec_context) as i64)
@@ -193,11 +194,7 @@ fn prepare_datafusion_session_context(
 
     // Check if we are using unified memory manager integrated with Spark. 
Default to false if not
     // set.
-    let use_unified_memory_manager = conf
-        .get("use_unified_memory_manager")
-        .map(String::as_str)
-        .unwrap_or("false")
-        .parse::<bool>()?;
+    let use_unified_memory_manager = parse_bool(conf, 
"use_unified_memory_manager")?;
 
     if use_unified_memory_manager {
         // Set Comet memory pool for native
@@ -245,6 +242,14 @@ fn prepare_datafusion_session_context(
     ))
 }
 
+fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> 
{
+    conf.get(name)
+        .map(String::as_str)
+        .unwrap_or("false")
+        .parse::<bool>()
+        .map_err(|e| CometError::Config(format!("Failed to parse boolean 
config {name}: {e}")))
+}
+
 /// Prepares arrow arrays for output.
 fn prepare_output(
     env: &mut JNIEnv,
@@ -339,7 +344,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
             exec_context.root_op = Some(root_op.clone());
             exec_context.scans = scans;
 
-            if exec_context.debug_native {
+            if exec_context.explain_native {
                 let formatted_plan_str =
                     
DisplayableExecutionPlan::new(root_op.as_ref()).indent(true);
                 info!("Comet native query plan:\n {formatted_plan_str:}");
@@ -372,11 +377,11 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                     // Update metrics
                     update_metrics(&mut env, exec_context)?;
 
-                    if exec_context.debug_native {
+                    if exec_context.explain_native {
                         if let Some(plan) = &exec_context.root_op {
                             let formatted_plan_str =
                                 
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
-                            info!("Comet native query plan with metrics:\n 
{formatted_plan_str:}");
+                            info!("Comet native query plan with 
metrics:\n{formatted_plan_str:}");
                         }
                     }
 
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index d9212cf5..ae19b368 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -57,6 +57,8 @@ pub struct ScanExec {
     pub exec_context_id: i64,
     /// The input source of scan node. It is a global reference of JVM 
`CometBatchIterator` object.
     pub input_source: Option<Arc<GlobalRef>>,
+    /// A description of the input source for informational purposes
+    pub input_source_description: String,
     /// The data types of columns of the input batch. Converted from Spark 
schema.
     pub data_types: Vec<DataType>,
     /// The input batch of input data. Used to determine the schema of the 
input data.
@@ -70,6 +72,7 @@ impl ScanExec {
     pub fn new(
         exec_context_id: i64,
         input_source: Option<Arc<GlobalRef>>,
+        input_source_description: &str,
         data_types: Vec<DataType>,
     ) -> Result<Self, CometError> {
         // Scan's schema is determined by the input batch, so we need to set 
it before execution.
@@ -90,6 +93,7 @@ impl ScanExec {
         Ok(Self {
             exec_context_id,
             input_source,
+            input_source_description: input_source_description.to_string(),
             data_types,
             batch: Arc::new(Mutex::new(Some(first_batch))),
             cache,
@@ -291,14 +295,14 @@ impl DisplayAs for ScanExec {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "ScanExec")?;
+                write!(f, "ScanExec: source=[{}], ", 
self.input_source_description)?;
                 let fields: Vec<String> = self
                     .data_types
                     .iter()
                     .enumerate()
                     .map(|(idx, dt)| format!("col_{idx:}: {dt:}"))
                     .collect();
-                write!(f, ": schema=[{}]", fields.join(", "))?;
+                write!(f, "schema=[{}]", fields.join(", "))?;
             }
         }
         Ok(())
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index 335d4259..f20f48d8 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -48,6 +48,10 @@ message Operator {
 
 message Scan {
   repeated spark.spark_expression.DataType fields = 1;
+  // The source of the scan (e.g. file scan, broadcast exchange, shuffle, 
etc). This
+  // is purely for informational purposes when viewing native query plans in
+  // debug mode.
+  string source = 2;
 }
 
 message Projection {
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 89225c0d..ef18850b 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
 import org.apache.spark.sql.comet.CometMetricNode
 import org.apache.spark.sql.vectorized._
 
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, 
COMET_EXEC_MEMORY_FRACTION}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, 
COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED}
 import org.apache.comet.vector.NativeUtil
 
 /**
@@ -85,6 +85,7 @@ class CometExecIterator(
     result.put("memory_fraction", 
String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
     result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
     result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
+    result.put("explain_native", 
String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))
 
     // Strip mandatory prefix spark. which is not required for DataFusion 
session params
     conf.getAll.foreach {
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 135ed15b..c5a51b97 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2846,6 +2846,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
       case op if isCometSink(op) && op.output.forall(a => 
supportedDataType(a.dataType, true)) =>
         // These operators are source of Comet native execution chain
         val scanBuilder = OperatorOuterClass.Scan.newBuilder()
+        scanBuilder.setSource(op.simpleStringWithNodeId())
 
         val scanTypes = op.output.flatten { attr =>
           serializeDataType(attr.dataType)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 56da81cb..67d65566 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -685,6 +685,20 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("explain native plan") {
+    // there are no assertions in this test to prove that the explain feature
+    // wrote the expected output to stdout, but we at least test that enabling
+    // the config does not cause any exceptions.
+    withSQLConf(
+      CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
+        val df = sql("select * FROM tbl a join tbl b on a._1 = 
b._2").select("a._1")
+        checkSparkAnswerAndOperator(df)
+      }
+    }
+  }
+
   test("transformed cometPlan") {
     withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
       val df = sql("select * FROM tbl where _1 >= 2").select("_1")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to