This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 929405ab753 [SPARK-45575][SQL] Support time travel options for df read 
API
929405ab753 is described below

commit 929405ab753239e55bd46e9cdb31bb9f8c63b86d
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Oct 27 10:15:12 2023 +0800

    [SPARK-45575][SQL] Support time travel options for df read API
    
    ### What changes were proposed in this pull request?
    
    We've added time travel API in DS v2 and a dedicated SQL syntax for it. 
However, there is no way to do it with df read API. This PR adds time travel 
options (`timestampAsOf` and `versionAsOf`) to support time travel with df read 
API.
    
    ### Why are the changes needed?
    
    feature parity
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now people can specify time travel in read options.
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #43403 from cloud-fan/time-option.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../src/main/resources/error/error-classes.json    | 22 +++++--
 ...valid-time-travel-timestamp-expr-error-class.md |  4 ++
 docs/sql-error-conditions.md                       | 12 ++++
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 19 ++++--
 .../sql/catalyst/analysis/TimeTravelSpec.scala     | 40 +++++++++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 14 ++++
 .../datasources/v2/DataSourceV2Utils.scala         |  3 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 77 +++++++++++++++-------
 9 files changed, 153 insertions(+), 40 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 2ab4af73a8e..5b756ad1077 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2192,6 +2192,12 @@
     ],
     "sqlState" : "42K0F"
   },
+  "INVALID_TIME_TRAVEL_SPEC" : {
+    "message" : [
+      "Cannot specify both version and timestamp when time travelling the 
table."
+    ],
+    "sqlState" : "42K0E"
+  },
   "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR" : {
     "message" : [
       "The time travel timestamp expression <expr> is invalid."
@@ -2207,6 +2213,11 @@
           "Must be deterministic."
         ]
       },
+      "OPTION" : {
+        "message" : [
+          "Timestamp string in the options must be able to cast to TIMESTAMP 
type."
+        ]
+      },
       "UNEVALUABLE" : {
         "message" : [
           "Must be evaluable."
@@ -2386,6 +2397,12 @@
     ],
     "sqlState" : "42803"
   },
