This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 22fdec977 feat: Enable native columnar-to-row by default (#3299)
22fdec977 is described below
commit 22fdec97760c8c54203b5f9798616de03ae8921e
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 27 20:22:21 2026 -0700
feat: Enable native columnar-to-row by default (#3299)
---
.../main/scala/org/apache/comet/CometConf.scala | 4 +--
dev/diffs/3.4.3.diff | 28 +++++++++------
dev/diffs/3.5.7.diff | 28 +++++++++------
dev/diffs/4.0.1.diff | 41 ++++++++++++++++------
.../spark/sql/comet/CometPlanStabilitySuite.scala | 1 +
5 files changed, 67 insertions(+), 35 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 6504c0294..fbd3e0156 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -304,9 +304,9 @@ object CometConf extends ShimCometConf {
"Whether to enable native columnar to row conversion. When enabled,
Comet will use " +
"native Rust code to convert Arrow columnar data to Spark UnsafeRow
format instead " +
"of the JVM implementation. This can improve performance for queries
that need to " +
- "convert between columnar and row formats. This is an experimental
feature.")
+ "convert between columnar and row formats.")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED:
ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 4cfa4f7f3..c023f4149 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index d3544881af1..07d1ed97925 100644
+index d3544881af1..9c16099090c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
@@ -1434,18 +1434,18 @@ index eec396b2e39..bf3f1c769d6 100644
nums.createOrReplaceTempView("nums")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index b14f4a405f6..ab7baf434a5 100644
+index b14f4a405f6..90bed10eca9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec,
CometNativeColumnarToRowExec}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
+@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
@@ -1453,6 +1453,7 @@ index b14f4a405f6..ab7baf434a5 100644
+ df.queryExecution.executedPlan.collectFirst {
+ case p: ColumnarToRowExec => p
+ case p: CometColumnarToRowExec => p
++ case p: CometNativeColumnarToRowExec => p
+ }.get
try {
spark.range(1).foreach { _ =>
@@ -2384,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 266bb343526..6675cf7b636 100644
+index 266bb343526..e58a2f49eb9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 266bb343526..6675cf7b636 100644
val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -451,28 +463,49 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
val joinOperator = if
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 266bb343526..6675cf7b636 100644
+ case s: SortMergeJoinExec => s
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
++ case CometNativeColumnarToRowExec(child) =>
++ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++ case s: SortMergeJoinExec => s
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
}
@@ -2499,7 +2505,7 @@ index 266bb343526..6675cf7b636 100644
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
-@@ -835,11 +868,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 266bb343526..6675cf7b636 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -894,7 +927,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions")
{
@@ -2524,7 +2530,7 @@ index 266bb343526..6675cf7b636 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -913,7 +949,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than
bucket number") {
@@ -2535,7 +2541,7 @@ index 266bb343526..6675cf7b636 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
-@@ -943,7 +982,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 266bb343526..6675cf7b636 100644
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4
output partitions.
-@@ -1026,15 +1068,23 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
val plan = sql(query).queryExecution.executedPlan
diff --git a/dev/diffs/3.5.7.diff b/dev/diffs/3.5.7.diff
index 162ff709c..a8b9b5050 100644
--- a/dev/diffs/3.5.7.diff
+++ b/dev/diffs/3.5.7.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index a0e25ce4d8d..29d3b93f994 100644
+index a0e25ce4d8d..b95fba458f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
@@ -1402,18 +1402,18 @@ index eec396b2e39..bf3f1c769d6 100644
nums.createOrReplaceTempView("nums")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index b14f4a405f6..ab7baf434a5 100644
+index b14f4a405f6..90bed10eca9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec,
CometNativeColumnarToRowExec}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
+@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
@@ -1421,6 +1421,7 @@ index b14f4a405f6..ab7baf434a5 100644
+ df.queryExecution.executedPlan.collectFirst {
+ case p: ColumnarToRowExec => p
+ case p: CometColumnarToRowExec => p
++ case p: CometNativeColumnarToRowExec => p
+ }.get
try {
spark.range(1).foreach { _ =>
@@ -2383,7 +2384,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..a90106a1463 100644
+index 746f289c393..a773971d3c1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 746f289c393..a90106a1463 100644
val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -452,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
val joinOperator = if
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 746f289c393..a90106a1463 100644
+ case s: SortMergeJoinExec => s
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
++ case CometNativeColumnarToRowExec(child) =>
++ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++ case s: SortMergeJoinExec => s
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
}
@@ -2499,7 +2505,7 @@ index 746f289c393..a90106a1463 100644
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
-@@ -836,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -836,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 746f289c393..a90106a1463 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -895,7 +928,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -895,7 +933,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions")
{
@@ -2524,7 +2530,7 @@ index 746f289c393..a90106a1463 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +950,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -914,7 +955,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than
bucket number") {
@@ -2535,7 +2541,7 @@ index 746f289c393..a90106a1463 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
-@@ -944,7 +983,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -944,7 +988,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 746f289c393..a90106a1463 100644
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4
output partitions.
-@@ -1029,15 +1071,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -1029,15 +1076,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index d86a1287b..8c6c0dd52 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index 22922143fc3..477d4ec4194 100644
+index 22922143fc3..7c56e5e8641 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
@@ -1817,18 +1817,18 @@ index 47679ed7865..9ffbaecb98e 100644
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
-index aed11badb71..ab7e9456e26 100644
+index aed11badb71..1a365b5aacf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
-+import org.apache.spark.sql.comet.CometColumnarToRowExec
++import org.apache.spark.sql.comet.{CometColumnarToRowExec,
CometNativeColumnarToRowExec}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-@@ -134,7 +135,10 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
+@@ -134,7 +135,11 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
@@ -1836,22 +1836,36 @@ index aed11badb71..ab7e9456e26 100644
+ df.queryExecution.executedPlan.collectFirst {
+ case p: ColumnarToRowExec => p
+ case p: CometColumnarToRowExec => p
++ case p: CometNativeColumnarToRowExec => p
+ }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index a3cfdc5a240..1b08a1f42ee 100644
+index a3cfdc5a240..3793b6191bf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -22,6 +22,7 @@ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+@@ -19,9 +19,10 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.SparkException
+ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row,
SaveMode}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
+import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometHashJoinExec,
CometSortExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -33,7 +34,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType,
StructType}
+
+ // Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
+- with DisableAdaptiveExecutionSuite {
++ with DisableAdaptiveExecutionSuite with IgnoreCometSuite {
+
+ import testImplicits._
+
@@ -172,6 +173,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
assert(oneJoinDF.queryExecution.executedPlan.collect {
@@ -3066,7 +3080,7 @@ index 7838e62013d..8fa09652921 100644
import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index c4b09c4b289..a2f8ca47ffb 100644
+index c4b09c4b289..dd5763e8405 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.expressions
@@ -3116,7 +3130,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
val bucketColumnType =
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -454,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -454,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
val joinOperator = if
(joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
val executedPlan =
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -3147,6 +3161,11 @@ index c4b09c4b289..a2f8ca47ffb 100644
+ case s: SortMergeJoinExec => s
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
++ case CometNativeColumnarToRowExec(child) =>
++ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
++ case s: SortMergeJoinExec => s
++ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
++ }
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
+ }
}
@@ -3174,7 +3193,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
-@@ -838,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -838,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
val scanDF = spark.table("bucketed_table").select("j")
@@ -3188,7 +3207,7 @@ index c4b09c4b289..a2f8ca47ffb 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -1031,15 +1062,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -1031,15 +1067,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index adf74ba54..baf109483 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -223,6 +223,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
+ CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key ->
"true",
// as well as for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]