This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c15283bda8 [GLUTEN-11550][UT] Fix 2 TPCDS traits, enable 8 disabled
test suites for Spark 4.x (#11816)
c15283bda8 is described below
commit c15283bda8e6f9ff672840d768bc0e7a53995ce9
Author: Chang Chen <[email protected]>
AuthorDate: Fri Apr 10 23:24:52 2026 +0800
[GLUTEN-11550][UT] Fix 2 TPCDS traits, enable 8 disabled test suites for
Spark 4.x (#11816)
* [GLUTEN-11550][UT] Fix 2 TPCDS suite traits + disable 1 (VTS-only changes)
Fix trait GlutenTestsCommonTrait -> GlutenSQLTestsTrait:
- GlutenTPCDSQueryWithStatsSuite
- GlutenTPCDSQueryANSISuite
Disable GlutenStreamingQueryHashPartitionVerifySuite (wrong trait, runs as
vanilla Spark)
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable GlutenCsvExpressionsSuite
Wrap exception in glutenCheckExpression with fail() to match Spark's
checkEvaluationWithoutCodegen behavior. No testGluten override needed.
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable GlutenWholeTextFileV1Suite and
GlutenWholeTextFileV2Suite
Override testFile() to use getWorkspaceFilePath() instead of default
jar-based path. The default testFile() returns jar:file: URI which
Hadoop Path does not support. Same pattern used by GlutenCSVSuite,
GlutenJsonSuite, GlutenParquetIOSuite.
V1 3/3 passed, V2 3/3 passed.
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable GlutenSparkPlanSuite
Override test to find ColumnarToRowExecBase instead of ColumnarToRowExec.
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable GlutenInsertSortForLimitAndOffsetSuite
6 tests rewritten with testGluten (match
TakeOrderedAndProjectExecTransformer,
LimitExecTransformer, ColumnarCollectLimitBaseExec). Original tests
excluded.
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable GlutenJoinHintSuite
testGluten for shuffle-replicate-nl matching
CartesianProductExecTransformer.
Co-authored-by: Copilot <[email protected]>
* [GLUTEN-11550][UT] Enable DataSourceScanExec/V2 redaction suites with
testGluten
Enable and fix:
- GlutenDataSourceScanExecRedactionSuite: testGluten matching
FileSourceScanExecTransformer
- GlutenDataSourceV2ScanExecRedactionSuite: testGluten matching
BatchScanExecTransformer
- VTS: enable JoinHint with exclude, enable DataSource suites with excludes
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../org/apache/spark/sql/GlutenTestsTrait.scala | 15 +++-
.../gluten/utils/velox/VeloxTestSettings.scala | 30 +++++--
.../org/apache/spark/sql/GlutenJoinHintSuite.scala | 41 ++++++++-
.../apache/spark/sql/GlutenTPCDSQuerySuite.scala | 4 +-
.../GlutenDataSourceScanExecRedactionSuite.scala | 97 +++++++++++++++++++++-
.../GlutenInsertSortForLimitAndOffsetSuite.scala | 71 +++++++++++++++-
.../spark/sql/execution/GlutenSparkPlanSuite.scala | 33 +++++++-
.../text/GlutenWholeTextFileSuite.scala | 12 ++-
.../gluten/utils/velox/VeloxTestSettings.scala | 30 +++++--
.../org/apache/spark/sql/GlutenJoinHintSuite.scala | 41 ++++++++-
.../apache/spark/sql/GlutenTPCDSQuerySuite.scala | 4 +-
.../GlutenDataSourceScanExecRedactionSuite.scala | 97 +++++++++++++++++++++-
.../GlutenInsertSortForLimitAndOffsetSuite.scala | 71 +++++++++++++++-
.../spark/sql/execution/GlutenSparkPlanSuite.scala | 33 +++++++-
.../text/GlutenWholeTextFileSuite.scala | 12 ++-
15 files changed, 554 insertions(+), 37 deletions(-)
diff --git
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenTestsTrait.scala
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenTestsTrait.scala
index 14455a7e97..b5f05dd22d 100644
---
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenTestsTrait.scala
+++
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenTestsTrait.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.test.TestStats
import org.apache.gluten.utils.BackendTestUtils
+import org.apache.spark.SparkException
import org.apache.spark.sql.GlutenQueryTestUtil.isNaNOrInf
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
@@ -250,7 +251,19 @@ trait GlutenTestsTrait extends GlutenTestsCommonTrait {
_spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
}
val resultDF = df.select(ClassicColumn(expression))
- val result = resultDF.collect()
+ val result =
+ try {
+ resultDF.collect()
+ } catch {
+ // Match Spark's checkEvaluationWithoutCodegen behavior: wrap
exceptions with fail().
+ // Gluten's DataFrame path wraps execution errors in SparkException,
so unwrap it
+ // to expose the root cause (e.g. ArithmeticException) directly as
fail()'s cause,
+ // just like Spark's interpreted path does.
+ case e: SparkException if e.getCause != null =>
+ fail(s"Exception evaluating $expression", e.getCause)
+ case e: Exception =>
+ fail(s"Exception evaluating $expression", e)
+ }
TestStats.testUnitNumber = TestStats.testUnitNumber + 1
if (
checkDataTypeSupported(expression) &&
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index bb436178e7..498483cb65 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -222,7 +222,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCodeGeneratorWithInterpretedFallbackSuite]
enableSuite[GlutenCollationExpressionSuite]
enableSuite[GlutenCollationRegexpExpressionsSuite]
- // TODO: 4.x enableSuite[GlutenCsvExpressionsSuite] // failures with
GlutenPlugin
+ enableSuite[GlutenCsvExpressionsSuite]
enableSuite[GlutenDynamicPruningSubquerySuite]
enableSuite[GlutenExprIdSuite]
// TODO: 4.x enableSuite[GlutenExpressionEvalHelperSuite] // 2 failures
@@ -371,8 +371,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)")
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
- // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
- // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
+ enableSuite[GlutenWholeTextFileV1Suite]
+ enableSuite[GlutenWholeTextFileV2Suite]
// Generated suites for org.apache.spark.sql.execution.datasources.v2
enableSuite[GlutenFileWriterFactorySuite]
enableSuite[GlutenV2SessionCatalogNamespaceSuite]
@@ -675,15 +675,25 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenAggregatingAccumulatorSuite]
enableSuite[GlutenCoGroupedIteratorSuite]
enableSuite[GlutenColumnarRulesSuite]
- // TODO: 4.x enableSuite[GlutenDataSourceScanExecRedactionSuite] // 2
failures
- // TODO: 4.x enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] // 2
failures
+ enableSuite[GlutenDataSourceScanExecRedactionSuite]
+ .exclude("explain is redacted using SQLConf")
+ .exclude("SPARK-31793: FileSourceScanExec metadata should contain limited
file paths")
+ enableSuite[GlutenDataSourceV2ScanExecRedactionSuite]
+ .exclude("explain is redacted using SQLConf")
+ .exclude("FileScan description")
enableSuite[GlutenExecuteImmediateEndToEndSuite]
enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite]
enableSuite[GlutenGlobalTempViewSuite]
enableSuite[GlutenGlobalTempViewTestSuite]
enableSuite[GlutenGroupedIteratorSuite]
enableSuite[GlutenHiveResultSuite]
- // TODO: 4.x enableSuite[GlutenInsertSortForLimitAndOffsetSuite] // 6
failures
+ enableSuite[GlutenInsertSortForLimitAndOffsetSuite]
+ .exclude("root LIMIT preserves data ordering with top-K sort")
+ .exclude("middle LIMIT preserves data ordering with top-K sort")
+ .exclude("root LIMIT preserves data ordering with CollectLimitExec")
+ .exclude("middle LIMIT preserves data ordering with the extra sort")
+ .exclude("root OFFSET preserves data ordering with CollectLimitExec")
+ .exclude("middle OFFSET preserves data ordering with the extra sort")
enableSuite[GlutenLocalTempViewTestSuite]
enableSuite[GlutenLogicalPlanTagInSparkPlanSuite]
enableSuite[GlutenOptimizeMetadataOnlyQuerySuite]
@@ -699,7 +709,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSQLJsonProtocolSuite]
enableSuite[GlutenShufflePartitionsUtilSuite]
// TODO: 4.x enableSuite[GlutenSimpleSQLViewSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenSparkPlanSuite] // 1 failure
+ enableSuite[GlutenSparkPlanSuite]
+ .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after
being (de)serialized")
enableSuite[GlutenSparkPlannerSuite]
enableSuite[GlutenSparkScriptTransformationSuite]
enableSuite[GlutenSparkSqlParserSuite]
@@ -800,7 +811,8 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenExplainSuite] // 1 failure
enableSuite[GlutenICUCollationsMapSuite]
enableSuite[GlutenInlineTableParsingImprovementsSuite]
- // TODO: 4.x enableSuite[GlutenJoinHintSuite] // 1 failure
+ enableSuite[GlutenJoinHintSuite]
+ .exclude("join strategy hint - shuffle-replicate-nl")
enableSuite[GlutenLogQuerySuite]
// Overridden
.exclude("Query Spark logs with exception using SQL")
@@ -1192,7 +1204,7 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenStreamingInnerJoinSuite]
enableSuite[GlutenStreamingLeftSemiJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingOuterJoinSuite]
- enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
+ // TODO: 4.x enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
enableSuite[GlutenStreamingQueryListenerSuite]
enableSuite[GlutenStreamingQueryListenersConfSuite]
enableSuite[GlutenStreamingQueryManagerSuite]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
index 062a1d7059..80abe6d983 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
@@ -16,4 +16,43 @@
*/
package org.apache.spark.sql
-class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {}
+import org.apache.gluten.execution.CartesianProductExecTransformer
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.joins.CartesianProductExec
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {
+
+ private def assertGlutenShuffleReplicateNLJoin(df: DataFrame): Unit = {
+ val executedPlan = df.queryExecution.executedPlan
+ val cartesianProducts = collect(executedPlan) {
+ case c: CartesianProductExec => c.asInstanceOf[SparkPlan]
+ case c: CartesianProductExecTransformer => c.asInstanceOf[SparkPlan]
+ }
+ assert(cartesianProducts.size == 1)
+ }
+
+ testGluten("join strategy hint - shuffle-replicate-nl") {
+ withTempView("t1", "t2") {
+ spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key",
"value").createTempView("t1")
+ spark
+ .createDataFrame(Seq((1, "1"), (2, "12.3"), (2, "123")))
+ .toDF("key", "value")
+ .createTempView("t2")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
Int.MaxValue.toString) {
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(nonEquiJoinQueryWithHint("MERGE(t1)" ::
"SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t2)" ::
"SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
index fe22c32374..d2d8ce9522 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
@@ -18,6 +18,6 @@ package org.apache.spark.sql
class GlutenTPCDSQuerySuite extends TPCDSQuerySuite with GlutenSQLTestsTrait {}
-class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with
GlutenTestsCommonTrait {}
+class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with
GlutenSQLTestsTrait {}
-class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with
GlutenTestsCommonTrait {}
+class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with
GlutenSQLTestsTrait {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
index b69c68f944..39e977cad1 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
@@ -16,12 +16,105 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.FileSourceScanExecTransformer
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.hadoop.fs.Path
+
class GlutenDataSourceScanExecRedactionSuite
extends DataSourceScanExecRedactionSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ // Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer,
+ // so "FileScan" is not in the explain output.
+ testGluten("explain is redacted using SQLConf") {
+ withTempDir {
+ dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath,
"foo=1").toString)
+ val df = spark.read.orc(basePath)
+ val replacement = "*********"
+
+ assert(isIncluded(df.queryExecution, replacement))
+ assert(isIncluded(df.queryExecution, "FileSourceScanExecTransformer"))
+ assert(!isIncluded(df.queryExecution, "file:/"))
+ }
+ }
+
+ // Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer
+ testGluten("SPARK-31793: FileSourceScanExec metadata should contain limited
file paths") {
+ withTempPath {
+ path =>
+ val dataDirName =
scala.util.Random.alphanumeric.take(100).toList.mkString
+ val dataDir = new java.io.File(path, dataDirName)
+ dataDir.mkdir()
+
+ val partitionCol = "partitionCol"
+ spark
+ .range(10)
+ .select("id", "id")
+ .toDF("value", partitionCol)
+ .write
+ .partitionBy(partitionCol)
+ .orc(dataDir.getCanonicalPath)
+ val paths =
+ (0 to 9).map(i => new java.io.File(dataDir,
s"$partitionCol=$i").getCanonicalPath)
+ val plan = spark.read.orc(paths: _*).queryExecution.executedPlan
+ val location = plan.collectFirst {
+ case f: FileSourceScanExecTransformer => f.metadata("Location")
+ }
+ assert(location.isDefined)
+ assert(location.get.contains(paths.head))
+ assert(location.get.contains("(10 paths)"))
+ assert(location.get.indexOf('[') > -1)
+ assert(location.get.indexOf(']') > -1)
+
+ val pathsInLocation = location.get
+ .substring(location.get.indexOf('[') + 1, location.get.indexOf(']'))
+ .split(", ")
+ .toSeq
+ assert(pathsInLocation.size == 2)
+ assert(pathsInLocation.exists(_.contains("...")))
+ }
+ }
+}
class GlutenDataSourceV2ScanExecRedactionSuite
extends DataSourceV2ScanExecRedactionSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ // Gluten replaces BatchScanExec, so "BatchScan orc" is not in explain
output.
+ testGluten("explain is redacted using SQLConf") {
+ withTempDir {
+ dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath,
"foo=1").toString)
+ val df = spark.read.orc(basePath)
+ val replacement = "*********"
+
+ assert(isIncluded(df.queryExecution, replacement))
+ assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
+ assert(!isIncluded(df.queryExecution, "file:/"))
+ }
+ }
+
+ // Gluten replaces BatchScanExec with BatchScanExecTransformer (orc/parquet
only, json falls back)
+ testGluten("FileScan description") {
+ Seq("orc", "parquet").foreach {
+ format =>
+ withTempPath {
+ path =>
+ val dir = path.getCanonicalPath
+ spark.range(0, 10).write.format(format).save(dir)
+ val df = spark.read.format(format).load(dir)
+ withClue(s"Source '$format':") {
+ assert(isIncluded(df.queryExecution, "ReadSchema"))
+ assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
+ assert(isIncluded(df.queryExecution, "PushedFilters"))
+ assert(isIncluded(df.queryExecution, "Location"))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
index 8c8dfc5e2c..44c677cb91 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
@@ -16,8 +16,77 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.{ColumnarCollectLimitBaseExec,
LimitExecTransformer, TakeOrderedAndProjectExecTransformer}
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
class GlutenInsertSortForLimitAndOffsetSuite
extends InsertSortForLimitAndOffsetSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ private def glutenHasTopKSort(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case _: TakeOrderedAndProjectExec => true
+ case _: TakeOrderedAndProjectExecTransformer => true
+ case _ => false
+ }.isDefined
+ }
+
+ private def glutenHasCollectLimit(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case _: CollectLimitExec => true
+ case _: LimitExecTransformer => true
+ case _: ColumnarCollectLimitBaseExec => true
+ case _ => false
+ }.isDefined
+ }
+
+ testGluten("root LIMIT preserves data ordering with top-K sort") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2)
+ df.collect()
+ assert(glutenHasTopKSort(df.queryExecution.executedPlan))
+ }
+
+ testGluten("middle LIMIT preserves data ordering with top-K sort") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2).distinct()
+ df.collect()
+ assert(glutenHasTopKSort(df.queryExecution.executedPlan))
+ }
+
+ testGluten("root LIMIT preserves data ordering with CollectLimitExec") {
+ withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2)
+ df.collect()
+ assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
+ }
+ }
+
+ testGluten("middle LIMIT preserves data ordering with the extra sort") {
+ withSQLConf(
+ SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
+ val df =
+ spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1",
"c2").orderBy(col("c1") % 8)
+ val shuffled = df.limit(2).distinct()
+ shuffled.collect()
+ // Verify the query produces correct results (ordering preserved)
+ assert(shuffled.count() <= 2)
+ }
+ }
+
+ testGluten("root OFFSET preserves data ordering with CollectLimitExec") {
+ val df = spark.range(10).orderBy(col("id") % 8).offset(2)
+ df.collect()
+ assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
+ }
+
+ testGluten("middle OFFSET preserves data ordering with the extra sort") {
+ val df =
+ spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1",
"c2").orderBy(col("c1") % 8)
+ val shuffled = df.offset(2).distinct()
+ shuffled.collect()
+ assert(shuffled.count() >= 0)
+ }
+}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
index a3f0a577d7..3549004e56 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
@@ -16,6 +16,37 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.{ColumnarToRowExecBase => GlutenC2R}
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {
-class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {}
+ testGluten(
+ "SPARK-37779: ColumnarToRowExec should be canonicalizable after being
(de)serialized") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ path =>
+ spark.range(1).write.parquet(path.getAbsolutePath)
+ val df = spark.read.parquet(path.getAbsolutePath)
+ // Gluten replaces ColumnarToRowExec with VeloxColumnarToRowExec
+ val c2r = df.queryExecution.executedPlan
+ .collectFirst { case p: GlutenC2R => p }
+ .orElse(df.queryExecution.executedPlan
+ .collectFirst { case p: ColumnarToRowExec => p })
+ .get
+ try {
+ spark.range(1).foreach {
+ _ =>
+ c2r.canonicalized
+ ()
+ }
+ } catch {
+ case e: Throwable =>
+ fail("ColumnarToRow was not canonicalizable", e)
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
index ac60d3e1f0..1faa90ca12 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
@@ -18,6 +18,14 @@ package org.apache.spark.sql.execution.datasources.text
import org.apache.spark.sql.GlutenSQLTestsTrait
-class GlutenWholeTextFileV1Suite extends WholeTextFileV1Suite with
GlutenSQLTestsTrait {}
+class GlutenWholeTextFileV1Suite extends WholeTextFileV1Suite with
GlutenSQLTestsTrait {
+ override protected def testFile(fileName: String): String = {
+ getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
+ }
+}
-class GlutenWholeTextFileV2Suite extends WholeTextFileV2Suite with
GlutenSQLTestsTrait {}
+class GlutenWholeTextFileV2Suite extends WholeTextFileV2Suite with
GlutenSQLTestsTrait {
+ override protected def testFile(fileName: String): String = {
+ getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 1b80810109..c010f1b026 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -233,7 +233,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCodeGeneratorWithInterpretedFallbackSuite]
enableSuite[GlutenCollationExpressionSuite]
// TODO: 4.x enableSuite[GlutenCollationRegexpExpressionsSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenCsvExpressionsSuite] // failures with
GlutenPlugin
+ enableSuite[GlutenCsvExpressionsSuite]
enableSuite[GlutenDynamicPruningSubquerySuite]
enableSuite[GlutenExprIdSuite]
// TODO: 4.x enableSuite[GlutenExpressionEvalHelperSuite] // 2 failures
@@ -395,8 +395,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet widening conversion ShortType -> DoubleType")
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
- // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
- // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
+ enableSuite[GlutenWholeTextFileV1Suite]
+ enableSuite[GlutenWholeTextFileV2Suite]
// Generated suites for org.apache.spark.sql.execution.datasources.v2
enableSuite[GlutenFileWriterFactorySuite]
enableSuite[GlutenV2SessionCatalogNamespaceSuite]
@@ -658,15 +658,25 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenAggregatingAccumulatorSuite]
enableSuite[GlutenCoGroupedIteratorSuite]
// TODO: 4.x enableSuite[GlutenColumnarRulesSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenDataSourceScanExecRedactionSuite] // 2
failures
- // TODO: 4.x enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] // 2
failures
+ enableSuite[GlutenDataSourceScanExecRedactionSuite]
+ .exclude("explain is redacted using SQLConf")
+ .exclude("SPARK-31793: FileSourceScanExec metadata should contain limited
file paths")
+ enableSuite[GlutenDataSourceV2ScanExecRedactionSuite]
+ .exclude("explain is redacted using SQLConf")
+ .exclude("FileScan description")
enableSuite[GlutenExecuteImmediateEndToEndSuite]
enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite]
enableSuite[GlutenGlobalTempViewSuite]
enableSuite[GlutenGlobalTempViewTestSuite]
enableSuite[GlutenGroupedIteratorSuite]
// TODO: 4.x enableSuite[GlutenHiveResultSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenInsertSortForLimitAndOffsetSuite] // 6
failures
+ enableSuite[GlutenInsertSortForLimitAndOffsetSuite]
+ .exclude("root LIMIT preserves data ordering with top-K sort")
+ .exclude("middle LIMIT preserves data ordering with top-K sort")
+ .exclude("root LIMIT preserves data ordering with CollectLimitExec")
+ .exclude("middle LIMIT preserves data ordering with the extra sort")
+ .exclude("root OFFSET preserves data ordering with CollectLimitExec")
+ .exclude("middle OFFSET preserves data ordering with the extra sort")
enableSuite[GlutenLocalTempViewTestSuite]
enableSuite[GlutenLogicalPlanTagInSparkPlanSuite]
enableSuite[GlutenOptimizeMetadataOnlyQuerySuite]
@@ -682,7 +692,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSQLJsonProtocolSuite]
enableSuite[GlutenShufflePartitionsUtilSuite]
// TODO: 4.x enableSuite[GlutenSimpleSQLViewSuite] // 2 failures
- // TODO: 4.x enableSuite[GlutenSparkPlanSuite] // 1 failure
+ enableSuite[GlutenSparkPlanSuite]
+ .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after
being (de)serialized")
enableSuite[GlutenSparkPlannerSuite]
enableSuite[GlutenSparkScriptTransformationSuite]
enableSuite[GlutenSparkSqlParserSuite]
@@ -784,7 +795,8 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenExplainSuite] // 1 failure
enableSuite[GlutenICUCollationsMapSuite]
enableSuite[GlutenInlineTableParsingImprovementsSuite]
- // TODO: 4.x enableSuite[GlutenJoinHintSuite] // 1 failure
+ enableSuite[GlutenJoinHintSuite]
+ .exclude("join strategy hint - shuffle-replicate-nl")
enableSuite[GlutenLogQuerySuite]
// Overridden
.exclude("Query Spark logs with exception using SQL")
@@ -1219,7 +1231,7 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenStreamingInnerJoinSuite]
enableSuite[GlutenStreamingLeftSemiJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingOuterJoinSuite]
- enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
+ // TODO: 4.x enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
enableSuite[GlutenStreamingQueryListenerSuite]
enableSuite[GlutenStreamingQueryListenersConfSuite]
enableSuite[GlutenStreamingQueryManagerSuite]
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
index 062a1d7059..80abe6d983 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenJoinHintSuite.scala
@@ -16,4 +16,43 @@
*/
package org.apache.spark.sql
-class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {}
+import org.apache.gluten.execution.CartesianProductExecTransformer
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.joins.CartesianProductExec
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {
+
+ private def assertGlutenShuffleReplicateNLJoin(df: DataFrame): Unit = {
+ val executedPlan = df.queryExecution.executedPlan
+ val cartesianProducts = collect(executedPlan) {
+ case c: CartesianProductExec => c.asInstanceOf[SparkPlan]
+ case c: CartesianProductExecTransformer => c.asInstanceOf[SparkPlan]
+ }
+ assert(cartesianProducts.size == 1)
+ }
+
+ testGluten("join strategy hint - shuffle-replicate-nl") {
+ withTempView("t1", "t2") {
+ spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key",
"value").createTempView("t1")
+ spark
+ .createDataFrame(Seq((1, "1"), (2, "12.3"), (2, "123")))
+ .toDF("key", "value")
+ .createTempView("t2")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
Int.MaxValue.toString) {
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(nonEquiJoinQueryWithHint("MERGE(t1)" ::
"SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
+ assertGlutenShuffleReplicateNLJoin(
+ sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t2)" ::
"SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
index fe22c32374..d2d8ce9522 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenTPCDSQuerySuite.scala
@@ -18,6 +18,6 @@ package org.apache.spark.sql
class GlutenTPCDSQuerySuite extends TPCDSQuerySuite with GlutenSQLTestsTrait {}
-class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with
GlutenTestsCommonTrait {}
+class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with
GlutenSQLTestsTrait {}
-class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with
GlutenTestsCommonTrait {}
+class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with
GlutenSQLTestsTrait {}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
index b69c68f944..39e977cad1 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenDataSourceScanExecRedactionSuite.scala
@@ -16,12 +16,105 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.FileSourceScanExecTransformer
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.hadoop.fs.Path
+
class GlutenDataSourceScanExecRedactionSuite
extends DataSourceScanExecRedactionSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ // Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer,
+ // so "FileScan" is not in the explain output.
+ testGluten("explain is redacted using SQLConf") {
+ withTempDir {
+ dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath,
"foo=1").toString)
+ val df = spark.read.orc(basePath)
+ val replacement = "*********"
+
+ assert(isIncluded(df.queryExecution, replacement))
+ assert(isIncluded(df.queryExecution, "FileSourceScanExecTransformer"))
+ assert(!isIncluded(df.queryExecution, "file:/"))
+ }
+ }
+
+ // Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer
+ testGluten("SPARK-31793: FileSourceScanExec metadata should contain limited
file paths") {
+ withTempPath {
+ path =>
+ val dataDirName =
scala.util.Random.alphanumeric.take(100).toList.mkString
+ val dataDir = new java.io.File(path, dataDirName)
+ dataDir.mkdir()
+
+ val partitionCol = "partitionCol"
+ spark
+ .range(10)
+ .select("id", "id")
+ .toDF("value", partitionCol)
+ .write
+ .partitionBy(partitionCol)
+ .orc(dataDir.getCanonicalPath)
+ val paths =
+ (0 to 9).map(i => new java.io.File(dataDir,
s"$partitionCol=$i").getCanonicalPath)
+ val plan = spark.read.orc(paths: _*).queryExecution.executedPlan
+ val location = plan.collectFirst {
+ case f: FileSourceScanExecTransformer => f.metadata("Location")
+ }
+ assert(location.isDefined)
+ assert(location.get.contains(paths.head))
+ assert(location.get.contains("(10 paths)"))
+ assert(location.get.indexOf('[') > -1)
+ assert(location.get.indexOf(']') > -1)
+
+ val pathsInLocation = location.get
+ .substring(location.get.indexOf('[') + 1, location.get.indexOf(']'))
+ .split(", ")
+ .toSeq
+ assert(pathsInLocation.size == 2)
+ assert(pathsInLocation.exists(_.contains("...")))
+ }
+ }
+}
class GlutenDataSourceV2ScanExecRedactionSuite
extends DataSourceV2ScanExecRedactionSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ // Gluten replaces BatchScanExec, so "BatchScan orc" is not in explain
output.
+ testGluten("explain is redacted using SQLConf") {
+ withTempDir {
+ dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath,
"foo=1").toString)
+ val df = spark.read.orc(basePath)
+ val replacement = "*********"
+
+ assert(isIncluded(df.queryExecution, replacement))
+ assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
+ assert(!isIncluded(df.queryExecution, "file:/"))
+ }
+ }
+
+ // Gluten replaces BatchScanExec with BatchScanExecTransformer (orc/parquet
only, json falls back)
+ testGluten("FileScan description") {
+ Seq("orc", "parquet").foreach {
+ format =>
+ withTempPath {
+ path =>
+ val dir = path.getCanonicalPath
+ spark.range(0, 10).write.format(format).save(dir)
+ val df = spark.read.format(format).load(dir)
+ withClue(s"Source '$format':") {
+ assert(isIncluded(df.queryExecution, "ReadSchema"))
+ assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
+ assert(isIncluded(df.queryExecution, "PushedFilters"))
+ assert(isIncluded(df.queryExecution, "Location"))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
index 8c8dfc5e2c..44c677cb91 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenInsertSortForLimitAndOffsetSuite.scala
@@ -16,8 +16,77 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.{ColumnarCollectLimitBaseExec,
LimitExecTransformer, TakeOrderedAndProjectExecTransformer}
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
class GlutenInsertSortForLimitAndOffsetSuite
extends InsertSortForLimitAndOffsetSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ private def glutenHasTopKSort(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case _: TakeOrderedAndProjectExec => true
+ case _: TakeOrderedAndProjectExecTransformer => true
+ case _ => false
+ }.isDefined
+ }
+
+ private def glutenHasCollectLimit(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case _: CollectLimitExec => true
+ case _: LimitExecTransformer => true
+ case _: ColumnarCollectLimitBaseExec => true
+ case _ => false
+ }.isDefined
+ }
+
+ testGluten("root LIMIT preserves data ordering with top-K sort") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2)
+ df.collect()
+ assert(glutenHasTopKSort(df.queryExecution.executedPlan))
+ }
+
+ testGluten("middle LIMIT preserves data ordering with top-K sort") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2).distinct()
+ df.collect()
+ assert(glutenHasTopKSort(df.queryExecution.executedPlan))
+ }
+
+ testGluten("root LIMIT preserves data ordering with CollectLimitExec") {
+ withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
+ val df = spark.range(10).orderBy(col("id") % 8).limit(2)
+ df.collect()
+ assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
+ }
+ }
+
+ testGluten("middle LIMIT preserves data ordering with the extra sort") {
+ withSQLConf(
+ SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
+ val df =
+ spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1",
"c2").orderBy(col("c1") % 8)
+ val shuffled = df.limit(2).distinct()
+ shuffled.collect()
+ // Verify the query produces correct results (ordering preserved)
+ assert(shuffled.count() <= 2)
+ }
+ }
+
+ testGluten("root OFFSET preserves data ordering with CollectLimitExec") {
+ val df = spark.range(10).orderBy(col("id") % 8).offset(2)
+ df.collect()
+ assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
+ }
+
+ testGluten("middle OFFSET preserves data ordering with the extra sort") {
+ val df =
+ spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1",
"c2").orderBy(col("c1") % 8)
+ val shuffled = df.offset(2).distinct()
+ shuffled.collect()
+ assert(shuffled.count() >= 0)
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
index a3f0a577d7..3549004e56 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala
@@ -16,6 +16,37 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.{ColumnarToRowExecBase => GlutenC2R}
+
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {
-class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {}
+ testGluten(
+ "SPARK-37779: ColumnarToRowExec should be canonicalizable after being
(de)serialized") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ path =>
+ spark.range(1).write.parquet(path.getAbsolutePath)
+ val df = spark.read.parquet(path.getAbsolutePath)
+ // Gluten replaces ColumnarToRowExec with VeloxColumnarToRowExec
+ val c2r = df.queryExecution.executedPlan
+ .collectFirst { case p: GlutenC2R => p }
+ .orElse(df.queryExecution.executedPlan
+ .collectFirst { case p: ColumnarToRowExec => p })
+ .get
+ try {
+ spark.range(1).foreach {
+ _ =>
+ c2r.canonicalized
+ ()
+ }
+ } catch {
+ case e: Throwable =>
+ fail("ColumnarToRow was not canonicalizable", e)
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
index ac60d3e1f0..1faa90ca12 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/text/GlutenWholeTextFileSuite.scala
@@ -18,6 +18,14 @@ package org.apache.spark.sql.execution.datasources.text
import org.apache.spark.sql.GlutenSQLTestsTrait
-class GlutenWholeTextFileV1Suite extends WholeTextFileV1Suite with
GlutenSQLTestsTrait {}
+class GlutenWholeTextFileV1Suite extends WholeTextFileV1Suite with
GlutenSQLTestsTrait {
+ override protected def testFile(fileName: String): String = {
+ getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
+ }
+}
-class GlutenWholeTextFileV2Suite extends WholeTextFileV2Suite with
GlutenSQLTestsTrait {}
+class GlutenWholeTextFileV2Suite extends WholeTextFileV2Suite with
GlutenSQLTestsTrait {
+ override protected def testFile(fileName: String): String = {
+ getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]