This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 8f8a0d92 build: bump spark version to 3.4.3 (#292)
8f8a0d92 is described below
commit 8f8a0d927daf146015bee3306342e1844e1b4758
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue May 28 23:17:57 2024 -0700
build: bump spark version to 3.4.3 (#292)
* build: bump spark version to 3.4.3
* add 3.4.4 diff
* fix 3.4.3.diff
---------
Co-authored-by: Huaxin Gao <[email protected]>
---
.github/actions/setup-spark-builder/action.yaml | 4 +-
.github/workflows/spark_sql_test.yml | 2 +-
dev/diffs/3.4.3.diff | 2564 +++++++++++++++++++++++
pom.xml | 2 +-
4 files changed, 2568 insertions(+), 4 deletions(-)
diff --git a/.github/actions/setup-spark-builder/action.yaml
b/.github/actions/setup-spark-builder/action.yaml
index 9293dee9..10cdbff5 100644
--- a/.github/actions/setup-spark-builder/action.yaml
+++ b/.github/actions/setup-spark-builder/action.yaml
@@ -23,9 +23,9 @@ inputs:
required: true
default: '3.4'
spark-version:
- description: 'The Apache Spark version (e.g., 3.4.2) to build'
+ description: 'The Apache Spark version (e.g., 3.4.3) to build'
required: true
- default: '3.4.2'
+ default: '3.4.3'
comet-version:
description: 'The Comet version to use for Spark'
required: true
diff --git a/.github/workflows/spark_sql_test.yml
b/.github/workflows/spark_sql_test.yml
index 94958b46..997136de 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java-version: [11]
- spark-version: [{short: '3.4', full: '3.4.2'}]
+ spark-version: [{short: '3.4', full: '3.4.3'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l
org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
new file mode 100644
index 00000000..ae46e916
--- /dev/null
+++ b/dev/diffs/3.4.3.diff
@@ -0,0 +1,2564 @@
+diff --git a/pom.xml b/pom.xml
+index d3544881af1..47382e29b5a 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -148,6 +148,8 @@
+ <chill.version>0.10.0</chill.version>
+ <ivy.version>2.5.1</ivy.version>
+ <oro.version>2.0.8</oro.version>
++ <spark.version.short>3.4</spark.version.short>
++ <comet.version>0.1.0-SNAPSHOT</comet.version>
+ <!--
+ If you changes codahale.metrics.version, you also need to change
+ the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2784,6 +2786,25 @@
+ <artifactId>arpack</artifactId>
+ <version>${netlib.ludovic.dev.version}</version>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ <version>${comet.version}</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-sql_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-core_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index b386d135da1..854aec17c2d 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -77,6 +77,10 @@
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ </dependency>
+
+ <!--
+ This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+index c595b50950b..6b60213e775 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+@@ -102,7 +102,7 @@ class SparkSession private(
+ sc: SparkContext,
+ initialSessionOptions: java.util.HashMap[String, String]) = {
+ this(sc, None, None,
+- SparkSession.applyExtensions(
++ SparkSession.applyExtensions(sc,
+
sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ new SparkSessionExtensions), initialSessionOptions.asScala.toMap)
+ }
+@@ -1028,7 +1028,7 @@ object SparkSession extends Logging {
+ }
+
+ loadExtensions(extensions)
+- applyExtensions(
++ applyExtensions(sparkContext,
+
sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ extensions)
+
+@@ -1282,14 +1282,24 @@ object SparkSession extends Logging {
+ }
+ }
+
++ private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++ if (sparkContext.getConf.getBoolean("spark.comet.enabled", false)) {
++ Seq("org.apache.comet.CometSparkSessionExtensions")
++ } else {
++ Seq.empty
++ }
++ }
++
+ /**
+ * Initialize extensions for given extension classnames. The classes will
be applied to the
+ * extensions passed into this function.
+ */
+ private def applyExtensions(
++ sparkContext: SparkContext,
+ extensionConfClassNames: Seq[String],
+ extensions: SparkSessionExtensions): SparkSessionExtensions = {
+- extensionConfClassNames.foreach { extensionConfClassName =>
++ val extensionClassNames = extensionConfClassNames ++
loadCometExtension(sparkContext)
++ extensionClassNames.foreach { extensionConfClassName =>
+ try {
+ val extensionConfClass = Utils.classForName(extensionConfClassName)
+ val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index db587dd9868..aac7295a53d 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.annotation.DeveloperApi
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+ // dump the file scan metadata (e.g file path) to event log
+ val metadata = plan match {
+ case fileScan: FileSourceScanExec => fileScan.metadata
++ case cometScan: CometScanExec => cometScan.metadata
+ case _ => Map[String, String]()
+ }
+ new SparkPlanInfo(
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 698ca009b4f..57d774a3617 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ -- Test tables
+ CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+index 1152d77da0c..f77493f690b 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+@@ -7,6 +7,9 @@
+
+ -- avoid bit-exact output here because operations may not be bit-exact.
+ -- SET extra_float_digits = 0;
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..44cd244d3b0 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -5,6 +5,9 @@
+ -- AGGREGATES [Part 3]
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..2b73732c33f 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- INT8
+ -- Test int8 64-bit integers.
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..423d3b3d76d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- SELECT_HAVING
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+index cf40e944c09..bdd5be4f462 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
+ import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, RDDScanExec, SparkPlan}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.columnar._
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -516,7 +516,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
+ */
+ private def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
+ assert(
+- collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec
=> e }.size == expected)
++ collect(df.queryExecution.executedPlan) {
++ case _: ShuffleExchangeLike => 1 }.size == expected)
+ }
+
+ test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+index 1cc09c3d7fc..f031fa45c33 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+@@ -27,7 +27,7 @@ import org.apache.spark.SparkException
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -755,7 +755,7 @@ class DataFrameAggregateSuite extends QueryTest
+ assert(objHashAggPlans.nonEmpty)
+
+ val exchangePlans = collect(aggPlan) {
+- case shuffle: ShuffleExchangeExec => shuffle
++ case shuffle: ShuffleExchangeLike => shuffle
+ }
+ assert(exchangePlans.length == 1)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index 56e9520fdab..917932336df 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+
+ withTempDatabase { dbName =>
+ withTable(table1Name, table2Name) {
+- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++ withSQLConf(
++ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++ "spark.comet.enabled" -> "false") {
+ spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+ spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+index a9f69ab28a1..4056f14c893 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+@@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.FakeV2Provider
+ import org.apache.spark.sql.execution.{FilterExec, LogicalRDD,
QueryExecution, SortExec, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.expressions.{Aggregator, Window}
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -1981,7 +1981,7 @@ class DataFrameSuite extends QueryTest
+ fail("Should not have back to back Aggregates")
+ }
+ atFirstAgg = true
+- case e: ShuffleExchangeExec => atFirstAgg = false
++ case e: ShuffleExchangeLike => atFirstAgg = false
+ case _ =>
+ }
+ }
+@@ -2305,7 +2305,7 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join, df)
+ assert(
+ collect(join.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size === 1)
++ case _: ShuffleExchangeLike => true }.size === 1)
+ assert(
+ collect(join.queryExecution.executedPlan) { case e:
ReusedExchangeExec => true }.size === 1)
+ val broadcasted = broadcast(join)
+@@ -2313,7 +2313,7 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join2, df)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size == 1)
++ case _: ShuffleExchangeLike => true }.size == 1)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+ case e: BroadcastExchangeExec => true }.size === 1)
+@@ -2876,7 +2876,7 @@ class DataFrameSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+@@ -3325,7 +3325,8 @@ class DataFrameSuite extends QueryTest
+ assert(df2.isLocal)
+ }
+
+- test("SPARK-35886: PromotePrecision should be subexpr replaced") {
++ test("SPARK-35886: PromotePrecision should be subexpr replaced",
++ IgnoreComet("TODO: fix Comet for this test")) {
+ withTable("tbl") {
+ sql(
+ """
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+index daef11ae4d6..9f3cc9181f2 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{LeftAnti,
LeftSemi}
+ import org.apache.spark.sql.catalyst.util.sideBySide
+ import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.streaming.MemoryStream
+ import org.apache.spark.sql.expressions.UserDefinedFunction
+ import org.apache.spark.sql.functions._
+@@ -2254,7 +2254,7 @@ class DatasetSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index f33432ddb6f..060f874ea72 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case _ => Nil
+ }
+ }
+@@ -1187,7 +1191,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("Make sure dynamic pruning works on uncorrelated queries") {
++ test("Make sure dynamic pruning works on uncorrelated queries",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """
+@@ -1238,7 +1243,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("Plan broadcast pruning only when the broadcast can be reused") {
++ test("Plan broadcast pruning only when the broadcast can be reused",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
+ Given("dynamic pruning filter on the build side")
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+@@ -1485,7 +1491,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("SPARK-38148: Do not add dynamic partition pruning if there exists
static partition " +
+- "pruning") {
++ "pruning", IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet:
#242")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+ Seq(
+ "f.store_id = 1" -> false,
+@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ case s: BatchScanExec =>
+ // we use f1 col for v2 tables due to schema pruning
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
++ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
+ case _ => false
+ }
+ assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index a6b295578d6..91acca4306f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("Explain formatted output for scan operator for datasource V2") {
++ test("Explain formatted output for scan operator for datasource V2",
++ IgnoreComet("Comet explain output is different")) {
+ withTempDir { dir =>
+ Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+ val basePath = dir.getCanonicalPath + "/" + fmt
+@@ -541,7 +542,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+-class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite {
++// Ignored when Comet is enabled. Comet changes expected query plans.
++class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite
++ with IgnoreCometSuite {
+ import testImplicits._
+
+ test("SPARK-35884: Explain Formatted") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 2796b1cf154..be7078b38f4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT,
NullData, NullUDT}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GreaterThan, Literal}
+ import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec,
CometSortMergeJoinExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ assert(bJoinExec.isEmpty)
+ val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
+ case smJoin: SortMergeJoinExec => smJoin
++ case smJoin: CometSortMergeJoinExec => smJoin
+ }
+ assert(smJoinExec.nonEmpty)
+ }
+@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1100,6 +1104,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ val filters = df.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f.dataFilters
+ case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++ case b: CometScanExec => b.dataFilters
++ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
+ }.flatten
+ assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+new file mode 100644
+index 00000000000..4b31bea33de
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+@@ -0,0 +1,42 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++/**
++ * Tests with this tag will be ignored when Comet is enabled (e.g., via
`ENABLE_COMET`).
++ */
++case class IgnoreComet(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default
config values.
++ */
++trait IgnoreCometSuite extends SQLTestUtils {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
++ (implicit pos: Position): Unit = {
++ if (isCometEnabled) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
++ }
++}
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+index 1792b4c32eb..1616e6f39bd 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft,
BuildRight, BuildSide
+ import org.apache.spark.sql.catalyst.plans.PlanTest
+ import org.apache.spark.sql.catalyst.plans.logical._
+ import org.apache.spark.sql.catalyst.rules.RuleExecutor
++import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortMergeJoinExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.joins._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -362,6 +363,7 @@ class JoinHintSuite extends PlanTest with
SharedSparkSession with AdaptiveSparkP
+ val executedPlan = df.queryExecution.executedPlan
+ val shuffleHashJoins = collect(executedPlan) {
+ case s: ShuffledHashJoinExec => s
++ case c: CometHashJoinExec =>
c.originalPlan.asInstanceOf[ShuffledHashJoinExec]
+ }
+ assert(shuffleHashJoins.size == 1)
+ assert(shuffleHashJoins.head.buildSide == buildSide)
+@@ -371,6 +373,7 @@ class JoinHintSuite extends PlanTest with
SharedSparkSession with AdaptiveSparkP
+ val executedPlan = df.queryExecution.executedPlan
+ val shuffleMergeJoins = collect(executedPlan) {
+ case s: SortMergeJoinExec => s
++ case c: CometSortMergeJoinExec => c
+ }
+ assert(shuffleMergeJoins.size == 1)
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+index 7f062bfb899..400e939468f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow,
SortOrder}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec,
ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
+@@ -740,7 +741,8 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
+ }
+ }
+
+- test("test SortMergeJoin (with spill)") {
++ test("test SortMergeJoin (with spill)",
++ IgnoreComet("TODO: Comet SMJ doesn't support spill yet")) {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
+@@ -1115,9 +1117,11 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
+ .groupBy($"k1").count()
+ .queryExecution.executedPlan
+- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size ===
1)
++ assert(collect(plan) {
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // No extra shuffle before aggregate
+- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true }.size === 2)
+ })
+ }
+
+@@ -1134,10 +1138,11 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
+ .queryExecution
+ .executedPlan
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 2)
+ assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size ===
1)
+ // No extra sort before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 3)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 3)
+ })
+
+ // Test shuffled hash join
+@@ -1147,10 +1152,13 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
+ .queryExecution
+ .executedPlan
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
+- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size ===
1)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 2)
++ assert(collect(plan) {
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // No extra sort before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 3)
++ assert(collect(plan) {
++ case _: SortExec | _: CometSortExec => true }.size === 3)
+ })
+ }
+
+@@ -1241,12 +1249,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ inputDFs.foreach { case (df1, df2, joinExprs) =>
+ val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full")
+ assert(collect(smjDF.queryExecution.executedPlan) {
+- case _: SortMergeJoinExec => true }.size === 1)
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 1)
+ val smjResult = smjDF.collect()
+
+ val shjDF = df1.join(df2.hint("SHUFFLE_HASH"), joinExprs, "full")
+ assert(collect(shjDF.queryExecution.executedPlan) {
+- case _: ShuffledHashJoinExec => true }.size === 1)
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // Same result between shuffled hash join and sort merge join
+ checkAnswer(shjDF, smjResult)
+ }
+@@ -1341,7 +1349,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+ // Have shuffle before aggregation
+- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size
=== 1)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true }.size === 1)
+ }
+
+ def getJoinQuery(selectExpr: String, joinType: String): String = {
+@@ -1370,9 +1379,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ }
+ val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
++ }.size === 3)
+ // No extra sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 5)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 5)
+ }
+
+ // Test output ordering is not preserved
+@@ -1381,9 +1393,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
+ val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
++ }.size === 3)
+ // Have sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 6)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 6)
+ }
+
+ // Test singe partition
+@@ -1393,7 +1408,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
+ |""".stripMargin)
+ val plan = fullJoinDF.queryExecution.executedPlan
+- assert(collect(plan) { case _: ShuffleExchangeExec => true}.size == 1)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true}.size == 1)
+ checkAnswer(fullJoinDF, Row(100))
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+index b5b34922694..a72403780c4 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+@@ -69,7 +69,7 @@ import org.apache.spark.tags.ExtendedSQLTest
+ * }}}
+ */
+ // scalastyle:on line.size.limit
+-trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with
IgnoreCometSuite {
+
+ protected val baseResourcePath = {
+ // use the same way as `SQLQueryTestSuite` to get the resource path
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+index 525d97e4998..8a3e7457618 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
+ checkAnswer(sql("select -0.001"), Row(BigDecimal("-0.001")))
+ }
+
+- test("external sorting updates peak execution memory") {
++ test("external sorting updates peak execution memory",
++ IgnoreComet("TODO: native CometSort does not update peak execution
memory")) {
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external
sort") {
+ sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+index 75eabcb96f2..36e3318ad7e 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+@@ -21,10 +21,11 @@ import scala.collection.mutable.ArrayBuffer
+
+ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join,
LogicalPlan, Project, Sort, Union}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.FileScanRDD
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
++ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++ fs.inputRDDs().forall(
++ _.asInstanceOf[FileScanRDD].filePartitions.forall(
++ _.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case _ => false
+ })
+ }
+@@ -2108,7 +2115,7 @@ class SubquerySuite extends QueryTest
+
+ df.collect()
+ val exchanges = collect(df.queryExecution.executedPlan) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(exchanges.size === 1)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+index 02990a7a40d..bddf5e1ccc2 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+@@ -24,6 +24,7 @@ import test.org.apache.spark.sql.connector._
+
+ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.InternalRow
++import org.apache.spark.sql.comet.CometSortExec
+ import org.apache.spark.sql.connector.catalog.{PartitionInternalRow,
SupportsRead, Table, TableCapability, TableProvider}
+ import org.apache.spark.sql.connector.catalog.TableCapability._
+ import org.apache.spark.sql.connector.expressions.{Expression,
FieldReference, Literal, NamedReference, NullOrdering, SortDirection,
SortOrder, Transform}
+@@ -33,7 +34,7 @@ import
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning,
+ import org.apache.spark.sql.execution.SortExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, DataSourceV2ScanRelation}
+-import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{Exchange,
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+@@ -268,13 +269,13 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+ val groupByColJ = df.groupBy($"j").agg(sum($"i"))
+ checkAnswer(groupByColJ, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
+ assert(collectFirst(groupByColJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined)
+
+ val groupByIPlusJ = df.groupBy($"i" + $"j").agg(count("*"))
+ checkAnswer(groupByIPlusJ, Seq(Row(5, 2), Row(6, 2), Row(8, 1),
Row(9, 1)))
+ assert(collectFirst(groupByIPlusJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined)
+ }
+ }
+@@ -334,10 +335,11 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+
+ val (shuffleExpected, sortExpected) = groupByExpects
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined === shuffleExpected)
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+ case e: SortExec => e
++ case c: CometSortExec => c
+ }.isDefined === sortExpected)
+ }
+
+@@ -352,10 +354,11 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+
+ val (shuffleExpected, sortExpected) = windowFuncExpects
+
assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined === shuffleExpected)
+
assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+ case e: SortExec => e
++ case c: CometSortExec => c
+ }.isDefined === sortExpected)
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+index cfc8b2cc845..c6fcfd7bd08 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
+ import org.apache.spark.sql.connector.read.ScanBuilder
+ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+ val df = spark.read.format(format).load(path.getCanonicalPath)
+ checkAnswer(df, inputData.toDF())
+ assert(
+-
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
++ df.queryExecution.executedPlan.exists {
++ case _: FileSourceScanExec | _: CometScanExec => true
++ case _ => false
++ }
++ )
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+index cf76f6ca32c..f454128af06 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+@@ -22,6 +22,7 @@ import org.apache.spark.sql.{DataFrame, Row}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal,
TransformExpression}
+ import org.apache.spark.sql.catalyst.plans.physical
++import org.apache.spark.sql.comet.CometSortMergeJoinExec
+ import org.apache.spark.sql.connector.catalog.Identifier
+ import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog
+ import org.apache.spark.sql.connector.catalog.functions._
+@@ -31,7 +32,7 @@ import
org.apache.spark.sql.connector.expressions.Expressions._
+ import org.apache.spark.sql.execution.SparkPlan
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.SQLConf._
+@@ -279,13 +280,14 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
+ Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
+ }
+
+- private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
++ private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeLike] = {
+ // here we skip collecting shuffle operators that are not associated with
SMJ
+ collect(plan) {
+ case s: SortMergeJoinExec => s
++ case c: CometSortMergeJoinExec => c.originalPlan
+ }.flatMap(smj =>
+ collect(smj) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ })
+ }
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+index c0ec8a58bd5..4e8bc6ed3c5 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.FsPermission
+ import org.mockito.Mockito.{mock, spy, when}
+
+ import org.apache.spark._
+-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset,
QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset,
IgnoreComet, QueryTest, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.util.BadRecordException
+ import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry,
JDBCOptions}
+@@ -248,7 +248,8 @@ class QueryExecutionErrorsSuite
+ }
+
+ test("INCONSISTENT_BEHAVIOR_CROSS_VERSION: " +
+- "compatibility with Spark 2.4/3.2 in reading/writing dates") {
++ "compatibility with Spark 2.4/3.2 in reading/writing dates",
++ IgnoreComet("Comet doesn't completely support datetime rebase mode yet"))
{
+
+ // Fail to read ancient datetime values.
+ withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key ->
EXCEPTION.toString) {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+index 418ca3430bb..eb8267192f8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+@@ -23,7 +23,7 @@ import scala.util.Random
+ import org.apache.hadoop.fs.Path
+
+ import org.apache.spark.SparkConf
+-import org.apache.spark.sql.{DataFrame, QueryTest}
++import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
+ import org.apache.spark.sql.internal.SQLConf
+@@ -195,7 +195,7 @@ class DataSourceV2ScanExecRedactionSuite extends
DataSourceScanRedactionTest {
+ }
+ }
+
+- test("FileScan description") {
++ test("FileScan description", IgnoreComet("Comet doesn't use BatchScan")) {
+ Seq("json", "orc", "parquet").foreach { format =>
+ withTempPath { path =>
+ val dir = path.getCanonicalPath
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+index 743ec41dbe7..9f30d6c8e04 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+@@ -53,6 +53,10 @@ class LogicalPlanTagInSparkPlanSuite extends
TPCDSQuerySuite with DisableAdaptiv
+ case ColumnarToRowExec(i: InputAdapter) => isScanPlanTree(i.child)
+ case p: ProjectExec => isScanPlanTree(p.child)
+ case f: FilterExec => isScanPlanTree(f.child)
++ // Comet produces scan plan tree like:
++ // ColumnarToRow
++ // +- ReusedExchange
++ case _: ReusedExchangeExec => false
+ case _: LeafExecNode => true
+ case _ => false
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+index 4b3d3a4b805..56e1e0e6f16 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.rdd.RDD
+-import org.apache.spark.sql.{execution, DataFrame, Row}
++import org.apache.spark.sql.{execution, DataFrame, IgnoreCometSuite, Row}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.plans._
+@@ -35,7 +35,9 @@ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types._
+
+-class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
++// Ignore this suite when Comet is enabled. This suite tests the Spark
planner and Comet planner
++// comes out with too many difference. Simply ignoring this suite for now.
++class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper
with IgnoreCometSuite {
+ import testImplicits._
+
+ setupTestData()
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+index 9e9d717db3b..91a4f9a38d5 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.comet.CometProjectExec
+ import org.apache.spark.sql.connector.SimpleWritableDataSource
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
+ private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
+ withClue(df.queryExecution) {
+ val plan = df.queryExecution.executedPlan
+- val actual = collectWithSubqueries(plan) { case p: ProjectExec => p
}.size
++ val actual = collectWithSubqueries(plan) {
++ case p: ProjectExec => p
++ case p: CometProjectExec => p
++ }.size
+ assert(actual == expected)
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
+index 30ce940b032..0d3f6c6c934 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{DataFrame, QueryTest}
+ import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning,
UnknownPartitioning}
++import org.apache.spark.sql.comet.CometSortExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.execution.joins.ShuffledJoin
+ import org.apache.spark.sql.internal.SQLConf
+@@ -33,7 +34,7 @@ abstract class RemoveRedundantSortsSuiteBase
+
+ private def checkNumSorts(df: DataFrame, count: Int): Unit = {
+ val plan = df.queryExecution.executedPlan
+- assert(collectWithSubqueries(plan) { case s: SortExec => s }.length ==
count)
++ assert(collectWithSubqueries(plan) { case _: SortExec | _: CometSortExec
=> 1 }.length == count)
+ }
+
+ private def checkSorts(query: String, enabledCount: Int, disabledCount:
Int): Unit = {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReplaceHashWithSortAggSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReplaceHashWithSortAggSuite.scala
+index 47679ed7865..9ffbaecb98e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReplaceHashWithSortAggSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReplaceHashWithSortAggSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{DataFrame, QueryTest}
++import org.apache.spark.sql.comet.CometHashAggregateExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -31,7 +32,7 @@ abstract class ReplaceHashWithSortAggSuiteBase
+ private def checkNumAggs(df: DataFrame, hashAggCount: Int, sortAggCount:
Int): Unit = {
+ val plan = df.queryExecution.executedPlan
+ assert(collectWithSubqueries(plan) {
+- case s @ (_: HashAggregateExec | _: ObjectHashAggregateExec) => s
++ case s @ (_: HashAggregateExec | _: ObjectHashAggregateExec | _:
CometHashAggregateExec ) => s
+ }.length == hashAggCount)
+ assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+index ac710c32296..e163c1a6a76 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
++import org.apache.spark.sql.comet.CometSortMergeJoinExec
+ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -224,6 +225,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ assert(twoJoinsDF.queryExecution.executedPlan.collect {
+ case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
+ case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
+ }.size === 2)
+ checkAnswer(twoJoinsDF,
+ Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null),
Row(4, 4, null),
+@@ -258,6 +260,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
+ assert(twoJoinsDF.queryExecution.executedPlan.collect {
+ case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
++ case _: CometSortMergeJoinExec => true
+ }.size === 2)
+ checkAnswer(twoJoinsDF,
+ Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4,
null, 4), Row(5, null, 5),
+@@ -280,8 +283,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2",
"left_semi")
+ .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
+ assert(twoJoinsDF.queryExecution.executedPlan.collect {
+- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
+- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
++ case _: SortMergeJoinExec => true
+ }.size === 2)
+ checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
+ }
+@@ -302,8 +304,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
+ .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
+ assert(twoJoinsDF.queryExecution.executedPlan.collect {
+- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
+- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
++ case _: SortMergeJoinExec => true
+ }.size === 2)
+ checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
+ }
+@@ -436,7 +437,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ val plan = df.queryExecution.executedPlan
+ assert(plan.exists(p =>
+ p.isInstanceOf[WholeStageCodegenExec] &&
+- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
++ p.asInstanceOf[WholeStageCodegenExec].collect {
++ case _: SortExec => true
++ }.nonEmpty))
+ assert(df.collect() === Array(Row(1), Row(2), Row(3)))
+ }
+
+@@ -616,7 +619,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ .write.mode(SaveMode.Overwrite).parquet(path)
+
+ withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
+- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
++ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
++ // Disable Comet native execution because this checks wholestage
codegen.
++ "spark.comet.exec.enabled" -> "false") {
+ val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
+ val df = spark.read.parquet(path).selectExpr(projection: _*)
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+index 593bd7bb4ba..7ad55e3ab20 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+@@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._
+
+ import org.apache.spark.SparkException
+ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobStart}
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
++import org.apache.spark.sql.{Dataset, IgnoreComet, QueryTest, Row,
SparkSession, Strategy}
+ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
++import org.apache.spark.sql.comet._
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec,
ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
+ import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
+ import org.apache.spark.sql.execution.command.DataWritingCommandExec
+@@ -104,6 +106,7 @@ class AdaptiveQueryExecSuite
+ private def findTopLevelBroadcastHashJoin(plan: SparkPlan):
Seq[BroadcastHashJoinExec] = {
+ collect(plan) {
+ case j: BroadcastHashJoinExec => j
++ case j: CometBroadcastHashJoinExec =>
j.originalPlan.asInstanceOf[BroadcastHashJoinExec]
+ }
+ }
+
+@@ -116,30 +119,38 @@ class AdaptiveQueryExecSuite
+ private def findTopLevelSortMergeJoin(plan: SparkPlan):
Seq[SortMergeJoinExec] = {
+ collect(plan) {
+ case j: SortMergeJoinExec => j
++ case j: CometSortMergeJoinExec =>
++ assert(j.originalPlan.isInstanceOf[SortMergeJoinExec])
++ j.originalPlan.asInstanceOf[SortMergeJoinExec]
+ }
+ }
+
+ private def findTopLevelShuffledHashJoin(plan: SparkPlan):
Seq[ShuffledHashJoinExec] = {
+ collect(plan) {
+ case j: ShuffledHashJoinExec => j
++ case j: CometHashJoinExec =>
j.originalPlan.asInstanceOf[ShuffledHashJoinExec]
+ }
+ }
+
+ private def findTopLevelBaseJoin(plan: SparkPlan): Seq[BaseJoinExec] = {
+ collect(plan) {
+ case j: BaseJoinExec => j
++ case c: CometHashJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec]
++ case c: CometSortMergeJoinExec =>
c.originalPlan.asInstanceOf[BaseJoinExec]
+ }
+ }
+
+ private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = {
+ collect(plan) {
+ case s: SortExec => s
++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec]
+ }
+ }
+
+ private def findTopLevelAggregate(plan: SparkPlan): Seq[BaseAggregateExec]
= {
+ collect(plan) {
+ case agg: BaseAggregateExec => agg
++ case agg: CometHashAggregateExec =>
agg.originalPlan.asInstanceOf[BaseAggregateExec]
+ }
+ }
+
+@@ -176,6 +187,7 @@ class AdaptiveQueryExecSuite
+ val parts = rdd.partitions
+ assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
+ }
++
+ assert(numShuffles === (numLocalReads.length +
numShufflesWithoutLocalRead))
+ }
+
+@@ -184,7 +196,7 @@ class AdaptiveQueryExecSuite
+ val plan = df.queryExecution.executedPlan
+ assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
+ val shuffle =
plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(shuffle.size == 1)
+ assert(shuffle(0).outputPartitioning.numPartitions == numPartition)
+@@ -200,7 +212,8 @@ class AdaptiveQueryExecSuite
+ assert(smj.size == 1)
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+- checkNumLocalShuffleReads(adaptivePlan)
++ // Comet shuffle changes shuffle metrics
++ // checkNumLocalShuffleReads(adaptivePlan)
+ }
+ }
+
+@@ -227,7 +240,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
++ test("Reuse the parallelism of coalesced shuffle in local shuffle read",
++ IgnoreComet("Comet shuffle changes shuffle partition size")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
+@@ -259,7 +273,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Reuse the default parallelism in local shuffle read") {
++ test("Reuse the default parallelism in local shuffle read",
++ IgnoreComet("Comet shuffle changes shuffle partition size")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
+@@ -273,7 +288,8 @@ class AdaptiveQueryExecSuite
+ val localReads = collect(adaptivePlan) {
+ case read: AQEShuffleReadExec if read.isLocalRead => read
+ }
+- assert(localReads.length == 2)
++ // Comet shuffle changes shuffle metrics
++ assert(localReads.length == 1)
+ val localShuffleRDD0 =
localReads(0).execute().asInstanceOf[ShuffledRowRDD]
+ val localShuffleRDD1 =
localReads(1).execute().asInstanceOf[ShuffledRowRDD]
+ // the final parallelism is math.max(1, numReduces / numMappers):
math.max(1, 5/2) = 2
+@@ -322,7 +338,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Scalar subquery") {
++ test("Scalar subquery", IgnoreComet("Comet shuffle changes shuffle
metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+@@ -337,7 +353,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Scalar subquery in later stages") {
++ test("Scalar subquery in later stages", IgnoreComet("Comet shuffle changes
shuffle metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+@@ -353,7 +369,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("multiple joins") {
++ test("multiple joins", IgnoreComet("Comet shuffle changes shuffle
metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+@@ -398,7 +414,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("multiple joins with aggregate") {
++ test("multiple joins with aggregate", IgnoreComet("Comet shuffle changes
shuffle metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+@@ -443,7 +459,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("multiple joins with aggregate 2") {
++ test("multiple joins with aggregate 2", IgnoreComet("Comet shuffle changes
shuffle metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
+@@ -508,7 +524,7 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Exchange reuse with subqueries") {
++ test("Exchange reuse with subqueries", IgnoreComet("Comet shuffle changes
shuffle metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+@@ -539,7 +555,9 @@ class AdaptiveQueryExecSuite
+ assert(smj.size == 1)
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+- checkNumLocalShuffleReads(adaptivePlan)
++ // Comet shuffle changes shuffle metrics,
++ // so we can't check the number of local shuffle reads.
++ // checkNumLocalShuffleReads(adaptivePlan)
+ // Even with local shuffle read, the query stage reuse can also work.
+ val ex = findReusedExchange(adaptivePlan)
+ assert(ex.nonEmpty)
+@@ -560,7 +578,9 @@ class AdaptiveQueryExecSuite
+ assert(smj.size == 1)
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+- checkNumLocalShuffleReads(adaptivePlan)
++ // Comet shuffle changes shuffle metrics,
++ // so we can't check the number of local shuffle reads.
++ // checkNumLocalShuffleReads(adaptivePlan)
+ // Even with local shuffle read, the query stage reuse can also work.
+ val ex = findReusedExchange(adaptivePlan)
+ assert(ex.isEmpty)
+@@ -569,7 +589,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("Broadcast exchange reuse across subqueries") {
++ test("Broadcast exchange reuse across subqueries",
++ IgnoreComet("Comet shuffle changes shuffle metrics")) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000",
+@@ -664,7 +685,8 @@ class AdaptiveQueryExecSuite
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+ // There is still a SMJ, and its two shuffles can't apply local read.
+- checkNumLocalShuffleReads(adaptivePlan, 2)
++ // Comet shuffle changes shuffle metrics
++ // checkNumLocalShuffleReads(adaptivePlan, 2)
+ }
+ }
+
+@@ -786,7 +808,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-29544: adaptive skew join with different join types") {
++ test("SPARK-29544: adaptive skew join with different join types",
++ IgnoreComet("Comet shuffle has different partition metrics")) {
+ Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+ def getJoinNode(plan: SparkPlan): Seq[ShuffledJoin] = if (joinHint ==
"SHUFFLE_MERGE") {
+ findTopLevelSortMergeJoin(plan)
+@@ -1004,7 +1027,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("metrics of the shuffle read") {
++ test("metrics of the shuffle read",
++ IgnoreComet("Comet shuffle changes the metrics")) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+ val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT key FROM testData GROUP BY key")
+@@ -1599,7 +1623,7 @@ class AdaptiveQueryExecSuite
+ val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
+ assert(collect(adaptivePlan) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }.length == 1)
+ }
+ }
+@@ -1679,7 +1703,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-33551: Do not use AQE shuffle read for repartition") {
++ test("SPARK-33551: Do not use AQE shuffle read for repartition",
++ IgnoreComet("Comet shuffle changes partition size")) {
+ def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case s: ShuffleExchangeLike =>
+@@ -1864,6 +1889,9 @@ class AdaptiveQueryExecSuite
+ def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin):
Unit = {
+ assert(collect(ds.queryExecution.executedPlan) {
+ case s: ShuffleExchangeExec if s.shuffleOrigin == origin &&
s.numPartitions == 2 => s
++ case c: CometShuffleExchangeExec
++ if c.originalPlan.shuffleOrigin == origin &&
++ c.originalPlan.numPartitions == 2 => c
+ }.size == 1)
+ ds.collect()
+ val plan = ds.queryExecution.executedPlan
+@@ -1872,6 +1900,9 @@ class AdaptiveQueryExecSuite
+ }.isEmpty)
+ assert(collect(plan) {
+ case s: ShuffleExchangeExec if s.shuffleOrigin == origin &&
s.numPartitions == 2 => s
++ case c: CometShuffleExchangeExec
++ if c.originalPlan.shuffleOrigin == origin &&
++ c.originalPlan.numPartitions == 2 => c
+ }.size == 1)
+ checkAnswer(ds, testData)
+ }
+@@ -2028,7 +2059,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-35264: Support AQE side shuffled hash join formula") {
++ test("SPARK-35264: Support AQE side shuffled hash join formula",
++ IgnoreComet("Comet shuffle changes the partition size")) {
+ withTempView("t1", "t2") {
+ def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = {
+ Seq("100", "100000").foreach { size =>
+@@ -2114,7 +2146,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-35725: Support optimize skewed partitions in
RebalancePartitions") {
++ test("SPARK-35725: Support optimize skewed partitions in
RebalancePartitions",
++ IgnoreComet("Comet shuffle changes shuffle metrics")) {
+ withTempView("v") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+@@ -2213,7 +2246,7 @@ class AdaptiveQueryExecSuite
+ runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM
skewData1 " +
+ s"JOIN skewData2 ON key1 = key2 GROUP BY key1")
+ val shuffles1 = collect(adaptive1) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(shuffles1.size == 3)
+ // shuffles1.head is the top-level shuffle under the Aggregate
operator
+@@ -2226,7 +2259,7 @@ class AdaptiveQueryExecSuite
+ runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM
skewData1 " +
+ s"JOIN skewData2 ON key1 = key2")
+ val shuffles2 = collect(adaptive2) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ if (hasRequiredDistribution) {
+ assert(shuffles2.size == 3)
+@@ -2260,7 +2293,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-35794: Allow custom plugin for cost evaluator") {
++ test("SPARK-35794: Allow custom plugin for cost evaluator",
++ IgnoreComet("Comet shuffle changes shuffle metrics")) {
+ CostEvaluator.instantiate(
+ classOf[SimpleShuffleSortCostEvaluator].getCanonicalName,
spark.sparkContext.getConf)
+ intercept[IllegalArgumentException] {
+@@ -2404,6 +2438,7 @@ class AdaptiveQueryExecSuite
+ val (_, adaptive) = runAdaptiveAndVerifyResult(query)
+ assert(adaptive.collect {
+ case sort: SortExec => sort
++ case sort: CometSortExec => sort
+ }.size == 1)
+ val read = collect(adaptive) {
+ case read: AQEShuffleReadExec => read
+@@ -2421,7 +2456,8 @@ class AdaptiveQueryExecSuite
+ }
+ }
+
+- test("SPARK-37357: Add small partition factor for rebalance partitions") {
++ test("SPARK-37357: Add small partition factor for rebalance partitions",
++ IgnoreComet("Comet shuffle changes shuffle metrics")) {
+ withTempView("v") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key
-> "true",
+@@ -2533,7 +2569,7 @@ class AdaptiveQueryExecSuite
+ runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN
skewData2 ON key1 = key2 " +
+ "JOIN skewData3 ON value2 = value3")
+ val shuffles1 = collect(adaptive1) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(shuffles1.size == 4)
+ val smj1 = findTopLevelSortMergeJoin(adaptive1)
+@@ -2544,7 +2580,7 @@ class AdaptiveQueryExecSuite
+ runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN
skewData2 ON key1 = key2 " +
+ "JOIN skewData3 ON value1 = value3")
+ val shuffles2 = collect(adaptive2) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(shuffles2.size == 4)
+ val smj2 = findTopLevelSortMergeJoin(adaptive2)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+index bd9c79e5b96..ab7584e768e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.SchemaPruningTest
+ import org.apache.spark.sql.catalyst.expressions.Concat
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+ import org.apache.spark.sql.catalyst.plans.logical.Expand
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.functions._
+@@ -867,6 +868,7 @@ abstract class SchemaPruningSuite
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: FileSourceScanExec => scan.requiredSchema
++ case scan: CometScanExec => scan.requiredSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+index ce43edb79c1..c414b19eda7 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+@@ -17,7 +17,7 @@
+
+ package org.apache.spark.sql.execution.datasources
+
+-import org.apache.spark.sql.{QueryTest, Row}
++import org.apache.spark.sql.{IgnoreComet, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.expressions.{Ascending,
AttributeReference, NullsFirst, SortOrder}
+ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
+ import org.apache.spark.sql.execution.{QueryExecution, SortExec}
+@@ -305,7 +305,8 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
+ }
+ }
+
+- test("v1 write with AQE changing SMJ to BHJ") {
++ test("v1 write with AQE changing SMJ to BHJ",
++ IgnoreComet("TODO: Comet SMJ to BHJ by AQE")) {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+index 1d2e467c94c..3ea82cd1a3f 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem,
GlobFilter, Path}
+ import org.mockito.Mockito.{mock, when}
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.{DataFrame, IgnoreCometSuite, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.encoders.RowEncoder
+ import org.apache.spark.sql.execution.datasources.PartitionedFile
+ import org.apache.spark.sql.functions.col
+@@ -38,7 +38,9 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types._
+ import org.apache.spark.util.Utils
+
+-class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
++// For some reason this suite is flaky w/ or w/o Comet when running in Github
workflow.
++// Since it isn't related to Comet, we disable it for now.
++class BinaryFileFormatSuite extends QueryTest with SharedSparkSession with
IgnoreCometSuite {
+ import BinaryFileFormat._
+
+ private var testDir: String = _
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+index 07e2849ce6f..3e73645b638 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+
+ import org.apache.spark.TestUtils
+ import org.apache.spark.memory.MemoryMode
+-import org.apache.spark.sql.Row
++import org.apache.spark.sql.{IgnoreComet, Row}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -201,7 +201,8 @@ class ParquetEncodingSuite extends
ParquetCompatibilityTest with SharedSparkSess
+ }
+ }
+
+- test("parquet v2 pages - rle encoding for boolean value columns") {
++ test("parquet v2 pages - rle encoding for boolean value columns",
++ IgnoreComet("Comet doesn't support RLE encoding yet")) {
+ val extraOptions = Map[String, String](
+ ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
+ )
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+index 104b4e416cd..034f56b6230 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // When a filter is pushed to Parquet, Parquet can apply it to
every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+- assert(stripSparkFilter(df).count == 1)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ assert(stripSparkFilter(df).count == 1)
++ }
+ }
+ }
+ }
+@@ -1581,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1608,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1744,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
++ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
++ IgnoreComet("IN predicate is not yet supported in Comet, see issue
#36")) {
+ val schema = StructType(Seq(
+ StructField("a", IntegerType, nullable = false)
+ ))
+@@ -1985,7 +1998,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("Support Parquet column index") {
++ test("Support Parquet column index",
++ IgnoreComet("Comet doesn't support Parquet column index yet")) {
+ // block 1:
+ // null count min
max
+ // page-0 0 0
99
+@@ -2277,7 +2291,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+ } else {
+ assert(selectedFilters.isEmpty, "There is filter pushed down")
+ }
+@@ -2337,7 +2355,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+
+ case _ =>
+ throw new AnalysisException("Can not match ParquetTable in the
query.")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+index 8670d95c65e..b624c3811dd 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") {
++ test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings",
++ IgnoreComet("Comet doesn't support DELTA encoding yet")) {
+ withAllParquetReaders {
+ checkAnswer(
+ // "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+index 29cb224c878..7a57d29d9a8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+@@ -1047,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ checkAnswer(readParquet(schema, path), df)
+ }
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+ checkAnswer(readParquet(schema1, path), df)
+ val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+@@ -1069,7 +1070,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
+@@ -1128,7 +1130,7 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ .where(s"a < ${Long.MaxValue}")
+ .collect()
+ }
+-
assert(exception.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
++ assert(exception.getMessage.contains("Column: [a], Expected: bigint,
Found: INT32"))
+ }
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+index 240bb4e6dcb..8287ffa03ca 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+@@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths, StandardCopyOption}
+ import java.sql.{Date, Timestamp}
+
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException,
SparkUpgradeException}
+-import org.apache.spark.sql.{QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
++import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
+ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy,
ParquetOutputTimestampType}
+@@ -30,9 +30,11 @@ import
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96,
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.tags.SlowSQLTest
+
++// Comet is disabled for this suite because it doesn't support datetime
rebase mode
+ abstract class ParquetRebaseDatetimeSuite
+ extends QueryTest
+ with ParquetTest
++ with IgnoreCometSuite
+ with SharedSparkSession {
+
+ import testImplicits._
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+index 351c6d698fc..36492fe936d 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader,
ParquetOutputFormat}
+ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+ import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -230,6 +231,12 @@ class ParquetRowIndexSuite extends QueryTest with
SharedSparkSession {
+ case f: FileSourceScanExec =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
++ case b: CometScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
++ case b: CometBatchScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+index 5c0b7def039..151184bc98c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.DataFrame
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.SchemaPruningSuite
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -56,6 +57,7 @@ class ParquetV2SchemaPruningSuite extends
ParquetSchemaPruningSuite {
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: BatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
++ case scan: CometBatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+index bf5c51b89bb..ca22370ca3b 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+ import org.apache.parquet.schema.Type._
+
+ import org.apache.spark.SparkException
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.ScalaReflection
+ import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.desc
+@@ -1016,7 +1017,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ e
+ }
+
+- test("schema mismatch failure error message for parquet reader") {
++ test("schema mismatch failure error message for parquet reader",
++ IgnoreComet("Comet doesn't work with vectorizedReaderEnabled = false"))
{
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
+ val expectedMessage = "Encountered error while reading file"
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+index 3a0bd35cb70..b28f06a757f 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug
+ import java.io.ByteArrayOutputStream
+
+ import org.apache.spark.rdd.RDD
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.Attribute
+ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with
DisableAdaptiveExecutionSui
+ | id LongType: {}""".stripMargin))
+ }
+
+- test("SPARK-28537: DebugExec cannot debug columnar related queries") {
++ test("SPARK-28537: DebugExec cannot debug columnar related queries",
++ IgnoreComet("Comet does not use FileScan")) {
+ withTempPath { workDir =>
+ val workDirPath = workDir.getAbsolutePath
+ val input = spark.range(5).toDF("id")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+index 26e61c6b58d..cb09d7e116a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+@@ -45,8 +45,10 @@ import org.apache.spark.sql.util.QueryExecutionListener
+ import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
+
+ // Disable AQE because metric info is different with AQE on/off
++// This test suite runs tests against the metrics of physical operators.
++// Disabling it for Comet because the metrics are different with Comet
enabled.
+ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
+- with DisableAdaptiveExecutionSuite {
++ with DisableAdaptiveExecutionSuite with IgnoreCometSuite {
+ import testImplicits._
+
+ /**
+@@ -737,7 +739,8 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
+ }
+ }
+
+- test("SPARK-26327: FileSourceScanExec metrics") {
++ test("SPARK-26327: FileSourceScanExec metrics",
++ IgnoreComet("Spark uses row-based Parquet reader while Comet is
vectorized")) {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+index 0ab8691801d..d9125f658ad 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution.python
+
+ import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+- assert(scanNodes.head.dataFilters.length == 2)
+-
assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
++ val dataFilters = scanNodes.head match {
++ case scan: FileSourceScanExec => scan.dataFilters
++ case scan: CometScanExec => scan.dataFilters
++ }
++ assert(dataFilters.length == 2)
++ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometBatchScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometBatchScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+index d083cac48ff..3c11bcde807 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+@@ -37,8 +37,10 @@ import org.apache.spark.sql.streaming.{StreamingQuery,
StreamingQueryException,
+ import org.apache.spark.sql.streaming.util.StreamManualClock
+ import org.apache.spark.util.Utils
+
++// For some reason this suite is flaky w/ or w/o Comet when running in Github
workflow.
++// Since it isn't related to Comet, we disable it for now.
+ class AsyncProgressTrackingMicroBatchExecutionSuite
+- extends StreamTest with BeforeAndAfter with Matchers {
++ extends StreamTest with BeforeAndAfter with Matchers with IgnoreCometSuite {
+
+ import testImplicits._
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+index 266bb343526..a426d8396be 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+@@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
+ import org.apache.spark.sql.catalyst.expressions
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+-import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec,
SparkPlan}
++import org.apache.spark.sql.comet._
++import org.apache.spark.sql.execution.{ColumnarToRowExec, FileSourceScanExec,
SortExec, SparkPlan}
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.BucketingUtils
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+- val fileScan = collect(plan) { case f: FileSourceScanExec => f }
++ private def getFileScan(plan: SparkPlan): SparkPlan = {
++ val fileScan = collect(plan) {
++ case f: FileSourceScanExec => f
++ case f: CometScanExec => f
++ }
+ assert(fileScan.nonEmpty, plan)
+ fileScan.head
+ }
+
++ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan)
match {
++ case fs: FileSourceScanExec => fs.bucketedScan
++ case bs: CometScanExec => bs.bucketedScan
++ }
++
+ // To verify if the bucket pruning works, this function checks two
conditions:
+ // 1) Check if the pruned buckets (before filtering) are empty.
+ // 2) Verify the final result is the same as the expected one
+@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ val planWithoutBucketedScan =
bucketedDataFrame.filter(filterCondition)
+ .queryExecution.executedPlan
+ val fileScan = getFileScan(planWithoutBucketedScan)
+- assert(!fileScan.bucketedScan, s"except no bucketed scan but
found\n$fileScan")
++ val bucketedScan = getBucketScan(planWithoutBucketedScan)
++ assert(!bucketedScan, s"except no bucketed scan but
found\n$fileScan")
+
+ val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
+ val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
+@@ -451,28 +461,44 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ val joinOperator = if
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
+ val executedPlan =
+
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+- assert(executedPlan.isInstanceOf[SortMergeJoinExec])
+- executedPlan.asInstanceOf[SortMergeJoinExec]
++ executedPlan match {
++ case s: SortMergeJoinExec => s
++ case b: CometSortMergeJoinExec =>
++ b.originalPlan match {
++ case s: SortMergeJoinExec => s
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
+ } else {
+ val executedPlan = joined.queryExecution.executedPlan
+- assert(executedPlan.isInstanceOf[SortMergeJoinExec])
+- executedPlan.asInstanceOf[SortMergeJoinExec]
++ executedPlan match {
++ case s: SortMergeJoinExec => s
++ case ColumnarToRowExec(child) =>
++ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++ case s: SortMergeJoinExec => s
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
+ }
+
+ // check existence of shuffle
+ assert(
+- joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleLeft,
++ joinOperator.left.exists(op =>
op.isInstanceOf[ShuffleExchangeLike]) == shuffleLeft,
+ s"expected shuffle in plan to be $shuffleLeft but
found\n${joinOperator.left}")
+ assert(
+- joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleRight,
++ joinOperator.right.exists(op =>
op.isInstanceOf[ShuffleExchangeLike]) == shuffleRight,
+ s"expected shuffle in plan to be $shuffleRight but
found\n${joinOperator.right}")
+
+ // check existence of sort
+ assert(
+- joinOperator.left.exists(_.isInstanceOf[SortExec]) == sortLeft,
++ joinOperator.left.exists(op => op.isInstanceOf[SortExec] ||
op.isInstanceOf[CometExec] &&
++ op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec])
== sortLeft,
+ s"expected sort in the left child to be $sortLeft but
found\n${joinOperator.left}")
+ assert(
+- joinOperator.right.exists(_.isInstanceOf[SortExec]) == sortRight,
++ joinOperator.right.exists(op => op.isInstanceOf[SortExec] ||
op.isInstanceOf[CometExec] &&
++ op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec])
== sortRight,
+ s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
+
+ // check the output partitioning
+@@ -835,11 +861,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
+
+ val scanDF = spark.table("bucketed_table").select("j")
+- assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan)
++ assert(!getBucketScan(scanDF.queryExecution.executedPlan))
+ checkAnswer(scanDF, df1.select("j"))
+
+ val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k"))
+- assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan)
++ assert(!getBucketScan(aggDF.queryExecution.executedPlan))
+ checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
+ }
+ }
+@@ -1026,15 +1052,23 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+ expectedNumShuffles: Int,
+ expectedCoalescedNumBuckets: Option[Int]): Unit = {
+ val plan = sql(query).queryExecution.executedPlan
+- val shuffles = plan.collect { case s: ShuffleExchangeExec => s }
++ val shuffles = plan.collect {
++ case s: ShuffleExchangeLike => s
++ }
+ assert(shuffles.length == expectedNumShuffles)
+
+ val scans = plan.collect {
+ case f: FileSourceScanExec if
f.optionalNumCoalescedBuckets.isDefined => f
++ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined
=> b
+ }
+ if (expectedCoalescedNumBuckets.isDefined) {
+ assert(scans.length == 1)
+- assert(scans.head.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ scans.head match {
++ case f: FileSourceScanExec =>
++ assert(f.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ case b: CometScanExec =>
++ assert(b.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ }
+ } else {
+ assert(scans.isEmpty)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+index b5f6d2f9f68..277784a92af 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources
+ import java.io.File
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.AnalysisException
++import org.apache.spark.sql.{AnalysisException, IgnoreCometSuite}
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -28,7 +28,10 @@ import
org.apache.spark.sql.internal.SQLConf.BUCKETING_MAX_BUCKETS
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.util.Utils
+
+-class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession
{
++// For some reason this suite is flaky w/ or w/o Comet when running in Github
workflow.
++// Since it isn't related to Comet, we disable it for now.
++class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession
++ with IgnoreCometSuite {
+ import testImplicits._
+
+ protected override lazy val sql = spark.sql _
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+index 1f55742cd67..42377f7cf26 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
+ import org.apache.spark.sql.QueryTest
+ import org.apache.spark.sql.catalyst.expressions.AttributeReference
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+
+ def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int):
Unit = {
+ val plan = sql(query).queryExecution.executedPlan
+- val bucketedScan = collect(plan) { case s: FileSourceScanExec if
s.bucketedScan => s }
++ val bucketedScan = collect(plan) {
++ case s: FileSourceScanExec if s.bucketedScan => s
++ case s: CometScanExec if s.bucketedScan => s
++ }
+ assert(bucketedScan.length == expectedNumBucketedScan)
+ }
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+index 75f440caefc..36b1146bc3a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+@@ -34,6 +34,7 @@ import org.apache.spark.paths.SparkPath
+ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+ import org.apache.spark.sql.{AnalysisException, DataFrame}
+ import org.apache.spark.sql.catalyst.util.stringToFile
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.DataSourceScanExec
+ import org.apache.spark.sql.execution.datasources._
+ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, FileScan, FileTable}
+@@ -748,6 +749,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+ val fileScan = df.queryExecution.executedPlan.collect {
+ case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
+ batch.scan.asInstanceOf[FileScan]
++ case batch: CometBatchScanExec if batch.scan.isInstanceOf[FileScan] =>
++ batch.scan.asInstanceOf[FileScan]
+ }.headOption.getOrElse {
+ fail(s"No FileScan in query\n${df.queryExecution}")
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
+index b597a244710..b2e8be41065 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateDistributionSuite.scala
+@@ -21,6 +21,7 @@ import java.io.File
+
+ import org.apache.commons.io.FileUtils
+
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
+ import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec,
MemoryStream}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -91,7 +92,7 @@ class FlatMapGroupsWithStateDistributionSuite extends
StreamTest
+ }
+
+ test("SPARK-38204: flatMapGroupsWithState should require
StatefulOpClusteredDistribution " +
+- "from children - without initial state") {
++ "from children - without initial state", IgnoreComet("TODO: fix Comet for
this test")) {
+ // function will return -1 on timeout and returns count of the state
otherwise
+ val stateFunc =
+ (key: (String, String), values: Iterator[(String, String, Long)],
+@@ -243,7 +244,8 @@ class FlatMapGroupsWithStateDistributionSuite extends
StreamTest
+ }
+
+ test("SPARK-38204: flatMapGroupsWithState should require
ClusteredDistribution " +
+- "from children if the query starts from checkpoint in 3.2.x - without
initial state") {
++ "from children if the query starts from checkpoint in 3.2.x - without
initial state",
++ IgnoreComet("TODO: fix Comet for this test")) {
+ // function will return -1 on timeout and returns count of the state
otherwise
+ val stateFunc =
+ (key: (String, String), values: Iterator[(String, String, Long)],
+@@ -335,7 +337,8 @@ class FlatMapGroupsWithStateDistributionSuite extends
StreamTest
+ }
+
+ test("SPARK-38204: flatMapGroupsWithState should require
ClusteredDistribution " +
+- "from children if the query starts from checkpoint in prior to 3.2") {
++ "from children if the query starts from checkpoint in prior to 3.2",
++ IgnoreComet("TODO: fix Comet for this test")) {
+ // function will return -1 on timeout and returns count of the state
otherwise
+ val stateFunc =
+ (key: (String, String), values: Iterator[(String, String, Long)],
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+index 6aa7d0945c7..38523536154 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+@@ -25,7 +25,7 @@ import org.scalatest.exceptions.TestFailedException
+
+ import org.apache.spark.SparkException
+ import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
+-import org.apache.spark.sql.{DataFrame, Encoder}
++import org.apache.spark.sql.{DataFrame, Encoder, IgnoreCometSuite}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection, UnsafeRow}
+ import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
+@@ -46,8 +46,9 @@ case class RunningCount(count: Long)
+
+ case class Result(key: Long, count: Int)
+
++// TODO: fix Comet to enable this suite
+ @SlowSQLTest
+-class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
++class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with
IgnoreCometSuite {
+
+ import testImplicits._
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
+index 2a2a83d35e1..e3b7b290b3e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.streaming
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{AnalysisException, Dataset,
KeyValueGroupedDataset}
++import org.apache.spark.sql.{AnalysisException, Dataset, IgnoreComet,
KeyValueGroupedDataset}
+ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
+ import org.apache.spark.sql.execution.streaming.MemoryStream
+ import
org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper
+@@ -253,7 +253,8 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends
StateStoreMetricsTest
+ assert(e.message.contains(expectedError))
+ }
+
+- test("flatMapGroupsWithState - initial state - initial state has
flatMapGroupsWithState") {
++ test("flatMapGroupsWithState - initial state - initial state has
flatMapGroupsWithState",
++ IgnoreComet("TODO: fix Comet for this test")) {
+ val initialStateDS = Seq(("keyInStateAndData", new
RunningCount(1))).toDS()
+ val initialState: KeyValueGroupedDataset[String, RunningCount] =
+ initialStateDS.groupByKey(_._1).mapValues(_._2)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+index ef5b8a769fe..84fe1bfabc9 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+@@ -37,6 +37,7 @@ import org.apache.spark.sql._
+ import org.apache.spark.sql.catalyst.plans.logical.{Range,
RepartitionByExpression}
+ import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes,
StreamingRelationV2}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
++import org.apache.spark.sql.comet.CometLocalLimitExec
+ import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
+ import org.apache.spark.sql.execution.command.ExplainCommand
+ import org.apache.spark.sql.execution.streaming._
+@@ -1103,11 +1104,12 @@ class StreamSuite extends StreamTest {
+ val localLimits = execPlan.collect {
+ case l: LocalLimitExec => l
+ case l: StreamingLocalLimitExec => l
++ case l: CometLocalLimitExec => l
+ }
+
+ require(
+ localLimits.size == 1,
+- s"Cant verify local limit optimization with this plan:\n$execPlan")
++ s"Cant verify local limit optimization ${localLimits.size} with this
plan:\n$execPlan")
+
+ if (expectStreamingLimit) {
+ assert(
+@@ -1115,7 +1117,8 @@ class StreamSuite extends StreamTest {
+ s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
+ } else {
+ assert(
+- localLimits.head.isInstanceOf[LocalLimitExec],
++ localLimits.head.isInstanceOf[LocalLimitExec] ||
++ localLimits.head.isInstanceOf[CometLocalLimitExec],
+ s"Local limit was not LocalLimitExec:\n$execPlan")
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationDistributionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationDistributionSuite.scala
+index b4c4ec7acbf..20579284856 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationDistributionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationDistributionSuite.scala
+@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils
+ import org.scalatest.Assertions
+
+ import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
++import org.apache.spark.sql.comet.CometHashAggregateExec
+ import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
+ import org.apache.spark.sql.execution.streaming.{MemoryStream,
StateStoreRestoreExec, StateStoreSaveExec}
+ import org.apache.spark.sql.functions.count
+@@ -67,6 +68,7 @@ class StreamingAggregationDistributionSuite extends
StreamTest
+ // verify aggregations in between, except partial aggregation
+ val allAggregateExecs = query.lastExecution.executedPlan.collect {
+ case a: BaseAggregateExec => a
++ case c: CometHashAggregateExec => c.originalPlan
+ }
+
+ val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter {
+@@ -201,6 +203,7 @@ class StreamingAggregationDistributionSuite extends
StreamTest
+ // verify aggregations in between, except partial aggregation
+ val allAggregateExecs = executedPlan.collect {
+ case a: BaseAggregateExec => a
++ case c: CometHashAggregateExec => c.originalPlan
+ }
+
+ val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+index 4d92e270539..33f1c2eb75e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+@@ -31,7 +31,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.execution.streaming.{MemoryStream,
StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec,
StreamingSymmetricHashJoinHelper}
+ import
org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider,
StateStore, StateStoreProviderId}
+ import org.apache.spark.sql.functions._
+@@ -619,14 +619,28 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite
{
+
+ val numPartitions =
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
+
+- assert(query.lastExecution.executedPlan.collect {
+- case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
+- ShuffleExchangeExec(opA: HashPartitioning, _, _),
+- ShuffleExchangeExec(opB: HashPartitioning, _, _))
+- if partitionExpressionsColumns(opA.expressions) === Seq("a",
"b")
+- && partitionExpressionsColumns(opB.expressions) === Seq("a",
"b")
+- && opA.numPartitions == numPartitions && opB.numPartitions ==
numPartitions => j
+- }.size == 1)
++ val join = query.lastExecution.executedPlan.collect {
++ case j: StreamingSymmetricHashJoinExec => j
++ }.head
++ val opA = join.left.collect {
++ case s: ShuffleExchangeLike
++ if s.outputPartitioning.isInstanceOf[HashPartitioning] &&
++ partitionExpressionsColumns(
++ s.outputPartitioning
++ .asInstanceOf[HashPartitioning].expressions) === Seq("a",
"b") =>
++ s.outputPartitioning
++ .asInstanceOf[HashPartitioning]
++ }.head
++ val opB = join.right.collect {
++ case s: ShuffleExchangeLike
++ if s.outputPartitioning.isInstanceOf[HashPartitioning] &&
++ partitionExpressionsColumns(
++ s.outputPartitioning
++ .asInstanceOf[HashPartitioning].expressions) === Seq("a",
"b") =>
++ s.outputPartitioning
++ .asInstanceOf[HashPartitioning]
++ }.head
++ assert(opA.numPartitions == numPartitions && opB.numPartitions ==
numPartitions)
+ })
+ }
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+index abe606ad9c1..2d930b64cca 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+@@ -22,7 +22,7 @@ import java.util
+
+ import org.scalatest.BeforeAndAfter
+
+-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
+@@ -327,7 +327,8 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
+ }
+ }
+
+- test("explain with table on DSv1 data source") {
++ test("explain with table on DSv1 data source",
++ IgnoreComet("Comet explain output is different")) {
+ val tblSourceName = "tbl_src"
+ val tblTargetName = "tbl_target"
+ val tblSourceQualified = s"default.$tblSourceName"
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+index dd55fcfe42c..293e9dc2986 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
+ import org.apache.spark.sql.catalyst.plans.PlanTestBase
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+ import org.apache.spark.sql.catalyst.util._
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.FilterExec
+ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
+ import org.apache.spark.sql.execution.datasources.DataSourceUtils
+@@ -126,7 +127,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
+ }
+ }
+ } else {
+- super.test(testName, testTags: _*)(testFun)
++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++ ignore(testName + " (disabled when Comet is on)", testTags:
_*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
+ }
+ }
+
+@@ -242,6 +247,32 @@ private[sql] trait SQLTestUtilsBase
+ protected override def _sqlContext: SQLContext = self.spark.sqlContext
+ }
+
++ /**
++ * Whether Comet extension is enabled
++ */
++ protected def isCometEnabled: Boolean = {
++ val v = System.getenv("ENABLE_COMET")
++ v != null && v.toBoolean
++ }
++
++ /**
++ * Whether to enable ansi mode This is only effective when
++ * [[isCometEnabled]] returns true.
++ */
++ protected def enableCometAnsiMode: Boolean = {
++ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
++ v != null && v.toBoolean
++ }
++
++ /**
++ * Whether Spark should only apply Comet scan optimization. This is only
effective when
++ * [[isCometEnabled]] returns true.
++ */
++ protected def isCometScanOnly: Boolean = {
++ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
++ v != null && v.toBoolean
++ }
++
+ protected override def withSQLConf(pairs: (String, String)*)(f: => Unit):
Unit = {
+ SparkSession.setActiveSession(spark)
+ super.withSQLConf(pairs: _*)(f)
+@@ -434,6 +465,8 @@ private[sql] trait SQLTestUtilsBase
+ val schema = df.schema
+ val withoutFilters = df.queryExecution.executedPlan.transform {
+ case FilterExec(_, child) => child
++ case CometFilterExec(_, _, _, child, _) => child
++ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _),
_) => child
+ }
+
+ spark.internalCreateDataFrame(withoutFilters.execute(), schema)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+index ed2e309fa07..e071fc44960 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+@@ -74,6 +74,28 @@ trait SharedSparkSessionBase
+ // this rule may potentially block testing of other optimization rules
such as
+ // ConstantPropagation etc.
+ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
++ // Enable Comet if `ENABLE_COMET` environment variable is set
++ if (isCometEnabled) {
++ conf
++ .set("spark.sql.extensions",
"org.apache.comet.CometSparkSessionExtensions")
++ .set("spark.comet.enabled", "true")
++
++ if (!isCometScanOnly) {
++ conf
++ .set("spark.comet.exec.enabled", "true")
++ .set("spark.comet.exec.all.enabled", "true")
++ .set("spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .set("spark.comet.exec.shuffle.enabled", "true")
++ .set("spark.comet.memoryOverhead", "10g")
++ }
++
++ if (enableCometAnsiMode) {
++ conf
++ .set("spark.sql.ansi.enabled", "true")
++ .set("spark.comet.ansi.enabled", "true")
++ }
++ }
+ conf.set(
+ StaticSQLConf.WAREHOUSE_PATH,
+ conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" +
getClass.getCanonicalName)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
+index 1510e8957f9..7618419d8ff 100644
+---
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
+@@ -43,7 +43,7 @@ class SqlResourceWithActualMetricsSuite
+ import testImplicits._
+
+ // Exclude nodes which may not have the metrics
+- val excludedNodes = List("WholeStageCodegen", "Project",
"SerializeFromObject")
++ val excludedNodes = List("WholeStageCodegen", "Project",
"SerializeFromObject", "RowToColumnar")
+
+ implicit val formats = new DefaultFormats {
+ override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+index 52abd248f3a..7a199931a08 100644
+---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
++++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
+
+ import org.apache.spark.sql._
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution._
+ import
org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite,
EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.hive.execution.HiveTableScanExec
+@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
+ case s: FileSourceScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case h: HiveTableScanExec => h.partitionPruningPred.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+index 1966e1e64fd..cde97a0aafe 100644
+---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
++++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+@@ -656,7 +656,8 @@ abstract class AggregationQuerySuite extends QueryTest
with SQLTestUtils with Te
+ Row(3, 4, 4, 3, null) :: Nil)
+ }
+
+- test("single distinct multiple columns set") {
++ test("single distinct multiple columns set",
++ IgnoreComet("TODO: fix Comet for this test")) {
+ checkAnswer(
+ spark.sql(
+ """
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+index 07361cfdce9..25b0dc3ef7e 100644
+--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+@@ -55,25 +55,53 @@ object TestHive
+ new SparkContext(
+ System.getProperty("spark.sql.test.master", "local[1]"),
+ "TestSQLContext",
+- new SparkConf()
+- .set("spark.sql.test", "")
+- .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+- .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+- .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+- "org.apache.spark.sql.hive.execution.PairSerDe")
+- .set(WAREHOUSE_PATH.key,
TestHiveContext.makeWarehouseDir().toURI.getPath)
+- // SPARK-8910
+- .set(UI_ENABLED, false)
+- .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+- // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
+- // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
+-
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
+- // Disable ConvertToLocalRelation for better test coverage. Test
cases built on
+- // LocalRelation will exercise the optimization rules better by
disabling it as
+- // this rule may potentially block testing of other optimization
rules such as
+- // ConstantPropagation etc.
+- .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)))
++ {
++ val conf = new SparkConf()
++ .set("spark.sql.test", "")
++ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
++ .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
++ .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
++ "org.apache.spark.sql.hive.execution.PairSerDe")
++ .set(WAREHOUSE_PATH.key,
TestHiveContext.makeWarehouseDir().toURI.getPath)
++ // SPARK-8910
++ .set(UI_ENABLED, false)
++ .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
++ // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
++ // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
++
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
++ // Disable ConvertToLocalRelation for better test coverage. Test
cases built on
++ // LocalRelation will exercise the optimization rules better by
disabling it as
++ // this rule may potentially block testing of other optimization
rules such as
++ // ConstantPropagation etc.
++ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
++
++ val v = System.getenv("ENABLE_COMET")
++ if (v != null && v.toBoolean) {
++ conf
++ .set("spark.sql.extensions",
"org.apache.spark.CometSparkSessionExtensions")
++ .set("spark.comet.enabled", "true")
++
++ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
++ if (v == null || !v.toBoolean) {
++ conf
++ .set("spark.comet.exec.enabled", "true")
++ .set("spark.comet.exec.all.enabled", "true")
++ .set("spark.shuffle.manager",
++
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .set("spark.comet.exec.shuffle.enabled", "true")
++ }
++
++ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
++ if (a != null && a.toBoolean) {
++ conf
++ .set("spark.sql.ansi.enabled", "true")
++ .set("spark.comet.ansi.enabled", "true")
++ }
++ }
+
++ conf
++ }
++ ))
+
+ case class TestHiveVersion(hiveClient: HiveClient)
+ extends TestHiveContext(TestHive.sparkContext, hiveClient)
diff --git a/pom.xml b/pom.xml
index 57b4206c..0ec83498 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,7 @@ under the License.
<scala.plugin.version>4.7.2</scala.plugin.version>
<scalatest.version>3.2.9</scalatest.version>
<scalatest-maven-plugin.version>2.0.2</scalatest-maven-plugin.version>
- <spark.version>3.4.2</spark.version>
+ <spark.version>3.4.3</spark.version>
<spark.version.short>3.4</spark.version.short>
<spark.maven.scope>provided</spark.maven.scope>
<protobuf.version>3.19.6</protobuf.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]