This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new edafe266144 [SPARK-41974][SQL] Turn `INCORRECT_END_OFFSET` into
`INTERNAL_ERROR`
edafe266144 is described below
commit edafe266144c5c70852491fef9bb6907a001b286
Author: itholic <[email protected]>
AuthorDate: Wed Jan 18 17:14:16 2023 +0800
[SPARK-41974][SQL] Turn `INCORRECT_END_OFFSET` into `INTERNAL_ERROR`
### What changes were proposed in this pull request?
This PR proposes to update `INCORRECT_END_OFFSET` as `INTERNAL_ERROR`.
### Why are the changes needed?
We should turn error class into INTERNAL_ERROR when it's not triggered by
user space.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`
Closes #39496 from itholic/INCORRECT_END_OFFSET.
Authored-by: itholic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/errors/QueryExecutionErrors.scala | 10 +++-------
.../streaming/sources/RateStreamProviderSuite.scala | 17 +++++++++--------
2 files changed, 12 insertions(+), 15 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 9598933d941..a957fffc97c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2469,13 +2469,9 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
def incorrectEndOffset(rowsPerSecond: Long,
maxSeconds: Long,
endSeconds: Long): Throwable = {
- new SparkRuntimeException(
- errorClass = "INCORRECT_END_OFFSET",
- messageParameters = Map(
- "rowsPerSecond" -> rowsPerSecond.toString,
- "maxSeconds" -> maxSeconds.toString,
- "endSeconds" -> endSeconds.toString
- ))
+ SparkException.internalError(
+ s"Max offset with ${rowsPerSecond.toString} rowsPerSecond is
${maxSeconds.toString}, " +
+ s"but it's ${endSeconds.toString} now.")
}
def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize:
Int): Throwable = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index aebeb08775f..730611f8f35 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{SparkRuntimeException}
+import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -218,7 +218,7 @@ class RateStreamProviderSuite extends StreamTest {
withTempDir { temp =>
val maxSeconds = (Long.MaxValue / 100)
val endSeconds = Long.MaxValue
- val e = intercept[SparkRuntimeException](
+ val e = intercept[SparkException](
new RateStreamMicroBatchStream(
rowsPerSecond = 100,
rampUpTimeSeconds = 2,
@@ -228,11 +228,12 @@ class RateStreamProviderSuite extends StreamTest {
checkError(
exception = e,
- errorClass = "INCORRECT_END_OFFSET",
+ errorClass = "INTERNAL_ERROR",
parameters = Map(
- "rowsPerSecond" -> "100",
- "maxSeconds" -> maxSeconds.toString,
- "endSeconds" -> endSeconds.toString))
+ ("message" ->
+ ("Max offset with 100 rowsPerSecond is 92233720368547758, " +
+ "but it's 9223372036854775807 now.")
+ )))
}
}
@@ -310,8 +311,8 @@ class RateStreamProviderSuite extends StreamTest {
.distinct()
testStream(input)(
AdvanceRateManualClock(2),
- ExpectFailure[SparkRuntimeException](t => {
- Seq("INCORRECT_END_OFFSET", "rowsPerSecond").foreach { msg =>
+ ExpectFailure[SparkException](t => {
+ Seq("INTERNAL_ERROR", "rowsPerSecond").foreach { msg =>
assert(t.getMessage.contains(msg))
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]