This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 22fdec977 feat: Enable native columnar-to-row by default (#3299)
22fdec977 is described below

commit 22fdec97760c8c54203b5f9798616de03ae8921e
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 27 20:22:21 2026 -0700

    feat: Enable native columnar-to-row by default (#3299)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  4 +--
 dev/diffs/3.4.3.diff                               | 28 +++++++++------
 dev/diffs/3.5.7.diff                               | 28 +++++++++------
 dev/diffs/4.0.1.diff                               | 41 ++++++++++++++++------
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |  1 +
 5 files changed, 67 insertions(+), 35 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 6504c0294..fbd3e0156 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -304,9 +304,9 @@ object CometConf extends ShimCometConf {
         "Whether to enable native columnar to row conversion. When enabled, 
Comet will use " +
           "native Rust code to convert Arrow columnar data to Spark UnsafeRow 
format instead " +
           "of the JVM implementation. This can improve performance for queries 
that need to " +
-          "convert between columnar and row formats. This is an experimental 
feature.")
+          "convert between columnar and row formats.")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: 
ConfigEntry[Boolean] =
     conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 4cfa4f7f3..c023f4149 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index d3544881af1..07d1ed97925 100644
+index d3544881af1..9c16099090c 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -148,6 +148,8 @@
@@ -1434,18 +1434,18 @@ index eec396b2e39..bf3f1c769d6 100644
      nums.createOrReplaceTempView("nums")
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index b14f4a405f6..ab7baf434a5 100644
+index b14f4a405f6..90bed10eca9 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 @@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
  import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec, 
CometNativeColumnarToRowExec}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.internal.SQLConf
  import org.apache.spark.sql.test.SharedSparkSession
-@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
+@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
          spark.range(1).write.parquet(path.getAbsolutePath)
          val df = spark.read.parquet(path.getAbsolutePath)
          val columnarToRowExec =
@@ -1453,6 +1453,7 @@ index b14f4a405f6..ab7baf434a5 100644
 +          df.queryExecution.executedPlan.collectFirst {
 +            case p: ColumnarToRowExec => p
 +            case p: CometColumnarToRowExec => p
++            case p: CometNativeColumnarToRowExec => p
 +          }.get
          try {
            spark.range(1).foreach { _ =>
@@ -2384,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 266bb343526..6675cf7b636 100644
+index 266bb343526..e58a2f49eb9 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 266bb343526..6675cf7b636 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -451,28 +463,49 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 266bb343526..6675cf7b636 100644
 +                case s: SortMergeJoinExec => s
 +                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +              }
++            case CometNativeColumnarToRowExec(child) =>
++              child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++                case s: SortMergeJoinExec => s
++                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++              }
 +            case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +          }
          }
@@ -2499,7 +2505,7 @@ index 266bb343526..6675cf7b636 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -835,11 +868,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 266bb343526..6675cf7b636 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -894,7 +927,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2524,7 +2530,7 @@ index 266bb343526..6675cf7b636 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -913,7 +949,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2535,7 +2541,7 @@ index 266bb343526..6675cf7b636 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -943,7 +982,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 266bb343526..6675cf7b636 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1026,15 +1068,23 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
              expectedNumShuffles: Int,
              expectedCoalescedNumBuckets: Option[Int]): Unit = {
            val plan = sql(query).queryExecution.executedPlan
diff --git a/dev/diffs/3.5.7.diff b/dev/diffs/3.5.7.diff
index 162ff709c..a8b9b5050 100644
--- a/dev/diffs/3.5.7.diff
+++ b/dev/diffs/3.5.7.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index a0e25ce4d8d..29d3b93f994 100644
+index a0e25ce4d8d..b95fba458f2 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -152,6 +152,8 @@
@@ -1402,18 +1402,18 @@ index eec396b2e39..bf3f1c769d6 100644
      nums.createOrReplaceTempView("nums")
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index b14f4a405f6..ab7baf434a5 100644
+index b14f4a405f6..90bed10eca9 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 @@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
  import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec, 
CometNativeColumnarToRowExec}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.internal.SQLConf
  import org.apache.spark.sql.test.SharedSparkSession
-@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
+@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
          spark.range(1).write.parquet(path.getAbsolutePath)
          val df = spark.read.parquet(path.getAbsolutePath)
          val columnarToRowExec =
