Repository: spark
Updated Branches:
  refs/heads/master 06a3b6aaf -> f982ca07e


[SPARK-26178][SQL] Use java.time API for parsing timestamps and dates from CSV

## What changes were proposed in this pull request?

In the PR, I propose to use **java.time API** for parsing timestamps and dates 
from CSV content with microseconds precision. The SQL config 
`spark.sql.legacy.timeParser.enabled` allow to switch back to previous 
behaviour with using `java.text.SimpleDateFormat`/`FastDateFormat` for 
parsing/generating timestamps/dates.

## How was this patch tested?

It was tested by `UnivocityParserSuite`, `CsvExpressionsSuite`, 
`CsvFunctionsSuite` and `CsvSuite`.

Closes #23150 from MaxGekk/time-parser.

Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f982ca07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f982ca07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f982ca07

Branch: refs/heads/master
Commit: f982ca07e80074bdc1e3b742c5e21cf368e4ede2
Parents: 06a3b6a
Author: Maxim Gekk <[email protected]>
Authored: Tue Dec 4 08:36:33 2018 -0600
Committer: Sean Owen <[email protected]>
Committed: Tue Dec 4 08:36:33 2018 -0600

----------------------------------------------------------------------
 docs/sql-migration-guide-upgrade.md             |   2 +
 .../spark/sql/catalyst/csv/CSVInferSchema.scala |  15 +-
 .../spark/sql/catalyst/csv/CSVOptions.scala     |  10 +-
 .../sql/catalyst/csv/UnivocityGenerator.scala   |  14 +-
 .../sql/catalyst/csv/UnivocityParser.scala      |  38 ++--
 .../sql/catalyst/util/DateTimeFormatter.scala   | 179 +++++++++++++++++++
 .../spark/sql/catalyst/util/DateTimeUtils.scala |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 +
 .../sql/catalyst/csv/CSVInferSchemaSuite.scala  |   7 +-
 .../sql/catalyst/csv/UnivocityParserSuite.scala | 113 ++++++------
 .../sql/catalyst/util/DateTimeTestUtils.scala   |   5 +-
 .../spark/sql/util/DateTimeFormatterSuite.scala | 103 +++++++++++
 .../apache/spark/sql/CsvFunctionsSuite.scala    |   2 +-
 .../execution/datasources/csv/CSVSuite.scala    |  66 +++----
 14 files changed, 431 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index 787f4bc..fee0e6d 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Spark applications which are built with Spark version 2.4 and prior, and 
call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, 
as they are not binary compatible with Spark 3.0.
 
+  - Since Spark 3.0, CSV datasource uses java.time API for parsing and 
generating CSV content. New formatting implementation supports date/timestamp 
patterns conformed to ISO 8601. To switch back to the implementation used in 
Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
+
 ## Upgrading From Spark SQL 2.3 to 2.4
 
   - In Spark version 2.3 and earlier, the second parameter to array_contains 
function is implicitly promoted to the element type of first array type 
parameter. This type promotion can be lossy and may cause `array_contains` 
function to return wrong result. This problem has been addressed in 2.4 by 
employing a safer type promotion mechanism. This can cause some change in 
behavior and are illustrated in the table below.

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 94cb4b1..345dc4d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -22,10 +22,16 @@ import scala.util.control.Exception.allCatch
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeFormatter
 import org.apache.spark.sql.types._
 
