This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19afe1341d2 [SPARK-39412][SQL] Exclude IllegalStateException from
Spark's internal errors
19afe1341d2 is described below
commit 19afe1341d277bc2d7dd47175d142a8c71141138
Author: Max Gekk <[email protected]>
AuthorDate: Wed Jun 8 21:20:55 2022 +0300
[SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal
errors
### What changes were proposed in this pull request?
In the PR, I propose to exclude `IllegalStateException` from the list of
exceptions that are wrapped by `SparkException` with the `INTERNAL_ERROR` error
class.
### Why are the changes needed?
See explanation in SPARK-39412.
### Does this PR introduce _any_ user-facing change?
No, the reverted changes haven't released yet.
### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *ContinuousSuite"
$ build/sbt "test:testOnly *MicroBatchExecutionSuite"
$ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
$ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
$ build/sbt "test:testOnly *.WholeStageCodegenSuite"
```
Closes #36804 from MaxGekk/exclude-IllegalStateException.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 ++++-------
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../scala/org/apache/spark/sql/execution/QueryExecution.scala | 7 +++----
.../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 11 ++++-------
.../sql/execution/streaming/MicroBatchExecutionSuite.scala | 6 ++----
.../spark/sql/streaming/continuous/ContinuousSuite.scala | 7 +++----
6 files changed, 17 insertions(+), 27 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 0a32b1b54d0..2396f31b954 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -667,10 +666,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
- ExpectFailure[SparkException](e => {
- assert(e.asInstanceOf[SparkThrowable].getErrorClass ===
"INTERNAL_ERROR")
+ ExpectFailure[IllegalStateException](e => {
// The offset of `topic2` should be changed from 2 to 1
- assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
+ assert(e.getMessage.contains("was changed from 2 to 1"))
})
)
}
@@ -766,13 +764,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
testStream(df)(
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
- ExpectFailure[SparkException](e => {
- assert(e.asInstanceOf[SparkThrowable].getErrorClass ===
"INTERNAL_ERROR")
+ ExpectFailure[IllegalStateException](e => {
Seq(
s"maximum supported log version is v1, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this
version"
).foreach { message =>
- assert(e.getCause.toString.contains(message))
+ assert(e.toString.contains(message))
}
}))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0a45cf92c6e..97a5318b3ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3916,7 +3916,7 @@ class Dataset[T] private[sql](
/**
* Wrap a Dataset action to track the QueryExecution and time cost, then
report to the
- * user-registered callback functions, and also to convert asserts/illegal
states to
+ * user-registered callback functions, and also to convert asserts/NPE to
* the internal error exception.
*/
private def withAction[U](name: String, qe: QueryExecution)(action:
SparkPlan => U) = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 206f2a24e0e..d9fc877c321 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -491,11 +491,10 @@ object QueryExecution {
}
/**
- * Converts asserts, null pointer, illegal state exceptions to internal
errors.
+ * Converts asserts, null pointer exceptions to internal errors.
*/
private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e
match {
- case e @ (_: java.lang.IllegalStateException | _:
java.lang.NullPointerException |
- _: java.lang.AssertionError) =>
+ case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError)
=>
new SparkException(
errorClass = "INTERNAL_ERROR",
messageParameters = Array(msg +
@@ -506,7 +505,7 @@ object QueryExecution {
}
/**
- * Catches asserts, null pointer, illegal state exceptions, and converts
them to internal errors.
+ * Catches asserts, null pointer exceptions, and converts them to internal
errors.
*/
private[sql] def withInternalError[T](msg: String)(block: => T): T = {
try {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 7da55674c92..eca22b14763 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
-import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
@@ -763,11 +762,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
"SELECT AVG(v) FROM VALUES(1) t(v)",
// Tet case with keys
"SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach {
query =>
- val e = intercept[SparkException] {
+ val e = intercept[IllegalStateException] {
sql(query).collect
}
- assert(e.getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains(expectedErrMsg))
+ assert(e.getMessage.contains(expectedErrMsg))
}
}
}
@@ -786,11 +784,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
// Tet case with keys
"SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k,
a, b, c) " +
"GROUP BY k").foreach { query =>
- val e = intercept[SparkException] {
+ val e = intercept[IllegalStateException] {
sql(query).collect
}
- assert(e.getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains(expectedErrMsg))
+ assert(e.getMessage.contains(expectedErrMsg))
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index 9d731248ad4..f06e62b33b1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.read.streaming
@@ -94,9 +93,8 @@ class MicroBatchExecutionSuite extends StreamTest with
BeforeAndAfter {
testStream(streamEvent) (
AddData(inputData, 1, 2, 3, 4, 5, 6),
StartStream(Trigger.Once, checkpointLocation =
checkpointDir.getAbsolutePath),
- ExpectFailure[SparkException] { e =>
- assert(e.asInstanceOf[SparkThrowable].getErrorClass ===
"INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
+ ExpectFailure[IllegalStateException] { e =>
+ assert(e.getMessage.contains("batch 3 doesn't exist"))
}
)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index cd1c865f5aa..26c201d5921 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
import java.sql.Timestamp
-import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
+import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
@@ -440,9 +440,8 @@ class ContinuousEpochBacklogSuite extends
ContinuousSuiteBase {
testStream(df)(
StartStream(Trigger.Continuous(1)),
- ExpectFailure[SparkException] { e =>
- assert(e.asInstanceOf[SparkThrowable].getErrorClass ===
"INTERNAL_ERROR")
- e.getCause.getMessage.contains("queue has exceeded its maximum")
+ ExpectFailure[IllegalStateException] { e =>
+ e.getMessage.contains("queue has exceeded its maximum")
}
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]