This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9277353 [SPARK-38644][SQL] DS V2 topN push-down supports project with alias 9277353 is described below commit 9277353b23df4b54dfb65e948e1b3d001806929b Author: Jiaan Geng <belie...@163.com> AuthorDate: Fri Mar 25 20:00:39 2022 +0800 [SPARK-38644][SQL] DS V2 topN push-down supports project with alias ### What changes were proposed in this pull request? Currently, Spark DS V2 topN push-down doesn't supports project with alias. This PR let it works good with alias. **Example**: the origin plan show below: ``` Sort [mySalary#10 ASC NULLS FIRST], true +- Project [NAME#1, SALARY#2 AS mySalary#10] +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82) ``` The `pushedLimit` and `sortOrders` of `JDBCScanBuilder` are empty. If we can push down the top n, then the plan will be: ``` Project [NAME#1, SALARY#2 AS mySalary#10] +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82) ``` The `pushedLimit` of `JDBCScanBuilder` will be `1` and `sortOrders` of `JDBCScanBuilder` will be `SALARY ASC NULLS FIRST`. ### Why are the changes needed? Alias is more useful. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users could see DS V2 topN push-down supports project with alias. ### How was this patch tested? New tests. Closes #35961 from beliefer/SPARK-38644. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/V2ScanRelationPushDown.scala | 15 ++++++++------ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 24 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index c699e92..eaa30f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, Limit, LocalLimit, LogicalPlan, Project, Sample, Sort} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.expressions.SortOrder +import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder} import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, Count, GeneralAggregateFunc, Sum} import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} @@ -374,9 +374,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit sHolder.pushedLimit = Some(limit) } operation - case s @ Sort(order, _, operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder)) - if filter.isEmpty => - val orders = DataSourceStrategy.translateSortOrders(order) + case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) + if filter.isEmpty && CollapseProject.canCollapseExpressions( + order, project, alwaysInline = true) => + val aliasMap = getAliasMap(project) + val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + val orders = DataSourceStrategy.translateSortOrders(newOrder) if (orders.length == order.length) { val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit) if (topNPushed) { @@ -427,7 +430,7 @@ case class ScanBuilderHolder( builder: ScanBuilder) extends LeafNode { var pushedLimit: Option[Int] = None - var sortOrders: Seq[SortOrder] = Seq.empty[SortOrder] + var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder] var pushedSample: Option[TableSampleInfo] = None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index e7e9174..3ab87ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -267,6 +267,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false))) } + test("simple scan with top N: order by with alias") { + val df1 = spark.read + .table("h2.test.employee") + .select($"NAME", $"SALARY".as("mySalary")) + .sort("mySalary") + .limit(1) + checkSortRemoved(df1) + checkPushedInfo(df1, + "PushedFilters: [], PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ") + checkAnswer(df1, Seq(Row("cathy", 9000.00))) + + val df2 = spark.read + .table("h2.test.employee") + .select($"DEPT", $"NAME", $"SALARY".as("mySalary")) + .filter($"DEPT" > 1) + .sort("mySalary") + .limit(1) + checkSortRemoved(df2) + checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " + + "PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ") + checkAnswer(df2, Seq(Row(2, "david", 10000.00))) + } + test("scan with filter push-down") { val df = spark.table("h2.test.people").filter($"id" > 1) checkFiltersRemoved(df) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org