Repository: spark
Updated Branches:
  refs/heads/master 6a9a85b84 -> 865b2fd84


[SPARK-18937][SQL] Timezone support in CSV/JSON parsing

## What changes were proposed in this pull request?

This is a follow-up pr of #16308.

This pr enables timezone support in CSV/JSON parsing.

We should introduce `timeZone` option for CSV/JSON datasources (the default 
value of the option is session local timezone).

The datasources should use the `timeZone` option to format/parse to write/read 
timestamp values.
Notice that while reading, if the timestampFormat has the timezone info, the 
timezone will not be used because we should respect the timezone in the values.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values 
written with the default timezone option, which is `"GMT"` because session 
local timezone is `"GMT"` here, are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> df.write.json("/path/to/gmtjson")
```

```sh
$ cat /path/to/gmtjson/part-*
{"ts":"2016-01-01T00:00:00.000Z"}
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", "PST").json("/path/to/pstjson")
```

```sh
$ cat /path/to/pstjson/part-*
{"ts":"2015-12-31T16:00:00.000-08:00"}
```

We can properly read these files even if the timezone option is wrong because 
the timestamp values have timezone info:

```scala
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(ts,TimestampType,true))

scala> spark.read.schema(schema).json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+

scala> spark.read.schema(schema).option("timeZone", 
"PST").json("/path/to/gmtjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

And even if `timezoneFormat` doesn't contain timezone info, we can properly 
read the values with setting correct timezone option:

```scala
scala> df.write.option("timestampFormat", 
"yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson")
```

```sh
$ cat /path/to/jstjson/part-*
{"ts":"2016-01-01T09:00:00"}
```

```scala
// wrong result
scala> spark.read.schema(schema).option("timestampFormat", 
"yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 09:00:00|
+-------------------+

// correct result
scala> spark.read.schema(schema).option("timestampFormat", 
"yyyy-MM-dd'T'HH:mm:ss").option("timeZone", 
"JST").json("/path/to/jstjson").show()
+-------------------+
|ts                 |
+-------------------+
|2016-01-01 00:00:00|
+-------------------+
```

This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` 
to be able to evaluate values with timezone option.

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #16750 from ueshin/issues/SPARK-18937.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865b2fd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865b2fd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865b2fd8

Branch: refs/heads/master
Commit: 865b2fd84c6f82de147540c8f17bbe0f0d9fb69c
Parents: 6a9a85b
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Wed Feb 15 13:26:34 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Feb 15 13:26:34 2017 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  43 ++++---
 python/pyspark/sql/streaming.py                 |  20 ++--
 .../catalyst/expressions/jsonExpressions.scala  |  30 ++++-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  11 +-
 .../sql/catalyst/json/JacksonGenerator.scala    |   2 +-
 .../expressions/JsonExpressionsSuite.scala      | 113 ++++++++++++++++---
 .../org/apache/spark/sql/DataFrameReader.scala  |   8 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   4 +
 .../scala/org/apache/spark/sql/Dataset.scala    |   6 +-
 .../datasources/csv/CSVFileFormat.scala         |   8 +-
 .../execution/datasources/csv/CSVOptions.scala  |  21 ++--
 .../datasources/csv/UnivocityGenerator.scala    |   2 +-
 .../datasources/csv/UnivocityParser.scala       |   2 +-
 .../datasources/json/JsonFileFormat.scala       |   9 +-
 .../spark/sql/streaming/DataStreamReader.scala  |   4 +
 .../datasources/csv/CSVInferSchemaSuite.scala   |  22 ++--
 .../execution/datasources/csv/CSVSuite.scala    |  44 +++++++-
 .../datasources/csv/UnivocityParserSuite.scala  |  73 +++++++-----
 .../execution/datasources/json/JsonSuite.scala  |  46 +++++++-
 .../sql/sources/ResolvedDataSourceSuite.scala   |   6 +-
 20 files changed, 351 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d31f3fb..1678334 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -158,7 +158,8 @@ class DataFrameReader(OptionUtils):
     def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
              allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None):
+             mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
+             timeZone=None):
         """
         Loads a JSON file (`JSON Lines text format or newline-delimited JSON
         <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects 
(one object per
@@ -204,11 +205,13 @@ class DataFrameReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to parse timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -225,7 +228,7 @@ class DataFrameReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, 
allowNumericLeadingZero=allowNumericLeadingZero,
             
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
-            timestampFormat=timestampFormat)
+            timestampFormat=timestampFormat, timeZone=timeZone)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -298,7 +301,7 @@ class DataFrameReader(OptionUtils):
             comment=None, header=None, inferSchema=None, 
ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
-            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None):
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None, timeZone=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -341,11 +344,11 @@ class DataFrameReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record 
can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
@@ -357,6 +360,8 @@ class DataFrameReader(OptionUtils):
                                             uses the default value, ``10``.
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to parse timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
                 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted record.
                     When a schema is set by user, it sets ``null`` for extra 
