This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ede1d1e  [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate 
col name and group by col name
ede1d1e is described below

commit ede1d1e9a7a6498f09b3d14704432b2603a2951f
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Fri Aug 13 22:31:21 2021 -0700

    [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and 
group by col name
    
    ### What changes were proposed in this pull request?
    Normalize pushed down aggregate col names and group by col names ...
    
    ### Why are the changes needed?
    to handle case sensitive col names
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Modify existing test
    
    Closes #33739 from huaxingao/normalize.
    
    Authored-by: Huaxin Gao <huaxin_...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 3f8ec0dae4ddfd7ee55370dad5d44d03a9f10387)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala  | 10 +++++++---
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala |  4 ++--
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index ab5a0fe..8b253da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -93,8 +93,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with 
PredicateHelper {
                     agg
                 }
               }
-              val pushedAggregates = PushDownUtils
-                .pushAggregates(sHolder.builder, aggregates, 
groupingExpressions)
+              val normalizedAggregates = DataSourceStrategy.normalizeExprs(
+                aggregates, 
sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
+              val normalizedGroupingExpressions = 
DataSourceStrategy.normalizeExprs(
+                groupingExpressions, sHolder.relation.output)
+              val pushedAggregates = PushDownUtils.pushAggregates(
+                sHolder.builder, normalizedAggregates, 
normalizedGroupingExpressions)
               if (pushedAggregates.isEmpty) {
                 aggNode // return original plan node
               } else {
@@ -115,7 +119,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
                 // scalastyle:on
                 val newOutput = scan.readSchema().toAttributes
                 assert(newOutput.length == groupingExpressions.length + 
aggregates.length)
-                val groupAttrs = groupingExpressions.zip(newOutput).map {
+                val groupAttrs = 
normalizedGroupingExpressions.zip(newOutput).map {
                   case (a: Attribute, b: Attribute) => b.withExprId(a.exprId)
                   case (_, b) => b
                 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 37bc352..526dad9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -239,8 +239,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession 
with ExplainSuiteHel
   }
 
   test("scan with aggregate push-down: MAX MIN with filter and group by") {
-    val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where 
dept > 0" +
-      " group by DEPT")
+    val df = sql("select MAX(SaLaRY), MIN(BONUS) FROM h2.test.employee where 
dept > 0" +
+      " group by DePt")
     val filters = df.queryExecution.optimizedPlan.collect {
       case f: Filter => f
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to