This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 06abd06  [SPARK-27252][SQL] Make current_date() independent from time 
zones
06abd06 is described below

commit 06abd06112965cd73417ccceacdbd94b6b3d2793
Author: Maxim Gekk <[email protected]>
AuthorDate: Thu Mar 28 18:44:08 2019 -0700

    [SPARK-27252][SQL] Make current_date() independent from time zones
    
    ## What changes were proposed in this pull request?
    
    This makes the `CurrentDate` expression and `current_date` function 
independent from time zone settings. New result is number of days since epoch 
in `UTC` time zone. Previously, Spark shifted the current date (in `UTC` time 
zone) according the session time zone which violets definition of `DateType` - 
number of days since epoch (which is an absolute point in time, midnight of Jan 
1 1970 in UTC time).
    
    The changes makes `CurrentDate` consistent to `CurrentTimestamp` which is 
independent from time zone too.
    
    ## How was this patch tested?
    
    The changes were tested by existing test suites like `DateExpressionsSuite`.
    
    Closes #24185 from MaxGekk/current-date.
    
    Lead-authored-by: Maxim Gekk <[email protected]>
    Co-authored-by: Maxim Gekk <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 docs/sql-migration-guide-upgrade.md                |  4 +++-
 .../catalyst/expressions/datetimeExpressions.scala | 18 ++++++----------
 .../sql/catalyst/optimizer/finishAnalysis.scala    | 25 +++++++++++-----------
 .../expressions/DateExpressionsSuite.scala         |  6 +++---
 .../execution/streaming/MicroBatchExecution.scala  |  2 +-
 .../scala/org/apache/spark/sql/functions.scala     |  2 +-
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  2 +-
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 19 +++++++++-------
 8 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index b8597f0..3ba89c0 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -103,7 +103,9 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, the `current_timestamp` function returns 
a timestamp with millisecond resolution only. Since Spark 3.0, the function can 
return the result with microsecond resolution if the underlying clock available 
on the system offers such resolution.
 
-  - In Spark version 2.4 abd earlier, when reading a Hive Serde table with 
Spark native data sources(parquet/orc), Spark will infer the actual file schema 
and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer 
the schema anymore. This should not cause any problems to end users, but if it 
does, please set `spark.sql.hive.caseSensitiveInferenceMode` to 
`INFER_AND_SAVE`.
+  - In Spark version 2.4 and earlier, when reading a Hive Serde table with 
Spark native data sources(parquet/orc), Spark will infer the actual file schema 
and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer 
the schema anymore. This should not cause any problems to end users, but if it 
does, please set `spark.sql.hive.caseSensitiveInferenceMode` to 
`INFER_AND_SAVE`.
+
+  - In Spark version 2.4 and earlier, the `current_date` function returns the 
current date shifted according to the SQL config `spark.sql.session.timeZone`. 
Since Spark 3.0, the function always returns the current date in the `UTC` time 
zone.
 
   - Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the 
SQL config `spark.sql.session.timeZone`, and `DATE` literals are formatted 
using the UTC time zone. In Spark version 2.4 and earlier, both conversions use 
the default time zone of the Java virtual machine.
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 7878a87..3cda989 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.Timestamp
-import java.time.{Instant, LocalDate, ZoneId}
+import java.time.{Instant, LocalDate, ZoneId, ZoneOffset}
 import java.time.temporal.IsoFields
 import java.util.{Locale, TimeZone}
 
@@ -52,30 +52,26 @@ trait TimeZoneAwareExpression extends Expression {
   @transient lazy val zoneId: ZoneId = DateTimeUtils.getZoneId(timeZoneId.get)
 }
 
+// scalastyle:off line.size.limit
 /**
- * Returns the current date at the start of query evaluation.
+ * Returns the current date in the UTC time zone at the start of query 
evaluation.
  * All calls of current_date within the same query return the same value.
  *
  * There is no code generation since this expression should get constant 
folded by the optimizer.
  */
 @ExpressionDescription(
-  usage = "_FUNC_() - Returns the current date at the start of query 
evaluation.",
+  usage = "_FUNC_() - Returns the current date in the UTC time zone at the 
start of query evaluation.",
   since = "1.5.0")