fields.
@@ -374,7 +379,7 @@ class DataFrameReader(OptionUtils):
             nanValue=nanValue, positiveInf=positiveInf, 
negativeInf=negativeInf,
             dateFormat=dateFormat, timestampFormat=timestampFormat, 
maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
-            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, 
mode=mode, timeZone=timeZone)
         if isinstance(path, basestring):
             path = [path]
         return 
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -591,7 +596,8 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.saveAsTable(name)
 
     @since(1.4)
-    def json(self, path, mode=None, compression=None, dateFormat=None, 
timestampFormat=None):
+    def json(self, path, mode=None, compression=None, dateFormat=None, 
timestampFormat=None,
+             timeZone=None):
         """Saves the content of the :class:`DataFrame` in JSON format at the 
specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -607,17 +613,20 @@ class DataFrameWriter(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to format timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(
-            compression=compression, dateFormat=dateFormat, 
timestampFormat=timestampFormat)
+            compression=compression, dateFormat=dateFormat, 
timestampFormat=timestampFormat,
+            timeZone=timeZone)
         self._jwrite.json(path)
 
     @since(1.4)
@@ -664,7 +673,7 @@ class DataFrameWriter(OptionUtils):
     @since(2.0)
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, 
escape=None,
             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, 
dateFormat=None,
-            timestampFormat=None):
+            timestampFormat=None, timeZone=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the 
specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -699,18 +708,20 @@ class DataFrameWriter(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to parse timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(compression=compression, sep=sep, quote=quote, 
escape=escape, header=header,
                        nullValue=nullValue, escapeQuotes=escapeQuotes, 
quoteAll=quoteAll,
-                       dateFormat=dateFormat, timestampFormat=timestampFormat)
+                       dateFormat=dateFormat, timestampFormat=timestampFormat, 
timeZone=timeZone)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a10b185..d988e59 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -429,7 +429,7 @@ class DataStreamReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
-             timestampFormat=None):
+             timestampFormat=None, timeZone=None):
         """
         Loads a JSON file stream (`JSON Lines text format or newline-delimited 
JSON
         <http://jsonlines.org/>`_) and returns a :class`DataFrame`.
@@ -476,11 +476,13 @@ class DataStreamReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to parse timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> json_sdf.isStreaming
@@ -494,7 +496,7 @@ class DataStreamReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, 
allowNumericLeadingZero=allowNumericLeadingZero,
             
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
-            timestampFormat=timestampFormat)
+            timestampFormat=timestampFormat, timeZone=timeZone)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -552,7 +554,7 @@ class DataStreamReader(OptionUtils):
             comment=None, header=None, inferSchema=None, 
