advancedxy commented on code in PR #166:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/166#discussion_r1521021759
##########
dev/diffs/3.4.2.diff:
##########
@@ -0,0 +1,1306 @@
+diff --git a/pom.xml b/pom.xml
+index fab98342498..f2156d790d1 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -148,6 +148,8 @@
+ <chill.version>0.10.0</chill.version>
+ <ivy.version>2.5.1</ivy.version>
+ <oro.version>2.0.8</oro.version>
++ <spark.version.short>3.4</spark.version.short>
++ <comet.version>0.1.0-SNAPSHOT</comet.version>
+ <!--
+ If you changes codahale.metrics.version, you also need to change
+ the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2766,6 +2768,25 @@
+ <artifactId>arpack</artifactId>
+ <version>${netlib.ludovic.dev.version}</version>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ <version>${comet.version}</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-sql_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-core_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index 5b6cc8cb7af..5ce708adc38 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -77,6 +77,10 @@
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ </dependency>
+
+ <!--
+ This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+index c595b50950b..483508dc076 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+@@ -26,6 +26,8 @@ import scala.collection.JavaConverters._
+ import scala.reflect.runtime.universe.TypeTag
+ import scala.util.control.NonFatal
+
++import org.apache.comet.CometConf
++
+ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+ import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
+ import org.apache.spark.api.java.JavaRDD
+@@ -102,7 +104,7 @@ class SparkSession private(
+ sc: SparkContext,
+ initialSessionOptions: java.util.HashMap[String, String]) = {
+ this(sc, None, None,
+- SparkSession.applyExtensions(
++ SparkSession.applyExtensions(sc,
+
sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ new SparkSessionExtensions), initialSessionOptions.asScala.toMap)
+ }
+@@ -1028,7 +1030,7 @@ object SparkSession extends Logging {
+ }
+
+ loadExtensions(extensions)
+- applyExtensions(
++ applyExtensions(sparkContext,
+
sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ extensions)
+
+@@ -1282,14 +1284,24 @@ object SparkSession extends Logging {
+ }
+ }
+
++ private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++ if (sparkContext.getConf.getBoolean(CometConf.COMET_ENABLED.key, false)) {
++ Seq("org.apache.comet.CometSparkSessionExtensions")
++ } else {
++ Seq.empty
++ }
++ }
++
+ /**
+ * Initialize extensions for given extension classnames. The classes will
be applied to the
+ * extensions passed into this function.
+ */
+ private def applyExtensions(
++ sparkContext: SparkContext,
+ extensionConfClassNames: Seq[String],
+ extensions: SparkSessionExtensions): SparkSessionExtensions = {
+- extensionConfClassNames.foreach { extensionConfClassName =>
++ val extensionClassNames = extensionConfClassNames ++
loadCometExtension(sparkContext)
++ extensionClassNames.foreach { extensionConfClassName =>
+ try {
+ val extensionConfClass = Utils.classForName(extensionConfClassName)
+ val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index db587dd9868..aac7295a53d 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.annotation.DeveloperApi
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+ // dump the file scan metadata (e.g file path) to event log
+ val metadata = plan match {
+ case fileScan: FileSourceScanExec => fileScan.metadata
++ case cometScan: CometScanExec => cometScan.metadata
+ case _ => Map[String, String]()
+ }
+ new SparkPlanInfo(
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 698ca009b4f..57d774a3617 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ -- Test tables
+ CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+index 1152d77da0c..f77493f690b 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+@@ -7,6 +7,9 @@
+
+ -- avoid bit-exact output here because operations may not be bit-exact.
+ -- SET extra_float_digits = 0;
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..44cd244d3b0 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -5,6 +5,9 @@
+ -- AGGREGATES [Part 3]
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..2b73732c33f 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- INT8
+ -- Test int8 64-bit integers.
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..423d3b3d76d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- SELECT_HAVING
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index 56e9520fdab..917932336df 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+
+ withTempDatabase { dbName =>
+ withTable(table1Name, table2Name) {
+- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++ withSQLConf(
++ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++ "spark.comet.enabled" -> "false") {
+ spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+ spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+new file mode 100644
+index 00000000000..07687f6685a
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+@@ -0,0 +1,39 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++case class DisableComet(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default
config values.
++ */
++trait DisableCometSuite extends SQLTestUtils {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
++ (implicit pos: Position): Unit = {
++ if (isCometEnabled) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
++ }
++}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index f33432ddb6f..fe9f74ff8f1 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case _ => Nil
+ }
+ }
+@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ case s: BatchScanExec =>
+ // we use f1 col for v2 tables due to schema pruning
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
++ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
+ case _ => false
+ }
+ assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index a6b295578d6..d5e25564bb9 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("Explain formatted output for scan operator for datasource V2") {
++ test("Explain formatted output for scan operator for datasource V2",
++ DisableComet("Comet explain output is different")) {
+ withTempDir { dir =>
+ Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+ val basePath = dir.getCanonicalPath + "/" + fmt
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 2796b1cf154..94591f83c84 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT,
NullData, NullUDT}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GreaterThan, Literal}
+ import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -875,6 +876,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -916,6 +918,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1100,6 +1103,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ val filters = df.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f.dataFilters
+ case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++ case b: CometScanExec => b.dataFilters
++ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
+ }.flatten
+ assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+index 5125708be32..e274a497996 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow,
SortOrder}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec,
ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
+@@ -1371,7 +1372,7 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+ assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
+ // No extra sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 5)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 5)
+ }
+
+ // Test output ordering is not preserved
+@@ -1382,7 +1383,7 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+ assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
+ // Have sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 6)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 6)
+ }
+
+ // Test singe partition
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+index b5b34922694..5fa734d30e1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+@@ -69,7 +69,7 @@ import org.apache.spark.tags.ExtendedSQLTest
+ * }}}
+ */
+ // scalastyle:on line.size.limit
+-trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with
DisableCometSuite {
+
+ protected val baseResourcePath = {
+ // use the same way as `SQLQueryTestSuite` to get the resource path
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+index 3cfda19134a..afcfba37c6f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+
+ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join,
LogicalPlan, Project, Sort, Union}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.FileScanRDD
+@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
++ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++ fs.inputRDDs().forall(
++ _.asInstanceOf[FileScanRDD].filePartitions.forall(
++ _.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case _ => false
+ })
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+index cfc8b2cc845..c6fcfd7bd08 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
+ import org.apache.spark.sql.connector.read.ScanBuilder
+ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+ val df = spark.read.format(format).load(path.getCanonicalPath)
+ checkAnswer(df, inputData.toDF())
+ assert(
+-
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
++ df.queryExecution.executedPlan.exists {
++ case _: FileSourceScanExec | _: CometScanExec => true
++ case _ => false
++ }
++ )
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+index c0ec8a58bd5..5f880751e21 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.FsPermission
+ import org.mockito.Mockito.{mock, spy, when}
+
+ import org.apache.spark._
+-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset,
QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset,
DisableComet, QueryTest, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.util.BadRecordException
+ import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry,
JDBCOptions}
+@@ -248,7 +248,8 @@ class QueryExecutionErrorsSuite
+ }
+
+ test("INCONSISTENT_BEHAVIOR_CROSS_VERSION: " +
+- "compatibility with Spark 2.4/3.2 in reading/writing dates") {
++ "compatibility with Spark 2.4/3.2 in reading/writing dates",
++ DisableComet("Comet doesn't completely support datetime rebase mode
yet")) {
+
+ // Fail to read ancient datetime values.
+ withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key ->
EXCEPTION.toString) {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+index 418ca3430bb..d5fc207601c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+@@ -23,7 +23,7 @@ import scala.util.Random
+ import org.apache.hadoop.fs.Path
+
+ import org.apache.spark.SparkConf
+-import org.apache.spark.sql.{DataFrame, QueryTest}
++import org.apache.spark.sql.{DataFrame, DisableComet, QueryTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
+ import org.apache.spark.sql.internal.SQLConf
+@@ -195,7 +195,7 @@ class DataSourceV2ScanExecRedactionSuite extends
DataSourceScanRedactionTest {
+ }
+ }
+
+- test("FileScan description") {
++ test("FileScan description", DisableComet("Comet doesn't use BatchScan")) {
+ Seq("json", "orc", "parquet").foreach { format =>
+ withTempPath { path =>
+ val dir = path.getCanonicalPath
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+index 9e9d717db3b..91a4f9a38d5 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.comet.CometProjectExec
+ import org.apache.spark.sql.connector.SimpleWritableDataSource
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
+ private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
+ withClue(df.queryExecution) {
+ val plan = df.queryExecution.executedPlan
+- val actual = collectWithSubqueries(plan) { case p: ProjectExec => p
}.size
++ val actual = collectWithSubqueries(plan) {
++ case p: ProjectExec => p
++ case p: CometProjectExec => p
++ }.size
+ assert(actual == expected)
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+index ac710c32296..37746bd470d 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+@@ -616,7 +616,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ .write.mode(SaveMode.Overwrite).parquet(path)
+
+ withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
+- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
++ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
++ // Disable Comet native execution because this checks wholestage
codegen.
++ "spark.comet.exec.enabled" -> "false") {
+ val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
+ val df = spark.read.parquet(path).selectExpr(projection: _*)
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+index bd9c79e5b96..ab7584e768e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.SchemaPruningTest
+ import org.apache.spark.sql.catalyst.expressions.Concat
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+ import org.apache.spark.sql.catalyst.plans.logical.Expand
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.functions._
+@@ -867,6 +868,7 @@ abstract class SchemaPruningSuite
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: FileSourceScanExec => scan.requiredSchema
++ case scan: CometScanExec => scan.requiredSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+index 1d2e467c94c..77a119505b9 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem,
GlobFilter, Path}
+ import org.mockito.Mockito.{mock, when}
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.{DataFrame, DisableCometSuite, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.encoders.RowEncoder
+ import org.apache.spark.sql.execution.datasources.PartitionedFile
+ import org.apache.spark.sql.functions.col
+@@ -38,7 +38,9 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types._
+ import org.apache.spark.util.Utils
+
+-class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
++// For some reason this suite is flaky w/ or w/o Comet when running in Github
workflow.
++// Since it isn't related to Comet, we disable it for now.
++class BinaryFileFormatSuite extends QueryTest with SharedSparkSession with
DisableCometSuite {
+ import BinaryFileFormat._
+
+ private var testDir: String = _
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+index 07e2849ce6f..264fb61db16 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+
+ import org.apache.spark.TestUtils
+ import org.apache.spark.memory.MemoryMode
+-import org.apache.spark.sql.Row
++import org.apache.spark.sql.{DisableComet, Row}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -201,7 +201,8 @@ class ParquetEncodingSuite extends
ParquetCompatibilityTest with SharedSparkSess
+ }
+ }
+
+- test("parquet v2 pages - rle encoding for boolean value columns") {
++ test("parquet v2 pages - rle encoding for boolean value columns",
++ DisableComet("Comet doesn't support RLE encoding yet")) {
+ val extraOptions = Map[String, String](
+ ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
+ )
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+index 9adcb43c838..84c4db4a727 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+@@ -1025,7 +1025,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // When a filter is pushed to Parquet, Parquet can apply it to
every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+- assert(stripSparkFilter(df).count == 1)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ assert(stripSparkFilter(df).count == 1)
++ }
+ }
+ }
+ }
+@@ -1510,7 +1514,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1537,7 +1545,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1673,7 +1685,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
++ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
++ DisableComet("IN predicate is not yet supported in Comet, see issue
#36")) {
+ val schema = StructType(Seq(
+ StructField("a", IntegerType, nullable = false)
+ ))
+@@ -1914,7 +1927,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("Support Parquet column index") {
++ test("Support Parquet column index",
++ DisableComet("Comet doesn't support Parquet column index yet")) {
+ // block 1:
+ // null count min
max
+ // page-0 0 0
99
+@@ -2206,7 +2220,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+ } else {
+ assert(selectedFilters.isEmpty, "There is filter pushed down")
+ }
+@@ -2266,7 +2284,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+
+ case _ =>
+ throw new AnalysisException("Can not match ParquetTable in the
query.")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+index 8670d95c65e..4a16d9f6ff4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") {
++ test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings",
++ DisableComet("Comet doesn't support DELTA encoding yet")) {
+ withAllParquetReaders {
+ checkAnswer(
+ // "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+index 2e7b26126d2..f7368eb026e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+@@ -1029,7 +1029,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ checkAnswer(readParquet(schema, path), df)
+ }
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+ checkAnswer(readParquet(schema1, path), df)
+ val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+@@ -1051,7 +1052,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+index 240bb4e6dcb..c37b92d3691 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+@@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths, StandardCopyOption}
+ import java.sql.{Date, Timestamp}
+
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException,
SparkUpgradeException}
+-import org.apache.spark.sql.{QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
++import org.apache.spark.sql.{DisableCometSuite, QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
+ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy,
ParquetOutputTimestampType}
+@@ -30,9 +30,11 @@ import
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96,
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.tags.SlowSQLTest
+
++// Comet is disabled for this suite because it doesn't support datetime
rebase mode
+ abstract class ParquetRebaseDatetimeSuite
+ extends QueryTest
+ with ParquetTest
++ with DisableCometSuite
+ with SharedSparkSession {
+
+ import testImplicits._
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+index 351c6d698fc..36492fe936d 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader,
ParquetOutputFormat}
+ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+ import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -230,6 +231,12 @@ class ParquetRowIndexSuite extends QueryTest with
SharedSparkSession {
+ case f: FileSourceScanExec =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
++ case b: CometScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
++ case b: CometBatchScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+index 5c0b7def039..151184bc98c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.DataFrame
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.SchemaPruningSuite
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -56,6 +57,7 @@ class ParquetV2SchemaPruningSuite extends
ParquetSchemaPruningSuite {
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: BatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
++ case scan: CometBatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+index bf5c51b89bb..2c7f9701eeb 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+ import org.apache.parquet.schema.Type._
+
+ import org.apache.spark.SparkException
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.ScalaReflection
+ import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.desc
+@@ -1016,7 +1017,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ e
+ }
+
+- test("schema mismatch failure error message for parquet reader") {
++ test("schema mismatch failure error message for parquet reader",
++ DisableComet("Comet doesn't work with vectorizedReaderEnabled =
false")) {
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
+ val expectedMessage = "Encountered error while reading file"
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+index 3a0bd35cb70..ef351a56ec8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug
+ import java.io.ByteArrayOutputStream
+
+ import org.apache.spark.rdd.RDD
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.Attribute
+ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with
DisableAdaptiveExecutionSui
+ | id LongType: {}""".stripMargin))
+ }
+
+- test("SPARK-28537: DebugExec cannot debug columnar related queries") {
++ test("SPARK-28537: DebugExec cannot debug columnar related queries",
++ DisableComet("Comet does not use FileScan")) {
+ withTempPath { workDir =>
+ val workDirPath = workDir.getAbsolutePath
+ val input = spark.range(5).toDF("id")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+index 26e61c6b58d..2a7c96d164a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+@@ -737,7 +737,8 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
+ }
+ }
+
+- test("SPARK-26327: FileSourceScanExec metrics") {
++ test("SPARK-26327: FileSourceScanExec metrics",
++ DisableComet("Spark uses row-based Parquet reader while Comet is
vectorized")) {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+index 0ab8691801d..d9125f658ad 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution.python
+
+ import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+- assert(scanNodes.head.dataFilters.length == 2)
+-
assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
++ val dataFilters = scanNodes.head match {
++ case scan: FileSourceScanExec => scan.dataFilters
++ case scan: CometScanExec => scan.dataFilters
++ }
++ assert(dataFilters.length == 2)
++ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometBatchScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometBatchScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+index d083cac48ff..43057eb251b 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+@@ -37,8 +37,10 @@ import org.apache.spark.sql.streaming.{StreamingQuery,
StreamingQueryException,
+ import org.apache.spark.sql.streaming.util.StreamManualClock
+ import org.apache.spark.util.Utils
+
++// For some reason this suite is flaky w/ or w/o Comet when running in Github
workflow.
++// Since it isn't related to Comet, we disable it for now.
+ class AsyncProgressTrackingMicroBatchExecutionSuite
+- extends StreamTest with BeforeAndAfter with Matchers {
++ extends StreamTest with BeforeAndAfter with Matchers with DisableCometSuite
{
+
+ import testImplicits._
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+index 266bb343526..85ec36db996 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
+ import org.apache.spark.sql.catalyst.expressions
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec,
SparkPlan}
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.BucketingUtils
+@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+- val fileScan = collect(plan) { case f: FileSourceScanExec => f }
++ private def getFileScan(plan: SparkPlan): SparkPlan = {
++ val fileScan = collect(plan) {
++ case f: FileSourceScanExec => f
++ case f: CometScanExec => f
++ }
+ assert(fileScan.nonEmpty, plan)
+ fileScan.head
+ }
+
++ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan)
match {
++ case fs: FileSourceScanExec => fs.bucketedScan
++ case bs: CometScanExec => bs.bucketedScan
++ }
++
+ // To verify if the bucket pruning works, this function checks two
conditions:
+ // 1) Check if the pruned buckets (before filtering) are empty.
+ // 2) Verify the final result is the same as the expected one
+@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ val planWithoutBucketedScan =
bucketedDataFrame.filter(filterCondition)
+ .queryExecution.executedPlan
+ val fileScan = getFileScan(planWithoutBucketedScan)
+- assert(!fileScan.bucketedScan, s"except no bucketed scan but
found\n$fileScan")
++ val bucketedScan = getBucketScan(planWithoutBucketedScan)
++ assert(!bucketedScan, s"except no bucketed scan but
found\n$fileScan")
+
+ val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
+ val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
+@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+
+ // check existence of shuffle
+ assert(
+- joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleLeft,
++ joinOperator.left.find { op =>
++ op.isInstanceOf[SortExec] ||
++ (op.isInstanceOf[CometExec] &&
++
op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined)
++ }.isDefined == sortLeft,
Review Comment:
hmmm, I don't think you fixed this part.
ShuffleExchangeExec in left, SortExec in the right
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]