This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e3a63a8 [SPARK-31885][SQL][3.0] Fix filter push down for old millis timestamps to Parquet e3a63a8 is described below commit e3a63a89c2de7c9bb3af20728ebe0081b0df1ffa Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Jun 2 02:29:20 2020 +0000 [SPARK-31885][SQL][3.0] Fix filter push down for old millis timestamps to Parquet ### What changes were proposed in this pull request? Fixed conversions of `java.sql.Timestamp` to milliseconds in `ParquetFilter` by using existing functions from `DateTimeUtils` `fromJavaTimestamp()` and `microsToMillis()`. ### Why are the changes needed? The changes fix the bug: ```scala scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS") scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") scala> Seq(java.sql.Timestamp.valueOf("1000-06-14 08:28:53.123")).toDF("ts").write.mode("overwrite").parquet("/Users/maximgekk/tmp/ts_millis_old_filter") scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false) +---+ |ts | +---+ +---+ ``` ### Does this PR introduce _any_ user-facing change? Yes, after the changes (for the example above): ```scala scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false) +-----------------------+ |ts | +-----------------------+ |1000-06-14 08:28:53.123| +-----------------------+ ``` ### How was this patch tested? Modified tests in `ParquetFilterSuite` to check old timestamps. Closes #28699 from MaxGekk/parquet-ts-millis-filter-3.0. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/parquet/ParquetFilters.scala | 27 +++++++++++----------- .../datasources/parquet/ParquetFilterSuite.scala | 16 +++++++------ .../datasources/parquet/ParquetTest.scala | 4 +++- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index d89186a..d0977ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -148,6 +148,13 @@ class ParquetFilters( Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes) } + private def timestampToMillis(v: Any): JLong = { + val timestamp = v.asInstanceOf[Timestamp] + val micros = DateTimeUtils.fromJavaTimestamp(timestamp) + val millis = DateTimeUtils.toMillis(micros) + millis.asInstanceOf[JLong] + } + private val makeEq: PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetBooleanType => @@ -184,7 +191,7 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMillis).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( @@ -235,7 +242,7 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMillis).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( @@ -277,9 +284,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.lt( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -318,9 +323,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.ltEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -359,9 +362,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gt( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -400,9 +401,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gtEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7b33cef..c4cf511 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -589,19 +589,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS - val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"), - Timestamp.valueOf("2018-06-15 08:28:53.123"), - Timestamp.valueOf("2018-06-16 08:28:53.123"), - Timestamp.valueOf("2018-06-17 08:28:53.123")) + val millisData = Seq( + Timestamp.valueOf("1000-06-14 08:28:53.123"), + Timestamp.valueOf("1582-06-15 08:28:53.001"), + Timestamp.valueOf("1900-06-16 08:28:53.0"), + Timestamp.valueOf("2018-06-17 08:28:53.999")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { testTimestampPushdown(millisData) } // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS - val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"), - Timestamp.valueOf("2018-06-15 08:28:53.123456"), - Timestamp.valueOf("2018-06-16 08:28:53.123456"), + val microsData = Seq( + Timestamp.valueOf("1000-06-14 08:28:53.123456"), + Timestamp.valueOf("1582-06-15 08:28:53.123456"), + Timestamp.valueOf("1900-06-16 08:28:53.123456"), Timestamp.valueOf("2018-06-17 08:28:53.123456")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index f572697..105f025 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -69,7 +69,9 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true) (f: DataFrame => Unit): Unit = { withTempPath { file => - df.write.format(dataSourceName).save(file.getCanonicalPath) + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } readFile(file.getCanonicalPath, testVectorized)(f) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org