This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 1372f312052 [SPARK-39164][SQL][3.3] Wrap asserts/illegal state exceptions by the INTERNAL_ERROR exception in actions 1372f312052 is described below commit 1372f312052dd0361e371e2ed63436f3e299c617 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri May 13 16:43:53 2022 +0300 [SPARK-39164][SQL][3.3] Wrap asserts/illegal state exceptions by the INTERNAL_ERROR exception in actions ### What changes were proposed in this pull request? In the PR, I propose to catch `java.lang.IllegalStateException` and `java.lang.AssertionError` (raised by asserts), and wrap them by Spark's exception w/ the `INTERNAL_ERROR` error class. The modification affects only actions so far. This PR affects the case of missing bucket file. After the changes, Spark throws `SparkException` w/ `INTERNAL_ERROR` instead of `IllegalStateException`. Since this is not Spark's illegal state, the exception should be replaced by another runtime exception. Created the ticket SPARK-39163 to fix this. This is a backport of https://github.com/apache/spark/pull/36500. ### Why are the changes needed? To improve user experience with Spark SQL and unify representation of internal errors by using error classes like for other errors. Usually, users shouldn't observe asserts and illegal states, but even if such situation happens, they should see errors in the same way as other errors (w/ error class `INTERNAL_ERROR`). ### Does this PR introduce _any_ user-facing change? Yes. At least, in one particular case, see the modified test suites and SPARK-39163. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *.BucketedReadWithoutHiveSupportSuite" $ build/sbt "test:testOnly *.AdaptiveQueryExecSuite" $ build/sbt "test:testOnly *.WholeStageCodegenSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Max Gekk <max.gekkgmail.com> (cherry picked from commit f5c3f0c228fef7808d1f927e134595ddd4d31723) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #36533 from MaxGekk/class-internal-error-3.3. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 21 ++++++++++++++++----- .../spark/sql/execution/DataSourceScanExec.scala | 1 + .../org/apache/spark/sql/execution/subquery.scala | 1 + .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++++++---- .../sql/execution/WholeStageCodegenSuite.scala | 14 ++++++++------ .../execution/adaptive/AdaptiveQueryExecSuite.scala | 9 ++++++--- .../spark/sql/sources/BucketedReadSuite.scala | 8 +++++--- 7 files changed, 43 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7d16a2f5eee..56f0e8978ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, SparkThrowable, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ @@ -3848,12 +3848,23 @@ class Dataset[T] private[sql]( /** * Wrap a Dataset action to track the QueryExecution and time cost, then report to the - * user-registered callback functions. + * user-registered callback functions, and also to convert asserts/illegal states to + * the internal error exception. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { - SQLExecution.withNewExecutionId(qe, Some(name)) { - qe.executedPlan.resetMetrics() - action(qe.executedPlan) + try { + SQLExecution.withNewExecutionId(qe, Some(name)) { + qe.executedPlan.resetMetrics() + action(qe.executedPlan) + } + } catch { + case e: SparkThrowable => throw e + case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => + throw new SparkException( + errorClass = "INTERNAL_ERROR", + messageParameters = Array(s"""The "$name" action failed."""), + cause = e) + case e: Throwable => throw e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ac0f3af5725..1ec93a614b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -592,6 +592,7 @@ case class FileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 4bbfc3467d4..209b0f79243 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -79,6 +79,7 @@ case class ScalarSubquery( def updateResult(): Unit = { val rows = plan.executeCollect() if (rows.length > 1) { + // TODO(SPARK-39167): Throw an exception w/ an error class for multiple rows from a subquery throw new IllegalStateException( s"more than one row returned by a subquery used as an expression:\n$plan") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 221663c61e1..396fca47634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} @@ -146,12 +147,13 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("runtime error when the number of rows is greater than 1") { - val error2 = intercept[RuntimeException] { + val e = intercept[SparkException] { sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect() } - assert(error2.getMessage.contains( - "more than one row returned by a subquery used as an expression") - ) + // TODO(SPARK-39167): Throw an exception w/ an error class for multiple rows from a subquery + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains( + "more than one row returned by a subquery used as an expression")) } test("uncorrelated scalar subquery on a DataFrame generated query") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index ba511354f7a..27689bb4d45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite @@ -762,10 +763,11 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession "SELECT AVG(v) FROM VALUES(1) t(v)", // Tet case with keys "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query => - val errMsg = intercept[IllegalStateException] { + val e = intercept[SparkException] { sql(query).collect - }.getMessage - assert(errMsg.contains(expectedErrMsg)) + } + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains(expectedErrMsg)) } } } @@ -784,11 +786,11 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession // Tet case with keys "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " + "GROUP BY k").foreach { query => - val e = intercept[Exception] { + val e = intercept[SparkException] { sql(query).collect } - assert(e.isInstanceOf[IllegalStateException]) - assert(e.getMessage.contains(expectedErrMsg)) + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.getMessage.contains(expectedErrMsg)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 76741dc4d08..90aff26b7fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,6 +23,7 @@ import java.net.URI import org.apache.logging.log4j.Level import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} @@ -856,11 +857,13 @@ class AdaptiveQueryExecSuite df1.write.parquet(tableDir.getAbsolutePath) val aggregated = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[Exception] { + val error = intercept[SparkException] { aggregated.count() } - assert(error.toString contains "Invalid bucket file") - assert(error.getSuppressed.size === 0) + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file + assert(error.getErrorClass === "INTERNAL_ERROR") + assert(error.getCause.toString contains "Invalid bucket file") + assert(error.getCause.getSuppressed.size === 0) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 18039db2ca7..c3250f8d9fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,6 +22,7 @@ import java.net.URI import scala.util.Random +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions @@ -841,11 +842,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.parquet(tableDir.getAbsolutePath) val aggregated = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[Exception] { + val e = intercept[SparkException] { aggregated.count() } - - assert(error.toString contains "Invalid bucket file") + // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file + assert(e.getErrorClass === "INTERNAL_ERROR") + assert(e.getCause.toString contains "Invalid bucket file") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org