+  "MULTIPLE_TIME_TRAVEL_SPEC" : {
+    "message" : [
+      "Cannot specify time travel in both the time travel clause and options."
+    ],
+    "sqlState" : "42K0E"
+  },
   "MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : {
     "message" : [
       "The expression <expr> does not support more than one source."
@@ -5132,11 +5149,6 @@
       "<errorMessage>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_1334" : {
-    "message" : [
-      "Cannot specify both version and timestamp when time travelling the 
table."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1338" : {
     "message" : [
       "Sinks cannot request distribution and ordering in continuous execution 
mode."
diff --git 
a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md 
b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
index 42513bf989b..80344662d09 100644
--- 
a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
+++ 
b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md
@@ -33,6 +33,10 @@ Cannot be casted to the "TIMESTAMP" type.
 
 Must be deterministic.
 
+## OPTION
+
+Timestamp string in the options must be able to cast to TIMESTAMP type.
+
 ## UNEVALUABLE
 
 Must be evaluable.
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 007c95297f7..7c537f6fe20 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1222,6 +1222,12 @@ For more details see 
[INVALID_SUBQUERY_EXPRESSION](sql-error-conditions-invalid-
 
 Cannot create the persistent object `<objName>` of the type `<obj>` because it 
references to the temporary object `<tempObjName>` of the type `<tempObj>`. 
Please make the temporary object `<tempObjName>` persistent, or make the 
persistent object `<objName>` temporary.
 
+### INVALID_TIME_TRAVEL_SPEC
+
+[SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both version and timestamp when time travelling the table.
+
 ### 
[INVALID_TIME_TRAVEL_TIMESTAMP_EXPR](sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.html)
 
 [SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -1360,6 +1366,12 @@ For more details see 
[MISSING_ATTRIBUTES](sql-error-conditions-missing-attribute
 
 The query does not include a GROUP BY clause. Add GROUP BY or turn it into the 
window functions using OVER clauses.
 
+### MULTIPLE_TIME_TRAVEL_SPEC
+
+[SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify time travel in both the time travel clause and options.
+
 ### MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION
 
 [SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 06d949ece26..65338f9917b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1122,7 +1122,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 
       case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
           if timestamp.forall(ts => ts.resolved && 
!SubqueryExpression.hasSubquery(ts)) =>
-        resolveRelation(u, TimeTravelSpec.create(timestamp, version, 
conf)).getOrElse(r)
+        val timeTravelSpec = TimeTravelSpec.create(timestamp, version, 
conf.sessionLocalTimeZone)
+        resolveRelation(u, timeTravelSpec).getOrElse(r)
 
       case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
         lookupTableOrView(identifier).map {
@@ -1255,17 +1256,27 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
     private def resolveRelation(
         u: UnresolvedRelation,
         timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
-      resolveTempView(u.multipartIdentifier, u.isStreaming, 
timeTravelSpec.isDefined).orElse {
+      val timeTravelSpecFromOptions = TimeTravelSpec.fromOptions(
+        u.options,
+        conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY),
+        conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY),
+        conf.sessionLocalTimeZone
+      )
+      if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) {
+        throw new AnalysisException("MULTIPLE_TIME_TRAVEL_SPEC", 
Map.empty[String, String])
+      }
+      val finalTimeTravelSpec = 
timeTravelSpec.orElse(timeTravelSpecFromOptions)
+      resolveTempView(u.multipartIdentifier, u.isStreaming, 
finalTimeTravelSpec.isDefined).orElse {
         expandIdentifier(u.multipartIdentifier) match {
           case CatalogAndIdentifier(catalog, ident) =>
-            val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, 
timeTravelSpec)
+            val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, 
finalTimeTravelSpec)
             AnalysisContext.get.relationCache.get(key).map(_.transform {
               case multi: MultiInstanceRelation =>
                 val newRelation = multi.newInstance()
                 newRelation.copyTagsFrom(multi)
                 newRelation
             }).orElse {
-              val table = CatalogV2Util.loadTable(catalog, ident, 
timeTravelSpec)
+              val table = CatalogV2Util.loadTable(catalog, ident, 
finalTimeTravelSpec)
               val loaded = createRelation(catalog, ident, table, u.options, 
u.isStreaming)
               loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
               loaded
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
index 26856d9a5e0..8bfcd955497 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, 
RuntimeReplaceable, SubqueryExpression, Unevaluable}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal, 
RuntimeReplaceable, SubqueryExpression, Unevaluable}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 sealed trait TimeTravelSpec
 
@@ -31,7 +32,7 @@ object TimeTravelSpec {
   def create(
       timestamp: Option[Expression],
       version: Option[String],
-      conf: SQLConf) : Option[TimeTravelSpec] = {
+      sessionLocalTimeZone: String) : Option[TimeTravelSpec] = {
     if (timestamp.nonEmpty && version.nonEmpty) {
       throw QueryCompilationErrors.invalidTimeTravelSpecError()
     } else if (timestamp.nonEmpty) {
@@ -50,7 +51,7 @@ object TimeTravelSpec {
           throw QueryCompilationErrors.invalidTimestampExprForTimeTravel(
             "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts)
       }
-      val tz = Some(conf.sessionLocalTimeZone)
+      val tz = Some(sessionLocalTimeZone)
       // Set `ansiEnabled` to false, so that it can return null for invalid 
input and we can provide
       // better error message.
       val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval()
@@ -65,4 +66,35 @@ object TimeTravelSpec {
       None
     }
   }
+
+  def fromOptions(
+      options: CaseInsensitiveStringMap,
+      timestampKey: String,
+      versionKey: String,
+      sessionLocalTimeZone: String): Option[TimeTravelSpec] = {
+    (Option(options.get(timestampKey)), Option(options.get(versionKey))) match 
{
+      case (Some(_), Some(_)) =>
+        throw QueryCompilationErrors.invalidTimeTravelSpecError()
+
+      case (Some(timestampStr), None) =>
+        val timestampValue = Cast(
+          Literal(timestampStr),
+          TimestampType,
+          Some(sessionLocalTimeZone),
+          ansiEnabled = false
+        ).eval()
+        if (timestampValue == null) {
+          throw new AnalysisException(
+            "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
+            Map("expr" -> s"'$timestampStr'")
+          )
+        }
+        Some(AsOfTimestamp(timestampValue.asInstanceOf[Long]))
+
+      case (None, Some(versionStr)) =>
+        Some(AsOfVersion(versionStr))
+
+      case _ => None
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6d1aa076eb0..ae5ebd6a974 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3343,7 +3343,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 
   def invalidTimeTravelSpecError(): Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1334",
+      errorClass = "INVALID_TIME_TRAVEL_SPEC",
       messageParameters = Map.empty)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1e759b6266c..43dc541fbb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4474,6 +4474,20 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val TIME_TRAVEL_TIMESTAMP_KEY =
+    buildConf("spark.sql.timeTravelTimestampKey")
+      .doc("The option name to specify the time travel timestamp when reading 
a table.")
+      .version("4.0.0")
+      .stringConf
+      .createWithDefault("timestampAsOf")
+
+  val TIME_TRAVEL_VERSION_KEY =
+    buildConf("spark.sql.timeTravelVersionKey")
+      .doc("The option name to specify the time travel table version when 
reading a table.")
+      .version("4.0.0")
+      .stringConf
+      .createWithDefault("versionAsOf")
+
   val LEGACY_PERCENTILE_DISC_CALCULATION = 
buildConf("spark.sql.legacy.percentileDiscCalculation")
     .internal()
     .doc("If true, the old bogus percentile_disc calculation is used. The old 
calculation " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index e4853006157..c4e7bf23cac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -133,7 +133,8 @@ private[sql] object DataSourceV2Utils extends Logging {
         } else {
           None
         }
-        val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, 
timeTravelVersion, conf)
+        val timeTravel = TimeTravelSpec.create(
+          timeTravelTimestamp, timeTravelVersion, conf.sessionLocalTimeZone)
         (CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), 
Some(ident))
       case _ =>
         // TODO: Non-catalog paths for DSV2 are currently not well defined.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 3cbbc3786fc..c2e759efe40 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2902,10 +2902,14 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"INSERT INTO $t2 VALUES (3)")
       sql(s"INSERT INTO $t2 VALUES (4)")
 
-      assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect()
-        === Array(Row(1), Row(2)))
-      assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
-        === Array(Row(3), Row(4)))
+      val res1_sql = sql("SELECT * FROM t VERSION AS OF 
'Snapshot123456789'").collect()
+      assert(res1_sql === Array(Row(1), Row(2)))
+      val res1_df = spark.read.option("versionAsOf", 
"Snapshot123456789").table("t").collect()
+      assert(res1_df === Array(Row(1), Row(2)))
+      val res2_sql = sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
+      assert(res2_sql === Array(Row(3), Row(4)))
+      val res2_df = spark.read.option("versionAsOf", 
"2345678910").table("t").collect()
+      assert(res2_df === Array(Row(3), Row(4)))
     }
 
     val ts1 = DateTimeUtils.stringToTimestampAnsi(
@@ -2928,29 +2932,35 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"INSERT INTO $t4 VALUES (7)")
       sql(s"INSERT INTO $t4 VALUES (8)")
 
-      assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 
00:37:58'").collect()
-        === Array(Row(5), Row(6)))
-      assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 
00:00:00'").collect()
-        === Array(Row(7), Row(8)))
-      assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect()
-        === Array(Row(5), Row(6)))
-      assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect()
-        === Array(Row(7), Row(8)))
-      assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF 
$ts1InSeconds").collect()
-        === Array(Row(5), Row(6)))
-      assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF 
$ts2InSeconds").collect()
-        === Array(Row(7), Row(8)))
-      assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 
29)").collect()
-        === Array(Row(7), Row(8)))
-      assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 
00:00:00')").collect()
-        === Array(Row(7), Row(8)))
+      val res1_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 
00:37:58'").collect()
+      assert(res1_sql === Array(Row(5), Row(6)))
+      val res1_df = spark.read.option("timestampAsOf", "2019-01-29 
00:37:58").table("t").collect()
+      assert(res1_df === Array(Row(5), Row(6)))
+      val res2_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 
00:00:00'").collect()
+      assert(res2_sql === Array(Row(7), Row(8)))
+      val res2_df = spark.read.option("timestampAsOf", "2021-01-29 
00:00:00").table("t").collect()
+      assert(res2_df === Array(Row(7), Row(8)))
+
+      val res3 = sql(s"SELECT * FROM t TIMESTAMP AS OF 
$ts1InSeconds").collect()
+      assert(res3 === Array(Row(5), Row(6)))
+      val res4 = sql(s"SELECT * FROM t TIMESTAMP AS OF 
$ts2InSeconds").collect()
+      assert(res4 === Array(Row(7), Row(8)))
+      val res5 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF 
$ts1InSeconds").collect()
+      assert(res5 === Array(Row(5), Row(6)))
+      val res6 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF 
$ts2InSeconds").collect()
+      assert(res6 === Array(Row(7), Row(8)))
+      val res7 = sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 
29)").collect()
+      assert(res7 === Array(Row(7), Row(8)))
+      val res8 = sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 
00:00:00')")
+        .collect()
+      assert(res8 === Array(Row(7), Row(8)))
       // Scalar subquery is also supported.
-      assert(sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1, 
29))").collect()
-        === Array(Row(7), Row(8)))
+      val res9 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 
1, 29))").collect()
+      assert(res9 === Array(Row(7), Row(8)))
       // Nested subquery also works
-      assert(
-        sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 
1, 29)))").collect()
-        === Array(Row(7), Row(8)))
+      val res10 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT 
make_date(2021, 1, 29)))")
+        .collect()
+      assert(res10 === Array(Row(7), Row(8)))
 
       checkError(
         exception = intercept[AnalysisException] {
@@ -2967,6 +2977,23 @@ class DataSourceV2SQLSuiteV1Filter
         errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT",
         parameters = Map("expr" -> "\"abc\""))
 
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.read.option("timestampAsOf", "abc").table("t").collect()
+        },
+        errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
+        parameters = Map("expr" -> "'abc'"))
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.read
+            .option("timestampAsOf", "abc")
+            .option("versionAsOf", "1")
+            .table("t")
+            .collect()
+        },
+        errorClass = "INVALID_TIME_TRAVEL_SPEC")
+
       checkError(
         exception = intercept[AnalysisException] {
           sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to