ignoreLeadingWhiteSpace=None,
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
-            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None):
+            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None, timeZone=None):
         """Loads a CSV file stream and returns the result as a  
:class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -597,11 +599,11 @@ class DataStreamReader(OptionUtils):
         :param dateFormat: sets the string that indicates a date format. 
Custom date formats
                            follow the formats at 
``java.text.SimpleDateFormat``. This
                            applies to date type. If None is set, it uses the
-                           default value value, ``yyyy-MM-dd``.
+                           default value, ``yyyy-MM-dd``.
         :param timestampFormat: sets the string that indicates a timestamp 
format. Custom date
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
-                                default value value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+                                default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
         :param maxColumns: defines a hard limit of how many columns a record 
can have. If None is
                            set, it uses the default value, ``20480``.
         :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
@@ -609,6 +611,8 @@ class DataStreamReader(OptionUtils):
                                   ``-1`` meaning unlimited length.
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
+        :param timeZone: sets the string that indicates a timezone to be used 
to parse timestamps.
+                         If None is set, it uses the default value, session 
local timezone.
 
                 * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted record.
                     When a schema is set by user, it sets ``null`` for extra 
fields.
@@ -628,7 +632,7 @@ class DataStreamReader(OptionUtils):
             nanValue=nanValue, positiveInf=positiveInf, 
negativeInf=negativeInf,
             dateFormat=dateFormat, timestampFormat=timestampFormat, 
maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
-            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, 
mode=mode, timeZone=timeZone)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c410e79..bd852a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
 /**
  * Converts an json input string to a [[StructType]] with the specified schema.
  */
-case class JsonToStruct(schema: StructType, options: Map[String, String], 
child: Expression)
-  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class JsonToStruct(
+    schema: StructType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes {
   override def nullable: Boolean = true
 
+  def this(schema: StructType, options: Map[String, String], child: 
Expression) =
+    this(schema, options, child, None)
+
   @transient
   lazy val parser =
     new JacksonParser(
       schema,
       "invalid", // Not used since we force fail fast.  Invalid rows will be 
set to `null`.
-      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), 
timeZoneId.get))
 
   override def dataType: DataType = schema
 
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
   override def nullSafeEval(json: Any): Any = {
     try parser.parse(json.toString).headOption.orNull catch {
       case _: SparkSQLJsonProcessingException => null
@@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: 
Map[String, String], child:
 /**
  * Converts a [[StructType]] to a json output string.
  */
-case class StructToJson(options: Map[String, String], child: Expression)
-  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class StructToJson(
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes {
   override def nullable: Boolean = true
 
+  def this(options: Map[String, String], child: Expression) = this(options, 
child, None)
+
   @transient
   lazy val writer = new CharArrayWriter()
 
@@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], 
child: Expression)
     new JacksonGenerator(
       child.dataType.asInstanceOf[StructType],
       writer,
-      new JSONOptions(options))
+      new JSONOptions(options, timeZoneId.get))
 
   override def dataType: DataType = StringType
 
@@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], 
child: Expression)
     }
   }
 
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
   override def nullSafeEval(row: Any): Any = {
     gen.write(row.asInstanceOf[InternalRow])
     gen.flush()

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02bd8de..5307ce1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.json
 
-import java.util.Locale
+import java.util.{Locale, TimeZone}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.commons.lang3.time.FastDateFormat
@@ -31,10 +31,11 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-    @transient private val parameters: CaseInsensitiveMap[String])
+    @transient private val parameters: CaseInsensitiveMap[String], 
defaultTimeZoneId: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+    this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
 
   val samplingRatio =
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -58,13 +59,15 @@ private[sql] class JSONOptions(
   private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
 
+  val timeZone: TimeZone = 
TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
     FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), Locale.US)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), 
Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), 
timeZone, Locale.US)
 
   // Parse mode flags
   if (!ParseModes.isValidMode(parseMode)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index bf8e3c8..dec5527 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
 private[sql] class JacksonGenerator(
     schema: StructType,
     writer: Writer,
-    options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+    options: JSONOptions) {
   // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
appropriate
   // JSON data. Here we are using `SpecializedGetters` rather than 
`InternalRow` so that
   // we can directly access data in `ArrayData` without the help of 
`SpecificMutableRow`.

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 8e20bd1..0c46819 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.util.Calendar
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
ParseModes}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   test("json_tuple - hive key 4 - null json") {
     checkJsonTuple(
       JsonTuple(Literal(null) :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - hive key 5 - null and empty fields") {
     checkJsonTuple(
       JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, 
null)))
+      InternalRow(UTF8String.fromString(""), null, null, null, null))
   }
 
   test("json_tuple - hive key 6 - invalid json (array)") {
     checkJsonTuple(
       JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (object start only)") {
     checkJsonTuple(
       JsonTuple(Literal("{") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (no object end)") {
     checkJsonTuple(
       JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - invalid json (invalid json)") {
     checkJsonTuple(
       JsonTuple(Literal("\\") :: jsonTupleQuery),
-      InternalRow.fromSeq(Seq(null, null, null, null, null)))
+      InternalRow(null, null, null, null, null))
   }
 
   test("json_tuple - preserve newlines") {
     checkJsonTuple(
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
-      InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
+      InternalRow(UTF8String.fromString("b\nc")))
   }
 
+  val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
   test("from_json") {
     val jsonData = """{"a": 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData)),
