This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 39455e3d49 [GLUTEN-11088][VL] Fix GlutenStreamingQuerySuite (#11223)
39455e3d49 is described below
commit 39455e3d490473aa6d3da346cea3bda290f3588d
Author: Rong Ma <[email protected]>
AuthorDate: Thu Dec 4 00:46:07 2025 +0800
[GLUTEN-11088][VL] Fix GlutenStreamingQuerySuite (#11223)
---
.../gluten/utils/velox/VeloxTestSettings.scala | 4 +--
.../sql/execution/GlutenStreamingQuerySuite.scala | 29 +++++++++++++++++++++-
2 files changed, 29 insertions(+), 4 deletions(-)
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 18024f0d0e..75c70fbd6f 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -956,10 +956,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("detect escaped path and report the migration guide")
.exclude("ignore the escaped path check when the flag is off")
.excludeByPrefix("SPARK-51187")
- // TODO: fix in Spark-4.0
+ // Rewrite for the query plan check
.excludeByPrefix("SPARK-49905")
- .excludeByPrefix("SPARK-41199")
- .excludeByPrefix("SPARK-41198")
enableSuite[GlutenQueryExecutionSuite]
// Rewritten to set root logger level to INFO so that logs can be parsed
.exclude("Logging plan changes for execution")
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
index d09576908f..bda9c97eb5 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
@@ -17,6 +17,33 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.execution.exchange.REQUIRED_BY_STATEFUL_OPERATOR
+import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
-class GlutenStreamingQuerySuite extends StreamingQuerySuite with
GlutenSQLTestsTrait {}
+class GlutenStreamingQuerySuite extends StreamingQuerySuite with
GlutenSQLTestsTrait {
+
+ import testImplicits._
+
+ testGluten("SPARK-49905") {
+ val inputData = MemoryStream[Int]
+
+ // Use the streaming aggregation as an example - all stateful operators
are using the same
+ // distribution, named `StatefulOpClusteredDistribution`.
+ val df = inputData.toDF().groupBy("value").count()
+
+ testStream(df, OutputMode.Update())(
+ AddData(inputData, 1, 2, 3, 1, 2, 3),
+ CheckAnswer((1, 2), (2, 2), (3, 2)),
+ Execute {
+ qe =>
+ val shuffleOpt = qe.lastExecution.executedPlan.collect {
+ case s: ColumnarShuffleExchangeExec => s
+ }
+
+ assert(shuffleOpt.nonEmpty, "No shuffle exchange found in the query
plan")
+ assert(shuffleOpt.head.shuffleOrigin ===
REQUIRED_BY_STATEFUL_OPERATOR)
+ }
+ )
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]