This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c3f0c228f [SPARK-39164][SQL] Wrap asserts/illegal state exceptions 
by the INTERNAL_ERROR exception in actions
f5c3f0c228f is described below

commit f5c3f0c228fef7808d1f927e134595ddd4d31723
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Thu May 12 23:54:19 2022 +0300

    [SPARK-39164][SQL] Wrap asserts/illegal state exceptions by the 
INTERNAL_ERROR exception in actions
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to catch `java.lang.IllegalStateException` and 
`java.lang.AssertionError` (raised by asserts), and wrap them by Spark's 
exception w/ the `INTERNAL_ERROR` error class. The modification affects only 
actions so far.
    
    This PR affects the case of missing bucket file. After the changes, Spark 
throws `SparkException` w/ `INTERNAL_ERROR` instead of `IllegalStateException`. 
Since this is not Spark's illegal state, the exception should be replaced by 
another runtime exception. Created the ticket SPARK-39163 to fix this.
    
    ### Why are the changes needed?
    To improve user experience with Spark SQL and unify representation of 
internal errors by using error classes like for other errors. Usually, users 
shouldn't observe asserts and illegal states, but even if such situation 
happens, they should see errors in the same way as other errors (w/ error class 
`INTERNAL_ERROR`).
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. At least, in one particular case, see the modified test suites and 
SPARK-39163.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "test:testOnly *.BucketedReadWithoutHiveSupportSuite"
    $ build/sbt "test:testOnly *.AdaptiveQueryExecSuite"
    $ build/sbt "test:testOnly *.WholeStageCodegenSuite"
    ```
    
    Closes #36500 from MaxGekk/class-internal-error.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala   | 21 ++++++++++++++++-----
 .../spark/sql/execution/DataSourceScanExec.scala    |  1 +
 .../org/apache/spark/sql/execution/subquery.scala   |  1 +
 .../scala/org/apache/spark/sql/SubquerySuite.scala  | 10 ++++++----
 .../sql/execution/WholeStageCodegenSuite.scala      | 14 ++++++++------
 .../execution/adaptive/AdaptiveQueryExecSuite.scala |  9 ++++++---
 .../spark/sql/sources/BucketedReadSuite.scala       |  8 +++++---
 7 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 36b6d6b470d..8c89ec795de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, SparkThrowable, TaskContext}
 import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.java.function._
@@ -3906,12 +3906,23 @@ class Dataset[T] private[sql](
 
   /**
    * Wrap a Dataset action to track the QueryExecution and time cost, then 
report to the
-   * user-registered callback functions.
+   * user-registered callback functions, and also to convert asserts/illegal 
states to
+   * the internal error exception.
    */
   private def withAction[U](name: String, qe: QueryExecution)(action: 
SparkPlan => U) = {
-    SQLExecution.withNewExecutionId(qe, Some(name)) {
-      qe.executedPlan.resetMetrics()
-      action(qe.executedPlan)
+    try {
+      SQLExecution.withNewExecutionId(qe, Some(name)) {
+        qe.executedPlan.resetMetrics()
+        action(qe.executedPlan)
+      }
+    } catch {
+      case e: SparkThrowable => throw e
+      case e @ (_: java.lang.IllegalStateException | _: 
java.lang.AssertionError) =>
+        throw new SparkException(
+          errorClass = "INTERNAL_ERROR",
+          messageParameters = Array(s"""The "$name" action failed."""),
+          cause = e)
+      case e: Throwable => throw e
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9141a3f742e..f7b627cef08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -618,6 +618,7 @@ case class FileSourceScanExec(
       }.groupBy { f =>
         BucketingUtils
           .getBucketId(new Path(f.filePath).getName)
+          // TODO(SPARK-39163): Throw an exception w/ error class for an 
invalid bucket file
           .getOrElse(throw new IllegalStateException(s"Invalid bucket file 
${f.filePath}"))
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 4bbfc3467d4..209b0f79243 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -79,6 +79,7 @@ case class ScalarSubquery(
   def updateResult(): Unit = {
     val rows = plan.executeCollect()
     if (rows.length > 1) {
+      // TODO(SPARK-39167): Throw an exception w/ an error class for multiple 
rows from a subquery
       throw new IllegalStateException(
         s"more than one row returned by a subquery used as an 
expression:\n$plan")
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 221663c61e1..396fca47634 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, 
ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, 
ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
@@ -146,12 +147,13 @@ class SubquerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
   }
 
   test("runtime error when the number of rows is greater than 1") {
-    val error2 = intercept[RuntimeException] {
+    val e = intercept[SparkException] {
       sql("select (select a from (select 1 as a union all select 2 as a) t) as 
b").collect()
     }
-    assert(error2.getMessage.contains(
-      "more than one row returned by a subquery used as an expression")
-    )
+    // TODO(SPARK-39167): Throw an exception w/ an error class for multiple 
rows from a subquery
+    assert(e.getErrorClass ===  "INTERNAL_ERROR")
+    assert(e.getCause.getMessage.contains(
+      "more than one row returned by a subquery used as an expression"))
   }
 
   test("uncorrelated scalar subquery on a DataFrame generated query") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 622cce7e8b3..7da55674c92 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, 
CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
@@ -762,10 +763,11 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
           "SELECT AVG(v) FROM VALUES(1) t(v)",
           // Tet case with keys
           "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { 
query =>
-          val errMsg = intercept[IllegalStateException] {
+          val e = intercept[SparkException] {
             sql(query).collect
-          }.getMessage
-          assert(errMsg.contains(expectedErrMsg))
+          }
+          assert(e.getErrorClass === "INTERNAL_ERROR")
+          assert(e.getCause.getMessage.contains(expectedErrMsg))
         }
       }
     }
@@ -784,11 +786,11 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
           // Tet case with keys
           "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, 
a, b, c) " +
             "GROUP BY k").foreach { query =>
-          val e = intercept[Exception] {
+          val e = intercept[SparkException] {
             sql(query).collect
           }
-          assert(e.isInstanceOf[IllegalStateException])
-          assert(e.getMessage.contains(expectedErrMsg))
+          assert(e.getErrorClass === "INTERNAL_ERROR")
+          assert(e.getCause.getMessage.contains(expectedErrMsg))
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index eff19302c5b..0f71c028962 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,6 +23,7 @@ import java.net.URI
 import org.apache.logging.log4j.Level
 import org.scalatest.PrivateMethodTester
 
+import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
@@ -856,11 +857,13 @@ class AdaptiveQueryExecSuite
         df1.write.parquet(tableDir.getAbsolutePath)
 
         val aggregated = spark.table("bucketed_table").groupBy("i").count()
-        val error = intercept[Exception] {
+        val error = intercept[SparkException] {
           aggregated.count()
         }
-        assert(error.toString contains "Invalid bucket file")
-        assert(error.getSuppressed.size === 0)
+        // TODO(SPARK-39163): Throw an exception w/ error class for an invalid 
bucket file
+        assert(error.getErrorClass === "INTERNAL_ERROR")
+        assert(error.getCause.toString contains "Invalid bucket file")
+        assert(error.getCause.getSuppressed.size === 0)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 921cff2adca..c39edbc5860 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,6 +22,7 @@ import java.net.URI
 
 import scala.util.Random
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions
@@ -841,11 +842,12 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
       df1.write.parquet(tableDir.getAbsolutePath)
 
       val aggregated = spark.table("bucketed_table").groupBy("i").count()
-      val error = intercept[Exception] {
+      val e = intercept[SparkException] {
         aggregated.count()
       }
-
-      assert(error.toString contains "Invalid bucket file")
+      // TODO(SPARK-39163): Throw an exception w/ error class for an invalid 
bucket file
+      assert(e.getErrorClass === "INTERNAL_ERROR")
+      assert(e.getCause.toString contains "Invalid bucket file")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to