This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bedce4  build: Run Spark SQL tests for 3.4 (#166)
6bedce4 is described below

commit 6bedce410f5a5b961eb2cc1dbc8840c17bdab4bb
Author: Chao Sun <[email protected]>
AuthorDate: Tue Mar 12 10:28:58 2024 -0700

    build: Run Spark SQL tests for 3.4 (#166)
---
 .github/actions/setup-spark-builder/action.yaml |   64 ++
 .github/workflows/spark_sql_test.yml            |   79 ++
 dev/diffs/3.4.2.diff                            | 1306 +++++++++++++++++++++++
 pom.xml                                         |    1 +
 4 files changed, 1450 insertions(+)

diff --git a/.github/actions/setup-spark-builder/action.yaml 
b/.github/actions/setup-spark-builder/action.yaml
new file mode 100644
index 0000000..9293dee
--- /dev/null
+++ b/.github/actions/setup-spark-builder/action.yaml
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Setup Spark Builder
+description: 'Setup Apache Spark to run SQL tests'
+inputs:
+  spark-short-version:
+    description: 'The Apache Spark short version (e.g., 3.4) to build'
+    required: true
+    default: '3.4'
+  spark-version:
+    description: 'The Apache Spark version (e.g., 3.4.2) to build'
+    required: true
+    default: '3.4.2'
+  comet-version:
+    description: 'The Comet version to use for Spark'
+    required: true
+    default: '0.1.0-SNAPSHOT'
+runs:
+  using: "composite"
+  steps:
+    - name: Clone Spark repo
+      uses: actions/checkout@v4
+      with:
+        repository: apache/spark
+        path: apache-spark
+        ref: v${{inputs.spark-version}}
+        fetch-depth: 1
+
+    - name: Setup Spark for Comet
+      shell: bash
+      run: |
+        cd apache-spark
+        git apply ../dev/diffs/${{inputs.spark-version}}.diff
+        ../mvnw -nsu -q versions:set-property -Dproperty=comet.version  
-DnewVersion=${{inputs.comet-version}} -DgenerateBackupPoms=false
+
+    - name: Cache Maven dependencies
+      uses: actions/cache@v4
+      with:
+        path: |
+          ~/.m2/repository
+          /root/.m2/repository
+        key: ${{ runner.os }}-spark-sql-${{ hashFiles('spark/**/pom.xml', 
'common/**/pom.xml') }}
+        restore-keys: |
+          ${{ runner.os }}-spark-sql-
+
+    - name: Build Comet
+      shell: bash
+      run: |
+        PROFILES="-Pspark-${{inputs.spark-short-version}}" make release
diff --git a/.github/workflows/spark_sql_test.yml 
b/.github/workflows/spark_sql_test.yml
new file mode 100644
index 0000000..5c460b7
--- /dev/null
+++ b/.github/workflows/spark_sql_test.yml
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Spark SQL Tests
+
+concurrency:
+  group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ 
github.workflow }}
+  cancel-in-progress: true
+
+on:
+  push:
+    paths-ignore:
+      - "doc/**"
+      - "**.md"
+  pull_request:
+    paths-ignore:
+      - "doc/**"
+      - "**.md"
+  # manual trigger
+  # 
https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
+  workflow_dispatch:
+
+env:
+  RUST_VERSION: nightly
+
+jobs:
+  spark-sql-catalyst:
+    strategy:
+      matrix:
+        os: [ubuntu-latest]
+        java-version: [11]
+        spark-version: [{short: '3.4', full: '3.4.2'}]
+        module:
+          - {name: "catalyst", args1: "catalyst/test", args2: ""}
+          - {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l 
org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
+          - {name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n 
org.apache.spark.tags.ExtendedSQLTest"}
+          - {name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n 
org.apache.spark.tags.SlowSQLTest"}
+          - {name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l 
org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
+          - {name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n 
org.apache.spark.tags.ExtendedHiveTest"}
+          - {name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n 
org.apache.spark.tags.SlowHiveTest"}
+      fail-fast: false
+    name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ 
matrix.spark-version.full }}/java-${{ matrix.java-version }}
+    runs-on: ${{ matrix.os }}
+    container:
+      image: amd64/rust
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup Rust & Java toolchain
+        uses: ./.github/actions/setup-builder
+        with:
+          rust-version: ${{env.RUST_VERSION}}
+          jdk-version: ${{ matrix.java-version }}
+      - name: Setup Spark
+        uses: ./.github/actions/setup-spark-builder
+        with:
+          spark-version: ${{ matrix.spark-version.full }}
+          spark-short-version: ${{ matrix.spark-version.short }}
+          comet-version: '0.1.0-SNAPSHOT' # TODO: get this from pom.xml
+      - name: Run Spark tests
+        run: |
+          cd apache-spark
+          ENABLE_COMET=true build/sbt ${{ matrix.module.args1 }} "${{ 
matrix.module.args2 }}"
+        env:
+          LC_ALL: "C.UTF-8"
+
diff --git a/dev/diffs/3.4.2.diff b/dev/diffs/3.4.2.diff
new file mode 100644
index 0000000..b571cd2
--- /dev/null
+++ b/dev/diffs/3.4.2.diff
@@ -0,0 +1,1306 @@
+diff --git a/pom.xml b/pom.xml
+index fab98342498..f2156d790d1 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -148,6 +148,8 @@
+     <chill.version>0.10.0</chill.version>
+     <ivy.version>2.5.1</ivy.version>
+     <oro.version>2.0.8</oro.version>
++    <spark.version.short>3.4</spark.version.short>
++    <comet.version>0.1.0-SNAPSHOT</comet.version>
+     <!--
+     If you changes codahale.metrics.version, you also need to change
+     the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2766,6 +2768,25 @@
+         <artifactId>arpack</artifactId>
+         <version>${netlib.ludovic.dev.version}</version>
+       </dependency>
++      <dependency>
++        <groupId>org.apache.comet</groupId>
++        
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++        <version>${comet.version}</version>
++        <exclusions>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-sql_${scala.binary.version}</artifactId>
++          </exclusion>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-core_${scala.binary.version}</artifactId>
++          </exclusion>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++          </exclusion>
++        </exclusions>
++      </dependency>
+     </dependencies>
+   </dependencyManagement>
+ 
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index 5b6cc8cb7af..5ce708adc38 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -77,6 +77,10 @@
+       <groupId>org.apache.spark</groupId>
+       <artifactId>spark-tags_${scala.binary.version}</artifactId>
+     </dependency>
++    <dependency>
++      <groupId>org.apache.comet</groupId>
++      
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++    </dependency>
+ 
+     <!--
+       This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+index c595b50950b..483508dc076 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+@@ -26,6 +26,8 @@ import scala.collection.JavaConverters._
+ import scala.reflect.runtime.universe.TypeTag
+ import scala.util.control.NonFatal
+ 
++import org.apache.comet.CometConf
++
+ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+ import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
+ import org.apache.spark.api.java.JavaRDD
+@@ -102,7 +104,7 @@ class SparkSession private(
+       sc: SparkContext,
+       initialSessionOptions: java.util.HashMap[String, String]) = {
+     this(sc, None, None,
+-      SparkSession.applyExtensions(
++      SparkSession.applyExtensions(sc,
+         
sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+         new SparkSessionExtensions), initialSessionOptions.asScala.toMap)
+   }
+@@ -1028,7 +1030,7 @@ object SparkSession extends Logging {
+         }
+ 
+         loadExtensions(extensions)
+-        applyExtensions(
++        applyExtensions(sparkContext,
+           
sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+           extensions)
+ 
+@@ -1282,14 +1284,24 @@ object SparkSession extends Logging {
+     }
+   }
+ 
++  private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++    if (sparkContext.getConf.getBoolean(CometConf.COMET_ENABLED.key, false)) {
++      Seq("org.apache.comet.CometSparkSessionExtensions")
++    } else {
++      Seq.empty
++    }
++  }
++
+   /**
+    * Initialize extensions for given extension classnames. The classes will 
be applied to the
+    * extensions passed into this function.
+    */
+   private def applyExtensions(
++      sparkContext: SparkContext,
+       extensionConfClassNames: Seq[String],
+       extensions: SparkSessionExtensions): SparkSessionExtensions = {
+-    extensionConfClassNames.foreach { extensionConfClassName =>
++    val extensionClassNames = extensionConfClassNames ++ 
loadCometExtension(sparkContext)
++    extensionClassNames.foreach { extensionConfClassName =>
+       try {
+         val extensionConfClass = Utils.classForName(extensionConfClassName)
+         val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index db587dd9868..aac7295a53d 100644
+--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.annotation.DeveloperApi
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+     // dump the file scan metadata (e.g file path) to event log
+     val metadata = plan match {
+       case fileScan: FileSourceScanExec => fileScan.metadata
++      case cometScan: CometScanExec => cometScan.metadata
+       case _ => Map[String, String]()
+     }
+     new SparkPlanInfo(
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+ 
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+ 
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 698ca009b4f..57d774a3617 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+ 
+ -- Test tables
+ CREATE table  explain_temp1 (key int, val int) USING PARQUET;
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+index 1152d77da0c..f77493f690b 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+@@ -7,6 +7,9 @@
+ 
+ -- avoid bit-exact output here because operations may not be bit-exact.
+ -- SET extra_float_digits = 0;
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ 
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..44cd244d3b0 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -5,6 +5,9 @@
+ -- AGGREGATES [Part 3]
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+ 
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..2b73732c33f 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- INT8
+ -- Test int8 64-bit integers.
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..423d3b3d76d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- SELECT_HAVING
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index 56e9520fdab..917932336df 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+ 
+     withTempDatabase { dbName =>
+       withTable(table1Name, table2Name) {
+-        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++        withSQLConf(
++            SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++            "spark.comet.enabled" -> "false") {
+           spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+           spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+ 
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+new file mode 100644
+index 00000000000..07687f6685a
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+@@ -0,0 +1,39 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++case class DisableComet(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default 
config values.
++ */
++trait DisableCometSuite extends SQLTestUtils {
++  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
++    (implicit pos: Position): Unit = {
++    if (isCometEnabled) {
++      ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++    } else {
++      super.test(testName, testTags: _*)(testFun)
++    }
++  }
++}
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index f33432ddb6f..fe9f74ff8f1 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, 
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
+       case s: BatchScanExec => s.runtimeFilters.collect {
+         case d: DynamicPruningExpression => d.child
+       }
++      case s: CometScanExec => s.partitionFilters.collect {
++        case d: DynamicPruningExpression => d.child
++      }
+       case _ => Nil
+     }
+   }
+@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+               case s: BatchScanExec =>
+                 // we use f1 col for v2 tables due to schema pruning
+                 s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
++              case s: CometScanExec =>
++                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
+               case _ => false
+             }
+           assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index a6b295578d6..d5e25564bb9 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+     }
+   }
+ 
+-  test("Explain formatted output for scan operator for datasource V2") {
++  test("Explain formatted output for scan operator for datasource V2",
++      DisableComet("Comet explain output is different")) {
+     withTempDir { dir =>
+       Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+         val basePath = dir.getCanonicalPath + "/" + fmt
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 2796b1cf154..94591f83c84 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, 
NullData, NullUDT}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, Literal}
+ import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -875,6 +876,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ 
+           val fileScan = df.queryExecution.executedPlan collectFirst {
+             case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++            case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, 
_, _, _), _) => f
+           }
+           assert(fileScan.nonEmpty)
+           assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -916,6 +918,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ 
+           val fileScan = df.queryExecution.executedPlan collectFirst {
+             case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++            case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, 
_, _, _), _) => f
+           }
+           assert(fileScan.nonEmpty)
+           assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1100,6 +1103,8 @@ class FileBasedDataSourceSuite extends QueryTest
+           val filters = df.queryExecution.executedPlan.collect {
+             case f: FileSourceScanLike => f.dataFilters
+             case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++            case b: CometScanExec => b.dataFilters
++            case b: CometBatchScanExec => 
b.scan.asInstanceOf[FileScan].dataFilters
+           }.flatten
+           assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, 
Literal(5L))))
+         }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+index 5125708be32..e274a497996 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, 
SortOrder}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, 
ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, 
ShuffleExchangeLike}
+@@ -1371,7 +1372,7 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+           assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
+           assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 
3)
+           // No extra sort on left side before last sort merge join
+-          assert(collect(plan) { case _: SortExec => true }.size === 5)
++          assert(collect(plan) { case _: SortExec | _: CometSortExec => true 
}.size === 5)
+       }
+ 
+       // Test output ordering is not preserved
+@@ -1382,7 +1383,7 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+           assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
+           assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 
3)
+           // Have sort on left side before last sort merge join
+-          assert(collect(plan) { case _: SortExec => true }.size === 6)
++          assert(collect(plan) { case _: SortExec | _: CometSortExec => true 
}.size === 6)
+       }
+ 
+       // Test singe partition
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+index b5b34922694..5fa734d30e1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+@@ -69,7 +69,7 @@ import org.apache.spark.tags.ExtendedSQLTest
+  * }}}
+  */
+ // scalastyle:on line.size.limit
+-trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with 
DisableCometSuite {
+ 
+   protected val baseResourcePath = {
+     // use the same way as `SQLQueryTestSuite` to get the resource path
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+index 3cfda19134a..afcfba37c6f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ 
+ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, 
LogicalPlan, Project, Sort, Union}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.FileScanRDD
+@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
+             fs.inputRDDs().forall(
+               _.asInstanceOf[FileScanRDD].filePartitions.forall(
+                 _.files.forall(_.urlEncodedPath.contains("p=0"))))
++        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
++        fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++          partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++            fs.inputRDDs().forall(
++              _.asInstanceOf[FileScanRDD].filePartitions.forall(
++                _.files.forall(_.urlEncodedPath.contains("p=0"))))
+         case _ => false
+       })
+     }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+index cfc8b2cc845..c6fcfd7bd08 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
+ import org.apache.spark.sql.connector.read.ScanBuilder
+ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
+             val df = spark.read.format(format).load(path.getCanonicalPath)
+             checkAnswer(df, inputData.toDF())
+             assert(
+-              
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
++              df.queryExecution.executedPlan.exists {
++                case _: FileSourceScanExec | _: CometScanExec => true
++                case _ => false
++              }
++            )
+           }
+         } finally {
+           spark.listenerManager.unregister(listener)
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+index c0ec8a58bd5..5f880751e21 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.FsPermission
+ import org.mockito.Mockito.{mock, spy, when}
+ 
+ import org.apache.spark._
+-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
DisableComet, QueryTest, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.util.BadRecordException
+ import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, 
JDBCOptions}
+@@ -248,7 +248,8 @@ class QueryExecutionErrorsSuite
+   }
+ 
+   test("INCONSISTENT_BEHAVIOR_CROSS_VERSION: " +
+-    "compatibility with Spark 2.4/3.2 in reading/writing dates") {
++    "compatibility with Spark 2.4/3.2 in reading/writing dates",
++    DisableComet("Comet doesn't completely support datetime rebase mode 
yet")) {
+ 
+     // Fail to read ancient datetime values.
+     withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> 
EXCEPTION.toString) {
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+index 418ca3430bb..d5fc207601c 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+@@ -23,7 +23,7 @@ import scala.util.Random
+ import org.apache.hadoop.fs.Path
+ 
+ import org.apache.spark.SparkConf
+-import org.apache.spark.sql.{DataFrame, QueryTest}
++import org.apache.spark.sql.{DataFrame, DisableComet, QueryTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
+ import org.apache.spark.sql.internal.SQLConf
+@@ -195,7 +195,7 @@ class DataSourceV2ScanExecRedactionSuite extends 
DataSourceScanRedactionTest {
+     }
+   }
+ 
+-  test("FileScan description") {
++  test("FileScan description", DisableComet("Comet doesn't use BatchScan")) {
+     Seq("json", "orc", "parquet").foreach { format =>
+       withTempPath { path =>
+         val dir = path.getCanonicalPath
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+index 9e9d717db3b..91a4f9a38d5 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.comet.CometProjectExec
+ import org.apache.spark.sql.connector.SimpleWritableDataSource
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
+   private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
+     withClue(df.queryExecution) {
+       val plan = df.queryExecution.executedPlan
+-      val actual = collectWithSubqueries(plan) { case p: ProjectExec => p 
}.size
++      val actual = collectWithSubqueries(plan) {
++        case p: ProjectExec => p
++        case p: CometProjectExec => p
++      }.size
+       assert(actual == expected)
+     }
+   }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+index ac710c32296..37746bd470d 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+@@ -616,7 +616,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
+           .write.mode(SaveMode.Overwrite).parquet(path)
+ 
+         withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
+-            SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
++            SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
++            // Disable Comet native execution because this checks wholestage 
codegen.
++            "spark.comet.exec.enabled" -> "false") {
+           val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as 
newC$i")
+           val df = spark.read.parquet(path).selectExpr(projection: _*)
+ 
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+index bd9c79e5b96..ab7584e768e 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.SchemaPruningTest
+ import org.apache.spark.sql.catalyst.expressions.Concat
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+ import org.apache.spark.sql.catalyst.plans.logical.Expand
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.functions._
+@@ -867,6 +868,7 @@ abstract class SchemaPruningSuite
+     val fileSourceScanSchemata =
+       collect(df.queryExecution.executedPlan) {
+         case scan: FileSourceScanExec => scan.requiredSchema
++        case scan: CometScanExec => scan.requiredSchema
+       }
+     assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+       s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+index 1d2e467c94c..77a119505b9 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, 
GlobFilter, Path}
+ import org.mockito.Mockito.{mock, when}
+ 
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.{DataFrame, DisableCometSuite, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.encoders.RowEncoder
+ import org.apache.spark.sql.execution.datasources.PartitionedFile
+ import org.apache.spark.sql.functions.col
+@@ -38,7 +38,9 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types._
+ import org.apache.spark.util.Utils
+ 
+-class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
++// For some reason this suite is flaky w/ or w/o Comet when running in Github 
workflow.
++// Since it isn't related to Comet, we disable it for now.
++class BinaryFileFormatSuite extends QueryTest with SharedSparkSession with 
DisableCometSuite {
+   import BinaryFileFormat._
+ 
+   private var testDir: String = _
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+index 07e2849ce6f..264fb61db16 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+ 
+ import org.apache.spark.TestUtils
+ import org.apache.spark.memory.MemoryMode
+-import org.apache.spark.sql.Row
++import org.apache.spark.sql.{DisableComet, Row}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -201,7 +201,8 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSparkSess
+     }
+   }
+ 
+-  test("parquet v2 pages - rle encoding for boolean value columns") {
++  test("parquet v2 pages - rle encoding for boolean value columns",
++      DisableComet("Comet doesn't support RLE encoding yet")) {
+     val extraOptions = Map[String, String](
+       ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
+     )
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+index 9adcb43c838..84c4db4a727 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+@@ -1025,7 +1025,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+           // When a filter is pushed to Parquet, Parquet can apply it to 
every row.
+           // So, we can check the number of rows returned from the Parquet
+           // to make sure our filter pushdown work.
+-          assert(stripSparkFilter(df).count == 1)
++          // Similar to Spark's vectorized reader, Comet doesn't do row-level 
filtering but relies
++          // on Spark to apply the data filters after columnar batches are 
returned
++          if (!isCometEnabled) {
++            assert(stripSparkFilter(df).count == 1)
++          }
+         }
+       }
+     }
+@@ -1510,7 +1514,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+           // than the total length but should not be a single record.
+           // Note that, if record level filtering is enabled, it should be a 
single record.
+           // If no filter is pushed down to Parquet, it should be the total 
length of data.
+-          assert(actual > 1 && actual < data.length)
++          // Only enable Comet test iff it's scan only, since with native 
execution
++          // `stripSparkFilter` can't remove the native filter
++          if (!isCometEnabled || isCometScanOnly) {
++            assert(actual > 1 && actual < data.length)
++          }
+         }
+       }
+     }
+@@ -1537,7 +1545,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+         // than the total length but should not be a single record.
+         // Note that, if record level filtering is enabled, it should be a 
single record.
+         // If no filter is pushed down to Parquet, it should be the total 
length of data.
+-        assert(actual > 1 && actual < data.length)
++        // Only enable Comet test iff it's scan only, since with native 
execution
++        // `stripSparkFilter` can't remove the native filter
++        if (!isCometEnabled || isCometScanOnly) {
++          assert(actual > 1 && actual < data.length)
++        }
+       }
+     }
+   }
+@@ -1673,7 +1685,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
++  test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
++      DisableComet("IN predicate is not yet supported in Comet, see issue 
#36")) {
+     val schema = StructType(Seq(
+       StructField("a", IntegerType, nullable = false)
+     ))
+@@ -1914,7 +1927,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("Support Parquet column index") {
++  test("Support Parquet column index",
++      DisableComet("Comet doesn't support Parquet column index yet")) {
+     // block 1:
+     //                      null count  min                                   
    max
+     // page-0                         0  0                                    
     99
+@@ -2206,7 +2220,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+           assert(pushedParquetFilters.exists(_.getClass === filterClass),
+             s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
+ 
+-          checker(stripSparkFilter(query), expected)
++          // Similar to Spark's vectorized reader, Comet doesn't do row-level 
filtering but relies
++          // on Spark to apply the data filters after columnar batches are 
returned
++          if (!isCometEnabled) {
++            checker(stripSparkFilter(query), expected)
++          }
+         } else {
+           assert(selectedFilters.isEmpty, "There is filter pushed down")
+         }
+@@ -2266,7 +2284,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+           assert(pushedParquetFilters.exists(_.getClass === filterClass),
+             s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
+ 
+-          checker(stripSparkFilter(query), expected)
++          // Similar to Spark's vectorized reader, Comet doesn't do row-level 
filtering but relies
++          // on Spark to apply the data filters after columnar batches are 
returned
++          if (!isCometEnabled) {
++            checker(stripSparkFilter(query), expected)
++          }
+ 
+         case _ =>
+           throw new AnalysisException("Can not match ParquetTable in the 
query.")
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+index 8670d95c65e..4a16d9f6ff4 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") {
++  test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings",
++      DisableComet("Comet doesn't support DELTA encoding yet")) {
+     withAllParquetReaders {
+       checkAnswer(
+         // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+index 2e7b26126d2..f7368eb026e 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+@@ -1029,7 +1029,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+         checkAnswer(readParquet(schema, path), df)
+       }
+ 
+-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++          "spark.comet.enabled" -> "false") {
+         val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+         checkAnswer(readParquet(schema1, path), df)
+         val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+@@ -1051,7 +1052,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+       val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
+       df.write.parquet(path.toString)
+ 
+-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++          "spark.comet.enabled" -> "false") {
+         checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+         checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+         checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 
123456.0"))
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+index 240bb4e6dcb..c37b92d3691 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+@@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths, StandardCopyOption}
+ import java.sql.{Date, Timestamp}
+ 
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, 
SparkUpgradeException}
+-import org.apache.spark.sql.{QueryTest, Row, 
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, 
SPARK_TIMEZONE_METADATA_KEY}
++import org.apache.spark.sql.{DisableCometSuite, QueryTest, Row, 
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, 
SPARK_TIMEZONE_METADATA_KEY}
+ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, 
ParquetOutputTimestampType}
+@@ -30,9 +30,11 @@ import 
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96,
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.tags.SlowSQLTest
+ 
++// Comet is disabled for this suite because it doesn't support datetime 
rebase mode
+ abstract class ParquetRebaseDatetimeSuite
+   extends QueryTest
+   with ParquetTest
++  with DisableCometSuite
+   with SharedSparkSession {
+ 
+   import testImplicits._
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+index 351c6d698fc..36492fe936d 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, 
ParquetOutputFormat}
+ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+ 
+ import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -230,6 +231,12 @@ class ParquetRowIndexSuite extends QueryTest with 
SharedSparkSession {
+             case f: FileSourceScanExec =>
+               numPartitions += f.inputRDD.partitions.length
+               numOutputRows += f.metrics("numOutputRows").value
++            case b: CometScanExec =>
++              numPartitions += b.inputRDD.partitions.length
++              numOutputRows += b.metrics("numOutputRows").value
++            case b: CometBatchScanExec =>
++              numPartitions += b.inputRDD.partitions.length
++              numOutputRows += b.metrics("numOutputRows").value
+             case _ =>
+           }
+           assert(numPartitions > 0)
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+index 5c0b7def039..151184bc98c 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.DataFrame
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.SchemaPruningSuite
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -56,6 +57,7 @@ class ParquetV2SchemaPruningSuite extends 
ParquetSchemaPruningSuite {
+     val fileSourceScanSchemata =
+       collect(df.queryExecution.executedPlan) {
+         case scan: BatchScanExec => 
scan.scan.asInstanceOf[ParquetScan].readDataSchema
++        case scan: CometBatchScanExec => 
scan.scan.asInstanceOf[ParquetScan].readDataSchema
+       }
+     assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+       s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+index bf5c51b89bb..2c7f9701eeb 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+@@ -27,6 +27,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+ import org.apache.parquet.schema.Type._
+ 
+ import org.apache.spark.SparkException
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.ScalaReflection
+ import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.desc
+@@ -1016,7 +1017,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     e
+   }
+ 
+-  test("schema mismatch failure error message for parquet reader") {
++  test("schema mismatch failure error message for parquet reader",
++      DisableComet("Comet doesn't work with vectorizedReaderEnabled = 
false")) {
+     withTempPath { dir =>
+       val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = false)
+       val expectedMessage = "Encountered error while reading file"
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+index 3a0bd35cb70..ef351a56ec8 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug
+ import java.io.ByteArrayOutputStream
+ 
+ import org.apache.spark.rdd.RDD
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.Attribute
+ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with 
DisableAdaptiveExecutionSui
+          | id LongType: {}""".stripMargin))
+   }
+ 
+-  test("SPARK-28537: DebugExec cannot debug columnar related queries") {
++  test("SPARK-28537: DebugExec cannot debug columnar related queries",
++      DisableComet("Comet does not use FileScan")) {
+     withTempPath { workDir =>
+       val workDirPath = workDir.getAbsolutePath
+       val input = spark.range(5).toDF("id")
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+index 26e61c6b58d..2a7c96d164a 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+@@ -737,7 +737,8 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
+     }
+   }
+ 
+-  test("SPARK-26327: FileSourceScanExec metrics") {
++  test("SPARK-26327: FileSourceScanExec metrics",
++      DisableComet("Spark uses row-based Parquet reader while Comet is 
vectorized")) {
+     withTable("testDataForScan") {
+       spark.range(10).selectExpr("id", "id % 3 as p")
+         .write.partitionBy("p").saveAsTable("testDataForScan")
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+index 0ab8691801d..d9125f658ad 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution.python
+ 
+ import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, 
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+ 
+           val scanNodes = query.queryExecution.executedPlan.collect {
+             case scan: FileSourceScanExec => scan
++            case scan: CometScanExec => scan
+           }
+           assert(scanNodes.length == 1)
+           assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+ 
+           val scanNodes = query.queryExecution.executedPlan.collect {
+             case scan: FileSourceScanExec => scan
++            case scan: CometScanExec => scan
+           }
+           assert(scanNodes.length == 1)
+           // $"a" is not null and $"a" > 1
+-          assert(scanNodes.head.dataFilters.length == 2)
+-          
assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
++          val dataFilters = scanNodes.head match {
++            case scan: FileSourceScanExec => scan.dataFilters
++            case scan: CometScanExec => scan.dataFilters
++          }
++          assert(dataFilters.length == 2)
++          assert(dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
+         }
+       }
+     }
+@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+ 
+           val scanNodes = query.queryExecution.executedPlan.collect {
+             case scan: BatchScanExec => scan
++            case scan: CometBatchScanExec => scan
+           }
+           assert(scanNodes.length == 1)
+           assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+ 
+           val scanNodes = query.queryExecution.executedPlan.collect {
+             case scan: BatchScanExec => scan
++            case scan: CometBatchScanExec => scan
+           }
+           assert(scanNodes.length == 1)
+           // $"a" is not null and $"a" > 1
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+index d083cac48ff..43057eb251b 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+@@ -37,8 +37,10 @@ import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException,
+ import org.apache.spark.sql.streaming.util.StreamManualClock
+ import org.apache.spark.util.Utils
+ 
++// For some reason this suite is flaky w/ or w/o Comet when running in Github 
workflow.
++// Since it isn't related to Comet, we disable it for now.
+ class AsyncProgressTrackingMicroBatchExecutionSuite
+-  extends StreamTest with BeforeAndAfter with Matchers {
++  extends StreamTest with BeforeAndAfter with Matchers with DisableCometSuite 
{
+ 
+   import testImplicits._
+ 
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+index 266bb343526..85ec36db996 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
+ import org.apache.spark.sql.catalyst.expressions
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, 
SparkPlan}
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.BucketingUtils
+@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+     }
+   }
+ 
+-  private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+-    val fileScan = collect(plan) { case f: FileSourceScanExec => f }
++  private def getFileScan(plan: SparkPlan): SparkPlan = {
++    val fileScan = collect(plan) {
++      case f: FileSourceScanExec => f
++      case f: CometScanExec => f
++    }
+     assert(fileScan.nonEmpty, plan)
+     fileScan.head
+   }
+ 
++  private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) 
match {
++    case fs: FileSourceScanExec => fs.bucketedScan
++    case bs: CometScanExec => bs.bucketedScan
++  }
++
+   // To verify if the bucket pruning works, this function checks two 
conditions:
+   //   1) Check if the pruned buckets (before filtering) are empty.
+   //   2) Verify the final result is the same as the expected one
+@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+           val planWithoutBucketedScan = 
bucketedDataFrame.filter(filterCondition)
+             .queryExecution.executedPlan
+           val fileScan = getFileScan(planWithoutBucketedScan)
+-          assert(!fileScan.bucketedScan, s"except no bucketed scan but 
found\n$fileScan")
++          val bucketedScan = getBucketScan(planWithoutBucketedScan)
++          assert(!bucketedScan, s"except no bucketed scan but 
found\n$fileScan")
+ 
+           val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
+           val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
+@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+ 
+         // check existence of shuffle
+         assert(
+-          joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) == 
shuffleLeft,
++          joinOperator.left.find { op =>
++            op.isInstanceOf[SortExec] ||
++              (op.isInstanceOf[CometExec] &&
++                
op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined)
++          }.isDefined == sortLeft,
++
+           s"expected shuffle in plan to be $shuffleLeft but 
found\n${joinOperator.left}")
+         assert(
+-          joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) == 
shuffleRight,
++          joinOperator.right.find { op =>
++            op.isInstanceOf[SortExec] ||
++              (op.isInstanceOf[CometExec] &&
++                
op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined)
++          }.isDefined == sortRight,
+           s"expected shuffle in plan to be $shuffleRight but 
found\n${joinOperator.right}")
+ 
+         // check existence of sort
+         assert(
+-          joinOperator.left.exists(_.isInstanceOf[SortExec]) == sortLeft,
++          joinOperator.left.exists(op => op.isInstanceOf[SortExec] || 
op.isInstanceOf[CometExec] &&
++            op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec]) 
== sortLeft,
+           s"expected sort in the left child to be $sortLeft but 
found\n${joinOperator.left}")
+         assert(
+-          joinOperator.right.exists(_.isInstanceOf[SortExec]) == sortRight,
++          joinOperator.right.exists(op => op.isInstanceOf[SortExec] || 
op.isInstanceOf[CometExec] &&
++            op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec]) 
== sortRight,
+           s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
+ 
+         // check the output partitioning
+@@ -835,11 +856,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+       df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
+ 
+       val scanDF = spark.table("bucketed_table").select("j")
+-      assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan)
++      assert(!getBucketScan(scanDF.queryExecution.executedPlan))
+       checkAnswer(scanDF, df1.select("j"))
+ 
+       val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k"))
+-      assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan)
++      assert(!getBucketScan(aggDF.queryExecution.executedPlan))
+       checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
+     }
+   }
+@@ -1031,10 +1052,16 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+ 
+           val scans = plan.collect {
+             case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
++            case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined 
=> b
+           }
+           if (expectedCoalescedNumBuckets.isDefined) {
+             assert(scans.length == 1)
+-            assert(scans.head.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++            scans.head match {
++              case f: FileSourceScanExec =>
++                assert(f.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++              case b: CometScanExec =>
++                assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++            }
+           } else {
+             assert(scans.isEmpty)
+           }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+index b5f6d2f9f68..8e84ec3f070 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources
+ import java.io.File
+ 
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.AnalysisException
++import org.apache.spark.sql.{AnalysisException, DisableCometSuite}
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -28,7 +28,10 @@ import 
org.apache.spark.sql.internal.SQLConf.BUCKETING_MAX_BUCKETS
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.util.Utils
+ 
+-class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession 
{
++// For some reason this suite is flaky w/ or w/o Comet when running in Github 
workflow.
++// Since it isn't related to Comet, we disable it for now.
++class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession
++    with DisableCometSuite {
+   import testImplicits._
+ 
+   protected override lazy val sql = spark.sql _
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+index 1f55742cd67..42377f7cf26 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
+ import org.apache.spark.sql.QueryTest
+ import org.apache.spark.sql.catalyst.expressions.AttributeReference
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ 
+     def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): 
Unit = {
+       val plan = sql(query).queryExecution.executedPlan
+-      val bucketedScan = collect(plan) { case s: FileSourceScanExec if 
s.bucketedScan => s }
++      val bucketedScan = collect(plan) {
++        case s: FileSourceScanExec if s.bucketedScan => s
++        case s: CometScanExec if s.bucketedScan => s
++      }
+       assert(bucketedScan.length == expectedNumBucketedScan)
+     }
+ 
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+index 75f440caefc..36b1146bc3a 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+@@ -34,6 +34,7 @@ import org.apache.spark.paths.SparkPath
+ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+ import org.apache.spark.sql.{AnalysisException, DataFrame}
+ import org.apache.spark.sql.catalyst.util.stringToFile
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.DataSourceScanExec
+ import org.apache.spark.sql.execution.datasources._
+ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2Relation, FileScan, FileTable}
+@@ -748,6 +749,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+       val fileScan = df.queryExecution.executedPlan.collect {
+         case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
+           batch.scan.asInstanceOf[FileScan]
++        case batch: CometBatchScanExec if batch.scan.isInstanceOf[FileScan] =>
++          batch.scan.asInstanceOf[FileScan]
+       }.headOption.getOrElse {
+         fail(s"No FileScan in query\n${df.queryExecution}")
+       }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+index abe606ad9c1..438e7494473 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+@@ -22,7 +22,7 @@ import java.util
+ 
+ import org.scalatest.BeforeAndAfter
+ 
+-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DisableComet, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+@@ -327,7 +327,8 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
+     }
+   }
+ 
+-  test("explain with table on DSv1 data source") {
++  test("explain with table on DSv1 data source",
++      DisableComet("Comet explain output is different")) {
+     val tblSourceName = "tbl_src"
+     val tblTargetName = "tbl_target"
+     val tblSourceQualified = s"default.$tblSourceName"
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+index dd55fcfe42c..e83348242d3 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
+ import org.apache.spark.sql.catalyst.plans.PlanTestBase
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+ import org.apache.spark.sql.catalyst.util._
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.FilterExec
+ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
+ import org.apache.spark.sql.execution.datasources.DataSourceUtils
+@@ -126,7 +127,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
+         }
+       }
+     } else {
+-      super.test(testName, testTags: _*)(testFun)
++      if (isCometEnabled && testTags.exists(_.isInstanceOf[DisableComet])) {
++        ignore(testName + " (disabled when Comet is on)", testTags: 
_*)(testFun)
++      } else {
++        super.test(testName, testTags: _*)(testFun)
++      }
+     }
+   }
+ 
+@@ -242,6 +247,23 @@ private[sql] trait SQLTestUtilsBase
+     protected override def _sqlContext: SQLContext = self.spark.sqlContext
+   }
+ 
++  /**
++   * Whether Comet extension is enabled
++   */
++  protected def isCometEnabled: Boolean = {
++    val v = System.getenv("ENABLE_COMET")
++    v != null && v.toBoolean
++  }
++
++  /**
++   * Whether Spark should only apply Comet scan optimization. This is only 
effective when
++   * [[isCometEnabled]] returns true.
++   */
++  protected def isCometScanOnly: Boolean = {
++    val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
++    v != null && v.toBoolean
++  }
++
+   protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): 
Unit = {
+     SparkSession.setActiveSession(spark)
+     super.withSQLConf(pairs: _*)(f)
+@@ -434,6 +456,8 @@ private[sql] trait SQLTestUtilsBase
+     val schema = df.schema
+     val withoutFilters = df.queryExecution.executedPlan.transform {
+       case FilterExec(_, child) => child
++      case CometFilterExec(_, _, _, child, _) => child
++      case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), 
_) => child
+     }
+ 
+     spark.internalCreateDataFrame(withoutFilters.execute(), schema)
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+index ed2e309fa07..3767d4e7ca4 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+@@ -74,6 +74,18 @@ trait SharedSparkSessionBase
+       // this rule may potentially block testing of other optimization rules 
such as
+       // ConstantPropagation etc.
+       .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
++    // Enable Comet if `ENABLE_COMET` environment variable is set
++    if (isCometEnabled) {
++      conf
++        .set("spark.sql.extensions", 
"org.apache.comet.CometSparkSessionExtensions")
++        .set("spark.comet.enabled", "true")
++
++      if (!isCometScanOnly) {
++        conf
++          .set("spark.comet.exec.enabled", "true")
++          .set("spark.comet.exec.all.enabled", "true")
++      }
++    }
+     conf.set(
+       StaticSQLConf.WAREHOUSE_PATH,
+       conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + 
getClass.getCanonicalName)
+diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+index 52abd248f3a..7a199931a08 100644
+--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
++++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
+ 
+ import org.apache.spark.sql._
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Expression}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution._
+ import 
org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.hive.execution.HiveTableScanExec
+@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
+       case s: FileSourceScanExec => s.partitionFilters.collect {
+         case d: DynamicPruningExpression => d.child
+       }
++      case s: CometScanExec => s.partitionFilters.collect {
++        case d: DynamicPruningExpression => d.child
++      }
+       case h: HiveTableScanExec => h.partitionPruningPred.collect {
+         case d: DynamicPruningExpression => d.child
+       }
+diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+index 07361cfdce9..545b3184c23 100644
+--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+@@ -55,25 +55,43 @@ object TestHive
+     new SparkContext(
+       System.getProperty("spark.sql.test.master", "local[1]"),
+       "TestSQLContext",
+-      new SparkConf()
+-        .set("spark.sql.test", "")
+-        .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+-        .set(SQLConf.CODEGEN_FACTORY_MODE.key, 
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+-        .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+-          "org.apache.spark.sql.hive.execution.PairSerDe")
+-        .set(WAREHOUSE_PATH.key, 
TestHiveContext.makeWarehouseDir().toURI.getPath)
+-        // SPARK-8910
+-        .set(UI_ENABLED, false)
+-        .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+-        // Hive changed the default of 
hive.metastore.disallow.incompatible.col.type.changes
+-        // from false to true. For details, see the JIRA HIVE-12320 and 
HIVE-17764.
+-        
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", 
"false")
+-        // Disable ConvertToLocalRelation for better test coverage. Test 
cases built on
+-        // LocalRelation will exercise the optimization rules better by 
disabling it as
+-        // this rule may potentially block testing of other optimization 
rules such as
+-        // ConstantPropagation etc.
+-        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)))
++      {
++        val conf = new SparkConf()
++          .set("spark.sql.test", "")
++          .set(SQLConf.CODEGEN_FALLBACK.key, "false")
++          .set(SQLConf.CODEGEN_FACTORY_MODE.key, 
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
++          .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
++            "org.apache.spark.sql.hive.execution.PairSerDe")
++          .set(WAREHOUSE_PATH.key, 
TestHiveContext.makeWarehouseDir().toURI.getPath)
++          // SPARK-8910
++          .set(UI_ENABLED, false)
++          .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
++          // Hive changed the default of 
hive.metastore.disallow.incompatible.col.type.changes
++          // from false to true. For details, see the JIRA HIVE-12320 and 
HIVE-17764.
++          
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", 
"false")
++          // Disable ConvertToLocalRelation for better test coverage. Test 
cases built on
++          // LocalRelation will exercise the optimization rules better by 
disabling it as
++          // this rule may potentially block testing of other optimization 
rules such as
++          // ConstantPropagation etc.
++          .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
++
++        val v = System.getenv("ENABLE_COMET")
++        if (v != null && v.toBoolean) {
++          conf
++            .set("spark.sql.extensions", 
"org.apache.spark.CometSparkSessionExtensions")
++            .set("spark.comet.enabled", "true")
++
++          val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
++          if (v == null || !v.toBoolean) {
++            conf
++              .set("spark.comet.exec.enabled", "true")
++              .set("spark.comet.exec.all.enabled", "true")
++          }
++        }
+ 
++        conf
++      }
++    ))
+ 
+ case class TestHiveVersion(hiveClient: HiveClient)
+   extends TestHiveContext(TestHive.sparkContext, hiveClient)
diff --git a/pom.xml b/pom.xml
index f19ce70..7077aa3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -859,6 +859,7 @@ under the License.
             <exclude>**/.settings/**</exclude>
             <exclude>**/build/**</exclude>
             <exclude>**/target/**</exclude>
+            <exclude>**/apache-spark/**</exclude>
             <exclude>.git/**</exclude>
             <exclude>.github/**</exclude>
             <exclude>.gitignore</exclude>

Reply via email to