[ https://issues.apache.org/jira/browse/SPARK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004607#comment-16004607 ]
Zoltan Ivanfi edited comment on SPARK-12297 at 5/10/17 6:26 PM: ---------------------------------------------------------------- bq. It'd be great to consider this more holistically and think about alternatives in fixing them As Ryan mentioned, the Parquet community discussed this timestamp incompatibilty problem with the aim of avoiding similar problems in the future. It was decided that the specification needs to include two separate types with well-defined semantics: one for timezone-agnostic (aka. TIMESTAMP WITHOUT TIMEZONE) and one for UTC-normalized (aka. TIMESTAMP WITH TIMEZONE) timestamps. (Otherwise implementors would be tempted to misuse the single existing type for storing timestamps of different semantics, as it already happened with the int96 timestamp type). Using these two types, SQL engines will be able to unambiguously store their timestamp type regardless of its semantics. However, the TIMESTAMP type should follow TIMESTAMP WITHOUT TIMEZONE semantics for consistency with other SQL engines. The TIMESTAMP WITH TIMEZONE semantics should be implemented as a new SQL type with a matching name. While this is a nice and clean long-term solution, a short-term fix is also desired until the new types become widely supported and/or to allow dealing with existing data. The commit in question is a part of this short-term fix and it allows getting correct values when reading int96 timestamps, even for data written by other components. bq. it completely changes the behavior of one of the most important data types. A very important aspect of this fix is that it does not change SparkSQL's behavior unless the user sets a table property, so it's a completely safe and non-breaking change. bq. One of the fundamental problem is that Spark treats timestamp as timestamp with timezone, whereas impala treats timestamp as timestamp without timezone. The parquet storage is only a small piece here. The fix only addresses Parquet timestamps indeed. This, however, is intentional and is not a limitation, neither an inconsistency as the problem seems to be specific to Parquet. My understanding is that for other file formats, SparkSQL follows timezone-agnostic (TIMESTAMP WITHOUT TIMEZONE) semantics and my experiments with the CSV and Avro formats seem to confirm this. So using UTC-normalized (TIMESTAMP WITH TIMEZONE) semantics in Parquet is not only incompatible with Impala but is also inconsistent within SparkSQL itself. bq. Also this is not just a Parquet issue. The same issue could happen to all data formats. It is going to be really confusing to have something that only works for Parquet The current behavior of SparkSQL already seems to be different for Parquet than for other formats. The fix allows the user to choose a consistent and less confusing behaviour instead. It also makes Impala, Hive and SparkSQL compatible with each other regarding int96 timestamps. bq. It seems like the purpose of this patch can be accomplished by just setting the session local timezone to UTC? Unfortunately that would not suffice. The problem has to addressed in all SQL engines. As of today, Hive and Impala already contains the changes that allow interoperability using the parquet.mr.int96.write.zone table property: * Hive: ** https://github.com/apache/hive/commit/84fdc1c7c8ff0922aa44f829dbfa9659935c503e ** https://github.com/apache/hive/commit/a1cbccb8dad1824f978205a1e93ec01e87ed8ed5 ** https://github.com/apache/hive/commit/2dfcea5a95b7d623484b8be50755b817fbc91ce0 ** https://github.com/apache/hive/commit/78e29fc70dacec498c35dc556dd7403e4c9f48fe * Impala: ** https://github.com/apache/incubator-impala/commit/5803a0b0744ddaee6830d4a1bc8dba8d3f2caa26 was (Author: zi): bq. It'd be great to consider this more holistically and think about alternatives in fixing them As Ryan mentioned, the Parquet community discussed this timestamp incompatibilty problem with the aim of avoiding similar problems in the future. It was decided that the specification needs to include two separate types with well-defined semantics: one for timezone-agnostic (aka. TIMESTAMP WITHOUT TIMEZONE) and one for UTC-normalized (aka. TIMESTAMP WITH TIMEZONE) timestamps. (Otherwise implementors would be tempted to misuse the single existing type for storing timestamps of different semantics, as it already happened with the int96 timestamp type). While this is a nice and clean long-term solution, a short-term fix is also desired until the new types become widely supported and/or to allow dealing with existing data. The commit in question is a part of this short-term fix and it allows getting correct values when reading int96 timestamps, even for data written by other components. bq. it completely changes the behavior of one of the most important data types. A very important aspect of this fix is that it does not change SparkSQL's behavior unless the user sets a table property, so it's a completely safe and non-breaking change. bq. One of the fundamental problem is that Spark treats timestamp as timestamp with timezone, whereas impala treats timestamp as timestamp without timezone. The parquet storage is only a small piece here. The fix only addresses Parquet timestamps indeed. This, however, is intentional and is not a limitation, neither an inconsistency. The problem in fact is specific to Parquet. For other file formats (for example CSV or Avro), SparkSQL follows timezone-agnostic (TIMESTAMP WITHOUT TIMEZONE) semantics. So using UTC-normalized (TIMESTAMP WITH TIMEZONE) semantics in Parquet is not only incompatible with Impala but is also inconsistent within SparkSQL itself. bq. Also this is not just a Parquet issue. The same issue could happen to all data formats. It is going to be really confusing to have something that only works for Parquet In fact the current behavior of SparkSQL is different for Parquet than for other formats. The fix allows the user to choose a consistent and less confusing behaviour instead. It also makes Impala, Hive and SparkSQL compatible with each other regarding int96 timestamps. bq. It seems like the purpose of this patch can be accomplished by just setting the session local timezone to UTC? Unfortunately that would not suffice. The problem has to addressed in all SQL engines. As of today, Hive and Impala already contains the changes that allow interoperability using the parquet.mr.int96.write.zone table property: * Hive: ** https://github.com/apache/hive/commit/84fdc1c7c8ff0922aa44f829dbfa9659935c503e ** https://github.com/apache/hive/commit/a1cbccb8dad1824f978205a1e93ec01e87ed8ed5 ** https://github.com/apache/hive/commit/2dfcea5a95b7d623484b8be50755b817fbc91ce0 ** https://github.com/apache/hive/commit/78e29fc70dacec498c35dc556dd7403e4c9f48fe * Impala: ** https://github.com/apache/incubator-impala/commit/5803a0b0744ddaee6830d4a1bc8dba8d3f2caa26 > Add work-around for Parquet/Hive int96 timestamp bug. > ----------------------------------------------------- > > Key: SPARK-12297 > URL: https://issues.apache.org/jira/browse/SPARK-12297 > Project: Spark > Issue Type: Task > Components: Spark Core > Reporter: Ryan Blue > > Spark copied Hive's behavior for parquet, but this was inconsistent with > other file formats, and inconsistent with Impala (which is the original > source of putting a timestamp as an int96 in parquet, I believe). This made > timestamps in parquet act more like timestamps with timezones, while in other > file formats, timestamps have no time zone, they are a "floating time". > The easiest way to see this issue is to write out a table with timestamps in > multiple different formats from one timezone, then try to read them back in > another timezone. Eg., here I write out a few timestamps to parquet and > textfile hive tables, and also just as a json file, all in the > "America/Los_Angeles" timezone: > {code} > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val tblPrefix = args(0) > val schema = new StructType().add("ts", TimestampType) > val rows = sc.parallelize(Seq( > "2015-12-31 23:50:59.123", > "2015-12-31 22:49:59.123", > "2016-01-01 00:39:59.123", > "2016-01-01 01:29:59.123" > ).map { x => Row(java.sql.Timestamp.valueOf(x)) }) > val rawData = spark.createDataFrame(rows, schema).toDF() > rawData.show() > Seq("parquet", "textfile").foreach { format => > val tblName = s"${tblPrefix}_$format" > spark.sql(s"DROP TABLE IF EXISTS $tblName") > spark.sql( > raw"""CREATE TABLE $tblName ( > | ts timestamp > | ) > | STORED AS $format > """.stripMargin) > rawData.write.insertInto(tblName) > } > rawData.write.json(s"${tblPrefix}_json") > {code} > Then I start a spark-shell in "America/New_York" timezone, and read the data > back from each table: > {code} > scala> spark.sql("select * from la_parquet").collect().foreach{println} > [2016-01-01 02:50:59.123] > [2016-01-01 01:49:59.123] > [2016-01-01 03:39:59.123] > [2016-01-01 04:29:59.123] > scala> spark.sql("select * from la_textfile").collect().foreach{println} > [2015-12-31 23:50:59.123] > [2015-12-31 22:49:59.123] > [2016-01-01 00:39:59.123] > [2016-01-01 01:29:59.123] > scala> spark.read.json("la_json").collect().foreach{println} > [2015-12-31 23:50:59.123] > [2015-12-31 22:49:59.123] > [2016-01-01 00:39:59.123] > [2016-01-01 01:29:59.123] > scala> spark.read.json("la_json").join(spark.sql("select * from > la_textfile"), "ts").show() > +--------------------+ > | ts| > +--------------------+ > |2015-12-31 23:50:...| > |2015-12-31 22:49:...| > |2016-01-01 00:39:...| > |2016-01-01 01:29:...| > +--------------------+ > scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), > "ts").show() > +---+ > | ts| > +---+ > +---+ > {code} > The textfile and json based data shows the same times, and can be joined > against each other, while the times from the parquet data have changed (and > obviously joins fail). > This is a big problem for any organization that may try to read the same data > (say in S3) with clusters in multiple timezones. It can also be a nasty > surprise as an organization tries to migrate file formats. Finally, its a > source of incompatibility between Hive, Impala, and Spark. > HIVE-12767 aims to fix this by introducing a table property which indicates > the "storage timezone" for the table. Spark should add the same to ensure > consistency between file formats, and with Hive & Impala. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org