-      InternalRow.fromSeq(1 :: Nil)
+      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+      InternalRow(1)
     )
   }
 
@@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val jsonData = """{"a" 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
       null
     )
 
     // Other modes should still return `null`.
     checkEvaluation(
-      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), 
Literal(jsonData)),
+      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), 
Literal(jsonData), gmtId),
       null
     )
   }
@@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
+      JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
       null
     )
   }
 
+  test("from_json with timestamp") {
+    val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+    val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}"""
+    var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+    c.set(2016, 0, 1, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 123)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+      InternalRow(c.getTimeInMillis * 1000L)
+    )
+    // The result doesn't change because the json string includes timezone 
string ("Z" here),
+    // which means the string represents the timestamp string in the timezone 
regardless of
+    // the timeZoneId parameter.
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+      InternalRow(c.getTimeInMillis * 1000L)
+    )
+
+    val jsonData2 = """{"t": "2016-01-01T00:00:00"}"""
+    for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+      c = Calendar.getInstance(tz)
+      c.set(2016, 0, 1, 0, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkEvaluation(
+        JsonToStruct(
+          schema,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
+          Literal(jsonData2),
+          Option(tz.getID)),
+        InternalRow(c.getTimeInMillis * 1000L)
+      )
+      checkEvaluation(
+        JsonToStruct(
+          schema,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> 
tz.getID),
+          Literal(jsonData2),
+          gmtId),
+        InternalRow(c.getTimeInMillis * 1000L)
+      )
+    }
+  }
+
   test("SPARK-19543: from_json empty input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)),
+      JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
       null
     )
   }
@@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(create_row(1), schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct),
+      StructToJson(Map.empty, struct, gmtId),
       """{"a":1}"""
     )
   }
@@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(null, schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct),
+      StructToJson(Map.empty, struct, gmtId),
       null
     )
   }
+
+  test("to_json with timestamp") {
+    val schema = StructType(StructField("t", TimestampType) :: Nil)
+    val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+    c.set(2016, 0, 1, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 0)
+    val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
+
+    checkEvaluation(
+      StructToJson(Map.empty, struct, gmtId),
+      """{"t":"2016-01-01T00:00:00.000Z"}"""
+    )
+    checkEvaluation(
+      StructToJson(Map.empty, struct, Option("PST")),
+      """{"t":"2015-12-31T16:00:00.000-08:00"}"""
+    )
+
+    checkEvaluation(
+      StructToJson(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> 
gmtId.get),
+        struct,
+        gmtId),
+      """{"t":"2016-01-01T00:00:00"}"""
+    )
+    checkEvaluation(
+      StructToJson(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"),
+        struct,
+        gmtId),
+      """{"t":"2015-12-31T16:00:00"}"""
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1830839..780fe51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Partition
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to parse timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 1.4.0
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
-    val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(extraOptions.toMap, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
     val columnNameOfCorruptRecord =
       parsedOptions.columnNameOfCorruptRecord
         .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to parse timestamps.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many 
columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of 
characters allowed

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 748ebba..1d834b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to format timestamps.</li>
    * </ul>
    *
    * @since 1.4.0
@@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to format timestamps.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b80ff4..e62cd9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.json.JacksonGenerator
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans._
@@ -2678,10 +2678,12 @@ class Dataset[T] private[sql](
    */
   def toJSON: Dataset[String] = {
     val rowSchema = this.schema
+    val sessionLocalTimeZone = 
sparkSession.sessionState.conf.sessionLocalTimeZone
     val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
       val writer = new CharArrayWriter()
       // create the Generator without separator inserted between 2 records
-      val gen = new JacksonGenerator(rowSchema, writer)
+      val gen = new JacksonGenerator(rowSchema, writer,
+        new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
 
       new Iterator[String] {
         override def hasNext: Boolean = iter.hasNext

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1d2bf07..566f40f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CompressionCodecs}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.sources._
@@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       files: Seq[FileStatus]): Option[StructType] = {
     require(files.nonEmpty, "Cannot infer schema from an empty set of files")
 
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
     val paths = files.map(_.getPath.toString)
     val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, 
paths)
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       dataSchema: StructType): OutputWriterFactory = {
     CSVUtils.verifySchema(dataSchema)
     val conf = job.getConfiguration
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
     csvOptions.compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
-    val csvOptions = new CSVOptions(options)
+    val csvOptions = new CSVOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9d79ea6..b7fbaa4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.nio.charset.StandardCharsets
-import java.util.Locale
+import java.util.{Locale, TimeZone}
 
 import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, 
UnescapedQuoteHandling}
 import org.apache.commons.lang3.time.FastDateFormat
@@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CompressionCodecs, ParseModes}
 
-private[csv] class CSVOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
+private[csv] class CSVOptions(
+    @transient private val parameters: CaseInsensitiveMap[String], 
defaultTimeZoneId: String)
   extends Logging with Serializable {
 
-  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+    this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
 
   private def getChar(paramName: String, default: Char): Char = {
     val paramValue = parameters.get(paramName)
@@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val 
parameters: CaseInsensitive
     name.map(CompressionCodecs.getCodecClassName)
   }
 
+  val timeZone: TimeZone = 
TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
     FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), Locale.US)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), 
Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), 
timeZone, Locale.US)
 
   val maxColumns = getInt("maxColumns", 20480)
 
