This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 19991047d5b [SPARK-39865][SQL][3.3] Show proper error messages on the
overflow errors of table insert
19991047d5b is described below
commit 19991047d5b5316412d8b1763807c5945a705bff
Author: Gengliang Wang <[email protected]>
AuthorDate: Thu Jul 28 11:26:34 2022 -0700
[SPARK-39865][SQL][3.3] Show proper error messages on the overflow errors
of table insert
### What changes were proposed in this pull request?
In Spark 3.3, the error message of ANSI CAST is improved. However, the
table insertion is using the same CAST expression:
```
> create table tiny(i tinyint);
> insert into tiny values (1000);
org.apache.spark.SparkArithmeticException[CAST_OVERFLOW]: The value 1000 of
the type "INT" cannot be cast to "TINYINT" due to an overflow. Use `try_cast`
to tolerate overflow and return NULL instead. If necessary set
"spark.sql.ansi.enabled" to "false" to bypass this error.
```
Showing the hint of `If necessary set "spark.sql.ansi.enabled" to "false"
to bypass this error` doesn't help at all. This PR is to fix the error message.
After changes, the error message of this example will become:
```
org.apache.spark.SparkArithmeticException: [CAST_OVERFLOW_IN_TABLE_INSERT]
Fail to insert a value of "INT" type into the "TINYINT" type column `i` due to
an overflow. Use `try_cast` on the input value to tolerate overflow and return
NULL instead.
```
### Why are the changes needed?
Show proper error messages on the overflow errors of table insert. The
current message is super confusing.
### Does this PR introduce _any_ user-facing change?
Yes, after changes it show proper error messages on the overflow errors of
table insert.
### How was this patch tested?
Unit test
Closes #37311 from gengliangwang/PR_TOOL_PICK_PR_37283_BRANCH-3.3.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
core/src/main/resources/error/error-classes.json | 4 ++
.../catalyst/analysis/TableOutputResolver.scala | 25 +++++++++++-
.../spark/sql/catalyst/expressions/Cast.scala | 44 ++++++++++++++++++++++
.../spark/sql/errors/QueryExecutionErrors.scala | 13 +++++++
.../org/apache/spark/sql/sources/InsertSuite.scala | 20 +++++-----
5 files changed, 95 insertions(+), 11 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 31ec5aaa05e..89a9d5af587 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -33,6 +33,10 @@
"message" : [ "The value <value> of the type <sourceType> cannot be cast
to <targetType> due to an overflow. Use `try_cast` to tolerate overflow and
return NULL instead. If necessary set <config> to \"false\" to bypass this
error." ],
"sqlState" : "22005"
},
+ "CAST_OVERFLOW_IN_TABLE_INSERT" : {
+ "message" : [ "Fail to insert a value of <sourceType> type into the
<targetType> type column <columnName> due to an overflow. Use `try_cast` on the
input value to tolerate overflow and return NULL instead." ],
+ "sqlState" : "22005"
+ },
"CONCURRENT_QUERY" : {
"message" : [ "Another instance of this query was just started by a
concurrent session." ]
},
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 2cd069e5858..c723a018a6c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -26,7 +26,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType,
IntegralType, MapType, StructType}
object TableOutputResolver {
def resolveOutputColumns(
@@ -220,6 +220,21 @@ object TableOutputResolver {
}
}
+ private def containsIntegralOrDecimalType(dt: DataType): Boolean = dt match {
+ case _: IntegralType | _: DecimalType => true
+ case a: ArrayType => containsIntegralOrDecimalType(a.elementType)
+ case m: MapType =>
+ containsIntegralOrDecimalType(m.keyType) ||
containsIntegralOrDecimalType(m.valueType)
+ case s: StructType =>
+ s.fields.exists(sf => containsIntegralOrDecimalType(sf.dataType))
+ case _ => false
+ }
+
+ private def canCauseCastOverflow(cast: AnsiCast): Boolean = {
+ containsIntegralOrDecimalType(cast.dataType) &&
+ !Cast.canUpCast(cast.child.dataType, cast.dataType)
+ }
+
private def checkField(
tableAttr: Attribute,
queryExpr: NamedExpression,
@@ -235,7 +250,13 @@ object TableOutputResolver {
} else {
val casted = storeAssignmentPolicy match {
case StoreAssignmentPolicy.ANSI =>
- AnsiCast(queryExpr, tableAttr.dataType,
Option(conf.sessionLocalTimeZone))
+ val cast = AnsiCast(queryExpr, tableAttr.dataType,
Option(conf.sessionLocalTimeZone))
+ if (canCauseCastOverflow(cast)) {
+ CheckOverflowInTableInsert(cast, tableAttr.name)
+ } else {
+ cast
+ }
+
case StoreAssignmentPolicy.LEGACY =>
Cast(queryExpr, tableAttr.dataType,
Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 4b7c7b479d4..014c2be7319 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,6 +21,7 @@ import java.time.{ZoneId, ZoneOffset}
import java.util.Locale
import java.util.concurrent.TimeUnit._
+import org.apache.spark.SparkArithmeticException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.Cast.resolvableNullability
@@ -2351,3 +2352,46 @@ case class UpCast(child: Expression, target:
AbstractDataType, walkedTypePath: S
override protected def withNewChildInternal(newChild: Expression): UpCast =
copy(child = newChild)
}
+
+/**
+ * Casting a numeric value as another numeric type in store assignment. It can
capture the
+ * arithmetic errors and show proper error messages to users.
+ */
+case class CheckOverflowInTableInsert(child: AnsiCast, columnName: String)
extends UnaryExpression {
+ override protected def withNewChildInternal(newChild: Expression):
Expression =
+ copy(child = newChild.asInstanceOf[AnsiCast])
+
+ override def eval(input: InternalRow): Any = try {
+ child.eval(input)
+ } catch {
+ case e: SparkArithmeticException =>
+ QueryExecutionErrors.castingCauseOverflowErrorInTableInsert(
+ child.child.dataType,
+ child.dataType,
+ columnName)
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val childGen = child.genCode(ctx)
+ val exceptionClass = classOf[SparkArithmeticException].getCanonicalName
+ val fromDt =
+ ctx.addReferenceObj("from", child.child.dataType,
child.child.dataType.getClass.getName)
+ val toDt = ctx.addReferenceObj("to", child.dataType,
child.dataType.getClass.getName)
+ val col = ctx.addReferenceObj("colName", columnName, "java.lang.String")
+ // scalastyle:off line.size.limit
+ ev.copy(code = code"""
+ boolean ${ev.isNull} = true;
+ ${CodeGenerator.javaType(dataType)} ${ev.value} =
${CodeGenerator.defaultValue(dataType)};
+ try {
+ ${childGen.code}
+ ${ev.isNull} = ${childGen.isNull};
+ ${ev.value} = ${childGen.value};
+ } catch ($exceptionClass e) {
+ throw
QueryExecutionErrors.castingCauseOverflowErrorInTableInsert($fromDt, $toDt,
$col);
+ }"""
+ )
+ // scalastyle:on line.size.limit
+ }
+
+ override def dataType: DataType = child.dataType
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index b1378275b85..1db38d854a9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -93,6 +93,19 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
toSQLConf(SQLConf.ANSI_ENABLED.key)))
}
+ def castingCauseOverflowErrorInTableInsert(
+ from: DataType,
+ to: DataType,
+ columnName: String): ArithmeticException = {
+ new SparkArithmeticException(
+ errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+ messageParameters = Array(
+ toSQLType(from),
+ toSQLType(to),
+ toSQLId(columnName))
+ )
+ }
+
def cannotChangeDecimalPrecisionError(
value: Decimal,
decimalPrecision: Int,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index b293307d4ea..679a5eb2661 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -710,18 +710,18 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
withTable("t") {
sql("create table t(b int) using parquet")
val outOfRangeValue1 = (Int.MaxValue + 1L).toString
+ val expectedMsg = "Fail to insert a value of \"BIGINT\" type into the
\"INT\" type column" +
+ " `b` due to an overflow."
var msg = intercept[SparkException] {
sql(s"insert into t values($outOfRangeValue1)")
}.getCause.getMessage
- assert(msg.contains(
- s"""The value ${outOfRangeValue1}L of the type "BIGINT" cannot be
cast to "INT""""))
+ assert(msg.contains(expectedMsg))
val outOfRangeValue2 = (Int.MinValue - 1L).toString
msg = intercept[SparkException] {
sql(s"insert into t values($outOfRangeValue2)")
}.getCause.getMessage
- assert(msg.contains(
- s"""The value ${outOfRangeValue2}L of the type "BIGINT" cannot be
cast to "INT""""))
+ assert(msg.contains(expectedMsg))
}
}
}
@@ -732,18 +732,18 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
withTable("t") {
sql("create table t(b long) using parquet")
val outOfRangeValue1 = Math.nextUp(Long.MaxValue)
+ val expectedMsg = "Fail to insert a value of \"DOUBLE\" type into the
\"BIGINT\" type " +
+ "column `b` due to an overflow."
var msg = intercept[SparkException] {
sql(s"insert into t values(${outOfRangeValue1}D)")
}.getCause.getMessage
- assert(msg.contains(
- s"""The value ${outOfRangeValue1}D of the type "DOUBLE" cannot be
cast to "BIGINT""""))
+ assert(msg.contains(expectedMsg))
val outOfRangeValue2 = Math.nextDown(Long.MinValue)
msg = intercept[SparkException] {
sql(s"insert into t values(${outOfRangeValue2}D)")
}.getCause.getMessage
- assert(msg.contains(
- s"""The value ${outOfRangeValue2}D of the type "DOUBLE" cannot be
cast to "BIGINT""""))
+ assert(msg.contains(expectedMsg))
}
}
}
@@ -754,10 +754,12 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
withTable("t") {
sql("create table t(b decimal(3,2)) using parquet")
val outOfRangeValue = "123.45"
+ val expectedMsg = "Fail to insert a value of \"DECIMAL(5,2)\" type
into the " +
+ "\"DECIMAL(3,2)\" type column `b` due to an overflow."
val msg = intercept[SparkException] {
sql(s"insert into t values(${outOfRangeValue})")
}.getCause.getMessage
- assert(msg.contains("cannot be represented as Decimal(3, 2)"))
+ assert(msg.contains(expectedMsg))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]