@@ -1421,6 +1421,7 @@ index b14f4a405f6..ab7baf434a5 100644
 +          df.queryExecution.executedPlan.collectFirst {
 +            case p: ColumnarToRowExec => p
 +            case p: CometColumnarToRowExec => p
++            case p: CometNativeColumnarToRowExec => p
 +          }.get
          try {
            spark.range(1).foreach { _ =>
@@ -2383,7 +2384,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..a90106a1463 100644
+index 746f289c393..a773971d3c1 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 746f289c393..a90106a1463 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -452,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 746f289c393..a90106a1463 100644
 +                case s: SortMergeJoinExec => s
 +                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +              }
++            case CometNativeColumnarToRowExec(child) =>
++              child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++                case s: SortMergeJoinExec => s
++                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++              }
 +            case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +          }
          }
@@ -2499,7 +2505,7 @@ index 746f289c393..a90106a1463 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -836,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -836,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 746f289c393..a90106a1463 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -895,7 +928,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -895,7 +933,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2524,7 +2530,7 @@ index 746f289c393..a90106a1463 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +950,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -914,7 +955,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2535,7 +2541,7 @@ index 746f289c393..a90106a1463 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -944,7 +983,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -944,7 +988,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 746f289c393..a90106a1463 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1029,15 +1071,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1029,15 +1076,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index d86a1287b..8c6c0dd52 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index 22922143fc3..477d4ec4194 100644
+index 22922143fc3..7c56e5e8641 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -148,6 +148,8 @@
@@ -1817,18 +1817,18 @@ index 47679ed7865..9ffbaecb98e 100644
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index aed11badb71..ab7e9456e26 100644
+index aed11badb71..1a365b5aacf 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 @@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
  import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec, 
CometNativeColumnarToRowExec}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.internal.SQLConf
  import org.apache.spark.sql.test.SharedSparkSession
-@@ -134,7 +135,10 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
+@@ -134,7 +135,11 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
          spark.range(1).write.parquet(path.getAbsolutePath)
          val df = spark.read.parquet(path.getAbsolutePath)
          val columnarToRowExec =
@@ -1836,22 +1836,36 @@ index aed11badb71..ab7e9456e26 100644
 +          df.queryExecution.executedPlan.collectFirst {
 +            case p: ColumnarToRowExec => p
 +            case p: CometColumnarToRowExec => p
++            case p: CometNativeColumnarToRowExec => p
 +          }.get
          try {
            spark.range(1).foreach { _ =>
              columnarToRowExec.canonicalized
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index a3cfdc5a240..1b08a1f42ee 100644
+index a3cfdc5a240..3793b6191bf 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -22,6 +22,7 @@ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+@@ -19,9 +19,10 @@ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.SparkException
+ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row, 
SaveMode}
  import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
  import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, 
CodeAndComment, CodeGenerator}
 +import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometHashJoinExec, 
CometSortExec, CometSortMergeJoinExec}
  import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
  import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
SortAggregateExec}
  import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -33,7 +34,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, 
StructType}
+ 
+ // Disable AQE because the WholeStageCodegenExec is added when running 
QueryStageExec
+ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
+-  with DisableAdaptiveExecutionSuite {
++  with DisableAdaptiveExecutionSuite with IgnoreCometSuite {
+ 
+   import testImplicits._
+ 
 @@ -172,6 +173,7 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
      val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
      assert(oneJoinDF.queryExecution.executedPlan.collect {
@@ -3066,7 +3080,7 @@ index 7838e62013d..8fa09652921 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index c4b09c4b289..a2f8ca47ffb 100644
+index c4b09c4b289..dd5763e8405 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.expressions
@@ -3116,7 +3130,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -454,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -454,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -3147,6 +3161,11 @@ index c4b09c4b289..a2f8ca47ffb 100644
 +                case s: SortMergeJoinExec => s
 +                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +              }
++            case CometNativeColumnarToRowExec(child) =>
++              child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++                case s: SortMergeJoinExec => s
++                case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++              }
 +            case o => fail(s"expected SortMergeJoinExec, but found\n$o")
 +          }
          }
@@ -3174,7 +3193,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -838,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -838,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -3188,7 +3207,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -1031,15 +1062,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1031,15 +1067,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index adf74ba54..baf109483 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -223,6 +223,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
       CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
       CometConf.COMET_EXEC_ENABLED.key -> "true",
       CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
+      CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key -> "false",
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
       CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> 
"true",
       // as well as for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to