This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8894e785eda [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase 8894e785eda is described below commit 8894e785edae42a642351ad91e539324c39da8e4 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Jun 1 20:16:17 2022 +0300 [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase ### What changes were proposed in this pull request? In the PR, I propose to catch asserts/illegal state exception on each phase of query execution: ANALYSIS, OPTIMIZATION, PLANNING, and convert them to a SparkException w/ the `INTERNAL_ERROR` error class. ### Why are the changes needed? To improve user experience with Spark SQL and unify representation of user-facing errors. ### Does this PR introduce _any_ user-facing change? No. The changes might affect users in corner cases only. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite" $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite" ``` Closes #36704 from MaxGekk/wrapby-INTERNAL_ERROR-every-phase. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 +++++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++------- .../spark/sql/execution/QueryExecution.scala | 31 +++++++++++++++++++++- .../sql/execution/streaming/StreamExecution.scala | 4 ++- .../streaming/MicroBatchExecutionSuite.scala | 6 +++-- .../sql/streaming/continuous/ContinuousSuite.scala | 7 ++--- 6 files changed, 51 insertions(+), 22 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 2396f31b954..0a32b1b54d0 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ +import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming.SparkDataStream @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { + assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 - assert(e.getMessage.contains("was changed from 2 to 1")) + assert(e.getCause.getMessage.contains("was changed from 2 to 1")) }) ) } @@ -764,12 +766,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(df)( StartStream(checkpointLocation = metadataPath.getAbsolutePath), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { + assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") Seq( s"maximum supported log version is v1, but encountered v99999", "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => - assert(e.toString.contains(message)) + assert(e.getCause.toString.contains(message)) } })) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f00ebf51d6d..0a45cf92c6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils -import org.apache.spark.{SparkException, SparkThrowable, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ @@ -3920,19 +3920,11 @@ class Dataset[T] private[sql]( * the internal error exception. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { - try { - SQLExecution.withNewExecutionId(qe, Some(name)) { + SQLExecution.withNewExecutionId(qe, Some(name)) { + QueryExecution.withInternalError(s"""The "$name" action failed.""") { qe.executedPlan.resetMetrics() action(qe.executedPlan) } - } catch { - case e: SparkThrowable => throw e - case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => - throw new SparkException( - errorClass = "INTERNAL_ERROR", - messageParameters = Array(s"""The "$name" action failed."""), - cause = e) - case e: Throwable => throw e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9ea769b4cf1..206f2a24e0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -180,7 +181,9 @@ class QueryExecution( } protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive { - tracker.measurePhase(phase)(block) + QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") { + tracker.measurePhase(phase)(block) + } } def simpleString: String = { @@ -486,4 +489,30 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Converts asserts, null pointer, illegal state exceptions to internal errors. + */ + private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match { + case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException | + _: java.lang.AssertionError) => + new SparkException( + errorClass = "INTERNAL_ERROR", + messageParameters = Array(msg + + " Please, fill a bug report in, and provide the full stack trace."), + cause = e) + case e: Throwable => + e + } + + /** + * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { + try { + block + } catch { + case e: Throwable => throw toInternalError(msg, e) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 88896c55455..eeaa37aa7ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf @@ -319,7 +320,8 @@ abstract class StreamExecution( // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException` // to `new IOException(ie.toString())` before Hadoop 2.8. updateStatusMessage("Stopped") - case e: Throwable => + case t: Throwable => + val e = QueryExecution.toInternalError(msg = s"Execution of the stream $name failed.", t) streamDeathCause = new StreamingQueryException( toDebugString(includeLogicalPlan = isInitialized), s"Query $prettyIdString terminated with exception: ${e.getMessage}", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index f06e62b33b1..9d731248ad4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter +import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.connector.read.streaming @@ -93,8 +94,9 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { testStream(streamEvent) ( AddData(inputData, 1, 2, 3, 4, 5, 6), StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), - ExpectFailure[IllegalStateException] { e => - assert(e.getMessage.contains("batch 3 doesn't exist")) + ExpectFailure[SparkException] { e => + assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains("batch 3 doesn't exist")) } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 26c201d5921..cd1c865f5aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous import java.sql.Timestamp -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException, SparkThrowable} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ @@ -440,8 +440,9 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase { testStream(df)( StartStream(Trigger.Continuous(1)), - ExpectFailure[IllegalStateException] { e => - e.getMessage.contains("queue has exceeded its maximum") + ExpectFailure[SparkException] { e => + assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") + e.getCause.getMessage.contains("queue has exceeded its maximum") } ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org