kazuyukitanimura commented on code in PR #166:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/166#discussion_r1512479029
##########
dev/diffs/3.4.2.diff:
##########
@@ -0,0 +1,1176 @@
+diff --git a/pom.xml b/pom.xml
+index fab98342498..f2156d790d1 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -148,6 +148,8 @@
+ <chill.version>0.10.0</chill.version>
+ <ivy.version>2.5.1</ivy.version>
+ <oro.version>2.0.8</oro.version>
++ <spark.version.short>3.4</spark.version.short>
++ <comet.version>0.1.0-SNAPSHOT</comet.version>
+ <!--
+ If you changes codahale.metrics.version, you also need to change
+ the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2766,6 +2768,25 @@
+ <artifactId>arpack</artifactId>
+ <version>${netlib.ludovic.dev.version}</version>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ <version>${comet.version}</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-sql_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-core_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index 5b6cc8cb7af..5ce708adc38 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -77,6 +77,10 @@
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.comet</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ </dependency>
+
+ <!--
+ This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+index c595b50950b..483508dc076 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+@@ -26,6 +26,8 @@ import scala.collection.JavaConverters._
+ import scala.reflect.runtime.universe.TypeTag
+ import scala.util.control.NonFatal
+
++import org.apache.comet.CometConf
++
+ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+ import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
+ import org.apache.spark.api.java.JavaRDD
+@@ -102,7 +104,7 @@ class SparkSession private(
+ sc: SparkContext,
+ initialSessionOptions: java.util.HashMap[String, String]) = {
+ this(sc, None, None,
+- SparkSession.applyExtensions(
++ SparkSession.applyExtensions(sc,
+
sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ new SparkSessionExtensions), initialSessionOptions.asScala.toMap)
+ }
+@@ -1028,7 +1030,7 @@ object SparkSession extends Logging {
+ }
+
+ loadExtensions(extensions)
+- applyExtensions(
++ applyExtensions(sparkContext,
+
sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
+ extensions)
+
+@@ -1282,14 +1284,24 @@ object SparkSession extends Logging {
+ }
+ }
+
++ private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++ if (sparkContext.getConf.getBoolean(CometConf.COMET_ENABLED.key, false)) {
++ Seq("org.apache.comet.CometSparkSessionExtensions")
++ } else {
++ Seq.empty
++ }
++ }
++
+ /**
+ * Initialize extensions for given extension classnames. The classes will
be applied to the
+ * extensions passed into this function.
+ */
+ private def applyExtensions(
++ sparkContext: SparkContext,
+ extensionConfClassNames: Seq[String],
+ extensions: SparkSessionExtensions): SparkSessionExtensions = {
+- extensionConfClassNames.foreach { extensionConfClassName =>
++ val extensionClassNames = extensionConfClassNames ++
loadCometExtension(sparkContext)
++ extensionClassNames.foreach { extensionConfClassName =>
+ try {
+ val extensionConfClass = Utils.classForName(extensionConfClassName)
+ val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index db587dd9868..aac7295a53d 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.annotation.DeveloperApi
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+ // dump the file scan metadata (e.g file path) to event log
+ val metadata = plan match {
+ case fileScan: FileSourceScanExec => fileScan.metadata
++ case cometScan: CometScanExec => cometScan.metadata
+ case _ => Map[String, String]()
+ }
+ new SparkPlanInfo(
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 698ca009b4f..57d774a3617 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ -- Test tables
+ CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+index 1152d77da0c..f77493f690b 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part1.sql
+@@ -7,6 +7,9 @@
+
+ -- avoid bit-exact output here because operations may not be bit-exact.
+ -- SET extra_float_digits = 0;
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..44cd244d3b0 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -5,6 +5,9 @@
+ -- AGGREGATES [Part 3]
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ -- Test aggregate operator with codegen on and off.
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..2b73732c33f 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- INT8
+ -- Test int8 64-bit integers.
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..423d3b3d76d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -1,6 +1,10 @@
+ --
+ -- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ --
++
++-- Disable Comet exec due to floating point precision difference
++--SET spark.comet.exec.enabled = false
++
+ --
+ -- SELECT_HAVING
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index 56e9520fdab..917932336df 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+
+ withTempDatabase { dbName =>
+ withTable(table1Name, table2Name) {
+- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++ withSQLConf(
++ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++ "spark.comet.enabled" -> "false") {
+ spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+ spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+new file mode 100644
+index 00000000000..07687f6685a
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala
+@@ -0,0 +1,39 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++case class DisableComet(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default
config values.
++ */
++trait DisableCometSuite extends SQLTestUtils {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
++ (implicit pos: Position): Unit = {
++ if (isCometEnabled) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
++ }
++}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index f33432ddb6f..fe9f74ff8f1 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case _ => Nil
+ }
+ }
+@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ case s: BatchScanExec =>
+ // we use f1 col for v2 tables due to schema pruning
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
++ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
+ case _ => false
+ }
+ assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index a6b295578d6..d5e25564bb9 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("Explain formatted output for scan operator for datasource V2") {
++ test("Explain formatted output for scan operator for datasource V2",
++ DisableComet("Comet explain output is different")) {
+ withTempDir { dir =>
+ Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+ val basePath = dir.getCanonicalPath + "/" + fmt
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 2796b1cf154..94591f83c84 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT,
NullData, NullUDT}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GreaterThan, Literal}
+ import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -875,6 +876,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -916,6 +918,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _,
_, _, _), _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1100,6 +1103,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ val filters = df.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f.dataFilters
+ case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++ case b: CometScanExec => b.dataFilters
++ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
+ }.flatten
+ assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+index b5b34922694..5fa734d30e1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+@@ -69,7 +69,7 @@ import org.apache.spark.tags.ExtendedSQLTest
+ * }}}
+ */
+ // scalastyle:on line.size.limit
+-trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with
DisableCometSuite {
+
+ protected val baseResourcePath = {
+ // use the same way as `SQLQueryTestSuite` to get the resource path
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+index 3cfda19134a..afcfba37c6f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+
+ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join,
LogicalPlan, Project, Sort, Union}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.FileScanRDD
+@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
++ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
++ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++ fs.inputRDDs().forall(
++ _.asInstanceOf[FileScanRDD].filePartitions.forall(
++ _.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case _ => false
+ })
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+index cfc8b2cc845..c6fcfd7bd08 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
+ import org.apache.spark.sql.connector.read.ScanBuilder
+ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+ val df = spark.read.format(format).load(path.getCanonicalPath)
+ checkAnswer(df, inputData.toDF())
+ assert(
+-
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
++ df.queryExecution.executedPlan.exists {
++ case _: FileSourceScanExec | _: CometScanExec => true
++ case _ => false
++ }
++ )
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+index 418ca3430bb..d5fc207601c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+@@ -23,7 +23,7 @@ import scala.util.Random
+ import org.apache.hadoop.fs.Path
+
+ import org.apache.spark.SparkConf
+-import org.apache.spark.sql.{DataFrame, QueryTest}
++import org.apache.spark.sql.{DataFrame, DisableComet, QueryTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
+ import org.apache.spark.sql.internal.SQLConf
+@@ -195,7 +195,7 @@ class DataSourceV2ScanExecRedactionSuite extends
DataSourceScanRedactionTest {
+ }
+ }
+
+- test("FileScan description") {
++ test("FileScan description", DisableComet("Comet doesn't use BatchScan")) {
+ Seq("json", "orc", "parquet").foreach { format =>
+ withTempPath { path =>
+ val dir = path.getCanonicalPath
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+index 9e9d717db3b..91a4f9a38d5 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+@@ -18,6 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
++import org.apache.spark.sql.comet.CometProjectExec
+ import org.apache.spark.sql.connector.SimpleWritableDataSource
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase
+ private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = {
+ withClue(df.queryExecution) {
+ val plan = df.queryExecution.executedPlan
+- val actual = collectWithSubqueries(plan) { case p: ProjectExec => p
}.size
++ val actual = collectWithSubqueries(plan) {
++ case p: ProjectExec => p
++ case p: CometProjectExec => p
++ }.size
+ assert(actual == expected)
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+index ac710c32296..37746bd470d 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+@@ -616,7 +616,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
+ .write.mode(SaveMode.Overwrite).parquet(path)
+
+ withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
+- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
++ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
++ // Disable Comet native execution because this checks wholestage
codegen.
++ "spark.comet.exec.enabled" -> "false") {
+ val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
+ val df = spark.read.parquet(path).selectExpr(projection: _*)
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+index bd9c79e5b96..ab7584e768e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.SchemaPruningTest
+ import org.apache.spark.sql.catalyst.expressions.Concat
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+ import org.apache.spark.sql.catalyst.plans.logical.Expand
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.functions._
+@@ -867,6 +868,7 @@ abstract class SchemaPruningSuite
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: FileSourceScanExec => scan.requiredSchema
++ case scan: CometScanExec => scan.requiredSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+index 07e2849ce6f..264fb61db16 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
+
+ import org.apache.spark.TestUtils
+ import org.apache.spark.memory.MemoryMode
+-import org.apache.spark.sql.Row
++import org.apache.spark.sql.{DisableComet, Row}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -201,7 +201,8 @@ class ParquetEncodingSuite extends
ParquetCompatibilityTest with SharedSparkSess
+ }
+ }
+
+- test("parquet v2 pages - rle encoding for boolean value columns") {
++ test("parquet v2 pages - rle encoding for boolean value columns",
++ DisableComet("Comet doesn't support RLE encoding yet")) {
+ val extraOptions = Map[String, String](
+ ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
+ )
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+index 9adcb43c838..e04dc5b4246 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+@@ -1025,7 +1025,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // When a filter is pushed to Parquet, Parquet can apply it to
every row.
+ // So, we can check the number of rows returned from the Parquet
+ // to make sure our filter pushdown work.
+- assert(stripSparkFilter(df).count == 1)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ assert(stripSparkFilter(df).count == 1)
++ }
+ }
+ }
+ }
+@@ -1510,7 +1514,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1537,7 +1545,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ // than the total length but should not be a single record.
+ // Note that, if record level filtering is enabled, it should be a
single record.
+ // If no filter is pushed down to Parquet, it should be the total
length of data.
+- assert(actual > 1 && actual < data.length)
++ // Only enable Comet test iff it's scan only, since with native
execution
++ // `stripSparkFilter` can't remove the native filter
++ if (!isCometEnabled || isCometScanOnly) {
++ assert(actual > 1 && actual < data.length)
++ }
+ }
+ }
+ }
+@@ -1914,7 +1926,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("Support Parquet column index") {
++ test("Support Parquet column index",
++ DisableComet("Comet doesn't support Parquet column index yet")) {
+ // block 1:
+ // null count min
max
+ // page-0 0 0
99
+@@ -2206,7 +2219,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+ } else {
+ assert(selectedFilters.isEmpty, "There is filter pushed down")
+ }
+@@ -2266,7 +2283,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+ assert(pushedParquetFilters.exists(_.getClass === filterClass),
+ s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
+
+- checker(stripSparkFilter(query), expected)
++ // Similar to Spark's vectorized reader, Comet doesn't do row-level
filtering but relies
++ // on Spark to apply the data filters after columnar batches are
returned
++ if (!isCometEnabled) {
++ checker(stripSparkFilter(query), expected)
++ }
+
+ case _ =>
+ throw new AnalysisException("Can not match ParquetTable in the
query.")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+index 8670d95c65e..4a16d9f6ff4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") {
++ test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings",
++ DisableComet("Comet doesn't support DELTA encoding yet")) {
+ withAllParquetReaders {
+ checkAnswer(
+ // "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+index 2e7b26126d2..f7368eb026e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+@@ -1029,7 +1029,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ checkAnswer(readParquet(schema, path), df)
+ }
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+ checkAnswer(readParquet(schema1, path), df)
+ val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+@@ -1051,7 +1052,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
+
+- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
++ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
++ "spark.comet.enabled" -> "false") {
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+index 240bb4e6dcb..c37b92d3691 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala
+@@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths, StandardCopyOption}
+ import java.sql.{Date, Timestamp}
+
+ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException,
SparkUpgradeException}
+-import org.apache.spark.sql.{QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
++import org.apache.spark.sql.{DisableCometSuite, QueryTest, Row,
SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY,
SPARK_TIMEZONE_METADATA_KEY}
+ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy,
ParquetOutputTimestampType}
+@@ -30,9 +30,11 @@ import
org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96,
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.tags.SlowSQLTest
+
++// Comet is disabled for this suite because it doesn't support datetime
rebase mode
+ abstract class ParquetRebaseDatetimeSuite
+ extends QueryTest
+ with ParquetTest
++ with DisableCometSuite
+ with SharedSparkSession {
+
+ import testImplicits._
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+index 351c6d698fc..36492fe936d 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader,
ParquetOutputFormat}
+ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+ import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -230,6 +231,12 @@ class ParquetRowIndexSuite extends QueryTest with
SharedSparkSession {
+ case f: FileSourceScanExec =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
++ case b: CometScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
++ case b: CometBatchScanExec =>
++ numPartitions += b.inputRDD.partitions.length
++ numOutputRows += b.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+index 5c0b7def039..151184bc98c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.DataFrame
+ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.SchemaPruningSuite
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+@@ -56,6 +57,7 @@ class ParquetV2SchemaPruningSuite extends
ParquetSchemaPruningSuite {
+ val fileSourceScanSchemata =
+ collect(df.queryExecution.executedPlan) {
+ case scan: BatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
++ case scan: CometBatchScanExec =>
scan.scan.asInstanceOf[ParquetScan].readDataSchema
+ }
+ assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
+ s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+index bf5c51b89bb..2c7f9701eeb 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+ import org.apache.parquet.schema.Type._
+
+ import org.apache.spark.SparkException
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.ScalaReflection
+ import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+ import org.apache.spark.sql.functions.desc
+@@ -1016,7 +1017,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ e
+ }
+
+- test("schema mismatch failure error message for parquet reader") {
++ test("schema mismatch failure error message for parquet reader",
++ DisableComet("Comet doesn't work with vectorizedReaderEnabled =
false")) {
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
+ val expectedMessage = "Encountered error while reading file"
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+index 3a0bd35cb70..ef351a56ec8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug
+ import java.io.ByteArrayOutputStream
+
+ import org.apache.spark.rdd.RDD
++import org.apache.spark.sql.DisableComet
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.Attribute
+ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with
DisableAdaptiveExecutionSui
+ | id LongType: {}""".stripMargin))
+ }
+
+- test("SPARK-28537: DebugExec cannot debug columnar related queries") {
++ test("SPARK-28537: DebugExec cannot debug columnar related queries",
++ DisableComet("Comet does not use FileScan")) {
+ withTempPath { workDir =>
+ val workDirPath = workDir.getAbsolutePath
+ val input = spark.range(5).toDF("id")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+index 26e61c6b58d..2a7c96d164a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+@@ -737,7 +737,8 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
+ }
+ }
+
+- test("SPARK-26327: FileSourceScanExec metrics") {
++ test("SPARK-26327: FileSourceScanExec metrics",
++ DisableComet("Spark uses row-based Parquet reader while Comet is
vectorized")) {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+index 0ab8691801d..df9e47fdc7a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+@@ -18,9 +18,9 @@
+ package org.apache.spark.sql.execution.python
+
+ import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+ import org.apache.spark.sql.functions.col
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -108,6 +108,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -120,11 +121,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+- assert(scanNodes.head.dataFilters.length == 2)
+-
assert(scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
++ val dataFilters = scanNodes.head match {
++ case scan: FileSourceScanExec => scan.dataFilters
++ case scan: CometScanExec => scan.dataFilters
++ }
++ assert(dataFilters.length == 2)
++ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+@@ -145,6 +151,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+@@ -157,12 +164,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExec => scan
++ case scan: CometScanExec => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+- val filters =
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+- assert(filters.length == 2)
+- assert(filters.flatMap(_.references).distinct === Array("a"))
++ val dataFilters = scanNodes.head match {
++ case scan: FileSourceScanExec => scan.dataFilters
++ case scan: CometScanExec => scan.dataFilters
++ }
++ assert(dataFilters.length == 2)
++ assert(dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+index 266bb343526..85ec36db996 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
+ import org.apache.spark.sql.catalyst.expressions
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec,
SparkPlan}
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.BucketingUtils
+@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- private def getFileScan(plan: SparkPlan): FileSourceScanExec = {
+- val fileScan = collect(plan) { case f: FileSourceScanExec => f }
++ private def getFileScan(plan: SparkPlan): SparkPlan = {
++ val fileScan = collect(plan) {
++ case f: FileSourceScanExec => f
++ case f: CometScanExec => f
++ }
+ assert(fileScan.nonEmpty, plan)
+ fileScan.head
+ }
+
++ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan)
match {
++ case fs: FileSourceScanExec => fs.bucketedScan
++ case bs: CometScanExec => bs.bucketedScan
++ }
++
+ // To verify if the bucket pruning works, this function checks two
conditions:
+ // 1) Check if the pruned buckets (before filtering) are empty.
+ // 2) Verify the final result is the same as the expected one
+@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ val planWithoutBucketedScan =
bucketedDataFrame.filter(filterCondition)
+ .queryExecution.executedPlan
+ val fileScan = getFileScan(planWithoutBucketedScan)
+- assert(!fileScan.bucketedScan, s"except no bucketed scan but
found\n$fileScan")
++ val bucketedScan = getBucketScan(planWithoutBucketedScan)
++ assert(!bucketedScan, s"except no bucketed scan but
found\n$fileScan")
+
+ val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
+ val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
+@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+
+ // check existence of shuffle
+ assert(
+- joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleLeft,
++ joinOperator.left.find { op =>
++ op.isInstanceOf[SortExec] ||
++ (op.isInstanceOf[CometExec] &&
++
op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined)
++ }.isDefined == sortLeft,
++
+ s"expected shuffle in plan to be $shuffleLeft but
found\n${joinOperator.left}")
+ assert(
+- joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) ==
shuffleRight,
++ joinOperator.right.find { op =>
++ op.isInstanceOf[SortExec] ||
++ (op.isInstanceOf[CometExec] &&
++
op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined)
++ }.isDefined == sortRight,
+ s"expected shuffle in plan to be $shuffleRight but
found\n${joinOperator.right}")
+
+ // check existence of sort
+ assert(
+- joinOperator.left.exists(_.isInstanceOf[SortExec]) == sortLeft,
++ joinOperator.left.exists(op => op.isInstanceOf[SortExec] ||
op.isInstanceOf[CometExec] &&
++ op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec])
== sortLeft,
+ s"expected sort in the left child to be $sortLeft but
found\n${joinOperator.left}")
+ assert(
+- joinOperator.right.exists(_.isInstanceOf[SortExec]) == sortRight,
++ joinOperator.right.exists(op => op.isInstanceOf[SortExec] ||
op.isInstanceOf[CometExec] &&
++ op.asInstanceOf[CometExec].originalPlan.isInstanceOf[SortExec])
== sortRight,
+ s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
+
+ // check the output partitioning
+@@ -835,11 +856,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
+
+ val scanDF = spark.table("bucketed_table").select("j")
+- assert(!getFileScan(scanDF.queryExecution.executedPlan).bucketedScan)
++ assert(!getBucketScan(scanDF.queryExecution.executedPlan))
+ checkAnswer(scanDF, df1.select("j"))
+
+ val aggDF = spark.table("bucketed_table").groupBy("j").agg(max("k"))
+- assert(!getFileScan(aggDF.queryExecution.executedPlan).bucketedScan)
++ assert(!getBucketScan(aggDF.queryExecution.executedPlan))
+ checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
+ }
+ }
+@@ -1031,10 +1052,16 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+
+ val scans = plan.collect {
+ case f: FileSourceScanExec if
f.optionalNumCoalescedBuckets.isDefined => f
++ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined
=> b
+ }
+ if (expectedCoalescedNumBuckets.isDefined) {
+ assert(scans.length == 1)
+- assert(scans.head.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ scans.head match {
++ case f: FileSourceScanExec =>
++ assert(f.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ case b: CometScanExec =>
++ assert(b.optionalNumCoalescedBuckets ==
expectedCoalescedNumBuckets)
++ }
+ } else {
+ assert(scans.isEmpty)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+index 1f55742cd67..42377f7cf26 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
+ import org.apache.spark.sql.QueryTest
+ import org.apache.spark.sql.catalyst.expressions.AttributeReference
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.FileSourceScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+
+ def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int):
Unit = {
+ val plan = sql(query).queryExecution.executedPlan
+- val bucketedScan = collect(plan) { case s: FileSourceScanExec if
s.bucketedScan => s }
++ val bucketedScan = collect(plan) {
++ case s: FileSourceScanExec if s.bucketedScan => s
++ case s: CometScanExec if s.bucketedScan => s
++ }
+ assert(bucketedScan.length == expectedNumBucketedScan)
+ }
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+index 75f440caefc..36b1146bc3a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+@@ -34,6 +34,7 @@ import org.apache.spark.paths.SparkPath
+ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+ import org.apache.spark.sql.{AnalysisException, DataFrame}
+ import org.apache.spark.sql.catalyst.util.stringToFile
++import org.apache.spark.sql.comet.CometBatchScanExec
+ import org.apache.spark.sql.execution.DataSourceScanExec
+ import org.apache.spark.sql.execution.datasources._
+ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, FileScan, FileTable}
+@@ -748,6 +749,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+ val fileScan = df.queryExecution.executedPlan.collect {
+ case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
+ batch.scan.asInstanceOf[FileScan]
++ case batch: CometBatchScanExec if batch.scan.isInstanceOf[FileScan] =>
++ batch.scan.asInstanceOf[FileScan]
+ }.headOption.getOrElse {
+ fail(s"No FileScan in query\n${df.queryExecution}")
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+index abe606ad9c1..438e7494473 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+@@ -22,7 +22,7 @@ import java.util
+
+ import org.scalatest.BeforeAndAfter
+
+-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DisableComet, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
+@@ -327,7 +327,8 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
+ }
+ }
+
+- test("explain with table on DSv1 data source") {
++ test("explain with table on DSv1 data source",
++ DisableComet("Comet explain output is different")) {
+ val tblSourceName = "tbl_src"
+ val tblTargetName = "tbl_target"
+ val tblSourceQualified = s"default.$tblSourceName"
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+index dd55fcfe42c..b8854809ba6 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+@@ -126,7 +126,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
+ }
+ }
+ } else {
+- super.test(testName, testTags: _*)(testFun)
++ if (isCometEnabled && testTags.exists(_.isInstanceOf[DisableComet])) {
++ ignore(testName + " (disabled when Comet is on)", testTags:
_*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
+ }
+ }
+
+@@ -242,6 +246,23 @@ private[sql] trait SQLTestUtilsBase
+ protected override def _sqlContext: SQLContext = self.spark.sqlContext
+ }
+
++ /**
++ * Whether Comet extension is enabled
++ */
++ protected def isCometEnabled: Boolean = {
++ val v = System.getenv("ENABLE_COMET")
++ v != null && v.toBoolean
++ }
++
++ /**
++ * Whether Spark should only apply Comet scan optimization. This is only
effective when
++ * [[isCometEnabled]] returns true.
++ */
++ protected def isCometScanOnly: Boolean = {
++ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
++ v != null && v.toBoolean
++ }
++
+ protected override def withSQLConf(pairs: (String, String)*)(f: => Unit):
Unit = {
+ SparkSession.setActiveSession(spark)
+ super.withSQLConf(pairs: _*)(f)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+index ed2e309fa07..3767d4e7ca4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+@@ -74,6 +74,18 @@ trait SharedSparkSessionBase
+ // this rule may potentially block testing of other optimization rules
such as
+ // ConstantPropagation etc.
+ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
++ // Enable Comet if `ENABLE_COMET` environment variable is set
++ if (isCometEnabled) {
++ conf
++ .set("spark.sql.extensions",
"org.apache.comet.CometSparkSessionExtensions")
++ .set("spark.comet.enabled", "true")
++
++ if (!isCometScanOnly) {
++ conf
++ .set("spark.comet.exec.enabled", "true")
++ .set("spark.comet.exec.all.enabled", "true")
++ }
++ }
+ conf.set(
+ StaticSQLConf.WAREHOUSE_PATH,
+ conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" +
getClass.getCanonicalName)
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+index 52abd248f3a..7a199931a08 100644
+---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
++++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
+
+ import org.apache.spark.sql._
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
++import org.apache.spark.sql.comet._
+ import org.apache.spark.sql.execution._
+ import
org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite,
EnableAdaptiveExecutionSuite}
+ import org.apache.spark.sql.hive.execution.HiveTableScanExec
+@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
+ case s: FileSourceScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case h: HiveTableScanExec => h.partitionPruningPred.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+index 07361cfdce9..545b3184c23 100644
+--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
++++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+@@ -55,25 +55,43 @@ object TestHive
+ new SparkContext(
+ System.getProperty("spark.sql.test.master", "local[1]"),
+ "TestSQLContext",
+- new SparkConf()
+- .set("spark.sql.test", "")
+- .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+- .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+- .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+- "org.apache.spark.sql.hive.execution.PairSerDe")
+- .set(WAREHOUSE_PATH.key,
TestHiveContext.makeWarehouseDir().toURI.getPath)
+- // SPARK-8910
+- .set(UI_ENABLED, false)
+- .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+- // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
+- // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
+-
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
+- // Disable ConvertToLocalRelation for better test coverage. Test
cases built on
+- // LocalRelation will exercise the optimization rules better by
disabling it as
+- // this rule may potentially block testing of other optimization
rules such as
+- // ConstantPropagation etc.
+- .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)))
++ {
++ val conf = new SparkConf()
++ .set("spark.sql.test", "")
++ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
++ .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
++ .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
++ "org.apache.spark.sql.hive.execution.PairSerDe")
++ .set(WAREHOUSE_PATH.key,
TestHiveContext.makeWarehouseDir().toURI.getPath)
++ // SPARK-8910
++ .set(UI_ENABLED, false)
++ .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
++ // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
++ // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
++
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
++ // Disable ConvertToLocalRelation for better test coverage. Test
cases built on
++ // LocalRelation will exercise the optimization rules better by
disabling it as
++ // this rule may potentially block testing of other optimization
rules such as
++ // ConstantPropagation etc.
++ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
++
++ val v = System.getenv("ENABLE_COMET")
Review Comment:
Is it possible to reuse `SQLTestUtils.scala`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]