This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 52c2988 [SPARK-31862][SQL] Remove exception wrapping in AQE 52c2988 is described below commit 52c2988a65d6a2d65f45e222423c59c9fb169b87 Author: Maryann Xue <maryann....@gmail.com> AuthorDate: Fri May 29 04:23:38 2020 +0000 [SPARK-31862][SQL] Remove exception wrapping in AQE ### What changes were proposed in this pull request? This PR removes the excessive exception wrapping in AQE so that error messages are less verbose and mostly consistent with non-aqe execution. Exceptions from stage materialization are now only wrapped with `SparkException` if there are multiple stage failures. Also, stage cancelling errors will not be included as part the exception thrown, but rather just be error logged. ### Why are the changes needed? This will make the AQE error reporting more readable and debuggable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated existing tests. Closes #28668 from maryannxue/spark-31862. Authored-by: Maryann Xue <maryann....@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 45864faaf2c9837a2ca48c456d3c2300736aa1ba) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 38 +++++++++------------- .../sql/execution/adaptive/QueryStageExec.scala | 3 +- .../org/apache/spark/sql/MetadataCacheSuite.scala | 9 +++-- .../adaptive/AdaptiveQueryExecSuite.scala | 2 +- .../sql/execution/adaptive/AdaptiveTestUtils.scala | 22 ------------- .../sql/execution/datasources/json/JsonSuite.scala | 3 +- .../execution/datasources/orc/OrcQuerySuite.scala | 9 +++-- .../sql/execution/joins/BroadcastJoinSuite.scala | 4 +-- .../spark/sql/sources/BucketedReadSuite.scala | 3 +- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 5 ++- 10 files changed, 33 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 90d1db9..f6a3333 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -156,7 +156,7 @@ case class AdaptiveSparkPlanExec( var currentLogicalPlan = currentPhysicalPlan.logicalLink.get var result = createQueryStages(currentPhysicalPlan) val events = new LinkedBlockingQueue[StageMaterializationEvent]() - val errors = new mutable.ArrayBuffer[SparkException]() + val errors = new mutable.ArrayBuffer[Throwable]() var stagesToReplace = Seq.empty[QueryStageExec] while (!result.allChildStagesMaterialized) { currentPhysicalPlan = result.newPlan @@ -176,9 +176,7 @@ case class AdaptiveSparkPlanExec( }(AdaptiveSparkPlanExec.executionContext) } catch { case e: Throwable => - val ex = new SparkException( - s"Early failed query stage found: ${stage.treeString}", e) - cleanUpAndThrowException(Seq(ex), Some(stage.id)) + cleanUpAndThrowException(Seq(e), Some(stage.id)) } } } @@ -193,8 +191,7 @@ case class AdaptiveSparkPlanExec( case StageSuccess(stage, res) => stage.resultOption = Some(res) case StageFailure(stage, ex) => - errors.append( - new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex)) + errors.append(ex) } // In case of errors, we cancel all running stages and throw exception. @@ -536,31 +533,28 @@ case class AdaptiveSparkPlanExec( * materialization errors and stage cancellation errors. */ private def cleanUpAndThrowException( - errors: Seq[SparkException], + errors: Seq[Throwable], earlyFailedStage: Option[Int]): Unit = { - val runningStages = currentPhysicalPlan.collect { + currentPhysicalPlan.foreach { // earlyFailedStage is the stage which failed before calling doMaterialize, // so we should avoid calling cancel on it to re-trigger the failure again. - case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s - } - val cancelErrors = new mutable.ArrayBuffer[SparkException]() - try { - runningStages.foreach { s => + case s: QueryStageExec if !earlyFailedStage.contains(s.id) => try { s.cancel() } catch { case NonFatal(t) => - cancelErrors.append( - new SparkException(s"Failed to cancel query stage: ${s.treeString}", t)) + logError(s"Exception in cancelling query stage: ${s.treeString}", t) } - } - } finally { - val ex = new SparkException( - "Adaptive execution failed due to stage materialization failures.", errors.head) - errors.tail.foreach(ex.addSuppressed) - cancelErrors.foreach(ex.addSuppressed) - throw ex + case _ => + } + val e = if (errors.size == 1) { + errors.head + } else { + val se = new SparkException("Multiple failures in stage materialization.", errors.head) + errors.tail.foreach(se.addSuppressed) + se } + throw e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index f414f85..9a9a8b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -147,7 +148,7 @@ case class ShuffleQueryStageExec( throw new IllegalStateException("wrong plan for shuffle stage:\n " + plan.treeString) } - override def doMaterialize(): Future[Any] = { + override def doMaterialize(): Future[Any] = attachTree(this, "execute") { shuffle.mapOutputStatisticsFuture } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala index a9f443b..956bd78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import java.io.File import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -56,8 +55,8 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSparkSession { val e = intercept[SparkException] { df.count() } - assertExceptionMessage(e, "FileNotFoundException") - assertExceptionMessage(e, "recreating the Dataset/DataFrame involved") + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("recreating the Dataset/DataFrame involved")) } } } @@ -85,8 +84,8 @@ class MetadataCacheV1Suite extends MetadataCacheSuite { val e = intercept[SparkException] { sql("select count(*) from view_refresh").first() } - assertExceptionMessage(e, "FileNotFoundException") - assertExceptionMessage(e, "REFRESH") + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("REFRESH")) // Refresh and we should be able to read it again. spark.catalog.refreshTable("view_refresh") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 18923b2..37e8e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -706,7 +706,7 @@ class AdaptiveQueryExecSuite val error = intercept[Exception] { agged.count() } - assert(error.getCause().toString contains "Early failed query stage found") + assert(error.getCause().toString contains "Invalid bucket file") assert(error.getSuppressed.size === 0) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala index ddaeb57..48f85ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala @@ -69,25 +69,3 @@ trait DisableAdaptiveExecutionSuite extends SQLTestUtils { } } } - -object AdaptiveTestUtils { - def assertExceptionMessage(e: Exception, expected: String): Unit = { - val stringWriter = new StringWriter() - e.printStackTrace(new PrintWriter(stringWriter)) - val errorMsg = stringWriter.toString - assert(errorMsg.contains(expected)) - } - - def assertExceptionCause(t: Throwable, causeClass: Class[_]): Unit = { - var c = t.getCause - var foundCause = false - while (c != null && !foundCause) { - if (causeClass.isAssignableFrom(c.getClass)) { - foundCause = true - } else { - c = c.getCause - } - } - assert(foundCause, s"Can not find cause: $causeClass") - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4982991..19ec586 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD -import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -2239,7 +2238,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson .count() } - assertExceptionMessage(exception, "Malformed records are detected in record parsing") + assert(exception.getMessage.contains("Malformed records are detected in record parsing")) } def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 60f278b..9caf0c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -34,7 +34,6 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -599,19 +598,19 @@ abstract class OrcQueryTest extends OrcTest { val e1 = intercept[SparkException] { testIgnoreCorruptFiles() } - assertExceptionMessage(e1, "Malformed ORC file") + assert(e1.getMessage.contains("Malformed ORC file")) val e2 = intercept[SparkException] { testIgnoreCorruptFilesWithoutSchemaInfer() } - assertExceptionMessage(e2, "Malformed ORC file") + assert(e2.getMessage.contains("Malformed ORC file")) val e3 = intercept[SparkException] { testAllCorruptFiles() } - assertExceptionMessage(e3, "Could not read footer for file") + assert(e3.getMessage.contains("Could not read footer for file")) val e4 = intercept[SparkException] { testAllCorruptFilesWithoutSchemaInfer() } - assertExceptionMessage(e4, "Malformed ORC file") + assert(e4.getMessage.contains("Malformed ORC file")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 1be9308..335ef25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft} import org.apache.spark.sql.catalyst.plans.logical.BROADCAST import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AdaptiveTestUtils, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.functions._ @@ -411,7 +411,7 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils val e = intercept[Exception] { testDf.collect() } - AdaptiveTestUtils.assertExceptionMessage(e, s"Could not execute broadcast in $timeout secs.") + assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs.")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 707d9c2..14ba008 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -759,7 +758,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { agged.count() } - assertExceptionMessage(error, "Invalid bucket file") + assert(error.getCause().toString contains "Invalid bucket file") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 743cdbd..db8ebcd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -21,7 +21,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -100,7 +99,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi val e = intercept[SparkException] { sql("select * from test").count() } - assertExceptionMessage(e, "FileNotFoundException") + assert(e.getMessage.contains("FileNotFoundException")) // Test refreshing the cache. spark.catalog.refreshTable("test") @@ -115,7 +114,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi val e2 = intercept[SparkException] { sql("select * from test").count() } - assertExceptionMessage(e2, "FileNotFoundException") + assert(e.getMessage.contains("FileNotFoundException")) spark.catalog.refreshByPath(dir.getAbsolutePath) assert(sql("select * from test").count() == 3) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org