This is an automated email from the ASF dual-hosted git repository.

exmy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a71c146f0 [GLUTEN-9581][CH] Fix: in subquery expression cannot exist 
in project (#9583)
9a71c146f0 is described below

commit 9a71c146f097a9a65d33d49aee1796b3571e7d83
Author: lgbo <[email protected]>
AuthorDate: Mon May 12 11:22:35 2025 +0800

    [GLUTEN-9581][CH] Fix: in subquery expression cannot exist in project 
(#9583)
---
 .../extension/CoalesceAggregationUnion.scala       | 19 +++++++++++++--
 .../GlutenCoalesceAggregationUnionSuite.scala      | 28 ++++++++++++++++++----
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
index 05e522ae82..331dfdc2db 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
@@ -312,6 +312,13 @@ object CoalesceUnionUtil extends Logging {
       Project(reprojectOutputs, coalescePlan)
     }
   }
+
+  def hasListQueryInside(expression: Expression): Boolean = {
+    expression match {
+      case list: ListQuery => true
+      case _ => expression.children.exists(hasListQueryInside)
+    }
+  }
 }
 
 /*
@@ -341,7 +348,7 @@ case class CoalesceAggregationUnion(spark: SparkSession) 
extends Rule[LogicalPla
   case class PlanAnalyzer(originalAggregate: Aggregate) extends 
AbstractPlanAnalyzer {
 
     protected def extractFilter(): Option[Filter] = {
-      originalAggregate.child match {
+      val filter = originalAggregate.child match {
         case filter: Filter => Some(filter)
         case project @ Project(_, filter: Filter) => Some(filter)
         case subquery: SubqueryAlias =>
@@ -357,6 +364,10 @@ case class CoalesceAggregationUnion(spark: SparkSession) 
extends Rule[LogicalPla
           }
         case _ => None
       }
+      filter match {
+        case Some(f) if CoalesceUnionUtil.hasListQueryInside(f.condition) => 
None
+        case _ => filter
+      }
     }
 
     // Try to make the plan simple, contain only three steps, source, filter, 
aggregate.
@@ -829,10 +840,14 @@ case class CoalesceProjectionUnion(spark: SparkSession) 
extends Rule[LogicalPlan
 
   case class PlanAnalyzer(originalPlan: LogicalPlan) extends 
AbstractPlanAnalyzer {
     def extractFilter(): Option[Filter] = {
-      originalPlan match {
+      val filter = originalPlan match {
         case project @ Project(_, filter: Filter) => Some(filter)
         case _ => None
       }
+      filter match {
+        case Some(f) if CoalesceUnionUtil.hasListQueryInside(f.condition) => 
None
+        case _ => filter
+      }
     }
 
     lazy val extractedSourcePlan = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
index 8479d9f41e..e5374bebb4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenCoalesceAggregationUnionSuite.scala
@@ -69,7 +69,7 @@ class GlutenCoalesceAggregationUnionSuite extends 
GlutenClickHouseWholeStageTran
         StructField("x", StringType, nullable = true),
         StructField("y", IntegerType, nullable = true)
       ))
-    val data = sparkContext.parallelize(
+    val data1 = sparkContext.parallelize(
       Seq(
         Row("a", 1, null, 1),
         Row("a", 2, "a", 2),
@@ -81,9 +81,17 @@ class GlutenCoalesceAggregationUnionSuite extends 
GlutenClickHouseWholeStageTran
         Row("b", 4, "g", null)
       ))
 
-    val dataFrame = spark.createDataFrame(data, schema)
-    createTestTable("coalesce_union_t1", dataFrame)
-    createTestTable("coalesce_union_t2", dataFrame)
+    val dataFrame1 = spark.createDataFrame(data1, schema)
+    createTestTable("coalesce_union_t1", dataFrame1)
+    createTestTable("coalesce_union_t2", dataFrame1)
+
+    val data2 = sparkContext.parallelize(
+      Seq(
+        Row("a", 1, null, 1),
+        Row("a", 2, "a", 2)
+      ))
+    val dataFrame2 = spark.createDataFrame(data2, schema)
+    createTestTable("coalesce_union_t3", dataFrame2)
   }
 
   def checkNoUnion(df: DataFrame): Unit = {
@@ -495,4 +503,16 @@ class GlutenCoalesceAggregationUnionSuite extends 
GlutenClickHouseWholeStageTran
         |""".stripMargin
     compareResultsAgainstVanillaSpark(sql, true, checkHasUnion, true)
   }
+
+  test("no coalesce project union. case 3") {
+    val sql =
+      """
+        |select a from (
+        |   select a from coalesce_union_t1 where b % 2 = 0
+        |   union all
+        |   select a from coalesce_union_t1 where a in (select a from 
coalesce_union_t3)
+        |) order by a
+        |""".stripMargin
+    compareResultsAgainstVanillaSpark(sql, true, checkHasUnion, true)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to