This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c009180a75 [SPARK-37945][SQL][CORE] Use error classes in the 
execution errors of arithmetic ops
6c009180a75 is described below

commit 6c009180a75ae8e548ef4395211b13ee25ab60a9
Author: Khalid Mammadov <[email protected]>
AuthorDate: Sun Oct 23 11:44:49 2022 +0500

    [SPARK-37945][SQL][CORE] Use error classes in the execution errors of 
arithmetic ops
    
    ### What changes were proposed in this pull request?
    Migrate the following errors in QueryExecutionErrors onto use error classes:
    
    unscaledValueTooLargeForPrecisionError -> 
UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION
    decimalPrecisionExceedsMaxPrecisionError -> 
DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION
    integerOverflowError -> INTEGER_OVERFLOW
    outOfDecimalTypeRangeError -> OUT_OF_DECIMAL_TYPE_RANGE
    
    ### Why are the changes needed?
    Porting ArithmeticExceptions to the new error framework
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, errors will indicate that it's controlled Spark exception
    
    ### How was this patch tested?
    ./build/sbt "catalyst/testOnly org.apache.spark.sql.types.DecimalSuite"
    ./build/sbt "sql/testOnly 
org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite"
    ./build/sbt "core/testOnly testOnly org.apache.spark.SparkThrowableSuite"
    
    Closes #38273 from khalidmammadov/error_class2.
    
    Lead-authored-by: Khalid Mammadov <[email protected]>
    Co-authored-by: khalidmammadov <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 core/src/main/resources/error/error-classes.json   | 40 ++++++++++------
 .../spark/sql/errors/QueryExecutionErrors.scala    | 41 +++++++++++++----
 .../catalyst/expressions/CastWithAnsiOnSuite.scala |  4 +-
 .../org/apache/spark/sql/types/DecimalSuite.scala  | 53 +++++++++++++++++-----
 .../sources/RateStreamMicroBatchStream.scala       |  8 ++--
 .../sources/RateStreamProviderSuite.scala          | 44 +++++++++++++++++-
 6 files changed, 145 insertions(+), 45 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 240cf5f4eea..5f4db145479 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -276,6 +276,11 @@
     ],
     "sqlState" : "22008"
   },
+  "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION" : {
+    "message" : [
+      "Decimal precision <precision> exceeds max precision <maxPrecision>."
+    ]
+  },
   "DEFAULT_DATABASE_NOT_EXISTS" : {
     "message" : [
       "Default database <defaultDatabase> does not exist, please create it 
first or change default database to 'default'."
@@ -416,6 +421,16 @@
       }
     }
   },
+  "INCORRECT_END_OFFSET" : {
+    "message" : [
+      "Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but it's 
<endSeconds> now."
+    ]
+  },
+  "INCORRECT_RUMP_UP_RATE" : {
+    "message" : [
+      "Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but 
'rampUpTimeSeconds' is <rampUpTimeSeconds>."
+    ]
+  },
   "INDEX_ALREADY_EXISTS" : {
     "message" : [
       "Cannot create the index because it already exists. <message>."
@@ -605,6 +620,11 @@
     ],
     "sqlState" : "22005"
   },
+  "OUT_OF_DECIMAL_TYPE_RANGE" : {
+    "message" : [
+      "Out of decimal type range: <value>."
+    ]
+  },
   "PARSE_CHAR_MISSING_LENGTH" : {
     "message" : [
       "DataType <type> requires a length parameter, for example <type>(10). 
Please specify the length."
@@ -814,6 +834,11 @@
     },
     "sqlState" : "42000"
   },
