This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f5c3f0c228f [SPARK-39164][SQL] Wrap asserts/illegal state exceptions by the INTERNAL_ERROR exception in actions f5c3f0c228f is described below commit f5c3f0c228fef7808d1f927e134595ddd4d31723 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Thu May 12 23:54:19 2022 +0300 [SPARK-39164][SQL] Wrap asserts/illegal state exceptions by the INTERNAL_ERROR exception in actions ### What changes were proposed in this pull request? In the PR, I propose to catch `java.lang.IllegalStateException` and `java.lang.AssertionError` (raised by asserts), and wrap them by Spark's exception w/ the `INTERNAL_ERROR` error class. The modification affects only actions so far. This PR affects the case of missing bucket file. After the changes, Spark throws `SparkException` w/ `INTERNAL_ERROR` instead of `IllegalStateException`. Since this is not Spark's illegal state, the exception should be replaced by another runtime exception. Created the ticket SPARK-39163 to fix this. ### Why are the changes needed? To improve user experience with Spark SQL and unify representation of internal errors by using error classes like for other errors. Usually, users shouldn't observe asserts and illegal states, but even if such situation happens, they should see errors in the same way as other errors (w/ error class `INTERNAL_ERROR`). ### Does this PR introduce _any_ user-facing change? Yes. At least, in one particular case, see the modified test suites and SPARK-39163. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *.BucketedReadWithoutHiveSupportSuite" $ build/sbt "test:testOnly *.AdaptiveQueryExecSuite" $ build/sbt "test:testOnly *.WholeStageCodegenSuite" ``` Closes #36500 from MaxGekk/class-internal-error. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 21 ++++++++++++++++----- .../spark/sql/execution/DataSourceScanExec.scala | 1 + .../org/apache/spark/sql/execution/subquery.scala | 1 + .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++++++---- .../sql/execution/WholeStageCodegenSuite.scala | 14 ++++++++------ .../execution/adaptive/AdaptiveQueryExecSuite.scala | 9 ++++++--- .../spark/sql/sources/BucketedReadSuite.scala | 8 +++++--- 7 files changed, 43 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 36b6d6b470d..8c89ec795de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, SparkThrowable, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ @@ -3906,12 +3906,23 @@ class Dataset[T] private[sql]( /** * Wrap a Dataset action to track the QueryExecution and time cost, then report to the - * user-registered callback functions. + * user-registered callback functions, and also to convert asserts/illegal states to + * the internal error exception. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { - SQLExecution.withNewExecutionId(qe, Some(name)) { - qe.executedPlan.resetMetrics() - action(qe.executedPlan) + try { + SQLExecution.withNewExecutionId(qe, Some(name)) { + qe.executedPlan.resetMetrics() + action(qe.executedPlan) + } + } catch { + case e: SparkThrowable => throw e + case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => + throw new SparkException( + errorClass = "INTERNAL_ERROR", + messageParameters = Array(s"""The "$name" action failed."""), + cause = e) + case e: Throwable => throw e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 9141a3f742e..f7b627cef08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -618,6 +618,7 @@ case class FileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 4bbfc3467d4..209b0f79243 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -79,6 +79,7 @@ case class ScalarSubquery( def updateResult(): Unit = { val rows = plan.executeCollect() if (rows.length > 1) { + // TODO(SPARK-39167): Throw an exception w/ an error class for multiple rows from a subquery throw new IllegalStateException( s"more than one row returned by a subquery used as an expression:\n$plan") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 221663c61e1..396fca47634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} @@ -146,12 +147,13 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("runtime error when the number of rows is greater than 1") { - val error2 = intercept[RuntimeException] { + val e = intercept[SparkException] { sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect() } - assert(error2.getMessage.contains( - "more than one row returned by a subquery used as an expression") - ) + // TODO(SPARK-39167): Throw an exception w/ an error class for multiple rows from a subquery + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains( + "more than one row returned by a subquery used as an expression")) } test("uncorrelated scalar subquery on a DataFrame generated query") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 622cce7e8b3..7da55674c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite @@ -762,10 +763,11 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession "SELECT AVG(v) FROM VALUES(1) t(v)", // Tet case with keys "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query => - val errMsg = intercept[IllegalStateException] { + val e = intercept[SparkException] { sql(query).collect - }.getMessage - assert(errMsg.contains(expectedErrMsg)) + } + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains(expectedErrMsg)) } } } @@ -784,11 +786,11 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession // Tet case with keys "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " + "GROUP BY k").foreach { query => - val e = intercept[Exception] { + val e = intercept[SparkException] { sql(query).collect } - assert(e.isInstanceOf[IllegalStateException]) - assert(e.getMessage.contains(expectedErrMsg)) + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains(expectedErrMsg)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index eff19302c5b..0f71c028962 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,6 +23,7 @@ import java.net.URI import org.apache.logging.log4j.Level import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} @@ -856,11 +857,13 @@ class AdaptiveQueryExecSuite df1.write.parquet(tableDir.getAbsolutePath) val aggregated = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[Exception] { + val error = intercept[SparkException] { aggregated.count() } - assert(error.toString contains "Invalid bucket file") - assert(error.getSuppressed.size === 0) + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file + assert(error.getErrorClass === "INTERNAL_ERROR") + assert(error.getCause.toString contains "Invalid bucket file") + assert(error.getCause.getSuppressed.size === 0) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 921cff2adca..c39edbc5860 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,6 +22,7 @@ import java.net.URI import scala.util.Random +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions @@ -841,11 +842,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.parquet(tableDir.getAbsolutePath) val aggregated = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[Exception] { + val e = intercept[SparkException] { aggregated.count() } - - assert(error.toString contains "Invalid bucket file") + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.toString contains "Invalid bucket file") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org