[ 
https://issues.apache.org/jira/browse/SPARK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-12297:
---------------------------------
    Description: 
Spark copied Hive's behavior for parquet, but this was inconsistent with other 
file formats, and inconsistent with Impala (which is the original source of 
putting a timestamp as an int96 in parquet, I believe).  This made timestamps 
in parquet act more like timestamps with timezones, while in other file 
formats, timestamps have no time zone, they are a "floating time".

The easiest way to see this issue is to write out a table with timestamps in 
multiple different formats from one timezone, then try to read them back in 
another timezone.  Eg., here I write out a few timestamps to parquet and 
textfile hive tables, and also just as a json file, all in the 
"America/Los_Angeles" timezone:

{code}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val tblPrefix = args(0)
val schema = new StructType().add("ts", TimestampType)
val rows = sc.parallelize(Seq(
  "2015-12-31 23:50:59.123",
  "2015-12-31 22:49:59.123",
  "2016-01-01 00:39:59.123",
  "2016-01-01 01:29:59.123"
).map { x => Row(java.sql.Timestamp.valueOf(x)) })
val rawData = spark.createDataFrame(rows, schema).toDF()

rawData.show()

Seq("parquet", "textfile").foreach { format =>
  val tblName = s"${tblPrefix}_$format"
  spark.sql(s"DROP TABLE IF EXISTS $tblName")
  spark.sql(
    raw"""CREATE TABLE $tblName (
          |  ts timestamp
          | )
          | STORED AS $format
     """.stripMargin)
  rawData.write.insertInto(tblName)
}

rawData.write.json(s"${tblPrefix}_json")
{code}

Then I start a spark-shell in "America/New_York" timezone, and read the data 
back from each table:

{code}
scala> spark.sql("select * from la_parquet").collect().foreach{println}
[2016-01-01 02:50:59.123]
[2016-01-01 01:49:59.123]
[2016-01-01 03:39:59.123]
[2016-01-01 04:29:59.123]

scala> spark.sql("select * from la_textfile").collect().foreach{println}
[2015-12-31 23:50:59.123]
[2015-12-31 22:49:59.123]
[2016-01-01 00:39:59.123]
[2016-01-01 01:29:59.123]

scala> spark.read.json("la_json").collect().foreach{println}
[2015-12-31 23:50:59.123]
[2015-12-31 22:49:59.123]
[2016-01-01 00:39:59.123]
[2016-01-01 01:29:59.123]

scala> spark.read.json("la_json").join(spark.sql("select * from la_textfile"), 
"ts").show()
+--------------------+
|                  ts|
+--------------------+
|2015-12-31 23:50:...|
|2015-12-31 22:49:...|
|2016-01-01 00:39:...|
|2016-01-01 01:29:...|
+--------------------+

scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), 
"ts").show()
+---+
| ts|
+---+
+---+
{code}

The textfile and json based data shows the same times, and can be joined 
against each other, while the times from the parquet data have changed (and 
obviously joins fail).

This is a big problem for any organization that may try to read the same data 
(say in S3) with clusters in multiple timezones.  It can also be a nasty 
surprise as an organization tries to migrate file formats.  Finally, its a 
source of incompatibility between Hive, Impala, and Spark.

HIVE-12767 aims to fix this by introducing a table property which indicates the 
"storage timezone" for the table.  Spark should add the same to ensure 
consistency between file formats, and with Hive & Impala.

  was:Hive has a bug where timestamps in Parquet data are incorrectly adjusted 
as though they were in the SQL session time zone to UTC. This is incorrect 
behavior because timestamp values are SQL timestamp without time zone and 
should not be internally changed.


> Add work-around for Parquet/Hive int96 timestamp bug.
> -----------------------------------------------------
>
>                 Key: SPARK-12297
>                 URL: https://issues.apache.org/jira/browse/SPARK-12297
>             Project: Spark
>          Issue Type: Task
>          Components: Spark Core
>            Reporter: Ryan Blue
>
> Spark copied Hive's behavior for parquet, but this was inconsistent with 
> other file formats, and inconsistent with Impala (which is the original 
> source of putting a timestamp as an int96 in parquet, I believe).  This made 
> timestamps in parquet act more like timestamps with timezones, while in other 
> file formats, timestamps have no time zone, they are a "floating time".
> The easiest way to see this issue is to write out a table with timestamps in 
> multiple different formats from one timezone, then try to read them back in 
> another timezone.  Eg., here I write out a few timestamps to parquet and 
> textfile hive tables, and also just as a json file, all in the 
> "America/Los_Angeles" timezone:
> {code}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val tblPrefix = args(0)
> val schema = new StructType().add("ts", TimestampType)
> val rows = sc.parallelize(Seq(
>   "2015-12-31 23:50:59.123",
>   "2015-12-31 22:49:59.123",
>   "2016-01-01 00:39:59.123",
>   "2016-01-01 01:29:59.123"
> ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
> val rawData = spark.createDataFrame(rows, schema).toDF()
> rawData.show()
> Seq("parquet", "textfile").foreach { format =>
>   val tblName = s"${tblPrefix}_$format"
>   spark.sql(s"DROP TABLE IF EXISTS $tblName")
>   spark.sql(
>     raw"""CREATE TABLE $tblName (
>           |  ts timestamp
>           | )
>           | STORED AS $format
>      """.stripMargin)
>   rawData.write.insertInto(tblName)
> }
> rawData.write.json(s"${tblPrefix}_json")
> {code}
> Then I start a spark-shell in "America/New_York" timezone, and read the data 
> back from each table:
> {code}
> scala> spark.sql("select * from la_parquet").collect().foreach{println}
> [2016-01-01 02:50:59.123]
> [2016-01-01 01:49:59.123]
> [2016-01-01 03:39:59.123]
> [2016-01-01 04:29:59.123]
> scala> spark.sql("select * from la_textfile").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").join(spark.sql("select * from 
> la_textfile"), "ts").show()
> +--------------------+
> |                  ts|
> +--------------------+
> |2015-12-31 23:50:...|
> |2015-12-31 22:49:...|
> |2016-01-01 00:39:...|
> |2016-01-01 01:29:...|
> +--------------------+
> scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), 
> "ts").show()
> +---+
> | ts|
> +---+
> +---+
> {code}
> The textfile and json based data shows the same times, and can be joined 
> against each other, while the times from the parquet data have changed (and 
> obviously joins fail).
> This is a big problem for any organization that may try to read the same data 
> (say in S3) with clusters in multiple timezones.  It can also be a nasty 
> surprise as an organization tries to migrate file formats.  Finally, its a 
> source of incompatibility between Hive, Impala, and Spark.
> HIVE-12767 aims to fix this by introducing a table property which indicates 
> the "storage timezone" for the table.  Spark should add the same to ensure 
> consistency between file formats, and with Hive & Impala.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to