This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 11cce7e7338 [SPARK-40768][SQL] Migrate type check failures of
bloom_filter_agg() onto error classes
11cce7e7338 is described below
commit 11cce7e73380231ec7c94096655e3d98ce7e635d
Author: lvshaokang <[email protected]>
AuthorDate: Fri Oct 21 10:06:44 2022 +0500
[SPARK-40768][SQL] Migrate type check failures of bloom_filter_agg() onto
error classes
### What changes were proposed in this pull request?
In the PR, I propose to use error classes in the case of type check failure
in Bloom Filter Agg expressions.
### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages.
### Does this PR introduce _any_ user-facing change?
Yes. The PR changes user-facing error messages.
### How was this patch tested?
```
build/sbt "sql/testOnly *SQLQueryTestSuite"
build/sbt "test:testOnly org.apache.spark.SparkThrowableSuite"
build/sbt "test:testOnly *BloomFilterAggregateQuerySuite"
```
Closes #38315 from lvshaokang/SPARK-40768.
Authored-by: lvshaokang <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 2 +-
.../expressions/BloomFilterMightContain.scala | 3 +-
.../aggregate/BloomFilterAggregate.scala | 61 +++++++--
.../spark/sql/BloomFilterAggregateQuerySuite.scala | 144 ++++++++++++++++++---
4 files changed, 179 insertions(+), 31 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 0cfb6861c77..1e9519dd89a 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -108,7 +108,7 @@
},
"BLOOM_FILTER_WRONG_TYPE" : {
"message" : [
- "Input to function <functionName> should have been <expectedLeft>
followed by a value with <expectedRight>, but it's [<actualLeft>,
<actualRight>]."
+ "Input to function <functionName> should have been <expectedLeft>
followed by value with <expectedRight>, but it's [<actual>]."
]
},
"CANNOT_CONVERT_TO_JSON" : {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
index 5cb19d36b80..b2273b6a6d1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
@@ -76,8 +76,7 @@ case class BloomFilterMightContain(
"functionName" -> toSQLId(prettyName),
"expectedLeft" -> toSQLType(BinaryType),
"expectedRight" -> toSQLType(LongType),
- "actualLeft" -> toSQLType(left.dataType),
- "actualRight" -> toSQLType(right.dataType)
+ "actual" -> Seq(left.dataType,
right.dataType).map(toSQLType).mkString(", ")
)
)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
index c734bca3ef8..5b78c5b5228 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId,
toSQLType, toSQLValue}
import org.apache.spark.sql.catalyst.trees.TernaryLike
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -63,28 +64,66 @@ case class BloomFilterAggregate(
override def checkInputDataTypes(): TypeCheckResult = {
(first.dataType, second.dataType, third.dataType) match {
case (_, NullType, _) | (_, _, NullType) =>
- TypeCheckResult.TypeCheckFailure("Null typed values cannot be used as
size arguments")
+ DataTypeMismatch(
+ errorSubClass = "UNEXPECTED_NULL",
+ messageParameters = Map(
+ "exprName" -> "estimatedNumItems or numBits"
+ )
+ )
case (LongType, LongType, LongType) =>
if (!estimatedNumItemsExpression.foldable) {
- TypeCheckFailure("The estimated number of items provided must be a
constant literal")
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> "estimatedNumItems",
+ "inputType" -> toSQLType(estimatedNumItemsExpression.dataType),
+ "inputExpr" -> toSQLExpr(estimatedNumItemsExpression)
+ )
+ )
} else if (estimatedNumItems <= 0L) {
- TypeCheckFailure("The estimated number of items must be a positive
value " +
- s" (current value = $estimatedNumItems)")
+ DataTypeMismatch(
+ errorSubClass = "VALUE_OUT_OF_RANGE",
+ messageParameters = Map(
+ "exprName" -> "estimatedNumItems",
+ "valueRange" -> s"[0, positive]",
+ "currentValue" -> toSQLValue(estimatedNumItems, LongType)
+ )
+ )
} else if (!numBitsExpression.foldable) {
- TypeCheckFailure("The number of bits provided must be a constant
literal")
+ DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> "numBitsExpression",
+ "inputType" -> toSQLType(numBitsExpression.dataType),
+ "inputExpr" -> toSQLExpr(numBitsExpression)
+ )
+ )
} else if (numBits <= 0L) {
- TypeCheckFailure("The number of bits must be a positive value " +
- s" (current value = $numBits)")
+ DataTypeMismatch(
+ errorSubClass = "VALUE_OUT_OF_RANGE",
+ messageParameters = Map(
+ "exprName" -> "numBits",
+ "valueRange" -> s"[0, positive]",
+ "currentValue" -> toSQLValue(numBits, LongType)
+ )
+ )
} else {
require(estimatedNumItems <=
SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))
require(numBits <=
SQLConf.get.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))
TypeCheckSuccess
}
- case _ => TypeCheckResult.TypeCheckFailure(s"Input to function
$prettyName should have " +
- s"been a ${LongType.simpleString} value followed with two
${LongType.simpleString} size " +
- s"arguments, but it's [${first.dataType.catalogString}, " +
- s"${second.dataType.catalogString}, ${third.dataType.catalogString}]")
+ case _ =>
+ DataTypeMismatch(
+ errorSubClass = "BLOOM_FILTER_WRONG_TYPE",
+ messageParameters = Map(
+ "functionName" -> toSQLId(prettyName),
+ "expectedLeft" -> toSQLType(BinaryType),
+ "expectedRight" -> toSQLType(LongType),
+ "actual" -> Seq(first.dataType, second.dataType, third.dataType)
+ .map(toSQLType).mkString(", ")
+ )
+ )
}
}
override def nullable: Boolean = true
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
index 6a22414db00..cf5d4c8c1e9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.Cast.toSQLValue
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.LongType
/**
* Query tests for the Bloom filter aggregate and filter function.
@@ -62,8 +64,8 @@ class BloomFilterAggregateQuerySuite extends QueryTest with
SharedSparkSession {
val table = "bloom_filter_test"
for (numEstimatedItems <- Seq(Long.MinValue, -10L, 0L, 4096L, 4194304L,
Long.MaxValue,
conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS))) {
- for (numBits <- Seq(Long.MinValue, -10L, 0L, 4096L, 4194304L,
Long.MaxValue,
- conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS))) {
+ for ((numBits, index) <- Seq(Long.MinValue, -10L, 0L, 4096L, 4194304L,
Long.MaxValue,
+ conf.getConf(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)).zipWithIndex)
{
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
@@ -87,13 +89,57 @@ class BloomFilterAggregateQuerySuite extends QueryTest with
SharedSparkSession {
val exception = intercept[AnalysisException] {
spark.sql(sqlString)
}
- assert(exception.getMessage.contains(
- "The estimated number of items must be a positive value"))
+ val stop = numEstimatedItems match {
+ case Long.MinValue => Seq(169, 152, 150, 153, 156, 168, 157)
+ case -10L => Seq(152, 135, 133, 136, 139, 151, 140)
+ case 0L => Seq(150, 133, 131, 134, 137, 149, 138)
+ }
+ checkError(
+ exception = exception,
+ errorClass = "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE",
+ parameters = Map(
+ "exprName" -> "estimatedNumItems",
+ "valueRange" -> "[0, positive]",
+ "currentValue" -> toSQLValue(numEstimatedItems, LongType),
+ "sqlExpr" -> (s""""bloom_filter_agg(col,
CAST($numEstimatedItems AS BIGINT), """ +
+ s"""CAST($numBits AS BIGINT))"""")
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(col,\n" +
+ s" cast($numEstimatedItems as long),\n" +
+ s" cast($numBits as long))",
+ start = 49,
+ stop = stop(index)
+ )
+ )
} else if (numBits <= 0) {
val exception = intercept[AnalysisException] {
spark.sql(sqlString)
}
- assert(exception.getMessage.contains("The number of bits must be a
positive value"))
+ val stop = numEstimatedItems match {
+ case 4096L => Seq(153, 136, 134)
+ case 4194304L => Seq(156, 139, 137)
+ case Long.MaxValue => Seq(168, 151, 149)
+ case 4000000 => Seq(156, 139, 137)
+ }
+ checkError(
+ exception = exception,
+ errorClass = "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE",
+ parameters = Map(
+ "exprName" -> "numBits",
+ "valueRange" -> "[0, positive]",
+ "currentValue" -> toSQLValue(numBits, LongType),
+ "sqlExpr" -> (s""""bloom_filter_agg(col,
CAST($numEstimatedItems AS BIGINT), """ +
+ s"""CAST($numBits AS BIGINT))"""")
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(col,\n" +
+ s" cast($numEstimatedItems as long),\n" +
+ s" cast($numBits as long))",
+ start = 49,
+ stop = stop(index)
+ )
+ )
} else {
checkAnswer(spark.sql(sqlString), Row(true, false))
}
@@ -109,8 +155,22 @@ class BloomFilterAggregateQuerySuite extends QueryTest
with SharedSparkSession {
|FROM values (1.2), (2.5) as t(a)"""
.stripMargin)
}
- assert(exception1.getMessage.contains(
- "Input to function bloom_filter_agg should have been a bigint value"))
+ checkError(
+ exception = exception1,
+ errorClass = "DATATYPE_MISMATCH.BLOOM_FILTER_WRONG_TYPE",
+ parameters = Map(
+ "functionName" -> "`bloom_filter_agg`",
+ "sqlExpr" -> "\"bloom_filter_agg(a, 1000000, 8388608)\"",
+ "expectedLeft" -> "\"BINARY\"",
+ "expectedRight" -> "\"BIGINT\"",
+ "actual" -> "\"DECIMAL(2,1)\", \"BIGINT\", \"BIGINT\""
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(a)",
+ start = 8,
+ stop = 26
+ )
+ )
val exception2 = intercept[AnalysisException] {
spark.sql("""
@@ -118,8 +178,22 @@ class BloomFilterAggregateQuerySuite extends QueryTest
with SharedSparkSession {
|FROM values (cast(1 as long)), (cast(2 as long)) as t(a)"""
.stripMargin)
}
- assert(exception2.getMessage.contains(
- "function bloom_filter_agg should have been a bigint value followed with
two bigint"))
+ checkError(
+ exception = exception2,
+ errorClass = "DATATYPE_MISMATCH.BLOOM_FILTER_WRONG_TYPE",
+ parameters = Map(
+ "functionName" -> "`bloom_filter_agg`",
+ "sqlExpr" -> "\"bloom_filter_agg(a, 2, (2 * 8))\"",
+ "expectedLeft" -> "\"BINARY\"",
+ "expectedRight" -> "\"BIGINT\"",
+ "actual" -> "\"BIGINT\", \"INT\", \"BIGINT\""
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(a, 2)",
+ start = 8,
+ stop = 29
+ )
+ )
val exception3 = intercept[AnalysisException] {
spark.sql("""
@@ -127,8 +201,22 @@ class BloomFilterAggregateQuerySuite extends QueryTest
with SharedSparkSession {
|FROM values (cast(1 as long)), (cast(2 as long)) as t(a)"""
.stripMargin)
}
- assert(exception3.getMessage.contains(
- "function bloom_filter_agg should have been a bigint value followed with
two bigint"))
+ checkError(
+ exception = exception3,
+ errorClass = "DATATYPE_MISMATCH.BLOOM_FILTER_WRONG_TYPE",
+ parameters = Map(
+ "functionName" -> "`bloom_filter_agg`",
+ "sqlExpr" -> "\"bloom_filter_agg(a, CAST(2 AS BIGINT), 5)\"",
+ "expectedLeft" -> "\"BINARY\"",
+ "expectedRight" -> "\"BIGINT\"",
+ "actual" -> "\"BIGINT\", \"BIGINT\", \"INT\""
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(a, cast(2 as long), 5)",
+ start = 8,
+ stop = 46
+ )
+ )
val exception4 = intercept[AnalysisException] {
spark.sql("""
@@ -136,7 +224,19 @@ class BloomFilterAggregateQuerySuite extends QueryTest
with SharedSparkSession {
|FROM values (cast(1 as long)), (cast(2 as long)) as t(a)"""
.stripMargin)
}
- assert(exception4.getMessage.contains("Null typed values cannot be used as
size arguments"))
+ checkError(
+ exception = exception4,
+ errorClass = "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+ parameters = Map(
+ "exprName" -> "estimatedNumItems or numBits",
+ "sqlExpr" -> "\"bloom_filter_agg(a, NULL, 5)\""
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(a, null, 5)",
+ start = 8,
+ stop = 35
+ )
+ )
val exception5 = intercept[AnalysisException] {
spark.sql("""
@@ -144,7 +244,19 @@ class BloomFilterAggregateQuerySuite extends QueryTest
with SharedSparkSession {
|FROM values (cast(1 as long)), (cast(2 as long)) as t(a)"""
.stripMargin)
}
- assert(exception5.getMessage.contains("Null typed values cannot be used as
size arguments"))
+ checkError(
+ exception = exception5,
+ errorClass = "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+ parameters = Map(
+ "exprName" -> "estimatedNumItems or numBits",
+ "sqlExpr" -> "\"bloom_filter_agg(a, 5, NULL)\""
+ ),
+ context = ExpectedContext(
+ fragment = "bloom_filter_agg(a, 5, null)",
+ start = 8,
+ stop = 35
+ )
+ )
}
test("Test that might_contain errors out disallowed input value types") {
@@ -160,8 +272,7 @@ class BloomFilterAggregateQuerySuite extends QueryTest with
SharedSparkSession {
"functionName" -> "`might_contain`",
"expectedLeft" -> "\"BINARY\"",
"expectedRight" -> "\"BIGINT\"",
- "actualLeft" -> "\"DECIMAL(2,1)\"",
- "actualRight" -> "\"BIGINT\""
+ "actual" -> "\"DECIMAL(2,1)\", \"BIGINT\""
),
context = ExpectedContext(
fragment = "might_contain(1.0, 1L)",
@@ -182,8 +293,7 @@ class BloomFilterAggregateQuerySuite extends QueryTest with
SharedSparkSession {
"functionName" -> "`might_contain`",
"expectedLeft" -> "\"BINARY\"",
"expectedRight" -> "\"BIGINT\"",
- "actualLeft" -> "\"VOID\"",
- "actualRight" -> "\"DECIMAL(1,1)\""
+ "actual" -> "\"VOID\", \"DECIMAL(1,1)\""
),
context = ExpectedContext(
fragment = "might_contain(NULL, 0.1)",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]