@@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val 
parameters: CaseInsensitive
     settings
   }
 }
-
-object CSVOptions {
-
-  def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty))
-
-  def apply(paramName: String, paramValue: String): CSVOptions = {
-    new CSVOptions(Map(paramName -> paramValue))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index ee79138..4082a0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
 private[csv] class UnivocityGenerator(
     schema: StructType,
     writer: Writer,
-    options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
+    options: CSVOptions) {
   private val writerSettings = options.asWriterSettings
   writerSettings.setHeaders(schema.fieldNames: _*)
   private val gen = new CsvWriter(writer, writerSettings)

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3b42aa6..2e409b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -76,7 +76,7 @@ private[csv] class UnivocityParser(
       name: String,
       dataType: DataType,
       nullable: Boolean = true,
-      options: CSVOptions = CSVOptions()): ValueConverter = dataType match {
+      options: CSVOptions): ValueConverter = dataType match {
     case _: ByteType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(_.toByte)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 98ab9d2..b4a8ff2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     if (files.isEmpty) {
       None
     } else {
-      val parsedOptions: JSONOptions = new JSONOptions(options)
+      val parsedOptions: JSONOptions =
+        new JSONOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
       val columnNameOfCorruptRecord =
         parsedOptions.columnNameOfCorruptRecord
           .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
     val conf = job.getConfiguration
-    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
     parsedOptions.compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val parsedOptions: JSONOptions =
+      new JSONOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
     val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
       .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b7ffb3c..4e706da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to parse timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`timeZone` (default session local timezone): sets the string that 
indicates a timezone
+   * to be used to parse timestamps.</li>
    * <li>`maxColumns` (default `20480`): defines a hard limit of how many 
columns
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of 
characters allowed

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index d8c6c25..6617420 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
 class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("String fields types are inferred correctly from null types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(NullType, "", options) == NullType)
     assert(CSVInferSchema.inferField(NullType, null, options) == NullType)
     assert(CSVInferSchema.inferField(NullType, "100000000000", options) == 
LongType)
@@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("String fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType)
     assert(CSVInferSchema.inferField(LongType, "test", options) == StringType)
     assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == 
DoubleType)
@@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Timestamp field types are inferred correctly via custom data format") {
-    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
+    var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == 
TimestampType)
-    options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
+    options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015", options) == 
TimestampType)
   }
 
   test("Timestamp field types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == 
StringType)
     assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) 
== StringType)
     assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == 
