This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0246a7cbac10 [SPARK-51732][SQL] Apply `rpad` on attributes with same 
`ExprId` if they need to be deduplicated
0246a7cbac10 is described below

commit 0246a7cbac10718c34f42c91f9628cf8ce6d0964
Author: Mihailo Timotic <[email protected]>
AuthorDate: Wed Apr 9 13:32:47 2025 +0800

    [SPARK-51732][SQL] Apply `rpad` on attributes with same `ExprId` if they 
need to be deduplicated
    
    ### What changes were proposed in this pull request?
    This PR fixes a case where `rpad` is not applied on attributes that have 
the same `ExprId` even though those attributes should be deduplicated.
    
    ### Why are the changes needed?
    For example, consider the following query:
    
    ```
    CREATE OR REPLACE TABLE t(a CHAR(50));
    SELECT t1.aFROM t t1
    WHERE (SELECT count(*) AS item_cnt FROM t t2 WHERE (t1.a = t2.a)) > 0
    ```
    In the above case, `ApplyCharTypePadding` will run for subquery where 
`t1.a` and `t2.a` will reference the same `ExprId`, therefore we won't apply 
`rpad`. However, after `DeduplicateRelations` runs for outer query, `t1.a` and 
`t2.a` will get different `ExprIds` and would therefore need `rpad`. However, 
this doesn't happen because `ApplyCharTypePadding` for outer query does not 
recurse into the subquery.
    
    On the other hand, for a query:
    
    ```
    SELECT t1.a
    FROM t t1, t t2
    WHERE t1.a = t2.a
    ```
    `ApplyCharTypePadding` will correctly add `rpad` to both `t1.a` and `t2.a` 
because attributes will first be deduplicated.
    
    In particular, this fixes a code-path when `readSideCharPadding` is off and 
`LEGACY_NO_CHAR_PADDING_IN_PREDICATE` is also false
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50527 from 
mihailotim-db/mihailotim-db/apply_char_type_padding_subqueries.
    
    Authored-by: Mihailo Timotic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../analysis/ApplyCharTypePaddingHelper.scala      |  2 +-
 .../catalyst/plans/logical/AnalysisHelper.scala    | 32 ++++++++++++++++++-
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 36 ++++++++++++++++++++--
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyCharTypePaddingHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyCharTypePaddingHelper.scala
index 8df977c80921..cf7ea21ee6f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyCharTypePaddingHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyCharTypePaddingHelper.scala
@@ -68,7 +68,7 @@ object ApplyCharTypePaddingHelper {
   private[sql] def paddingForStringComparison(
       plan: LogicalPlan,
       padCharCol: Boolean): LogicalPlan = {
-    plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON, 
IN)) {
+    
plan.resolveOperatorsUpWithSubqueriesAndPruning(_.containsAnyPattern(BINARY_COMPARISON,
 IN)) {
       case operator =>
         
operator.transformExpressionsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON,
 IN)) {
           case e if !e.childrenResolved => e
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index fd987c47f106..b8186fa07858 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
Expression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.rules.RuleId
 import org.apache.spark.sql.catalyst.rules.UnknownRuleId
 import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, CurrentOrigin, 
TreePatternBits}
+import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.Utils
 
@@ -155,6 +156,35 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { 
self: LogicalPlan =>
     }
   }
 
+  /**
+   * Similar to [[resolveOperatorsUpWithPruning]], but also applies the given 
partial function to
+   * all the plans in the subqueries of all nodes. This method is useful when 
we want to rewrite the
+   * whole plan, including its subqueries, in one go.
+   */
+  def resolveOperatorsUpWithSubqueriesAndPruning(
+      cond: TreePatternBits => Boolean,
+      ruleId: RuleId = UnknownRuleId)(
+      rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
+    val visit: PartialFunction[LogicalPlan, LogicalPlan] =
+      new PartialFunction[LogicalPlan, LogicalPlan] {
+        override def isDefinedAt(x: LogicalPlan): Boolean = true
+
+        override def apply(plan: LogicalPlan): LogicalPlan = {
+          val transformed = plan.transformExpressionsUpWithPruning(
+            t => t.containsPattern(PLAN_EXPRESSION) && cond(t)
+          ) {
+            case subquery: SubqueryExpression =>
+              val newPlan =
+                subquery.plan.resolveOperatorsUpWithSubqueriesAndPruning(cond, 
ruleId)(rule)
+              subquery.withNewPlan(newPlan)
+          }
+          rule.applyOrElse[LogicalPlan, LogicalPlan](transformed, identity)
+        }
+      }
+
+    resolveOperatorsUpWithPruning(cond, ruleId)(visit)
+  }
+
   /** Similar to [[resolveOperatorsUp]], but does it top-down. */
   def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): 
LogicalPlan = {
     resolveOperatorsDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 6a96431d5708..eceadc338bfc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql
 
 import org.apache.spark.{SparkConf, SparkRuntimeException}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, 
GreaterThan, ScalarSubquery, StringRPad}
 import org.apache.spark.sql.catalyst.expressions.Cast.toSQLId
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Project}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.SchemaRequiredDataSource
 import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
@@ -798,6 +798,38 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       )
     }
   }
+
+  test(
+    "SPARK-51732: rpad should be applied on attributes with same ExprId if 
those attributes " +
+      "should be deduplicated 2"
+  ) {
+    withSQLConf(
+      SQLConf.READ_SIDE_CHAR_PADDING.key -> "false",
+      SQLConf.LEGACY_NO_CHAR_PADDING_IN_PREDICATE.key -> "false"
+    ) {
+      withTable("mytable") {
+        sql(s"CREATE TABLE mytable(col CHAR(10))")
+        val plan = sql(
+          """
+            |SELECT t1.col
+            |FROM mytable t1
+            |WHERE (SELECT count(*) AS cnt FROM mytable t2 WHERE (t1.col = 
t2.col)) > 0
+          """.stripMargin).queryExecution.analyzed
+        val subquery = plan.asInstanceOf[Project]
+          .child.asInstanceOf[Filter]
+          .condition.asInstanceOf[GreaterThan]
+          .left.asInstanceOf[ScalarSubquery]
+        val subqueryFilterCondition = subquery.plan.asInstanceOf[Aggregate]
+          .child.asInstanceOf[Filter]
+          .condition.asInstanceOf[EqualTo]
+
+        // rpad should  be applied to both left and right hand side of t1.col 
= t2.col because the
+        // attributes are deduplicated.
+        assert(subqueryFilterCondition.left.isInstanceOf[StringRPad])
+        assert(subqueryFilterCondition.right.isInstanceOf[StringRPad])
+      }
+    }
+  }
 }
 
 // Some basic char/varchar tests which doesn't rely on table implementation.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to