This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3f8ec0d [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate
col name and group by col name
3f8ec0d is described below
commit 3f8ec0dae4ddfd7ee55370dad5d44d03a9f10387
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Aug 13 22:31:21 2021 -0700
[SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and
group by col name
### What changes were proposed in this pull request?
Normalize pushed down aggregate col names and group by col names ...
### Why are the changes needed?
to handle case sensitive col names
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modify existing test
Closes #33739 from huaxingao/normalize.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 10 +++++++---
.../src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++--
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index ab5a0fe..8b253da 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -93,8 +93,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with
PredicateHelper {
agg
}
}
- val pushedAggregates = PushDownUtils
- .pushAggregates(sHolder.builder, aggregates,
groupingExpressions)
+ val normalizedAggregates = DataSourceStrategy.normalizeExprs(
+ aggregates,
sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
+ val normalizedGroupingExpressions =
DataSourceStrategy.normalizeExprs(
+ groupingExpressions, sHolder.relation.output)
+ val pushedAggregates = PushDownUtils.pushAggregates(
+ sHolder.builder, normalizedAggregates,
normalizedGroupingExpressions)
if (pushedAggregates.isEmpty) {
aggNode // return original plan node
} else {
@@ -115,7 +119,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
// scalastyle:on
val newOutput = scan.readSchema().toAttributes
assert(newOutput.length == groupingExpressions.length +
aggregates.length)
- val groupAttrs = groupingExpressions.zip(newOutput).map {
+ val groupAttrs =
normalizedGroupingExpressions.zip(newOutput).map {
case (a: Attribute, b: Attribute) => b.withExprId(a.exprId)
case (_, b) => b
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 37bc352..526dad9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -239,8 +239,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession
with ExplainSuiteHel
}
test("scan with aggregate push-down: MAX MIN with filter and group by") {
- val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where
dept > 0" +
- " group by DEPT")
+ val df = sql("select MAX(SaLaRY), MIN(BONUS) FROM h2.test.employee where
dept > 0" +
+ " group by DePt")
val filters = df.queryExecution.optimizedPlan.collect {
case f: Filter => f
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]