This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9d4a864  [SPARK-31557][SQL] Fix timestamps rebasing in legacy parsers
9d4a864 is described below

commit 9d4a864e12df9a710468abd70e8a69a3508fec0f
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Thu Apr 30 12:45:32 2020 +0000

    [SPARK-31557][SQL] Fix timestamps rebasing in legacy parsers
    
    In the PR, I propose to fix two legacy timestamp formatter 
`LegacySimpleTimestampFormatter` and `LegacyFastTimestampFormatter` to perform 
micros rebasing in parsing/formatting from/to strings.
    
    Legacy timestamps formatters operate on the hybrid calendar (Julian + 
Gregorian), so, the input micros should be rebased to have the same date-time 
fields as in Proleptic Gregorian calendar used by Spark SQL, see SPARK-26651.
    
    Yes
    
    Added tests to `TimestampFormatterSuite`
    
    Closes #28408 from MaxGekk/fix-rebasing-in-legacy-timestamp-formatter.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit c09cfb9808d0e399b97781aae0da50332ba4b49b)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/util/TimestampFormatter.scala     | 15 ++++++----
 .../spark/sql/util/TimestampFormatterSuite.scala   | 33 ++++++++++++++++++++--
 2 files changed, 40 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 0c4fa17..47d6b47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util
 
+import java.sql.Timestamp
 import java.text.{ParseException, ParsePosition, SimpleDateFormat}
 import java.time._
 import java.time.format.{DateTimeFormatter, DateTimeParseException}
@@ -30,6 +31,7 @@ import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, 
LENIENT_SIMPLE_DATE_FORMAT}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.types.Decimal
@@ -154,12 +156,14 @@ class LegacyFastTimestampFormatter(
     }
     val micros = cal.getMicros()
     cal.set(Calendar.MILLISECOND, 0)
-    Math.addExact(fromMillis(cal.getTimeInMillis), micros)
+    val julianMicros = Math.addExact(fromMillis(cal.getTimeInMillis), micros)
+    rebaseJulianToGregorianMicros(julianMicros)
   }
 
   def format(timestamp: SQLTimestamp): String = {
-    cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * 
MILLIS_PER_SECOND)
-    cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND))
+    val julianMicros = rebaseGregorianToJulianMicros(timestamp)
+    cal.setTimeInMillis(Math.floorDiv(julianMicros, MICROS_PER_SECOND) * 
MILLIS_PER_SECOND)
+    cal.setMicros(Math.floorMod(julianMicros, MICROS_PER_SECOND))
     fastDateFormat.format(cal)
   }
 }
@@ -177,12 +181,11 @@ class LegacySimpleTimestampFormatter(
   }
 
   override def parse(s: String): Long = {
-    fromMillis(sdf.parse(s).getTime)
+    fromJavaTimestamp(new Timestamp(sdf.parse(s).getTime))
   }
 
   override def format(us: Long): String = {
-    val timestamp = DateTimeUtils.toJavaTimestamp(us)
-    sdf.format(timestamp)
+    sdf.format(toJavaTimestamp(us))
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
index b2c3924..5d27a6b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
@@ -24,9 +24,11 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
LegacyDateFormats, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{CET, PST, UTC}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.unsafe.types.UTF8String
 
 class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with 
Matchers {
@@ -260,6 +262,33 @@ class TimestampFormatterSuite extends SparkFunSuite with 
SQLHelper with Matchers
       LocalDateTime.of(-1233, 2, 22, 2, 22, 22).toInstant(ZoneOffset.UTC)))
     assert(formatter2.parse("AD 1234-02-22 02:22:22") === instantToMicros(
       LocalDateTime.of(1234, 2, 22, 2, 22, 22).toInstant(ZoneOffset.UTC)))
+  }
+
+  test("SPARK-31557: rebasing in legacy formatters/parsers") {
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> 
LegacyBehaviorPolicy.LEGACY.toString) {
+      LegacyDateFormats.values.foreach { legacyFormat =>
+        DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+          withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
+            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+              withClue(s"${zoneId.getId} legacyFormat = $legacyFormat") {
+                val formatter = TimestampFormatter(
+                  TimestampFormatter.defaultPattern,
+                  zoneId,
+                  TimestampFormatter.defaultLocale,
+                  legacyFormat,
+                  needVarLengthSecondFraction = false)
+                assert(microsToInstant(formatter.parse("1000-01-01 01:02:03"))
+                  .atZone(zoneId)
+                  .toLocalDateTime === LocalDateTime.of(1000, 1, 1, 1, 2, 3))
 
+                assert(formatter.format(instantToMicros(
+                  LocalDateTime.of(1000, 1, 1, 1, 2, 3)
+                    .atZone(zoneId).toInstant)) === "1000-01-01 01:02:03")
+              }
+            }
+          }
+        }
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to