StringType)
   }
 
   test("Boolean fields types are inferred correctly from other types") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
     assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType)
     assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == 
StringType)
   }
@@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("Null fields are handled properly when a nullValue is specified") {
-    var options = new CSVOptions(Map("nullValue" -> "null"))
+    var options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
     assert(CSVInferSchema.inferField(NullType, "null", options) == NullType)
     assert(CSVInferSchema.inferField(StringType, "null", options) == 
StringType)
     assert(CSVInferSchema.inferField(LongType, "null", options) == LongType)
 
-    options = new CSVOptions(Map("nullValue" -> "\\N"))
+    options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT")
     assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == 
IntegerType)
     assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
     assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == 
TimestampType)
@@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
-    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
     assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == 
TimestampType)
   }
 
   test("SPARK-18877: `inferField` on DecimalType should find a common type 
with `typeSoFar`") {
-    val options = new CSVOptions(Map.empty[String, String])
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
 
     // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9).
     assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) 
==
@@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
 
   test("DoubleType should be infered when user defined nan/inf are provided") {
     val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> 
"-inf",
-      "positiveInf" -> "inf"))
+      "positiveInf" -> "inf"), "GMT")
     assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType)
     assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType)

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9cebb..0c9a729 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
     }
   }
 
