This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d3455dfea8a7 [SPARK-48572][SQL] Fix DateSub, DateAdd, WindowTime,
TimeWindow and SessionWindow expressions
d3455dfea8a7 is described below
commit d3455dfea8a705d338949487f61f8c45eb089b3c
Author: Mihailo Milosevic <[email protected]>
AuthorDate: Mon Jun 17 16:41:19 2024 +0800
[SPARK-48572][SQL] Fix DateSub, DateAdd, WindowTime, TimeWindow and
SessionWindow expressions
### What changes were proposed in this pull request?
Fix for listed expressions.
### Why are the changes needed?
These expressions are found to be faulty when working with collations.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes expressions that are not working with collations.
### How was this patch tested?
Tests added for expressions to reproduce errors.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46943 from mihailom-db/SPARK-48572.
Authored-by: Mihailo Milosevic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/CollationTypeCasts.scala | 8 ++-
.../sql/catalyst/expressions/TimeWindow.scala | 2 +-
.../catalyst/expressions/complexTypeCreator.scala | 4 +-
.../spark/sql/types/DataTypeExpression.scala | 13 +++-
.../spark/sql/CollationSQLExpressionsSuite.scala | 78 ++++++++++++++++++++++
5 files changed, 99 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala
index b131b8b148b6..276062ce211d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala
@@ -85,8 +85,12 @@ object CollationTypeCasts extends TypeCoercionRule {
case otherExpr @ (
_: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _:
Greatest | _: Least |
- _: Coalesce | _: BinaryExpression | _: ConcatWs | _: Mask | _:
StringReplace |
- _: StringTranslate | _: StringTrim | _: StringTrimLeft | _:
StringTrimRight) =>
+ _: Coalesce | _: ArrayContains | _: ArrayExcept | _: ConcatWs | _: Mask
| _: StringReplace |
+ _: StringTranslate | _: StringTrim | _: StringTrimLeft | _:
StringTrimRight |
+ _: ArrayIntersect | _: ArrayPosition | _: ArrayRemove | _: ArrayUnion |
_: ArraysOverlap |
+ _: Contains | _: EndsWith | _: EqualNullSafe | _: EqualTo | _: FindInSet
| _: GreaterThan |
+ _: GreaterThanOrEqual | _: LessThan | _: LessThanOrEqual | _: StartsWith
| _: StringInstr |
+ _: ToNumber | _: TryToNumber) =>
val newChildren = collateToSingleType(otherExpr.children)
otherExpr.withNewChildren(newChildren)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index fd2e302deb99..673f9397bb03 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -212,7 +212,7 @@ object TimeWindow {
* that we can use `window` in SQL.
*/
def parseExpression(expr: Expression): Long = expr match {
- case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString)
+ case NonNullLiteral(s, _: StringType) =>
getIntervalInMicroSeconds(s.toString)
case IntegerLiteral(i) => i.toLong
case NonNullLiteral(l, LongType) => l.toString.toLong
case _ => throw
QueryCompilationErrors.invalidLiteralForWindowDurationError()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 167c02c0bafc..1bfa11d67af6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -196,7 +196,7 @@ case class CreateMap(children: Seq[Expression],
useStringTypeWhenEmpty: Boolean)
private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
- StringType
+ SQLConf.get.defaultStringType
} else {
NullType
}
@@ -354,7 +354,7 @@ case class MapFromArrays(left: Expression, right:
Expression)
case object NamePlaceholder extends LeafExpression with Unevaluable {
override lazy val resolved: Boolean = false
override def nullable: Boolean = false
- override def dataType: DataType = StringType
+ override def dataType: DataType = SQLConf.get.defaultStringType
override def prettyName: String = "NamePlaceholder"
override def toString: String = prettyName
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeExpression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeExpression.scala
index 026272a0f2d8..fd942ba60de4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeExpression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeExpression.scala
@@ -30,7 +30,18 @@ private[sql] abstract class DataTypeExpression(val dataType:
DataType) {
}
private[sql] case object BooleanTypeExpression extends
DataTypeExpression(BooleanType)
-private[sql] case object StringTypeExpression extends
DataTypeExpression(StringType)
+private[sql] case object StringTypeExpression {
+ /**
+ * Enables matching against StringType for expressions:
+ * {{{
+ * case Cast(child @ StringType(collationId), NumericType) =>
+ * ...
+ * }}}
+ */
+ def unapply(e: Expression): Boolean = {
+ e.dataType.isInstanceOf[StringType]
+ }
+}
private[sql] case object TimestampTypeExpression extends
DataTypeExpression(TimestampType)
private[sql] case object DateTypeExpression extends
DataTypeExpression(DateType)
private[sql] case object ByteTypeExpression extends
DataTypeExpression(ByteType)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
index 5f3d33d8825a..a1c6f5f94317 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
@@ -1905,6 +1905,84 @@ class CollationSQLExpressionsSuite
})
}
+ test("DateAdd expression with collation") {
+ // Supported collations
+ testSuppCollations.foreach(collationName => {
+ val query = s"""select date_add(collate('2016-07-30',
'${collationName}'), 1)"""
+ // Result & data type check
+ val testQuery = sql(query)
+ val dataType = DateType
+ val expectedResult = "2016-07-31"
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ checkAnswer(testQuery, Row(Date.valueOf(expectedResult)))
+ })
+ }
+
+ test("DateSub expression with collation") {
+ // Supported collations
+ testSuppCollations.foreach(collationName => {
+ val query = s"""select date_sub(collate('2016-07-30',
'${collationName}'), 1)"""
+ // Result & data type check
+ val testQuery = sql(query)
+ val dataType = DateType
+ val expectedResult = "2016-07-29"
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ checkAnswer(testQuery, Row(Date.valueOf(expectedResult)))
+ })
+ }
+
+ test("WindowTime and TimeWindow expressions with collation") {
+ // Supported collations
+ testSuppCollations.foreach(collationName => {
+ withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+ val query =
+ s"""SELECT window_time(window)
+ | FROM (SELECT a, window, count(*) as cnt FROM VALUES
+ |('A1', '2021-01-01 00:00:00'),
+ |('A1', '2021-01-01 00:04:30'),
+ |('A1', '2021-01-01 00:06:00'),
+ |('A2', '2021-01-01 00:01:00') AS tab(a, b)
+ |GROUP by a, window(b, '5 minutes') ORDER BY a, window.start);
+ |""".stripMargin
+ // Result & data type check
+ val testQuery = sql(query)
+ val dataType = TimestampType
+ val expectedResults =
+ Seq("2021-01-01 00:04:59.999999",
+ "2021-01-01 00:09:59.999999",
+ "2021-01-01 00:04:59.999999")
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ checkAnswer(testQuery, expectedResults.map(ts =>
Row(Timestamp.valueOf(ts))))
+ }
+ })
+ }
+
+ test("SessionWindow expressions with collation") {
+ // Supported collations
+ testSuppCollations.foreach(collationName => {
+ withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+ val query =
+ s"""SELECT count(*) as cnt
+ | FROM VALUES
+ |('A1', '2021-01-01 00:00:00'),
+ |('A1', '2021-01-01 00:04:30'),
+ |('A1', '2021-01-01 00:10:00'),
+ |('A2', '2021-01-01 00:01:00'),
+ |('A2', '2021-01-01 00:04:30') AS tab(a, b)
+ |GROUP BY a,
+ |session_window(b, CASE WHEN a = 'A1' THEN '5 minutes' ELSE '1
minutes' END)
+ |ORDER BY a, session_window.start;
+ |""".stripMargin
+ // Result & data type check
+ val testQuery = sql(query)
+ val dataType = LongType
+ val expectedResults = Seq(2, 1, 1, 1)
+ assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+ checkAnswer(testQuery, expectedResults.map(Row(_)))
+ }
+ })
+ }
+
test("ConvertTimezone expression with collation") {
// Supported collations
testSuppCollations.foreach(collationName => {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]