This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new ede1d1e [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and group by col name ede1d1e is described below commit ede1d1e9a7a6498f09b3d14704432b2603a2951f Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Fri Aug 13 22:31:21 2021 -0700 [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and group by col name ### What changes were proposed in this pull request? Normalize pushed down aggregate col names and group by col names ... ### Why are the changes needed? to handle case sensitive col names ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing test Closes #33739 from huaxingao/normalize. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 3f8ec0dae4ddfd7ee55370dad5d44d03a9f10387) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 10 +++++++--- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index ab5a0fe..8b253da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -93,8 +93,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { agg } } - val pushedAggregates = PushDownUtils - .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + val normalizedAggregates = DataSourceStrategy.normalizeExprs( + aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]] + val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs( + groupingExpressions, sHolder.relation.output) + val pushedAggregates = PushDownUtils.pushAggregates( + sHolder.builder, normalizedAggregates, normalizedGroupingExpressions) if (pushedAggregates.isEmpty) { aggNode // return original plan node } else { @@ -115,7 +119,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // scalastyle:on val newOutput = scan.readSchema().toAttributes assert(newOutput.length == groupingExpressions.length + aggregates.length) - val groupAttrs = groupingExpressions.zip(newOutput).map { + val groupAttrs = normalizedGroupingExpressions.zip(newOutput).map { case (a: Attribute, b: Attribute) => b.withExprId(a.exprId) case (_, b) => b } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 37bc352..526dad9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -239,8 +239,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } test("scan with aggregate push-down: MAX MIN with filter and group by") { - val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" + - " group by DEPT") + val df = sql("select MAX(SaLaRY), MIN(BONUS) FROM h2.test.employee where dept > 0" + + " group by DePt") val filters = df.queryExecution.optimizedPlan.collect { case f: Filter => f } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org