-case class CurrentDate(timeZoneId: Option[String] = None)
-  extends LeafExpression with TimeZoneAwareExpression with CodegenFallback {
-
-  def this() = this(None)
+// scalastyle:on line.size.limit
+case class CurrentDate() extends LeafExpression with CodegenFallback {
 
   override def foldable: Boolean = true
   override def nullable: Boolean = false
 
   override def dataType: DataType = DateType
 
-  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
-    copy(timeZoneId = Option(timeZoneId))
-
   override def eval(input: InternalRow): Any = {
-    DateTimeUtils.millisToDays(System.currentTimeMillis(), timeZone)
+    LocalDate.now(ZoneOffset.UTC).toEpochDay.toInt
   }
 
   override def prettyName: String = "current_date"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 4094864..d0bf4ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -58,21 +58,20 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
  */
 object ComputeCurrentTime extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val currentDates = mutable.Map.empty[String, Literal]
-    val timeExpr = CurrentTimestamp()
-    val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
-    val currentTime = Literal.create(timestamp, timeExpr.dataType)
+    val currentDate = {
+      val dateExpr = CurrentDate()
+      val date = dateExpr.eval(EmptyRow).asInstanceOf[Int]
+      Literal.create(date, dateExpr.dataType)
+    }
+    val currentTimestamp = {
+      val timeExpr = CurrentTimestamp()
+      val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
+      Literal.create(timestamp, timeExpr.dataType)
+    }
 
     plan transformAllExpressions {
-      case CurrentDate(Some(timeZoneId)) =>
-        currentDates.getOrElseUpdate(timeZoneId, {
-          Literal.create(
-            DateTimeUtils.millisToDays(
-              MICROSECONDS.toMillis(timestamp),
-              DateTimeUtils.getTimeZone(timeZoneId)),
-            DateType)
-        })
-      case CurrentTimestamp() => currentTime
+      case CurrentDate() => currentDate
+      case CurrentTimestamp() => currentTimestamp
     }
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 61ee8f0..8823fe7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -54,12 +54,12 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 
   test("datetime function current_date") {
     val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), 
TimeZoneGMT)
-    val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
+    val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
     val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), 
TimeZoneGMT)
     assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
 
-    val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
-    val cdpst = CurrentDate(pstId).eval(EmptyRow).asInstanceOf[Int]
+    val cdjst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
+    val cdpst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
     assert(cdpst <= cd && cd <= cdjst)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index fdd80cc..59a4afb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -514,7 +514,7 @@ class MicroBatchExecution(
           ct.dataType, Some("Dummy TimeZoneId"))
       case cd: CurrentDate =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType, cd.timeZoneId)
+          cd.dataType, Some("UTC"))
     }
 
     val triggerLogicalPlan = sink match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index f99186c..d6d1f6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2572,7 +2572,7 @@ object functions {
   }
 
   /**
-   * Returns the current date as a date column.
+   * Returns the current date in the UTC time zone as a date column.
    *
    * @group datetime_funcs
    * @since 1.5.0
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 73259a0..d7cd15f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -720,7 +720,7 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
     assert(testData.groupBy(col("key")).toString.contains(
       "[grouping expressions: [key], value: [key: int, value: string], type: 
GroupBy]"))
     assert(testData.groupBy(current_date()).toString.contains(
-      "grouping expressions: [current_date(None)], value: [key: int, value: 
string], " +
+      "grouping expressions: [current_date()], value: [key: int, value: 
string], " +
         "type: GroupBy]"))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index b06d52d..a9435e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.time.LocalDate
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.unsafe.types.CalendarInterval
 
@@ -31,13 +33,14 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
   import testImplicits._
 
   test("function current_date") {
-    val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
-    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
-    val d1 = 
DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
-    val d2 = DateTimeUtils.fromJavaDate(
-      sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
-    val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
-    assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
+      val d0 = System.currentTimeMillis() / MILLIS_PER_DAY
+      val d1 = 
localDateToDays(df1.select(current_date()).collect().head.getAs[LocalDate](0))
+      val d2 = localDateToDays(sql("""SELECT 
CURRENT_DATE()""").collect().head.getAs[LocalDate](0))
+      val d3 = System.currentTimeMillis() / MILLIS_PER_DAY
+      assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
+    }
   }
 
   test("function current_timestamp and now") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to