This is an automated email from the ASF dual-hosted git repository.
exmy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9a71c146f0 [GLUTEN-9581][CH] Fix: in subquery expression cannot exist
in project (#9583)
9a71c146f0 is described below
commit 9a71c146f097a9a65d33d49aee1796b3571e7d83
Author: lgbo <[email protected]>
AuthorDate: Mon May 12 11:22:35 2025 +0800
[GLUTEN-9581][CH] Fix: in subquery expression cannot exist in project
(#9583)
---
.../extension/CoalesceAggregationUnion.scala | 19 +++++++++++++--
.../GlutenCoalesceAggregationUnionSuite.scala | 28 ++++++++++++++++++----
2 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
index 05e522ae82..331dfdc2db 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
@@ -312,6 +312,13 @@ object CoalesceUnionUtil extends Logging {
Project(reprojectOutputs, coalescePlan)
}
}
+
+ def hasListQueryInside(expression: Expression): Boolean = {
+ expression match {
+ case list: ListQuery => true
+ case _ => expression.children.exists(hasListQueryInside)
+ }
+ }
}
/*
@@ -341,7 +348,7 @@ case class CoalesceAggregationUnion(spark: SparkSession)
extends Rule[LogicalPla
case class PlanAnalyzer(originalAggregate: Aggregate) extends
AbstractPlanAnalyzer {
protected def extractFilter(): Option[Filter] = {
- originalAggregate.child match {
+ val filter = originalAggregate.child match {
case filter: Filter => Some(filter)
case project @ Project(_, filter: Filter) => Some(filter)
case subquery: SubqueryAlias =>
@@ -357,6 +364,10 @@ case class CoalesceAggregationUnion(spark: SparkSession)
extends Rule[LogicalPla
}
case _ => None
}
+ filter match {
+ case Some(f) if CoalesceUnionUtil.hasListQueryInside(f.condition) =>
None
+ case _ => filter
+ }
}
// Try to make the plan simple, contain only three steps, source, filter,
aggregate.
@@ -829,10 +840,14 @@ case class CoalesceProjectionUnion(spark: SparkSession)
extends Rule[LogicalPlan
case class PlanAnalyzer(originalPlan: LogicalPlan) extends
AbstractPlanAnalyzer {
def extractFilter(): Option[Filter] = {
- originalPlan match {
+ val filter = originalPlan match {
case project @ Project(_, filter: Filter) => Some(filter)
case _ => None
}
+ filter match {
+ case Some(f) if CoalesceUnionUtil.hasListQueryInside(f.condition) =>
None
+ case _ => filter
+ }
}
lazy val extractedSourcePlan = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
index 8479d9f41e..e5374bebb4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
@@ -69,7 +69,7 @@ class GlutenCoalesceAggregationUnionSuite extends
GlutenClickHouseWholeStageTran
StructField("x", StringType, nullable = true),
StructField("y", IntegerType, nullable = true)
))
- val data = sparkContext.parallelize(
+ val data1 = sparkContext.parallelize(
Seq(
Row("a", 1, null, 1),
Row("a", 2, "a", 2),
@@ -81,9 +81,17 @@ class GlutenCoalesceAggregationUnionSuite extends
GlutenClickHouseWholeStageTran
Row("b", 4, "g", null)
))
- val dataFrame = spark.createDataFrame(data, schema)
- createTestTable("coalesce_union_t1", dataFrame)
- createTestTable("coalesce_union_t2", dataFrame)
+ val dataFrame1 = spark.createDataFrame(data1, schema)
+ createTestTable("coalesce_union_t1", dataFrame1)
+ createTestTable("coalesce_union_t2", dataFrame1)
+
+ val data2 = sparkContext.parallelize(
+ Seq(
+ Row("a", 1, null, 1),
+ Row("a", 2, "a", 2)
+ ))
+ val dataFrame2 = spark.createDataFrame(data2, schema)
+ createTestTable("coalesce_union_t3", dataFrame2)
}
def checkNoUnion(df: DataFrame): Unit = {
@@ -495,4 +503,16 @@ class GlutenCoalesceAggregationUnionSuite extends
GlutenClickHouseWholeStageTran
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, checkHasUnion, true)
}
+
+ test("no coalesce project union. case 3") {
+ val sql =
+ """
+ |select a from (
+ | select a from coalesce_union_t1 where b % 2 = 0
+ | union all
+ | select a from coalesce_union_t1 where a in (select a from
coalesce_union_t3)
+ |) order by a
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, checkHasUnion, true)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]