This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 21717eb fix: Iceberg scan transition should be in front of other data
source v2 (#302)
21717eb is described below
commit 21717eb0ac24bf88a9d8f37d84053a5f503fbadc
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Apr 23 07:47:35 2024 -0700
fix: Iceberg scan transition should be in front of other data source v2
(#302)
---
.../apache/comet/CometSparkSessionExtensions.scala | 72 +++++++++++++---------
1 file changed, 42 insertions(+), 30 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index fc0d133..8ef8cb8 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -52,6 +52,13 @@ import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
import org.apache.comet.shims.ShimCometSparkSessionExtensions
+/**
+ * The entry point of Comet extension to Spark. This class is responsible for
injecting Comet
+ * rules and extensions into Spark.
+ *
+ * CometScanRule: A rule to transform a Spark scan plan into a Comet scan
plan. CometExecRule: A
+ * rule to transform a Spark execution plan into a Comet execution plan.
+ */
class CometSparkSessionExtensions
extends (SparkSessionExtensions => Unit)
with Logging
@@ -98,7 +105,8 @@ class CometSparkSessionExtensions
scanExec.copy(scan = cometScan),
runtimeFilters = scanExec.runtimeFilters)
- // unsupported parquet data source V2
+ // If it is a `ParquetScan` but unsupported by Comet, attach the
exact
+ // reason to the plan.
case scanExec: BatchScanExec if
scanExec.scan.isInstanceOf[ParquetScan] =>
val requiredSchema =
scanExec.scan.asInstanceOf[ParquetScan].readDataSchema
val info1 = createMessage(
@@ -107,7 +115,7 @@ class CometSparkSessionExtensions
val readPartitionSchema =
scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema
val info2 = createMessage(
!isSchemaSupported(readPartitionSchema),
- s"Schema $readPartitionSchema is not supported")
+ s"Partition schema $readPartitionSchema is not supported")
// Comet does not support pushedAggregate
val info3 = createMessage(
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
@@ -115,34 +123,37 @@ class CometSparkSessionExtensions
withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n"))
scanExec
- case scanExec: BatchScanExec if
!scanExec.scan.isInstanceOf[ParquetScan] =>
- withInfo(scanExec, "Comet Scan only supports Parquet")
- scanExec
-
- // iceberg scan
+ // Other datasource V2 scan
case scanExec: BatchScanExec =>
- if (isSchemaSupported(scanExec.scan.readSchema())) {
- scanExec.scan match {
- case s: SupportsComet if s.isCometEnabled =>
- logInfo(s"Comet extension enabled for
${scanExec.scan.getClass.getSimpleName}")
- // When reading from Iceberg, we automatically enable type
promotion
-
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
- CometBatchScanExec(
- scanExec.clone().asInstanceOf[BatchScanExec],
- runtimeFilters = scanExec.runtimeFilters)
- case _ =>
- val msg = "Comet extension is not enabled for " +
- s"${scanExec.scan.getClass.getSimpleName}: not enabled on
data source side"
- logInfo(msg)
- withInfo(scanExec, msg)
- scanExec
- }
- } else {
- val msg = "Comet extension is not enabled for " +
- s"${scanExec.scan.getClass.getSimpleName}: Schema not
supported"
- logInfo(msg)
- withInfo(scanExec, msg)
- scanExec
+ scanExec.scan match {
+ // Iceberg scan, supported cases
+ case s: SupportsComet
+ if s.isCometEnabled &&
+ isSchemaSupported(scanExec.scan.readSchema()) =>
+ logInfo(s"Comet extension enabled for
${scanExec.scan.getClass.getSimpleName}")
+ // When reading from Iceberg, we automatically enable type
promotion
+ SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key,
"true")
+ CometBatchScanExec(
+ scanExec.clone().asInstanceOf[BatchScanExec],
+ runtimeFilters = scanExec.runtimeFilters)
+
+ // Iceberg scan but disabled or unsupported by Comet
+ case s: SupportsComet =>
+ val info1 = createMessage(
+ !s.isCometEnabled,
+ "Comet extension is not enabled for " +
+ s"${scanExec.scan.getClass.getSimpleName}: not enabled on
data source side")
+ val info2 = createMessage(
+ !isSchemaSupported(scanExec.scan.readSchema()),
+ "Comet extension is not enabled for " +
+ s"${scanExec.scan.getClass.getSimpleName}: Schema not
supported")
+ withInfo(scanExec, Seq(info1, info2).flatten.mkString("\n"))
+
+ // If it is data source V2 other than Parquet or Iceberg,
+ // attach the unsupported reason to the plan.
+ case _ =>
+ withInfo(scanExec, "Comet Scan only supports Parquet")
+ scanExec
}
// data source V1
@@ -986,7 +997,8 @@ object CometSparkSessionExtensions extends Logging {
/**
* Attaches explain information to a TreeNode, rolling up the corresponding
information tags
- * from any child nodes
+ * from any child nodes. For now, we are using this to attach the reasons
why certain Spark
+ * operators or expressions are disabled.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]