This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6c009180a75 [SPARK-37945][SQL][CORE] Use error classes in the
execution errors of arithmetic ops
6c009180a75 is described below
commit 6c009180a75ae8e548ef4395211b13ee25ab60a9
Author: Khalid Mammadov <[email protected]>
AuthorDate: Sun Oct 23 11:44:49 2022 +0500
[SPARK-37945][SQL][CORE] Use error classes in the execution errors of
arithmetic ops
### What changes were proposed in this pull request?
Migrate the following errors in QueryExecutionErrors onto use error classes:
unscaledValueTooLargeForPrecisionError ->
UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION
decimalPrecisionExceedsMaxPrecisionError ->
DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION
integerOverflowError -> INTEGER_OVERFLOW
outOfDecimalTypeRangeError -> OUT_OF_DECIMAL_TYPE_RANGE
### Why are the changes needed?
Porting ArithmeticExceptions to the new error framework
### Does this PR introduce _any_ user-facing change?
Yes, errors will indicate that it's controlled Spark exception
### How was this patch tested?
./build/sbt "catalyst/testOnly org.apache.spark.sql.types.DecimalSuite"
./build/sbt "sql/testOnly
org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite"
./build/sbt "core/testOnly testOnly org.apache.spark.SparkThrowableSuite"
Closes #38273 from khalidmammadov/error_class2.
Lead-authored-by: Khalid Mammadov <[email protected]>
Co-authored-by: khalidmammadov <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 40 ++++++++++------
.../spark/sql/errors/QueryExecutionErrors.scala | 41 +++++++++++++----
.../catalyst/expressions/CastWithAnsiOnSuite.scala | 4 +-
.../org/apache/spark/sql/types/DecimalSuite.scala | 53 +++++++++++++++++-----
.../sources/RateStreamMicroBatchStream.scala | 8 ++--
.../sources/RateStreamProviderSuite.scala | 44 +++++++++++++++++-
6 files changed, 145 insertions(+), 45 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 240cf5f4eea..5f4db145479 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -276,6 +276,11 @@
],
"sqlState" : "22008"
},
+ "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION" : {
+ "message" : [
+ "Decimal precision <precision> exceeds max precision <maxPrecision>."
+ ]
+ },
"DEFAULT_DATABASE_NOT_EXISTS" : {
"message" : [
"Default database <defaultDatabase> does not exist, please create it
first or change default database to 'default'."
@@ -416,6 +421,16 @@
}
}
},
+ "INCORRECT_END_OFFSET" : {
+ "message" : [
+ "Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but it's
<endSeconds> now."
+ ]
+ },
+ "INCORRECT_RUMP_UP_RATE" : {
+ "message" : [
+ "Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but
'rampUpTimeSeconds' is <rampUpTimeSeconds>."
+ ]
+ },
"INDEX_ALREADY_EXISTS" : {
"message" : [
"Cannot create the index because it already exists. <message>."
@@ -605,6 +620,11 @@
],
"sqlState" : "22005"
},
+ "OUT_OF_DECIMAL_TYPE_RANGE" : {
+ "message" : [
+ "Out of decimal type range: <value>."
+ ]
+ },
"PARSE_CHAR_MISSING_LENGTH" : {
"message" : [
"DataType <type> requires a length parameter, for example <type>(10).
Please specify the length."
@@ -814,6 +834,11 @@
},
"sqlState" : "42000"
},
+ "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION" : {
+ "message" : [
+ "Unscaled value too large for precision. If necessary set <ansiConfig>
to false to bypass this error."
+ ]
+ },
"UNSUPPORTED_DATATYPE" : {
"message" : [
"Unsupported data type <typeName>"
@@ -3707,21 +3732,6 @@
"Unexpected: <o>"
]
},
- "_LEGACY_ERROR_TEMP_2117" : {
- "message" : [
- "Unscaled value too large for precision. If necessary set <ansiConfig>
to false to bypass this error."
- ]
- },
- "_LEGACY_ERROR_TEMP_2118" : {
- "message" : [
- "Decimal precision <precision> exceeds max precision <maxPrecision>"
- ]
- },
- "_LEGACY_ERROR_TEMP_2119" : {
- "message" : [
- "out of decimal type range: <str>"
- ]
- },
"_LEGACY_ERROR_TEMP_2120" : {
"message" : [
"Do not support array of type <clazz>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 5edffc87b84..4aedfb3b03d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1260,8 +1260,9 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
def unscaledValueTooLargeForPrecisionError(): SparkArithmeticException = {
new SparkArithmeticException(
- errorClass = "_LEGACY_ERROR_TEMP_2117",
- messageParameters = Map("ansiConfig" ->
toSQLConf(SQLConf.ANSI_ENABLED.key)),
+ errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+ messageParameters = Map(
+ "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
context = Array.empty,
summary = "")
}
@@ -1269,18 +1270,20 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
def decimalPrecisionExceedsMaxPrecisionError(
precision: Int, maxPrecision: Int): SparkArithmeticException = {
new SparkArithmeticException(
- errorClass = "_LEGACY_ERROR_TEMP_2118",
+ errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
messageParameters = Map(
- "precision" -> precision.toString(),
- "maxPrecision" -> maxPrecision.toString()),
+ "precision" -> precision.toString,
+ "maxPrecision" -> maxPrecision.toString
+ ),
context = Array.empty,
summary = "")
}
def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException = {
new SparkArithmeticException(
- errorClass = "_LEGACY_ERROR_TEMP_2119",
- messageParameters = Map("str" -> str.toString()),
+ errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+ messageParameters = Map(
+ "value" -> str.toString),
context = Array.empty,
summary = "")
}
@@ -2384,8 +2387,28 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
new SparkException("Foreach writer has been aborted due to a task failure")
}
- def integerOverflowError(message: String): Throwable = {
- new ArithmeticException(s"Integer overflow. $message")
+ def incorrectRumpUpRate(rowsPerSecond: Long,
+ maxSeconds: Long,
+ rampUpTimeSeconds: Long): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "INCORRECT_RUMP_UP_RATE",
+ messageParameters = Map(
+ "rowsPerSecond" -> rowsPerSecond.toString,
+ "maxSeconds" -> maxSeconds.toString,
+ "rampUpTimeSeconds" -> rampUpTimeSeconds.toString
+ ))
+ }
+
+ def incorrectEndOffset(rowsPerSecond: Long,
+ maxSeconds: Long,
+ endSeconds: Long): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "INCORRECT_END_OFFSET",
+ messageParameters = Map(
+ "rowsPerSecond" -> rowsPerSecond.toString,
+ "maxSeconds" -> maxSeconds.toString,
+ "endSeconds" -> endSeconds.toString
+ ))
}
def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize:
Int): Throwable = {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
index 3de4eb68159..a3c32240ad2 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
@@ -244,7 +244,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with
QueryErrorsBase {
Decimal("12345678901234567890123456789012345678"))
checkExceptionInExpression[ArithmeticException](
cast("123456789012345678901234567890123456789", DecimalType(38, 0)),
- "out of decimal type range")
+ "Out of decimal type range")
checkExceptionInExpression[ArithmeticException](
cast("12345678901234567890123456789012345678", DecimalType(38, 1)),
"cannot be represented as Decimal(38, 1)")
@@ -262,7 +262,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with
QueryErrorsBase {
Decimal("60000000000000000000000000000000000000"))
checkExceptionInExpression[ArithmeticException](
cast("6E+38", DecimalType(38, 0)),
- "out of decimal type range")
+ "Out of decimal type range")
checkExceptionInExpression[ArithmeticException](
cast("6E+37", DecimalType(38, 1)),
"cannot be represented as Decimal(38, 1)")
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index b388dc46aa2..6cc11c6a0d4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.types
import org.scalatest.PrivateMethodTester
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkArithmeticException, SparkFunSuite,
SparkNumberFormatException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
@@ -60,11 +60,27 @@ class DecimalSuite extends SparkFunSuite with
PrivateMethodTester with SQLHelper
checkDecimal(Decimal(1000000000000000000L, 20, 2), "10000000000000000.00",
20, 2)
checkDecimal(Decimal(Long.MaxValue), Long.MaxValue.toString, 20, 0)
checkDecimal(Decimal(Long.MinValue), Long.MinValue.toString, 20, 0)
- intercept[ArithmeticException](Decimal(170L, 2, 1))
- intercept[ArithmeticException](Decimal(170L, 2, 0))
- intercept[ArithmeticException](Decimal(BigDecimal("10.030"), 2, 1))
- intercept[ArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1))
- intercept[ArithmeticException](Decimal(1e17.toLong, 17, 0))
+
+ checkError(
+ exception = intercept[SparkArithmeticException](Decimal(170L, 2, 1)),
+ errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+ parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
+ checkError(
+ exception = intercept[SparkArithmeticException](Decimal(170L, 2, 0)),
+ errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+ parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
+ checkError(
+ exception =
intercept[SparkArithmeticException](Decimal(BigDecimal("10.030"), 2, 1)),
+ errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
+ parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
+ checkError(
+ exception =
intercept[SparkArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1)),
+ errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
+ parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
+ checkError(
+ exception = intercept[SparkArithmeticException](Decimal(1e17.toLong, 17,
0)),
+ errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+ parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
}
test("creating decimals with negative scale under legacy mode") {
@@ -294,8 +310,11 @@ class DecimalSuite extends SparkFunSuite with
PrivateMethodTester with SQLHelper
def checkOutOfRangeFromString(string: String): Unit = {
assert(Decimal.fromString(UTF8String.fromString(string)) === null)
- val e =
intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
- assert(e.getMessage.contains("out of decimal type range"))
+ checkError(
+ exception = intercept[SparkArithmeticException](
+ Decimal.fromStringANSI(UTF8String.fromString(string))),
+ errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+ parameters = Map("value" -> string))
}
checkFromString("12345678901234567890123456789012345678")
@@ -311,9 +330,15 @@ class DecimalSuite extends SparkFunSuite with
PrivateMethodTester with SQLHelper
checkOutOfRangeFromString("6.0790316E+25569151")
assert(Decimal.fromString(UTF8String.fromString("str")) === null)
- val e =
intercept[NumberFormatException](Decimal.fromStringANSI(UTF8String.fromString("str")))
- assert(e.getMessage.contains(
- """The value 'str' of the type "STRING" cannot be cast to
"DECIMAL(10,0)""""))
+ checkError(
+ exception = intercept[SparkNumberFormatException](
+ Decimal.fromStringANSI(UTF8String.fromString("str"))),
+ errorClass = "CAST_INVALID_INPUT",
+ parameters = Map(
+ "expression" -> "'str'",
+ "sourceType" -> "\"STRING\"",
+ "targetType" -> "\"DECIMAL(10,0)\"",
+ "ansiConfig" -> "\"spark.sql.ansi.enabled\""))
}
test("SPARK-35841: Casting string to decimal type doesn't work " +
@@ -333,7 +358,11 @@ class DecimalSuite extends SparkFunSuite with
PrivateMethodTester with SQLHelper
val values = Array("7.836725755512218E38")
for (string <- values) {
assert(Decimal.fromString(UTF8String.fromString(string)) === null)
-
intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
+ checkError(
+ exception = intercept[SparkArithmeticException](
+ Decimal.fromStringANSI(UTF8String.fromString(string))),
+ errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+ parameters = Map("value" -> string))
}
withSQLConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key ->
"true") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index 36de35b959a..45c37d8ae77 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -52,9 +52,8 @@ class RateStreamMicroBatchStream(
private val maxSeconds = Long.MaxValue / rowsPerSecond
if (rampUpTimeSeconds > maxSeconds) {
- throw QueryExecutionErrors.integerOverflowError(
- s"Max offset with $rowsPerSecond rowsPerSecond" +
- s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
+ throw QueryExecutionErrors.incorrectRumpUpRate(
+ rowsPerSecond, maxSeconds, rampUpTimeSeconds)
}
private[sources] val creationTimeMs = {
@@ -120,8 +119,7 @@ class RateStreamMicroBatchStream(
val endSeconds = end.asInstanceOf[LongOffset].offset
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) >
endSeconds($endSeconds)")
if (endSeconds > maxSeconds) {
- throw QueryExecutionErrors.integerOverflowError("Max offset with " +
- s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds
now.")
+ throw QueryExecutionErrors.incorrectEndOffset(rowsPerSecond, maxSeconds,
endSeconds)
}
// Fix "lastTimeMs" for recovery
if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index cb3769ef8a9..175f26dc0bb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -196,6 +197,45 @@ class RateStreamProviderSuite extends StreamTest {
}
}
+ testQuietly("microbatch - rump up error") {
+ val e = intercept[SparkRuntimeException](
+ new RateStreamMicroBatchStream(
+ rowsPerSecond = Long.MaxValue,
+ rampUpTimeSeconds = 2,
+ options = CaseInsensitiveStringMap.empty(),
+ checkpointLocation = ""))
+
+ checkError(
+ exception = e,
+ errorClass = "INCORRECT_RUMP_UP_RATE",
+ parameters = Map(
+ "rowsPerSecond" -> Long.MaxValue.toString,
+ "maxSeconds" -> "1",
+ "rampUpTimeSeconds" -> "2"))
+ }
+
+ testQuietly("microbatch - end offset error") {
+ withTempDir { temp =>
+ val maxSeconds = (Long.MaxValue / 100)
+ val endSeconds = Long.MaxValue
+ val e = intercept[SparkRuntimeException](
+ new RateStreamMicroBatchStream(
+ rowsPerSecond = 100,
+ rampUpTimeSeconds = 2,
+ options = CaseInsensitiveStringMap.empty(),
+ checkpointLocation = temp.getCanonicalPath)
+ .planInputPartitions(LongOffset(1), LongOffset(endSeconds)))
+
+ checkError(
+ exception = e,
+ errorClass = "INCORRECT_END_OFFSET",
+ parameters = Map(
+ "rowsPerSecond" -> "100",
+ "maxSeconds" -> maxSeconds.toString,
+ "endSeconds" -> endSeconds.toString))
+ }
+ }
+
test("valueAtSecond") {
import RateStreamProvider._
@@ -270,8 +310,8 @@ class RateStreamProviderSuite extends StreamTest {
.distinct()
testStream(input)(
AdvanceRateManualClock(2),
- ExpectFailure[ArithmeticException](t => {
- Seq("overflow", "rowsPerSecond").foreach { msg =>
+ ExpectFailure[SparkRuntimeException](t => {
+ Seq("INCORRECT_END_OFFSET", "rowsPerSecond").foreach { msg =>
assert(t.getMessage.contains(msg))
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]