This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 19afe1341d2 [SPARK-39412][SQL] Exclude IllegalStateException from 
Spark's internal errors
19afe1341d2 is described below

commit 19afe1341d277bc2d7dd47175d142a8c71141138
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Wed Jun 8 21:20:55 2022 +0300

    [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal 
errors
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to exclude `IllegalStateException` from the list of 
exceptions that are wrapped by `SparkException` with the `INTERNAL_ERROR` error 
class.
    
    ### Why are the changes needed?
    See explanation in SPARK-39412.
    
    ### Does this PR introduce _any_ user-facing change?
    No, the reverted changes haven't released yet.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt "test:testOnly *ContinuousSuite"
    $ build/sbt "test:testOnly *MicroBatchExecutionSuite"
    $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
    $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
    $ build/sbt "test:testOnly *.WholeStageCodegenSuite"
    ```
    
    Closes #36804 from MaxGekk/exclude-IllegalStateException.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala       | 11 ++++-------
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../scala/org/apache/spark/sql/execution/QueryExecution.scala |  7 +++----
 .../apache/spark/sql/execution/WholeStageCodegenSuite.scala   | 11 ++++-------
 .../sql/execution/streaming/MicroBatchExecutionSuite.scala    |  6 ++----
 .../spark/sql/streaming/continuous/ContinuousSuite.scala      |  7 +++----
 6 files changed, 17 insertions(+), 27 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 0a32b1b54d0..2396f31b954 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -667,10 +666,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         testUtils.sendMessages(topic2, Array("6"))
       },
       StartStream(),
-      ExpectFailure[SparkException](e => {
-        assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
+      ExpectFailure[IllegalStateException](e => {
         // The offset of `topic2` should be changed from 2 to 1
-        assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
+        assert(e.getMessage.contains("was changed from 2 to 1"))
       })
     )
   }
@@ -766,13 +764,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 
       testStream(df)(
         StartStream(checkpointLocation = metadataPath.getAbsolutePath),
-        ExpectFailure[SparkException](e => {
-          assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
+        ExpectFailure[IllegalStateException](e => {
           Seq(
             s"maximum supported log version is v1, but encountered v99999",
             "produced by a newer version of Spark and cannot be read by this 
version"
           ).foreach { message =>
-            assert(e.getCause.toString.contains(message))
+            assert(e.toString.contains(message))
           }
         }))
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0a45cf92c6e..97a5318b3ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3916,7 +3916,7 @@ class Dataset[T] private[sql](
 
   /**
    * Wrap a Dataset action to track the QueryExecution and time cost, then 
report to the
-   * user-registered callback functions, and also to convert asserts/illegal 
states to
+   * user-registered callback functions, and also to convert asserts/NPE to
    * the internal error exception.
    */
   private def withAction[U](name: String, qe: QueryExecution)(action: 
SparkPlan => U) = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 206f2a24e0e..d9fc877c321 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -491,11 +491,10 @@ object QueryExecution {
   }
 
   /**
-   * Converts asserts, null pointer, illegal state exceptions to internal 
errors.
+   * Converts asserts, null pointer exceptions to internal errors.
    */
   private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e 
match {
-    case e @ (_: java.lang.IllegalStateException | _: 
java.lang.NullPointerException |
-              _: java.lang.AssertionError) =>
+    case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) 
=>
       new SparkException(
         errorClass = "INTERNAL_ERROR",
         messageParameters = Array(msg +
@@ -506,7 +505,7 @@ object QueryExecution {
   }
 
   /**
-   * Catches asserts, null pointer, illegal state exceptions, and converts 
them to internal errors.
+   * Catches asserts, null pointer exceptions, and converts them to internal 
errors.
    */
   private[sql] def withInternalError[T](msg: String)(block: => T): T = {
     try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 7da55674c92..eca22b14763 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, 
CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
@@ -763,11 +762,10 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
           "SELECT AVG(v) FROM VALUES(1) t(v)",
           // Tet case with keys
           "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { 
query =>
-          val e = intercept[SparkException] {
+          val e = intercept[IllegalStateException] {
             sql(query).collect
           }
-          assert(e.getErrorClass === "INTERNAL_ERROR")
-          assert(e.getCause.getMessage.contains(expectedErrMsg))
+          assert(e.getMessage.contains(expectedErrMsg))
         }
       }
     }
@@ -786,11 +784,10 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
           // Tet case with keys
           "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, 
a, b, c) " +
             "GROUP BY k").foreach { query =>
-          val e = intercept[SparkException] {
+          val e = intercept[IllegalStateException] {
             sql(query).collect
           }
-          assert(e.getErrorClass === "INTERNAL_ERROR")
-          assert(e.getCause.getMessage.contains(expectedErrMsg))
+          assert(e.getMessage.contains(expectedErrMsg))
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index 9d731248ad4..f06e62b33b1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.connector.read.streaming
@@ -94,9 +93,8 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
     testStream(streamEvent) (
       AddData(inputData, 1, 2, 3, 4, 5, 6),
       StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
-      ExpectFailure[SparkException] { e =>
-        assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
-        assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
+      ExpectFailure[IllegalStateException] { e =>
+        assert(e.getMessage.contains("batch 3 doesn't exist"))
       }
     )
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index cd1c865f5aa..26c201d5921 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
 
 import java.sql.Timestamp
 
-import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
@@ -440,9 +440,8 @@ class ContinuousEpochBacklogSuite extends 
ContinuousSuiteBase {
 
       testStream(df)(
         StartStream(Trigger.Continuous(1)),
-        ExpectFailure[SparkException] { e =>
-          assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
-          e.getCause.getMessage.contains("queue has exceeded its maximum")
+        ExpectFailure[IllegalStateException] { e =>
+          e.getMessage.contains("queue has exceeded its maximum")
         }
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to