+  "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION" : {
+    "message" : [
+      "Unscaled value too large for precision. If necessary set <ansiConfig> 
to false to bypass this error."
+    ]
+  },
   "UNSUPPORTED_DATATYPE" : {
     "message" : [
       "Unsupported data type <typeName>"
@@ -3707,21 +3732,6 @@
       "Unexpected: <o>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_2117" : {
-    "message" : [
-      "Unscaled value too large for precision. If necessary set <ansiConfig> 
to false to bypass this error."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2118" : {
-    "message" : [
-      "Decimal precision <precision> exceeds max precision <maxPrecision>"
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2119" : {
-    "message" : [
-      "out of decimal type range: <str>"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2120" : {
     "message" : [
       "Do not support array of type <clazz>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 5edffc87b84..4aedfb3b03d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1260,8 +1260,9 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 
   def unscaledValueTooLargeForPrecisionError(): SparkArithmeticException = {
     new SparkArithmeticException(
-      errorClass = "_LEGACY_ERROR_TEMP_2117",
-      messageParameters = Map("ansiConfig" -> 
toSQLConf(SQLConf.ANSI_ENABLED.key)),
+      errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+      messageParameters = Map(
+        "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
       context = Array.empty,
       summary = "")
   }
@@ -1269,18 +1270,20 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
   def decimalPrecisionExceedsMaxPrecisionError(
       precision: Int, maxPrecision: Int): SparkArithmeticException = {
     new SparkArithmeticException(
-      errorClass = "_LEGACY_ERROR_TEMP_2118",
+      errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
       messageParameters = Map(
-        "precision" -> precision.toString(),
-        "maxPrecision" -> maxPrecision.toString()),
+        "precision" -> precision.toString,
+        "maxPrecision" -> maxPrecision.toString
+      ),
       context = Array.empty,
       summary = "")
   }
 
   def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException = {
     new SparkArithmeticException(
-      errorClass = "_LEGACY_ERROR_TEMP_2119",
-      messageParameters = Map("str" -> str.toString()),
+      errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+      messageParameters = Map(
+        "value" -> str.toString),
       context = Array.empty,
       summary = "")
   }
@@ -2384,8 +2387,28 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
     new SparkException("Foreach writer has been aborted due to a task failure")
   }
 
-  def integerOverflowError(message: String): Throwable = {
-    new ArithmeticException(s"Integer overflow. $message")
+  def incorrectRumpUpRate(rowsPerSecond: Long,
+      maxSeconds: Long,
+      rampUpTimeSeconds: Long): Throwable = {
+    new SparkRuntimeException(
+      errorClass = "INCORRECT_RUMP_UP_RATE",
+      messageParameters = Map(
+        "rowsPerSecond" -> rowsPerSecond.toString,
+        "maxSeconds" -> maxSeconds.toString,
+        "rampUpTimeSeconds" -> rampUpTimeSeconds.toString
+      ))
+  }
+
+  def incorrectEndOffset(rowsPerSecond: Long,
+      maxSeconds: Long,
+      endSeconds: Long): Throwable = {
+    new SparkRuntimeException(
+      errorClass = "INCORRECT_END_OFFSET",
+      messageParameters = Map(
+        "rowsPerSecond" -> rowsPerSecond.toString,
+        "maxSeconds" -> maxSeconds.toString,
+        "endSeconds" -> endSeconds.toString
+      ))
   }
 
   def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: 
Int): Throwable = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
index 3de4eb68159..a3c32240ad2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOnSuite.scala
@@ -244,7 +244,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with 
QueryErrorsBase {
       Decimal("12345678901234567890123456789012345678"))
     checkExceptionInExpression[ArithmeticException](
       cast("123456789012345678901234567890123456789", DecimalType(38, 0)),
-      "out of decimal type range")
+      "Out of decimal type range")
     checkExceptionInExpression[ArithmeticException](
       cast("12345678901234567890123456789012345678", DecimalType(38, 1)),
       "cannot be represented as Decimal(38, 1)")
@@ -262,7 +262,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with 
QueryErrorsBase {
       Decimal("60000000000000000000000000000000000000"))
     checkExceptionInExpression[ArithmeticException](
       cast("6E+38", DecimalType(38, 0)),
-      "out of decimal type range")
+      "Out of decimal type range")
     checkExceptionInExpression[ArithmeticException](
       cast("6E+37", DecimalType(38, 1)),
       "cannot be represented as Decimal(38, 1)")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index b388dc46aa2..6cc11c6a0d4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.types
 
 import org.scalatest.PrivateMethodTester
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkArithmeticException, SparkFunSuite, 
SparkNumberFormatException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.internal.SQLConf
@@ -60,11 +60,27 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester with SQLHelper
     checkDecimal(Decimal(1000000000000000000L, 20, 2), "10000000000000000.00", 
20, 2)
     checkDecimal(Decimal(Long.MaxValue), Long.MaxValue.toString, 20, 0)
     checkDecimal(Decimal(Long.MinValue), Long.MinValue.toString, 20, 0)
-    intercept[ArithmeticException](Decimal(170L, 2, 1))
-    intercept[ArithmeticException](Decimal(170L, 2, 0))
-    intercept[ArithmeticException](Decimal(BigDecimal("10.030"), 2, 1))
-    intercept[ArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1))
-    intercept[ArithmeticException](Decimal(1e17.toLong, 17, 0))
+
+    checkError(
+      exception = intercept[SparkArithmeticException](Decimal(170L, 2, 1)),
+      errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+      parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
+    checkError(
+      exception = intercept[SparkArithmeticException](Decimal(170L, 2, 0)),
+      errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+      parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
+    checkError(
+      exception = 
intercept[SparkArithmeticException](Decimal(BigDecimal("10.030"), 2, 1)),
+      errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
+      parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
+    checkError(
+      exception = 
intercept[SparkArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1)),
+      errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
+      parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
+    checkError(
+      exception = intercept[SparkArithmeticException](Decimal(1e17.toLong, 17, 
0)),
+      errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
+      parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
   }
 
   test("creating decimals with negative scale under legacy mode") {
@@ -294,8 +310,11 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester with SQLHelper
 
     def checkOutOfRangeFromString(string: String): Unit = {
       assert(Decimal.fromString(UTF8String.fromString(string)) === null)
-      val e = 
intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
-      assert(e.getMessage.contains("out of decimal type range"))
+      checkError(
+        exception = intercept[SparkArithmeticException](
+          Decimal.fromStringANSI(UTF8String.fromString(string))),
+        errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+        parameters = Map("value" -> string))
     }
 
     checkFromString("12345678901234567890123456789012345678")
@@ -311,9 +330,15 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester with SQLHelper
     checkOutOfRangeFromString("6.0790316E+25569151")
 
     assert(Decimal.fromString(UTF8String.fromString("str")) === null)
-    val e = 
intercept[NumberFormatException](Decimal.fromStringANSI(UTF8String.fromString("str")))
-    assert(e.getMessage.contains(
-      """The value 'str' of the type "STRING" cannot be cast to 
"DECIMAL(10,0)""""))
+    checkError(
+      exception = intercept[SparkNumberFormatException](
+        Decimal.fromStringANSI(UTF8String.fromString("str"))),
+      errorClass = "CAST_INVALID_INPUT",
+      parameters = Map(
+        "expression" -> "'str'",
+        "sourceType" -> "\"STRING\"",
+        "targetType" -> "\"DECIMAL(10,0)\"",
+        "ansiConfig" -> "\"spark.sql.ansi.enabled\""))
   }
 
   test("SPARK-35841: Casting string to decimal type doesn't work " +
@@ -333,7 +358,11 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester with SQLHelper
     val values = Array("7.836725755512218E38")
     for (string <- values) {
       assert(Decimal.fromString(UTF8String.fromString(string)) === null)
-      
intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
+      checkError(
+        exception = intercept[SparkArithmeticException](
+          Decimal.fromStringANSI(UTF8String.fromString(string))),
+        errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
+        parameters = Map("value" -> string))
     }
 
     withSQLConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key -> 
"true") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
index 36de35b959a..45c37d8ae77 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala
@@ -52,9 +52,8 @@ class RateStreamMicroBatchStream(
   private val maxSeconds = Long.MaxValue / rowsPerSecond
 
   if (rampUpTimeSeconds > maxSeconds) {
-    throw QueryExecutionErrors.integerOverflowError(
-      s"Max offset with $rowsPerSecond rowsPerSecond" +
-        s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
+    throw QueryExecutionErrors.incorrectRumpUpRate(
+      rowsPerSecond, maxSeconds, rampUpTimeSeconds)
   }
 
   private[sources] val creationTimeMs = {
@@ -120,8 +119,7 @@ class RateStreamMicroBatchStream(
     val endSeconds = end.asInstanceOf[LongOffset].offset
     assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > 
endSeconds($endSeconds)")
     if (endSeconds > maxSeconds) {
-      throw QueryExecutionErrors.integerOverflowError("Max offset with " +
-        s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds 
now.")
+      throw QueryExecutionErrors.incorrectEndOffset(rowsPerSecond, maxSeconds, 
endSeconds)
     }
     // Fix "lastTimeMs" for recovery
     if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index cb3769ef8a9..175f26dc0bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkRuntimeException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -196,6 +197,45 @@ class RateStreamProviderSuite extends StreamTest {
     }
   }
 
+  testQuietly("microbatch - rump up error") {
+    val e = intercept[SparkRuntimeException](
+      new RateStreamMicroBatchStream(
+        rowsPerSecond = Long.MaxValue,
+        rampUpTimeSeconds = 2,
+        options = CaseInsensitiveStringMap.empty(),
+        checkpointLocation = ""))
+
+    checkError(
+      exception = e,
+      errorClass = "INCORRECT_RUMP_UP_RATE",
+      parameters = Map(
+        "rowsPerSecond" -> Long.MaxValue.toString,
+        "maxSeconds" -> "1",
+        "rampUpTimeSeconds" -> "2"))
+  }
+
+  testQuietly("microbatch - end offset error") {
+    withTempDir { temp =>
+      val maxSeconds = (Long.MaxValue / 100)
+      val endSeconds = Long.MaxValue
+      val e = intercept[SparkRuntimeException](
+        new RateStreamMicroBatchStream(
+          rowsPerSecond = 100,
+          rampUpTimeSeconds = 2,
+          options = CaseInsensitiveStringMap.empty(),
+          checkpointLocation = temp.getCanonicalPath)
+          .planInputPartitions(LongOffset(1), LongOffset(endSeconds)))
+
+      checkError(
+        exception = e,
+        errorClass = "INCORRECT_END_OFFSET",
+        parameters = Map(
+          "rowsPerSecond" -> "100",
+          "maxSeconds" -> maxSeconds.toString,
+          "endSeconds" -> endSeconds.toString))
+    }
+  }
+
   test("valueAtSecond") {
     import RateStreamProvider._
 
@@ -270,8 +310,8 @@ class RateStreamProviderSuite extends StreamTest {
       .distinct()
     testStream(input)(
       AdvanceRateManualClock(2),
-      ExpectFailure[ArithmeticException](t => {
-        Seq("overflow", "rowsPerSecond").foreach { msg =>
+      ExpectFailure[SparkRuntimeException](t => {
+        Seq("INCORRECT_END_OFFSET", "rowsPerSecond").foreach { msg =>
           assert(t.getMessage.contains(msg))
         }
       })


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to