-  test("Write timestamps correctly with dateFormat option") {
+  test("Write timestamps correctly with timestampFormat option") {
     withTempDir { dir =>
       // With dateFormat option.
       val timestampsWithFormatPath = 
s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
@@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
+  test("Write timestamps correctly with timestampFormat option and timeZone 
option") {
+    withTempDir { dir =>
+      // With dateFormat option and timeZone option.
+      val timestampsWithFormatPath = 
s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+      val timestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .load(testFile(datesFile))
+      timestampsWithFormat.write
+        .format("csv")
+        .option("header", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringTimestampsWithFormat = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "false")
+        .load(timestampsWithFormatPath)
+      val expectedStringTimestampsWithFormat = Seq(
+        Row("2015/08/27 01:00"),
+        Row("2014/10/28 01:30"),
+        Row("2016/01/29 04:00"))
+
+      checkAnswer(stringTimestampsWithFormat, 
expectedStringTimestampsWithFormat)
+
+      val readBack = spark.read
+        .format("csv")
+        .option("header", "true")
+        .option("inferSchema", "true")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .load(timestampsWithFormatPath)
+
+      checkAnswer(readBack, timestampsWithFormat)
+    }
+  }
+
   test("load duplicated field names consistently with null or empty strings - 
case sensitive") {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
       withTempPath { path =>

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index 62dae08..a74b22a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 class UnivocityParserSuite extends SparkFunSuite {
   private val parser =
-    new UnivocityParser(StructType(Seq.empty), new 
CSVOptions(Map.empty[String, String]))
+    new UnivocityParser(StructType(Seq.empty), new 
CSVOptions(Map.empty[String, String], "GMT"))
 
   private def assertNull(v: Any) = assert(v == null)
 
@@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite {
 
     stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
       val decimalValue = new BigDecimal(decimalVal.toString)
-      assert(parser.makeConverter("_1", decimalType).apply(strVal) ===
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assert(parser.makeConverter("_1", decimalType, options = 
options).apply(strVal) ===
         Decimal(decimalValue, decimalType.precision, decimalType.scale))
     }
   }
@@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite {
     // Nullable field with nullValue option.
     types.foreach { t =>
       // Tests that a custom nullValue.
+      val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT")
       val converter =
-        parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", 
"-"))
+        parser.makeConverter("_1", t, nullable = true, options = 
nullValueOptions)
       assertNull(converter.apply("-"))
       assertNull(converter.apply(null))
 
       // Tests that the default nullValue is empty string.
-      assertNull(parser.makeConverter("_1", t, nullable = true).apply(""))
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assertNull(parser.makeConverter("_1", t, nullable = true, options = 
options).apply(""))
     }
 
     // Not nullable field with nullValue option.
     types.foreach { t =>
       // Casts a null to not nullable field should throw an exception.
+      val options = new CSVOptions(Map("nullValue" -> "-"), "GMT")
       val converter =
-        parser.makeConverter("_1", t, nullable = false, 
CSVOptions("nullValue", "-"))
+        parser.makeConverter("_1", t, nullable = false, options = options)
       var message = intercept[RuntimeException] {
         converter.apply("-")
       }.getMessage
@@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite {
     // If nullValue is different with empty string, then, empty string should 
not be casted into
     // null.
     Seq(true, false).foreach { b =>
+      val options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
       val converter =
-        parser.makeConverter("_1", StringType, nullable = b, 
CSVOptions("nullValue", "null"))
+        parser.makeConverter("_1", StringType, nullable = b, options = options)
       assert(converter.apply("") == UTF8String.fromString(""))
     }
   }
 
   test("Throws exception for empty string with non null type") {
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
     val exception = intercept[RuntimeException]{
-      parser.makeConverter("_1", IntegerType, nullable = false, 
CSVOptions()).apply("")
+      parser.makeConverter("_1", IntegerType, nullable = false, options = 
options).apply("")
     }
     assert(exception.getMessage.contains("null value found but field _1 is not 
nullable."))
   }
 
   test("Types are cast correctly") {
-    assert(parser.makeConverter("_1", ByteType).apply("10") == 10)
-    assert(parser.makeConverter("_1", ShortType).apply("10") == 10)
-    assert(parser.makeConverter("_1", IntegerType).apply("10") == 10)
-    assert(parser.makeConverter("_1", LongType).apply("10") == 10)
-    assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0)
-    assert(parser.makeConverter("_1", BooleanType).apply("true") == true)
-
-    val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
+    val options = new CSVOptions(Map.empty[String, String], "GMT")
+    assert(parser.makeConverter("_1", ByteType, options = options).apply("10") 
== 10)
+    assert(parser.makeConverter("_1", ShortType, options = 
options).apply("10") == 10)
+    assert(parser.makeConverter("_1", IntegerType, options = 
options).apply("10") == 10)
+    assert(parser.makeConverter("_1", LongType, options = options).apply("10") 
== 10)
+    assert(parser.makeConverter("_1", FloatType, options = 
options).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", DoubleType, options = 
options).apply("1.00") == 1.0)
+    assert(parser.makeConverter("_1", BooleanType, options = 
options).apply("true") == true)
+
+    val timestampsOptions =
+      new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
     val customTimestamp = "31/01/2015 00:00"
     val expectedTime = 
timestampsOptions.timestampFormat.parse(customTimestamp).getTime
     val castedTimestamp =
-      parser.makeConverter("_1", TimestampType, nullable = true, 
timestampsOptions)
+      parser.makeConverter("_1", TimestampType, nullable = true, options = 
timestampsOptions)
         .apply(customTimestamp)
     assert(castedTimestamp == expectedTime * 1000L)
 
     val customDate = "31/01/2015"
-    val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+    val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
     val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
     val castedDate =
-      parser.makeConverter("_1", DateType, nullable = true, dateOptions)
+      parser.makeConverter("_1", DateType, nullable = true, options = 
dateOptions)
         .apply(customTimestamp)
     assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
 
     val timestamp = "2015-01-01 00:00:00"
-    assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
+    assert(parser.makeConverter("_1", TimestampType, options = 
options).apply(timestamp) ==
       DateTimeUtils.stringToTime(timestamp).getTime  * 1000L)
-    assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
+    assert(parser.makeConverter("_1", DateType, options = 
options).apply("2015-01-01") ==
       
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime))
   }
 
@@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite {
     try {
       Locale.setDefault(new Locale("fr", "FR"))
       // Would parse as 1.0 in fr-FR
-      assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0)
-      assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0)
+      val options = new CSVOptions(Map.empty[String, String], "GMT")
+      assert(parser.makeConverter("_1", FloatType, options = 
options).apply("1,00") == 100.0)
+      assert(parser.makeConverter("_1", DoubleType, options = 
options).apply("1,00") == 100.0)
     } finally {
       Locale.setDefault(originalLocale)
     }
   }
 
   test("Float NaN values are parsed correctly") {
+    val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT")
     val floatVal: Float = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn")
+      "_1", FloatType, nullable = true, options = options
     ).apply("nn").asInstanceOf[Float]
 
     // Java implements the IEEE-754 floating point standard which guarantees 
that any comparison
@@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite {
   }
 
   test("Double NaN values are parsed correctly") {
+    val options = new CSVOptions(Map("nanValue" -> "-"), "GMT")
     val doubleVal: Double = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-")
+      "_1", DoubleType, nullable = true, options = options
     ).apply("-").asInstanceOf[Double]
 
     assert(doubleVal.isNaN)
   }
 
   test("Float infinite values can be parsed") {
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
     val floatVal1 = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max")
+      "_1", FloatType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Float]
 
     assert(floatVal1 == Float.NegativeInfinity)
 
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
     val floatVal2 = parser.makeConverter(
-      "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max")
+      "_1", FloatType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Float]
 
     assert(floatVal2 == Float.PositiveInfinity)
   }
 
   test("Double infinite values can be parsed") {
+    val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
     val doubleVal1 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max")
+      "_1", DoubleType, nullable = true, options = negativeInfOptions
     ).apply("max").asInstanceOf[Double]
 
     assert(doubleVal1 == Double.NegativeInfinity)
 
