This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new adac454 [SPARK-34682][SQL] Fix regression in canonicalization error
check in CustomShuffleReaderExec
adac454 is described below
commit adac45400de87ceda91429c4ac857ab02b54e19d
Author: Andy Grove <[email protected]>
AuthorDate: Wed Mar 10 20:48:00 2021 +0900
[SPARK-34682][SQL] Fix regression in canonicalization error check in
CustomShuffleReaderExec
### What changes were proposed in this pull request?
There is a regression in 3.1.1 compared to 3.0.2 when checking for a
canonicalized plan when executing CustomShuffleReaderExec.
The regression was caused by the call to `sendDriverMetrics` which happens
before the check and will always fail if the plan is canonicalized.
### Why are the changes needed?
This is a regression in a useful error check.
### Does this PR introduce _any_ user-facing change?
No. This is not an error that a user would typically see, as far as I know.
### How was this patch tested?
I tested this change locally by making a distribution from this PR branch.
Before fixing the regression I saw:
```
java.util.NoSuchElementException: key not found: numPartitions
```
After fixing this regression I saw:
```
java.lang.IllegalStateException: operating on canonicalized plan
```
Closes #31793 from andygrove/SPARK-34682.
Lead-authored-by: Andy Grove <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit fd4843803c4670c656a94c1af652fb4b945bc82c)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../adaptive/CustomShuffleReaderExec.scala | 12 ++++++------
.../execution/adaptive/AdaptiveQueryExecSuite.scala | 21 +++++++++++++++++++++
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 49a4c25..2319c9e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -179,12 +179,12 @@ case class CustomShuffleReaderExec private(
}
private lazy val shuffleRDD: RDD[_] = {
- sendDriverMetrics()
-
- shuffleStage.map { stage =>
- stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
- }.getOrElse {
- throw new IllegalStateException("operating on canonicalized plan")
+ shuffleStage match {
+ case Some(stage) =>
+ sendDriverMetrics()
+ stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
+ case _ =>
+ throw new IllegalStateException("operating on canonicalized plan")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 92f7f40..cdd1901 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.adaptive
import java.io.File
+import java.lang.reflect.InvocationTargetException
import java.net.URI
import org.apache.log4j.Level
@@ -869,6 +870,26 @@ class AdaptiveQueryExecSuite
}
}
+ test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan")
{
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+ val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT key FROM testData GROUP BY key")
+ val readers = collect(adaptivePlan) {
+ case r: CustomShuffleReaderExec => r
+ }
+ assert(readers.length == 1)
+ val reader = readers.head
+ val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec]
+ // we can't just call execute() because that has separate checks for
canonicalized plans
+ val doExecute = c.getClass.getMethod("doExecute")
+ doExecute.setAccessible(true)
+ val ex = intercept[InvocationTargetException] {
+ doExecute.invoke(c)
+ }
+ assert(ex.getCause.getMessage === "operating on canonicalized plan")
+ }
+ }
+
test("metrics of the shuffle reader") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]