This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 18000fd0e20 [SPARK-39633][SQL] Support timestamp in seconds for
TimeTravel using Dataframe options
18000fd0e20 is described below
commit 18000fd0e20787b44b930296556483f3fb419a8f
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Jun 30 17:16:32 2022 -0700
[SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using
Dataframe options
### What changes were proposed in this pull request?
Support timestamp in seconds for TimeTravel using Dataframe options
### Why are the changes needed?
To have a parity in doing TimeTravel via SQL and Dataframe option.
SPARK-SQL supports queries like :
```sql
SELECT * from {table} TIMESTAMP AS OF 1548751078
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new UTs for testing the behaviour.
Closes #37025 from singhpk234/fix/timetravel_df_options.
Authored-by: Prashant Singh <[email protected]>
Signed-off-by: huaxingao <[email protected]>
(cherry picked from commit 44e2657f3d511c25135c95dc3d584c540d227b5b)
Signed-off-by: huaxingao <[email protected]>
---
.../sql/execution/datasources/v2/DataSourceV2Utils.scala | 12 ++++++++++--
.../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 +++++++++++
.../spark/sql/connector/SupportsCatalogOptionsSuite.scala | 7 +++++++
3 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index f69a2a45886..7fd61c44fd1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
SessionConfigSuppo
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
private[sql] object DataSourceV2Utils extends Logging {
@@ -124,7 +124,15 @@ private[sql] object DataSourceV2Utils extends Logging {
val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
val timeTravelVersion = if (version.isPresent) Some(version.get) else
None
- val timeTravelTimestamp = if (timestamp.isPresent)
Some(Literal(timestamp.get)) else None
+ val timeTravelTimestamp = if (timestamp.isPresent) {
+ if (timestamp.get.forall(_.isDigit)) {
+ Some(Literal(timestamp.get.toLong, LongType))
+ } else {
+ Some(Literal(timestamp.get))
+ }
+ } else {
+ None
+ }
val timeTravel = TimeTravelSpec.create(timeTravelTimestamp,
timeTravelVersion, conf)
(CatalogV2Util.loadTable(catalog, ident, timeTravel).get,
Some(catalog), Some(ident))
case _ =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b64ed080d8b..675dd2807ca 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -21,6 +21,7 @@ import java.sql.Timestamp
import java.time.{Duration, LocalDate, Period}
import scala.collection.JavaConverters._
+import scala.concurrent.duration.MICROSECONDS
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -2691,6 +2692,8 @@ class DataSourceV2SQLSuite
val ts2 = DateTimeUtils.stringToTimestampAnsi(
UTF8String.fromString("2021-01-29 00:00:00"),
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+ val ts1InSeconds = MICROSECONDS.toSeconds(ts1).toString
+ val ts2InSeconds = MICROSECONDS.toSeconds(ts2).toString
val t3 = s"testcat.t$ts1"
val t4 = s"testcat.t$ts2"
@@ -2707,6 +2710,14 @@ class DataSourceV2SQLSuite
=== Array(Row(5), Row(6)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29
00:00:00'").collect
=== Array(Row(7), Row(8)))
+ assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect
+ === Array(Row(5), Row(6)))
+ assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect
+ === Array(Row(7), Row(8)))
+ assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts1InSeconds").collect
+ === Array(Row(5), Row(6)))
+ assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF
$ts2InSeconds").collect
+ === Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1,
29)").collect
=== Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29
00:00:00')").collect
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index 473f679b4b9..8d771b07367 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connector
import java.util.Optional
+import scala.concurrent.duration.MICROSECONDS
import scala.language.implicitConversions
import scala.util.Try
@@ -322,6 +323,12 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
checkAnswer(load("t", Some(catalogName), version = None,
timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
+
+ // load with timestamp in number format
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some(MICROSECONDS.toSeconds(ts1).toString)), df3.toDF())
+ checkAnswer(load("t", Some(catalogName), version = None,
+ timestamp = Some(MICROSECONDS.toSeconds(ts2).toString)), df4.toDF())
}
val e = intercept[AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]