-class CSVInferSchema(options: CSVOptions) extends Serializable {
+class CSVInferSchema(val options: CSVOptions) extends Serializable {
+
+  @transient
+  private lazy val timeParser = DateTimeFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
 
   private val decimalParser = {
     ExprUtils.getDecimalParser(options.locale)
@@ -154,10 +160,7 @@ class CSVInferSchema(options: CSVOptions) extends 
Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
     // This case infers a custom `dataFormat` is set.
-    if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
-      TimestampType
-    } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
-      // We keep this for backwards compatibility.
+    if ((allCatch opt timeParser.parse(field)).isDefined) {
       TimestampType
     } else {
       tryParseBoolean(field)

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 94bdb72..90c96d1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
 import java.util.{Locale, TimeZone}
 
 import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, 
UnescapedQuoteHandling}
-import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
@@ -146,13 +145,10 @@ class CSVOptions(
   // A language tag in IETF BCP 47 format
   val locale: Locale = 
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
 
-  // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), locale)
+  val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
 
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, locale)
+  val timestampFormat: String =
+    parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 2ab376c..af09cd6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -22,7 +22,7 @@ import java.io.Writer
 import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
 import org.apache.spark.sql.types._
 
 class UnivocityGenerator(
@@ -41,14 +41,18 @@ class UnivocityGenerator(
   private val valueConverters: Array[ValueConverter] =
     schema.map(_.dataType).map(makeConverter).toArray
 
+  private val timeFormatter = DateTimeFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, 
options.timeZone, options.locale)
+
   private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
     case DateType =>
-      (row: InternalRow, ordinal: Int) =>
-        
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+      (row: InternalRow, ordinal: Int) => 
dateFormatter.format(row.getInt(ordinal))
 
     case TimestampType =>
-      (row: InternalRow, ordinal: Int) =>
-        
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+      (row: InternalRow, ordinal: Int) => 
timeFormatter.format(row.getLong(ordinal))
 
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 8fff4b0..0f375e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.csv
 
 import java.io.InputStream
 
-import scala.util.Try
 import scala.util.control.NonFatal
 
 import com.univocity.parsers.csv.CsvParser
@@ -27,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericInternalRow}
-import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, 
FailureSafeParser}
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -75,6 +74,12 @@ class UnivocityParser(
 
   private val row = new GenericInternalRow(requiredSchema.length)
 
+  private val timeFormatter = DateTimeFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, 
options.timeZone, options.locale)
+
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
     
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
@@ -100,7 +105,7 @@ class UnivocityParser(
   //
   //   output row - ["A", 2]
   private val valueConverters: Array[ValueConverter] = {
-    requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, 
options)).toArray
+    requiredSchema.map(f => makeConverter(f.name, f.dataType, 
f.nullable)).toArray
   }
 
   private val decimalParser = ExprUtils.getDecimalParser(options.locale)
@@ -115,8 +120,7 @@ class UnivocityParser(
   def makeConverter(
       name: String,
       dataType: DataType,
-      nullable: Boolean = true,
-      options: CSVOptions): ValueConverter = dataType match {
+      nullable: Boolean = true): ValueConverter = dataType match {
     case _: ByteType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(_.toByte)
 
@@ -154,34 +158,16 @@ class UnivocityParser(
       }
 
     case _: TimestampType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.
-        Try(options.timestampFormat.parse(datum).getTime * 1000L)
-          .getOrElse {
-          // If it fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
-          // compatibility.
-          DateTimeUtils.stringToTime(datum).getTime * 1000L
-        }
-      }
+      nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
 
     case _: DateType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.x
-        
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
-          .getOrElse {
-          // If it fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
-          // compatibility.
-          DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
-        }
-      }
+      nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
 
     case _: StringType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(UTF8String.fromString)
 
     case udt: UserDefinedType[_] => (datum: String) =>
-      makeConverter(name, udt.sqlType, nullable, options)
+      makeConverter(name, udt.sqlType, nullable)
 
     // We don't actually hit this exception though, we keep it for 
