This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1e525c1 [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
1e525c1 is described below
commit 1e525c1156ee5558bda3937751588a50bc132c38
Author: allisonwang-db <[email protected]>
AuthorDate: Fri Nov 20 09:47:54 2020 -0800
[SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
Backport #30373 for branch-3.0.
### What changes were proposed in this pull request?
This PR switched the order for the rule `RemoveRedundantSorts` and
`EnsureRequirements` so that `EnsureRequirements` will be invoked before
`RemoveRedundantSorts` to avoid IllegalArgumentException when instantiating
PartitioningCollection.
### Why are the changes needed?
`RemoveRedundantSorts` rule uses SparkPlan's `outputPartitioning` to check
whether a sort node is redundant. Currently, it is added before
`EnsureRequirements`. Since `PartitioningCollection` requires left and right
partitioning to have the same number of partitions, which is not necessarily
true before applying `EnsureRequirements`, the rule can fail with the following
exception:
```
IllegalArgumentException: requirement failed: PartitioningCollection
requires all of its partitionings have the same numPartitions.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes #30438 from allisonwang-db/spark-33472-3.0.
Authored-by: allisonwang-db
<[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/QueryExecution.scala | 4 +++-
.../org/apache/spark/sql/execution/SparkPlan.scala | 7 +++++-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 4 ++--
.../sql/execution/RemoveRedundantSortsSuite.scala | 25 ++++++++++++++++++++++
4 files changed, 36 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 574a67f..7f5a5e3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -297,8 +297,10 @@ object QueryExecution {
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
- RemoveRedundantSorts(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
+ // `RemoveRedundantSorts` needs to be added before `EnsureRequirements`
to guarantee the same
+ // number of partitions when instantiating PartitioningCollection.
+ RemoveRedundantSorts(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ead8c00..062aa69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
def longMetric(name: String): SQLMetric = metrics(name)
// TODO: Move to `DistributedPlan`
- /** Specifies how data is partitioned across different nodes in the cluster.
*/
+ /**
+ * Specifies how data is partitioned across different nodes in the cluster.
+ * Note this method may fail if it is invoked before `EnsureRequirements` is
applied
+ * since `PartitioningCollection` requires all its partitionings to have
+ * the same number of partitions.
+ */
def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG
WIDTH!
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 4e73f06..187827c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -90,8 +90,8 @@ case class AdaptiveSparkPlanExec(
// plan should reach a final status of query stages (i.e., no more addition
or removal of
// Exchange nodes) after running these rules.
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
- removeRedundantSorts,
- ensureRequirements
+ ensureRequirements,
+ removeRedundantSorts
) ++ context.session.sessionState.queryStagePrepRules
// A list of physical optimizer rules to be applied to a new stage before
its execution. These
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 1978d22..9cea812 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning,
UnknownPartitioning}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -99,6 +101,29 @@ abstract class RemoveRedundantSortsSuiteBase
}
}
}
+
+ test("SPARK-33472: shuffled join with different left and right side
partition numbers") {
+ withTempView("t1", "t2") {
+ spark.range(0, 100, 1, 2).select('id as
"key").createOrReplaceTempView("t1")
+ (0 to 100).toDF("key").createOrReplaceTempView("t2")
+
+ val query = """
+ |SELECT /*+ MERGE(t1) */ t1.key
+ |FROM t1 JOIN t2 ON t1.key = t2.key
+ |WHERE t1.key > 10 AND t2.key < 50
+ |ORDER BY t1.key ASC
+ """.stripMargin
+
+ val df = sql(query)
+ val sparkPlan = df.queryExecution.sparkPlan
+ val join = sparkPlan.collect { case j: SortMergeJoinExec => j }.head
+ val leftPartitioning = join.left.outputPartitioning
+ assert(leftPartitioning.isInstanceOf[RangePartitioning])
+ assert(leftPartitioning.numPartitions == 2)
+ assert(join.right.outputPartitioning == UnknownPartitioning(0))
+ checkSorts(query, 3, 3)
+ }
+ }
}
class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]