This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 929405ab753 [SPARK-45575][SQL] Support time travel options for df read
API
929405ab753 is described below
commit 929405ab753239e55bd46e9cdb31bb9f8c63b86d
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Oct 27 10:15:12 2023 +0800
[SPARK-45575][SQL] Support time travel options for df read API
### What changes were proposed in this pull request?
We've added time travel API in DS v2 and a dedicated SQL syntax for it.
However, there is no way to do it with df read API. This PR adds time travel
options (`timestampAsOf` and `versionAsOf`) to support time travel with df read
API.
### Why are the changes needed?
feature parity
### Does this PR introduce _any_ user-facing change?
Yes, now people can specify time travel in read options.
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43403 from cloud-fan/time-option.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../src/main/resources/error/error-classes.json | 22 +++++--
...valid-time-travel-timestamp-expr-error-class.md | 4 ++
docs/sql-error-conditions.md | 12 ++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 19 ++++--
.../sql/catalyst/analysis/TimeTravelSpec.scala | 40 +++++++++--
.../spark/sql/errors/QueryCompilationErrors.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 14 ++++
.../datasources/v2/DataSourceV2Utils.scala | 3 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 77 +++++++++++++++-------
9 files changed, 153 insertions(+), 40 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 2ab4af73a8e..5b756ad1077 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2192,6 +2192,12 @@
],
"sqlState" : "42K0F"
},
+ "INVALID_TIME_TRAVEL_SPEC" : {
+ "message" : [
+ "Cannot specify both version and timestamp when time travelling the
table."
+ ],
+ "sqlState" : "42K0E"
+ },
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR" : {
"message" : [
"The time travel timestamp expression <expr> is invalid."
@@ -2207,6 +2213,11 @@
"Must be deterministic."
]
},
+ "OPTION" : {
+ "message" : [
+ "Timestamp string in the options must be able to cast to TIMESTAMP
type."
+ ]
+ },
"UNEVALUABLE" : {
"message" : [
"Must be evaluable."
@@ -2386,6 +2397,12 @@
],
"sqlState" : "42803"
},
+ "MULTIPLE_TIME_TRAVEL_SPEC" : {
+ "message" : [
+ "Cannot specify time travel in both the time travel clause and options."
+ ],
+ "sqlState" : "42K0E"
+ },
"MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : {
"message" : [
"The expression <expr> does not support more than one source."
@@ -5132,11 +5149,6 @@
"<errorMessage>"
]
},
- "_LEGACY_ERROR_TEMP_1334" : {
- "message" : [
- "Cannot specify both version and timestamp when time travelling the
table."
- ]
- },
"_LEGACY_ERROR_TEMP_1338" : {
"message" : [
"Sinks cannot request distribution and ordering in continuous execution
mode."
diff --git
a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
index 42513bf989b..80344662d09 100644
---
a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
+++
b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
@@ -33,6 +33,10 @@ Cannot be casted to the "TIMESTAMP" type.
Must be deterministic.
+## OPTION
+
+Timestamp string in the options must be able to cast to TIMESTAMP type.
+
## UNEVALUABLE
Must be evaluable.
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 007c95297f7..7c537f6fe20 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1222,6 +1222,12 @@ For more details see
[INVALID_SUBQUERY_EXPRESSION](sql-error-conditions-invalid-
Cannot create the persistent object `<objName>` of the type `<obj>` because it
references to the temporary object `<tempObjName>` of the type `<tempObj>`.
Please make the temporary object `<tempObjName>` persistent, or make the
persistent object `<objName>` temporary.
+### INVALID_TIME_TRAVEL_SPEC
+
+[SQLSTATE:
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both version and timestamp when time travelling the table.
+
###
[INVALID_TIME_TRAVEL_TIMESTAMP_EXPR](sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.html)
[SQLSTATE:
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -1360,6 +1366,12 @@ For more details see
[MISSING_ATTRIBUTES](sql-error-conditions-missing-attribute
The query does not include a GROUP BY clause. Add GROUP BY or turn it into the
window functions using OVER clauses.
+### MULTIPLE_TIME_TRAVEL_SPEC
+
+[SQLSTATE:
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify time travel in both the time travel clause and options.
+
### MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION
[SQLSTATE:
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 06d949ece26..65338f9917b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1122,7 +1122,8 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved &&
!SubqueryExpression.hasSubquery(ts)) =>
- resolveRelation(u, TimeTravelSpec.create(timestamp, version,
conf)).getOrElse(r)
+ val timeTravelSpec = TimeTravelSpec.create(timestamp, version,
conf.sessionLocalTimeZone)
+ resolveRelation(u, timeTravelSpec).getOrElse(r)
case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
@@ -1255,17 +1256,27 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
private def resolveRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
- resolveTempView(u.multipartIdentifier, u.isStreaming,
timeTravelSpec.isDefined).orElse {
+ val timeTravelSpecFromOptions = TimeTravelSpec.fromOptions(
+ u.options,
+ conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY),
+ conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY),
+ conf.sessionLocalTimeZone
+ )
+ if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) {
+ throw new AnalysisException("MULTIPLE_TIME_TRAVEL_SPEC",
Map.empty[String, String])
+ }
+ val finalTimeTravelSpec =
timeTravelSpec.orElse(timeTravelSpecFromOptions)
+ resolveTempView(u.multipartIdentifier, u.isStreaming,
finalTimeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
- val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq,
timeTravelSpec)
+ val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq,
finalTimeTravelSpec)
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}).orElse {
- val table = CatalogV2Util.loadTable(catalog, ident,
timeTravelSpec)
+ val table = CatalogV2Util.loadTable(catalog, ident,
finalTimeTravelSpec)
val loaded = createRelation(catalog, ident, table, u.options,
u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
index 26856d9a5e0..8bfcd955497 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.expressions.{Cast, Expression,
RuntimeReplaceable, SubqueryExpression, Unevaluable}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal,
RuntimeReplaceable, SubqueryExpression, Unevaluable}
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
sealed trait TimeTravelSpec
@@ -31,7 +32,7 @@ object TimeTravelSpec {
def create(
timestamp: Option[Expression],
version: Option[String],
- conf: SQLConf) : Option[TimeTravelSpec] = {
+ sessionLocalTimeZone: String) : Option[TimeTravelSpec] = {
if (timestamp.nonEmpty && version.nonEmpty) {
throw QueryCompilationErrors.invalidTimeTravelSpecError()
} else if (timestamp.nonEmpty) {
@@ -50,7 +51,7 @@ object TimeTravelSpec {
throw QueryCompilationErrors.invalidTimestampExprForTimeTravel(
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts)
}
- val tz = Some(conf.sessionLocalTimeZone)
+ val tz = Some(sessionLocalTimeZone)
// Set `ansiEnabled` to false, so that it can return null for invalid
input and we can provide
// better error message.
val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval()
@@ -65,4 +66,35 @@ object TimeTravelSpec {
None
}
}
+
+ def fromOptions(
+ options: CaseInsensitiveStringMap,
+ timestampKey: String,
+ versionKey: String,
+ sessionLocalTimeZone: String): Option[TimeTravelSpec] = {
+ (Option(options.get(timestampKey)), Option(options.get(versionKey))) match
{
+ case (Some(_), Some(_)) =>
+ throw QueryCompilationErrors.invalidTimeTravelSpecError()
+
+ case (Some(timestampStr), None) =>
+ val timestampValue = Cast(
+ Literal(timestampStr),
+ TimestampType,
+ Some(sessionLocalTimeZone),
+ ansiEnabled = false
+ ).eval()
+ if (timestampValue == null) {
+ throw new AnalysisException(
+ "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
+ Map("expr" -> s"'$timestampStr'")
+ )
+ }
+ Some(AsOfTimestamp(timestampValue.asInstanceOf[Long]))
+
+ case (None, Some(versionStr)) =>
+ Some(AsOfVersion(versionStr))
+
+ case _ => None
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6d1aa076eb0..ae5ebd6a974 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3343,7 +3343,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
def invalidTimeTravelSpecError(): Throwable = {
new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1334",
+ errorClass = "INVALID_TIME_TRAVEL_SPEC",
messageParameters = Map.empty)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1e759b6266c..43dc541fbb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4474,6 +4474,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val TIME_TRAVEL_TIMESTAMP_KEY =
+ buildConf("spark.sql.timeTravelTimestampKey")
+ .doc("The option name to specify the time travel timestamp when reading
a table.")
+ .version("4.0.0")
+ .stringConf
+ .createWithDefault("timestampAsOf")
+
+ val TIME_TRAVEL_VERSION_KEY =
+ buildConf("spark.sql.timeTravelVersionKey")
+ .doc("The option name to specify the time travel table version when
reading a table.")
+ .version("4.0.0")
+ .stringConf
+ .createWithDefault("versionAsOf")
+
val LEGACY_PERCENTILE_DISC_CALCULATION =
buildConf("spark.sql.legacy.percentileDiscCalculation")
.internal()
.doc("If true, the old bogus percentile_disc calculation is used. The old
calculation " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index e4853006157..c4e7bf23cac 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -133,7 +133,8 @@ private[sql] object DataSourceV2Utils extends Logging {
} else {
None
}
- val timeTravel = TimeTravelSpec.create(timeTravelTimestamp,
timeTravelVersion, conf)
+ val timeTravel = TimeTravelSpec.create(
+ timeTravelTimestamp, timeTravelVersion, conf.sessionLocalTimeZone)
(CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog),
Some(ident))
case _ =>
// TODO: Non-catalog paths for DSV2 are currently not well defined.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 3cbbc3786fc..c2e759efe40 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2902,10 +2902,14 @@ class DataSourceV2SQLSuiteV1Filter
sql(s"INSERT INTO $t2 VALUES (3)")
sql(s"INSERT INTO $t2 VALUES (4)")
- assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect()
- === Array(Row(1), Row(2)))
- assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
- === Array(Row(3), Row(4)))
+ val res1_sql = sql("SELECT * FROM t VERSION AS OF
'Snapshot123456789'").collect()
+ assert(res1_sql === Array(Row(1), Row(2)))
+ val res1_df = spark.read.option("versionAsOf",
"Snapshot123456789").table("t").collect()
+ assert(res1_df === Array(Row(1), Row(2)))
+ val res2_sql = sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
+ assert(res2_sql === Array(Row(3), Row(4)))
+ val res2_df = spark.read.option("versionAsOf",
"2345678910").table("t").collect()
+ assert(res2_df === Array(Row(3), Row(4)))
}
val ts1 = DateTimeUtils.stringToTimestampAnsi(
@@ -2928,29 +2932,35 @@ class DataSourceV2SQLSuiteV1Filter
sql(s"INSERT INTO $t4 VALUES (7)")
sql(s"INSERT INTO $t4 VALUES (8)")
- assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29
00:37:58'").collect()
- === Array(Row(5), Row(6)))
- assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29
00:00:00'").collect()
- === Array(Row(7), Row(8)))
- assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect()
- === Array(Row(5), Row(6)))
- assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect()
- === Array(Row(7), Row(8)))
- assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts1InSeconds").collect()
- === Array(Row(5), Row(6)))
- assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts2InSeconds").collect()
- === Array(Row(7), Row(8)))
- assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1,
29)").collect()
- === Array(Row(7), Row(8)))
- assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29
00:00:00')").collect()
- === Array(Row(7), Row(8)))
+ val res1_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29
00:37:58'").collect()
+ assert(res1_sql === Array(Row(5), Row(6)))
+ val res1_df = spark.read.option("timestampAsOf", "2019-01-29
00:37:58").table("t").collect()
+ assert(res1_df === Array(Row(5), Row(6)))
+ val res2_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29
00:00:00'").collect()
+ assert(res2_sql === Array(Row(7), Row(8)))
+ val res2_df = spark.read.option("timestampAsOf", "2021-01-29
00:00:00").table("t").collect()
+ assert(res2_df === Array(Row(7), Row(8)))
+
+ val res3 = sql(s"SELECT * FROM t TIMESTAMP AS OF
$ts1InSeconds").collect()
+ assert(res3 === Array(Row(5), Row(6)))
+ val res4 = sql(s"SELECT * FROM t TIMESTAMP AS OF
$ts2InSeconds").collect()
+ assert(res4 === Array(Row(7), Row(8)))
+ val res5 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts1InSeconds").collect()
+ assert(res5 === Array(Row(5), Row(6)))
+ val res6 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts2InSeconds").collect()
+ assert(res6 === Array(Row(7), Row(8)))
+ val res7 = sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1,
29)").collect()
+ assert(res7 === Array(Row(7), Row(8)))
+ val res8 = sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29
00:00:00')")
+ .collect()
+ assert(res8 === Array(Row(7), Row(8)))
// Scalar subquery is also supported.
- assert(sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1,
29))").collect()
- === Array(Row(7), Row(8)))
+ val res9 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021,
1, 29))").collect()
+ assert(res9 === Array(Row(7), Row(8)))
// Nested subquery also works
- assert(
- sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021,
1, 29)))").collect()
- === Array(Row(7), Row(8)))
+ val res10 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT
make_date(2021, 1, 29)))")
+ .collect()
+ assert(res10 === Array(Row(7), Row(8)))
checkError(
exception = intercept[AnalysisException] {
@@ -2967,6 +2977,23 @@ class DataSourceV2SQLSuiteV1Filter
errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT",
parameters = Map("expr" -> "\"abc\""))
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.read.option("timestampAsOf", "abc").table("t").collect()
+ },
+ errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
+ parameters = Map("expr" -> "'abc'"))
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.read
+ .option("timestampAsOf", "abc")
+ .option("versionAsOf", "1")
+ .table("t")
+ .collect()
+ },
+ errorClass = "INVALID_TIME_TRAVEL_SPEC")
+
checkError(
exception = intercept[AnalysisException] {
sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]