This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 21717eb  fix: Iceberg scan transition should be in front of other data 
source v2 (#302)
21717eb is described below

commit 21717eb0ac24bf88a9d8f37d84053a5f503fbadc
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Apr 23 07:47:35 2024 -0700

    fix: Iceberg scan transition should be in front of other data source v2 
(#302)
---
 .../apache/comet/CometSparkSessionExtensions.scala | 72 +++++++++++++---------
 1 file changed, 42 insertions(+), 30 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index fc0d133..8ef8cb8 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -52,6 +52,13 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde
 import org.apache.comet.shims.ShimCometSparkSessionExtensions
 
+/**
+ * The entry point of Comet extension to Spark. This class is responsible for 
injecting Comet
+ * rules and extensions into Spark.
+ *
+ * CometScanRule: A rule to transform a Spark scan plan into a Comet scan 
plan. CometExecRule: A
+ * rule to transform a Spark execution plan into a Comet execution plan.
+ */
 class CometSparkSessionExtensions
     extends (SparkSessionExtensions => Unit)
     with Logging
@@ -98,7 +105,8 @@ class CometSparkSessionExtensions
               scanExec.copy(scan = cometScan),
               runtimeFilters = scanExec.runtimeFilters)
 
-          // unsupported parquet data source V2
+          // If it is a `ParquetScan` but unsupported by Comet, attach the 
exact
+          // reason to the plan.
           case scanExec: BatchScanExec if 
scanExec.scan.isInstanceOf[ParquetScan] =>
             val requiredSchema = 
scanExec.scan.asInstanceOf[ParquetScan].readDataSchema
             val info1 = createMessage(
@@ -107,7 +115,7 @@ class CometSparkSessionExtensions
             val readPartitionSchema = 
scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema
             val info2 = createMessage(
               !isSchemaSupported(readPartitionSchema),
-              s"Schema $readPartitionSchema is not supported")
+              s"Partition schema $readPartitionSchema is not supported")
             // Comet does not support pushedAggregate
             val info3 = createMessage(
               
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
@@ -115,34 +123,37 @@ class CometSparkSessionExtensions
             withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n"))
             scanExec
 
-          case scanExec: BatchScanExec if 
!scanExec.scan.isInstanceOf[ParquetScan] =>
-            withInfo(scanExec, "Comet Scan only supports Parquet")
-            scanExec
-
-          // iceberg scan
+          // Other datasource V2 scan
           case scanExec: BatchScanExec =>
-            if (isSchemaSupported(scanExec.scan.readSchema())) {
-              scanExec.scan match {
-                case s: SupportsComet if s.isCometEnabled =>
-                  logInfo(s"Comet extension enabled for 
${scanExec.scan.getClass.getSimpleName}")
-                  // When reading from Iceberg, we automatically enable type 
promotion
-                  
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
-                  CometBatchScanExec(
-                    scanExec.clone().asInstanceOf[BatchScanExec],
-                    runtimeFilters = scanExec.runtimeFilters)
-                case _ =>
-                  val msg = "Comet extension is not enabled for " +
-                    s"${scanExec.scan.getClass.getSimpleName}: not enabled on 
data source side"
-                  logInfo(msg)
-                  withInfo(scanExec, msg)
-                  scanExec
-              }
-            } else {
-              val msg = "Comet extension is not enabled for " +
-                s"${scanExec.scan.getClass.getSimpleName}: Schema not 
supported"
-              logInfo(msg)
-              withInfo(scanExec, msg)
-              scanExec
+            scanExec.scan match {
+              // Iceberg scan, supported cases
+              case s: SupportsComet
+                  if s.isCometEnabled &&
+                    isSchemaSupported(scanExec.scan.readSchema()) =>
+                logInfo(s"Comet extension enabled for 
${scanExec.scan.getClass.getSimpleName}")
+                // When reading from Iceberg, we automatically enable type 
promotion
+                SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, 
"true")
+                CometBatchScanExec(
+                  scanExec.clone().asInstanceOf[BatchScanExec],
+                  runtimeFilters = scanExec.runtimeFilters)
+
+              // Iceberg scan but disabled or unsupported by Comet
+              case s: SupportsComet =>
+                val info1 = createMessage(
+                  !s.isCometEnabled,
+                  "Comet extension is not enabled for " +
+                    s"${scanExec.scan.getClass.getSimpleName}: not enabled on 
data source side")
+                val info2 = createMessage(
+                  !isSchemaSupported(scanExec.scan.readSchema()),
+                  "Comet extension is not enabled for " +
+                    s"${scanExec.scan.getClass.getSimpleName}: Schema not 
supported")
+                withInfo(scanExec, Seq(info1, info2).flatten.mkString("\n"))
+
+              // If it is data source V2 other than Parquet or Iceberg,
+              // attach the unsupported reason to the plan.
+              case _ =>
+                withInfo(scanExec, "Comet Scan only supports Parquet")
+                scanExec
             }
 
           // data source V1
@@ -986,7 +997,8 @@ object CometSparkSessionExtensions extends Logging {
 
   /**
    * Attaches explain information to a TreeNode, rolling up the corresponding 
information tags
-   * from any child nodes
+   * from any child nodes. For now, we are using this to attach the reasons 
why certain Spark
+   * operators or expressions are disabled.
    *
    * @param node
    *   The node to attach the explain information to. Typically a SparkPlan


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to