This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4731c85c5569 [SPARK-51119][SQL][FOLLOW-UP] Fix missing fallback case
for parsing corrupt exists_default value
4731c85c5569 is described below
commit 4731c85c556934f3ab56c49fd5b9ccbc939dc053
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Apr 24 09:43:38 2025 +0800
[SPARK-51119][SQL][FOLLOW-UP] Fix missing fallback case for parsing corrupt
exists_default value
### What changes were proposed in this pull request?
Add another fallback for broken (non-resolved) exists_default values for
SPARK-51119 original fix.
### Why are the changes needed?
https://github.com/apache/spark/pull/49962 added a fallback in case there
were already broken (ie, non-resolved) persisted default values in catalogs. A
broken one is something like 'current_database, current_user,
current_timestamp' , these are non-deterministic and will bring wrong results
in EXISTS_DEFAULT, where user expects the value resolved when they set the
default.
However, this fallback missed one case when the current_xxx is in a cast.
This fixes it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add to existing unit test in StructTypeSuite
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50685 from szehon-ho/SPARK-51119-follow-3.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 516859f273b72e0db46f86cd40226f5f1075417b)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 2 +-
.../apache/spark/sql/types/StructTypeSuite.scala | 36 ++++++++++++++++------
2 files changed, 27 insertions(+), 11 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index ebd0ca9da1df..b24ad30e0719 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -371,7 +371,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
val expr = Literal.fromSQL(defaultSQL) match {
// EXISTS_DEFAULT will have a cast from analyze() due to
coerceDefaultValue
// hence we need to add timezone to the cast if necessary
- case c: Cast if c.needsTimeZone =>
+ case c: Cast if c.child.resolved && c.needsTimeZone =>
c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
case e: Expression => e
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index ad0adf13643a..5dd45d3d4496 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.StructType.fromDDL
import org.apache.spark.sql.types.YearMonthIntervalType._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.UTF8String.LongWrapper
class StructTypeSuite extends SparkFunSuite with SQLHelper {
@@ -835,18 +836,33 @@ class StructTypeSuite extends SparkFunSuite with
SQLHelper {
test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
val source = StructType(
Array(
- StructField("c1", VariantType, true,
- new MetadataBuilder()
- .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"parse_json(null)")
-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
"parse_json(null)")
- .build()),
- StructField("c0", StringType, true,
- new MetadataBuilder()
- .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"current_catalog()")
-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
"current_catalog()")
- .build())))
+ StructField("c0", VariantType, true,
+ new MetadataBuilder()
+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
+ "parse_json(null)")
+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+ "parse_json(null)")
+ .build()),
+ StructField("c1", StringType, true,
+ new MetadataBuilder()
+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
+ "current_catalog()")
+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+ "current_catalog()")
+ .build()),
+ StructField("c2", StringType, true,
+ new MetadataBuilder()
+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
+ "CAST(CURRENT_TIMESTAMP AS BIGINT)")
+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+ "CAST(CURRENT_TIMESTAMP AS BIGINT)")
+ .build())))
val res = ResolveDefaultColumns.existenceDefaultValues(source)
assert(res(0) == null)
assert(res(1) == UTF8String.fromString("spark_catalog"))
+
+ val res2Wrapper = new LongWrapper
+ assert(res(2).asInstanceOf[UTF8String].toLong(res2Wrapper))
+ assert(res2Wrapper.value > 0)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]