+    val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
     val doubleVal2 = parser.makeConverter(
-      "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max")
+      "_1", DoubleType, nullable = true, options = positiveInfOptions
     ).apply("max").asInstanceOf[Double]
 
     assert(doubleVal2 == Double.PositiveInfinity)

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 156fd96..9344aed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with 
TestJsonData {
         generator.flush()
       }
 
-      val dummyOption = new JSONOptions(Map.empty[String, String])
+      val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
       val dummySchema = StructType(Seq.empty)
       val parser = new JacksonParser(dummySchema, "", dummyOption)
 
@@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 
   test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
     // This is really a test that it doesn't throw an exception
-    val emptySchema = JsonInferSchema.infer(empty, "", new 
JSONOptions(Map.empty[String, String]))
+    val emptySchema = JsonInferSchema.infer(
+      empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 
   test("SPARK-8093 Erase empty structs") {
     val emptySchema = JsonInferSchema.infer(
-      emptyRecords, "", new JSONOptions(Map.empty[String, String]))
+      emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     }
   }
 
-  test("Write timestamps correctly with dateFormat option") {
+  test("Write timestamps correctly with timestampFormat option") {
     val customSchema = new StructType(Array(StructField("date", TimestampType, 
true)))
     withTempDir { dir =>
       // With dateFormat option.
@@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     }
   }
 
+  test("Write timestamps correctly with timestampFormat option and timeZone 
option") {
+    val customSchema = new StructType(Array(StructField("date", TimestampType, 
true)))
+    withTempDir { dir =>
+      // With dateFormat option and timeZone option.
+      val timestampsWithFormatPath = 
s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+      val timestampsWithFormat = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+      timestampsWithFormat.write
+        .format("json")
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .save(timestampsWithFormatPath)
+
+      // This will load back the timestamps as string.
+      val stringSchema = StructType(StructField("date", StringType, true) :: 
Nil)
+      val stringTimestampsWithFormat = spark.read
+        .schema(stringSchema)
+        .json(timestampsWithFormatPath)
+      val expectedStringDatesWithFormat = Seq(
+        Row("2015/08/27 01:00"),
+        Row("2014/10/28 01:30"),
+        Row("2016/01/29 04:00"))
+
+      checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+
+      val readBack = spark.read
+        .schema(customSchema)
+        .option("timestampFormat", "yyyy/MM/dd HH:mm")
+        .option("timeZone", "GMT")
+        .json(timestampsWithFormatPath)
+
+      checkAnswer(readBack, timestampsWithFormat)
+    }
+  }
+
   test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
     val records = sparkContext
       .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" 
:: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 76ffb94..9b5e364e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
   private def getProvidingClass(name: String): Class[_] =
-    DataSource(sparkSession = null, className = name).providingClass
+    DataSource(
+      sparkSession = null,
+      className = name,
+      options = Map("timeZone" -> 
DateTimeUtils.defaultTimeZone().getID)).providingClass
 
   test("jdbc") {
     assert(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to