understandability
     case _ => throw new RuntimeException(s"Unsupported type: 
${dataType.typeName}")

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
new file mode 100644
index 0000000..ad1f413
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time._
+import java.time.format.DateTimeFormatterBuilder
+import java.time.temporal.{ChronoField, TemporalQueries}
+import java.util.{Locale, TimeZone}
+
+import scala.util.Try
+
+import org.apache.commons.lang3.time.FastDateFormat
+
+import org.apache.spark.sql.internal.SQLConf
+
+sealed trait DateTimeFormatter {
+  def parse(s: String): Long // returns microseconds since epoch
+  def format(us: Long): String
+}
+
+class Iso8601DateTimeFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends DateTimeFormatter {
+  val formatter = new DateTimeFormatterBuilder()
+    .appendPattern(pattern)
+    .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
+    .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+    .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+    .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+    .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+    .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+    .toFormatter(locale)
+
+  def toInstant(s: String): Instant = {
+    val temporalAccessor = formatter.parse(s)
+    if (temporalAccessor.query(TemporalQueries.offset()) == null) {
+      val localDateTime = LocalDateTime.from(temporalAccessor)
+      val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
+      Instant.from(zonedDateTime)
+    } else {
+      Instant.from(temporalAccessor)
+    }
+  }
+
+  private def instantToMicros(instant: Instant): Long = {
+    val sec = Math.multiplyExact(instant.getEpochSecond, 
DateTimeUtils.MICROS_PER_SECOND)
+    val result = Math.addExact(sec, instant.getNano / 
DateTimeUtils.NANOS_PER_MICROS)
+    result
+  }
+
+  def parse(s: String): Long = instantToMicros(toInstant(s))
+
+  def format(us: Long): String = {
+    val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
+    val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
+    val instant = Instant.ofEpochSecond(secs, mos * 
DateTimeUtils.NANOS_PER_MICROS)
+
+    formatter.withZone(timeZone.toZoneId).format(instant)
+  }
+}
+
+class LegacyDateTimeFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends DateTimeFormatter {
+  val format = FastDateFormat.getInstance(pattern, timeZone, locale)
+
+  protected def toMillis(s: String): Long = format.parse(s).getTime
+
+  def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS
+
+  def format(us: Long): String = {
+    format.format(DateTimeUtils.toJavaTimestamp(us))
+  }
+}
+
+class LegacyFallbackDateTimeFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) 
{
+  override def toMillis(s: String): Long = {
+    Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
+  }
+}
+
+object DateTimeFormatter {
+  def apply(format: String, timeZone: TimeZone, locale: Locale): 
DateTimeFormatter = {
+    if (SQLConf.get.legacyTimeParserEnabled) {
+      new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
+    } else {
+      new Iso8601DateTimeFormatter(format, timeZone, locale)
+    }
+  }
+}
+
+sealed trait DateFormatter {
+  def parse(s: String): Int // returns days since epoch
+  def format(days: Int): String
+}
+
+class Iso8601DateFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends DateFormatter {
+
+  val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, 
locale)
+
+  override def parse(s: String): Int = {
+    val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
+    val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
+
+    days.toInt
+  }
+
+  override def format(days: Int): String = {
+    val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
+    dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
+  }
+}
+
+class LegacyDateFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends DateFormatter {
+  val format = FastDateFormat.getInstance(pattern, timeZone, locale)
+
+  def parse(s: String): Int = {
+    val milliseconds = format.parse(s).getTime
+    DateTimeUtils.millisToDays(milliseconds)
+  }
+
+  def format(days: Int): String = {
+    val date = DateTimeUtils.toJavaDate(days)
+    format.format(date)
+  }
+}
+
+class LegacyFallbackDateFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
+  override def parse(s: String): Int = {
+    Try(super.parse(s)).orElse {
+      // If it fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
+      // compatibility.
+      Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
+    }.getOrElse {
+      // In Spark 1.5.0, we store the data as number of days since epoch in 
string.
+      // So, we just convert it to Int.
+      s.toInt
+    }
+  }
+}
+
+object DateFormatter {
+  def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter 
= {
+    if (SQLConf.get.legacyTimeParserEnabled) {
+      new LegacyFallbackDateFormatter(format, timeZone, locale)
+    } else {
+      new Iso8601DateFormatter(format, timeZone, locale)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 5ae75dc..c6dfdbf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -50,7 +50,7 @@ object DateTimeUtils {
   final val MILLIS_PER_SECOND = 1000L
   final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
   final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
-
+  final val NANOS_PER_MICROS = 1000L
   final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
 
   // number of days in 400 years

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c4f00d7..451b051 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1618,6 +1618,13 @@ object SQLConf {
         "a SparkConf entry.")
       .booleanConf
       .createWithDefault(true)
+
+  val LEGACY_TIME_PARSER_ENABLED = 
buildConf("spark.sql.legacy.timeParser.enabled")
+    .doc("When set to true, java.text.SimpleDateFormat is used for formatting 
and parsing " +
+      " dates/timestamps in a locale-sensitive manner. When set to false, 
classes from " +
+      "java.time.* packages are used for the same purpose.")
+    .booleanConf
+    .createWithDefault(false)
 }
 
 /**
@@ -2040,6 +2047,8 @@ class SQLConf extends Serializable with Logging {
 
   def setCommandRejectsSparkConfs: Boolean = 
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CONFS)
 
+  def legacyTimeParserEnabled: Boolean = 
getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index 1a020e6..c2b525a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -22,13 +22,12 @@ import java.util.Locale
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
-class CSVInferSchemaSuite extends SparkFunSuite  with SQLHelper {
+class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
 
   test("String fields types are inferred correctly from null types") {
-    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
+    val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd 
HH:mm:ss"), false, "GMT")
     val inferSchema = new CSVInferSchema(options)
 
     assert(inferSchema.inferField(NullType, "") == NullType)
@@ -48,7 +47,7 @@ class CSVInferSchemaSuite extends SparkFunSuite  with 
SQLHelper {
   }
 
   test("String fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
+    val options = new CSVOptions(Map("timestampFormat" -> "yyyy-MM-dd 
HH:mm:ss"), false, "GMT")
     val inferSchema = new CSVInferSchema(options)
 
     assert(inferSchema.inferField(LongType, "1.0") == DoubleType)

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 7212402..2d0b0d3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -19,20 +19,17 @@ package org.apache.spark.sql.catalyst.csv
 
 import java.math.BigDecimal
 import java.text.{DecimalFormat, DecimalFormatSymbols}
-import java.util.Locale
+import java.util.{Locale, TimeZone}
+
+import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
-  private val parser = new UnivocityParser(
-    StructType(Seq.empty),
-    new CSVOptions(Map.empty[String, String], false, "GMT"))
-
   private def assertNull(v: Any) = assert(v == null)
 
   test("Can parse decimal type values") {
@@ -43,7 +40,8 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
     stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
       val decimalValue = new BigDecimal(decimalVal.toString)
       val options = new CSVOptions(Map.empty[String, String], false, "GMT")
-      assert(parser.makeConverter("_1", decimalType, options = 
options).apply(strVal) ===
+      val parser = new UnivocityParser(StructType(Seq.empty), options)
+      assert(parser.makeConverter("_1", decimalType).apply(strVal) ===
         Decimal(decimalValue, decimalType.precision, decimalType.scale))
     }
   }
@@ -56,22 +54,23 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
     types.foreach { t =>
       // Tests that a custom nullValue.
       val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, 
"GMT")
-      val converter =
-        parser.makeConverter("_1", t, nullable = true, options = 
nullValueOptions)
+      var parser = new UnivocityParser(StructType(Seq.empty), nullValueOptions)
+      val converter = parser.makeConverter("_1", t, nullable = true)
       assertNull(converter.apply("-"))
       assertNull(converter.apply(null))
 
       // Tests that the default nullValue is empty string.
       val options = new CSVOptions(Map.empty[String, String], false, "GMT")
-      assertNull(parser.makeConverter("_1", t, nullable = true, options = 
options).apply(""))
+      parser = new UnivocityParser(StructType(Seq.empty), options)
+      assertNull(parser.makeConverter("_1", t, nullable = true).apply(""))
     }
 
     // Not nullable field with nullValue option.
     types.foreach { t =>
       // Casts a null to not nullable field should throw an exception.
       val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT")
-      val converter =
-        parser.makeConverter("_1", t, nullable = false, options = options)
+      val parser = new UnivocityParser(StructType(Seq.empty), options)
+      val converter = parser.makeConverter("_1", t, nullable = false)
       var message = intercept[RuntimeException] {
         converter.apply("-")
       }.getMessage
@@ -86,62 +85,74 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
     // null.
     Seq(true, false).foreach { b =>
       val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT")
-      val converter =
-        parser.makeConverter("_1", StringType, nullable = b, options = options)
+      val parser = new UnivocityParser(StructType(Seq.empty), options)
+      val converter = parser.makeConverter("_1", StringType, nullable = b)
       assert(converter.apply("") == UTF8String.fromString(""))
     }
   }
 
   test("Throws exception for empty string with non null type") {
-      val options = new CSVOptions(Map.empty[String, String], false, "GMT")
+    val options = new CSVOptions(Map.empty[String, String], false, "GMT")
+    val parser = new UnivocityParser(StructType(Seq.empty), options)
     val exception = intercept[RuntimeException]{
-      parser.makeConverter("_1", IntegerType, nullable = false, options = 
options).apply("")
+      parser.makeConverter("_1", IntegerType, nullable = false).apply("")
     }
     assert(exception.getMessage.contains("null value found but field _1 is not 
nullable."))
   }
 
   test("Types are cast correctly") {
     val options = new CSVOptions(Map.empty[String, String], false, "GMT")
-    assert(parser.makeConverter("_1", ByteType, options = options).apply("10") 
== 10)
-    assert(parser.makeConverter("_1", ShortType, options = 
options).apply("10") == 10)
-    assert(parser.makeConverter("_1", IntegerType, options = 
options).apply("10") == 10)
-    assert(parser.makeConverter("_1", LongType, options = options).apply("10") 
== 10)
-    assert(parser.makeConverter("_1", FloatType, options = 
options).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", DoubleType, options = 
options).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", BooleanType, options = 
options).apply("true") == true)
-
-    val timestampsOptions =
+    var parser = new UnivocityParser(StructType(Seq.empty), options)
+    assert(parser.makeConverter("_1", ByteType).apply("10") == 10)
+    assert(parser.makeConverter("_1", ShortType).apply("10") == 10)
+    assert(parser.makeConverter("_1", IntegerType).apply("10") == 10)
+    assert(parser.makeConverter("_1", LongType).apply("10") == 10)
+    assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", BooleanType).apply("true") == true)
+
+    var timestampsOptions =
       new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, 
"GMT")
+    parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions)
     val customTimestamp = "31/01/2015 00:00"
-    val expectedTime = 
timestampsOptions.timestampFormat.parse(customTimestamp).getTime
-    val castedTimestamp =
-      parser.makeConverter("_1", TimestampType, nullable = true, options = 
timestampsOptions)
+    var format = FastDateFormat.getInstance(
+      timestampsOptions.timestampFormat, timestampsOptions.timeZone, 
timestampsOptions.locale)
+    val expectedTime = format.parse(customTimestamp).getTime
+    val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = 
true)
         .apply(customTimestamp)
     assert(castedTimestamp == expectedTime * 1000L)
 
     val customDate = "31/01/2015"
     val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, 
"GMT")
-    val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
-    val castedDate =
-      parser.makeConverter("_1", DateType, nullable = true, options = 
dateOptions)
-        .apply(customTimestamp)
-    assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
+    parser = new UnivocityParser(StructType(Seq.empty), dateOptions)
+    format = FastDateFormat.getInstance(
+      dateOptions.dateFormat, dateOptions.timeZone, dateOptions.locale)
+    val expectedDate = format.parse(customDate).getTime
+    val castedDate = parser.makeConverter("_1", DateType, nullable = true)
+        .apply(customDate)
+    assert(castedDate == DateTimeUtils.millisToDays(expectedDate, 
TimeZone.getTimeZone("GMT")))
 
     val timestamp = "2015-01-01 00:00:00"
-    assert(parser.makeConverter("_1", TimestampType, options = 
options).apply(timestamp) ==
-      DateTimeUtils.stringToTime(timestamp).getTime  * 1000L)
-    assert(parser.makeConverter("_1", DateType, options = 
options).apply("2015-01-01") ==
-      
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime))
+    timestampsOptions = new CSVOptions(Map(
+      "timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
+      "dateFormat" -> "yyyy-MM-dd"), false, "UTC")
+    parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions)
+    val expected = 1420070400 * DateTimeUtils.MICROS_PER_SECOND
+    assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
+      expected)
+    assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
+      expected / DateTimeUtils.MICROS_PER_DAY)
   }
 
   test("Throws exception for casting an invalid string to Float and Double 
Types") {
     val options = new CSVOptions(Map.empty[String, String], false, "GMT")
+    val parser = new UnivocityParser(StructType(Seq.empty), options)
     val types = Seq(DoubleType, FloatType)
     val input = Seq("10u000", "abc", "1 2/3")
     types.foreach { dt =>
       input.foreach { v =>
         val message = intercept[NumberFormatException] {
-          parser.makeConverter("_1", dt, options = options).apply(v)
+          parser.makeConverter("_1", dt).apply(v)
         }.getMessage
         assert(message.contains(v))
       }
@@ -150,9 +161,9 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
 
   test("Float NaN values are parsed correctly") {
     val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT")
+    val parser = new UnivocityParser(StructType(Seq.empty), options)
     val floatVal: Float = parser.makeConverter(
-      "_1", FloatType, nullable = true, options = options
-    ).apply("nn").asInstanceOf[Float]
+      "_1", FloatType, nullable = true).apply("nn").asInstanceOf[Float]
 
     // Java implements the IEEE-754 floating point standard which guarantees 
that any comparison
     // against NaN will return false (except != which returns true)
@@ -161,41 +172,41 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
 
   test("Double NaN values are parsed correctly") {
     val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT")
+    val parser = new UnivocityParser(StructType(Seq.empty), options)
     val doubleVal: Double = parser.makeConverter(
-      "_1", DoubleType, nullable = true, options = options
-    ).apply("-").asInstanceOf[Double]
+      "_1", DoubleType, nullable = true).apply("-").asInstanceOf[Double]
 
     assert(doubleVal.isNaN)
   }
 
   test("Float infinite values can be parsed") {
     val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), 
false, "GMT")
+    var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions)
     val floatVal1 = parser.makeConverter(
-      "_1", FloatType, nullable = true, options = negativeInfOptions
-    ).apply("max").asInstanceOf[Float]
+      "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float]
 
     assert(floatVal1 == Float.NegativeInfinity)
 
     val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), 
false, "GMT")
+    parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions)
     val floatVal2 = parser.makeConverter(
-      "_1", FloatType, nullable = true, options = positiveInfOptions
-    ).apply("max").asInstanceOf[Float]
+      "_1", FloatType, nullable = true).apply("max").asInstanceOf[Float]
 
     assert(floatVal2 == Float.PositiveInfinity)
   }
 
   test("Double infinite values can be parsed") {
     val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), 
false, "GMT")
+    var parser = new UnivocityParser(StructType(Seq.empty), negativeInfOptions)
     val doubleVal1 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, options = negativeInfOptions
-    ).apply("max").asInstanceOf[Double]
+      "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double]
 
     assert(doubleVal1 == Double.NegativeInfinity)
 
     val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), 
false, "GMT")
+    parser = new UnivocityParser(StructType(Seq.empty), positiveInfOptions)
     val doubleVal2 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, options = positiveInfOptions
-    ).apply("max").asInstanceOf[Double]
+      "_1", DoubleType, nullable = true).apply("max").asInstanceOf[Double]
 
     assert(doubleVal2 == Double.PositiveInfinity)
   }
@@ -211,7 +222,7 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
       val options = new CSVOptions(Map("locale" -> langTag), false, "GMT")
       val parser = new UnivocityParser(new StructType().add("d", decimalType), 
options)
 
-      assert(parser.makeConverter("_1", decimalType, options = 
options).apply(input) === expected)
+      assert(parser.makeConverter("_1", decimalType).apply(input) === expected)
     }
 
     Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing)

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
index dfa0fe9..66d8d28 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -26,7 +26,7 @@ object DateTimeTestUtils {
 
   val ALL_TIMEZONES: Seq[TimeZone] = 
TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone)
 
-  val outstandingTimezones: Seq[TimeZone] = Seq(
+  val outstandingTimezonesIds: Seq[String] = Seq(
     "UTC",
     "PST",
     "CET",
@@ -34,7 +34,8 @@ object DateTimeTestUtils {
     "America/Los_Angeles",
     "Antarctica/Vostok",
     "Asia/Hong_Kong",
-    "Europe/Amsterdam").map(TimeZone.getTimeZone)
+    "Europe/Amsterdam")
+  val outstandingTimezones: Seq[TimeZone] = 
outstandingTimezonesIds.map(TimeZone.getTimeZone)
 
   def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = {
     val originalDefaultTimeZone = TimeZone.getDefault

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
new file mode 100644
index 0000000..02d4ee0
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, 
DateTimeTestUtils}
+
+class DateTimeFormatterSuite  extends SparkFunSuite {
+  test("parsing dates using time zones") {
+    val localDate = "2018-12-02"
+    val expectedDays = Map(
+      "UTC" -> 17867,
+      "PST" -> 17867,
+      "CET" -> 17866,
+      "Africa/Dakar" -> 17867,
+      "America/Los_Angeles" -> 17867,
+      "Antarctica/Vostok" -> 17866,
+      "Asia/Hong_Kong" -> 17866,
+      "Europe/Amsterdam" -> 17866)
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = DateFormatter("yyyy-MM-dd", 
TimeZone.getTimeZone(timeZone), Locale.US)
+      val daysSinceEpoch = formatter.parse(localDate)
+      assert(daysSinceEpoch === expectedDays(timeZone))
+    }
+  }
+
+  test("parsing timestamps using time zones") {
+    val localDate = "2018-12-02T10:11:12.001234"
+    val expectedMicros = Map(
+      "UTC" -> 1543745472001234L,
+      "PST" -> 1543774272001234L,
+      "CET" -> 1543741872001234L,
+      "Africa/Dakar" -> 1543745472001234L,
+      "America/Los_Angeles" -> 1543774272001234L,
+      "Antarctica/Vostok" -> 1543723872001234L,
+      "Asia/Hong_Kong" -> 1543716672001234L,
+      "Europe/Amsterdam" -> 1543741872001234L)
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = DateTimeFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val microsSinceEpoch = formatter.parse(localDate)
+      assert(microsSinceEpoch === expectedMicros(timeZone))
+    }
+  }
+
+  test("format dates using time zones") {
+    val daysSinceEpoch = 17867
+    val expectedDate = Map(
+      "UTC" -> "2018-12-02",
+      "PST" -> "2018-12-01",
+      "CET" -> "2018-12-02",
+      "Africa/Dakar" -> "2018-12-02",
+      "America/Los_Angeles" -> "2018-12-01",
+      "Antarctica/Vostok" -> "2018-12-02",
+      "Asia/Hong_Kong" -> "2018-12-02",
+      "Europe/Amsterdam" -> "2018-12-02")
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = DateFormatter("yyyy-MM-dd", 
TimeZone.getTimeZone(timeZone), Locale.US)
+      val date = formatter.format(daysSinceEpoch)
+      assert(date === expectedDate(timeZone))
+    }
+  }
+
+  test("format timestamps using time zones") {
+    val microsSinceEpoch = 1543745472001234L
+    val expectedTimestamp = Map(
+      "UTC" -> "2018-12-02T10:11:12.001234",
+      "PST" -> "2018-12-02T02:11:12.001234",
+      "CET" -> "2018-12-02T11:11:12.001234",
+      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = DateTimeFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val timestamp = formatter.format(microsSinceEpoch)
+      assert(timestamp === expectedTimestamp(timeZone))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 537d13b1..6b67fcc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -53,7 +53,7 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSQLContext {
   test("checking the columnNameOfCorruptRecord option") {
     val columnNameOfCorruptRecord = "_unparsed"
     val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS()
-    val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
+    val schema = new StructType().add("a", IntegerType).add("b", DateType)
     val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, 
StringType)
     val df2 = df
       .select(from_csv($"value", schemaWithCorrField1, Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/f982ca07/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index c927319..3b977d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -586,6 +586,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils with Te
     val results = spark.read
       .format("csv")
       .options(Map("comment" -> "~", "header" -> "false", "inferSchema" -> 
"true"))
+      .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
       .load(testFile(commentsFile))
       .collect()
 
@@ -622,10 +623,11 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     val options = Map(
       "header" -> "true",
       "inferSchema" -> "false",
-      "dateFormat" -> "dd/MM/yyyy hh:mm")
+      "dateFormat" -> "dd/MM/yyyy HH:mm")
     val results = spark.read
       .format("csv")
       .options(options)
+      .option("timeZone", "UTC")
       .schema(customSchema)
       .load(testFile(datesFile))
       .select("date")
@@ -893,36 +895,38 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
   }
 
   test("Write dates correctly in ISO8601 format by default") {
-    withTempDir { dir =>
-      val customSchema = new StructType(Array(StructField("date", DateType, 
true)))
-      val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
-      val dates = spark.read
-        .format("csv")
-        .schema(customSchema)
-        .option("header", "true")
-        .option("inferSchema", "false")
-        .option("dateFormat", "dd/MM/yyyy HH:mm")
-        .load(testFile(datesFile))
-      dates.write
-        .format("csv")
-        .option("header", "true")
-        .save(iso8601datesPath)
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+      withTempDir { dir =>
+        val customSchema = new StructType(Array(StructField("date", DateType, 
true)))
+        val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+        val dates = spark.read
+          .format("csv")
+          .schema(customSchema)
+          .option("header", "true")
+          .option("inferSchema", "false")
+          .option("dateFormat", "dd/MM/yyyy HH:mm")
+          .load(testFile(datesFile))
+        dates.write
+          .format("csv")
+          .option("header", "true")
+          .save(iso8601datesPath)
 
-      // This will load back the dates as string.
-      val stringSchema = StructType(StructField("date", StringType, true) :: 
Nil)
-      val iso8601dates = spark.read
-        .format("csv")
-        .schema(stringSchema)
-        .option("header", "true")
-        .load(iso8601datesPath)
+        // This will load back the dates as string.
+        val stringSchema = StructType(StructField("date", StringType, true) :: 
Nil)
+        val iso8601dates = spark.read
+          .format("csv")
+          .schema(stringSchema)
+          .option("header", "true")
+          .load(iso8601datesPath)
+
+        val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US)
+        val expectedDates = dates.collect().map { r =>
+          // This should be ISO8601 formatted string.
+          Row(iso8501.format(r.toSeq.head))
+        }
 
-      val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US)
-      val expectedDates = dates.collect().map { r =>
-        // This should be ISO8601 formatted string.
-        Row(iso8501.format(r.toSeq.head))
+        checkAnswer(iso8601dates, expectedDates)
       }
-
-      checkAnswer(iso8601dates, expectedDates)
     }
   }
 
@@ -1107,7 +1111,7 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
 
   test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` 
field") {
     Seq(false, true).foreach { multiLine =>
-      val schema = new StructType().add("a", IntegerType).add("b", 
TimestampType)
+      val schema = new StructType().add("a", IntegerType).add("b", DateType)
       // We use `PERMISSIVE` mode by default if invalid string is given.
       val df1 = spark
         .read
@@ -1139,7 +1143,7 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
       val schemaWithCorrField2 = new StructType()
         .add("a", IntegerType)
         .add(columnNameOfCorruptRecord, StringType)
-        .add("b", TimestampType)
+        .add("b", DateType)
       val df3 = spark
         .read
         .option("mode", "permissive")
@@ -1325,7 +1329,7 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     val columnNameOfCorruptRecord = "_corrupt_record"
     val schema = new StructType()
       .add("a", IntegerType)
-      .add("b", TimestampType)
+      .add("b", DateType)
       .add(columnNameOfCorruptRecord, StringType)
     // negative cases
     val msg = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to