This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 78cc2ef [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource 78cc2ef is described below commit 78cc2ef5b663d6d605e3d4febc6fb99e20b7f165 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Thu Mar 26 13:14:28 2020 -0700 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource ### What changes were proposed in this pull request? This PR (SPARK-31238) aims the followings. 1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...] 2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. ### Does this PR introduce any user-facing change? Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following: ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +----------+ |dt | +----------+ |1200-01-08| +----------+ ``` After the changes ```scala scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +----------+ |dt | +----------+ |1200-01-01| +----------+ ``` ### How was this patch tested? - By running `OrcSourceSuite` and `HiveOrcSourceSuite`. - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc") scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false) +----------+ |dt | +----------+ |1200-01-01| +----------+ ``` - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write. Closes #28016 from MaxGekk/rebase-date-orc. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit d72ec8574113f9a7e87f3d7ec56c8447267b0506) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/execution/datasources}/DaysWritable.scala | 10 ++++++-- .../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes .../execution/datasources/orc/OrcSourceSuite.scala | 28 ++++++++++++++++++++- .../sql/execution/datasources/orc/OrcTest.scala | 5 ++++ .../execution/datasources/orc/OrcColumnVector.java | 15 ++++++++++- .../execution/datasources/orc}/DaysWritable.scala | 17 ++++++++++--- .../execution/datasources/orc/OrcShimUtils.scala | 4 +-- .../execution/datasources/orc/OrcColumnVector.java | 15 ++++++++++- .../execution/datasources/orc/OrcShimUtils.scala | 5 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 1 + 10 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala similarity index 92% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index 1eec8d7..00b710f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput, IOException} import java.sql.Date @@ -35,11 +35,12 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian * @param julianDays The number of days since the epoch 1970-01-01 in * Julian calendar. */ -private[hive] class DaysWritable( +class DaysWritable( var gregorianDays: Int, var julianDays: Int) extends DateWritable { + def this() = this(0, 0) def this(gregorianDays: Int) = this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays)) def this(dateWritable: DateWritable) = { @@ -55,6 +56,11 @@ private[hive] class DaysWritable( override def getDays: Int = julianDays override def get(): Date = new Date(DateWritable.daysToMillis(julianDays)) + override def set(d: Int): Unit = { + gregorianDays = d + julianDays = rebaseGregorianToJulianDays(d) + } + @throws[IOException] override def write(out: DataOutput): Unit = { WritableUtils.writeVInt(out, julianDays) diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc new file mode 100644 index 0000000..ebe0174 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 1e27593..b5e002f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.nio.charset.StandardCharsets.UTF_8 -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import java.util.Locale import org.apache.hadoop.conf.Configuration @@ -482,6 +482,32 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + test("SPARK-31238: compatibility with Spark 2.4 in reading dates") { + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + checkAnswer( + readResourceOrcFile("test-data/before_1582_date_v2_4.snappy.orc"), + Row(java.sql.Date.valueOf("1200-01-01"))) + } + } + } + + test("SPARK-31238: rebasing dates in write") { + withTempPath { dir => + val path = dir.getAbsolutePath + Seq("1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .write + .orc(path) + + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + checkAnswer(spark.read.orc(path), Row(Date.valueOf("1001-01-01"))) + } + } + } + } } class OrcSourceSuite extends OrcSuite with SharedSparkSession { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 388744b..16772fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -133,4 +133,9 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor throw new AnalysisException("Can not match OrcTable in the query.") } } + + protected def readResourceOrcFile(name: String): DataFrame = { + val url = Thread.currentThread().getContextClassLoader.getResource(name) + spark.read.orc(url.toString) + } } diff --git a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 0dfed76..5dc3f37 100644 --- a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.exec.vector.*; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto private DecimalColumnVector decimalData; private TimestampColumnVector timestampData; private final boolean isTimestamp; + private final boolean isDate; private int batchSize; @@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto isTimestamp = false; } + if (type instanceof DateType) { + isDate = true; + } else { + isDate = false; + } + baseData = vector; if (vector instanceof LongColumnVector) { longData = (LongColumnVector) vector; @@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto @Override public int getInt(int rowId) { - return (int) longData.vector[getRowIndex(rowId)]; + int value = (int) longData.vector[getRowIndex(rowId)]; + if (isDate) { + return DateTimeUtils.rebaseJulianToGregorianDays(value); + } else { + return value; + } } @Override diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala similarity index 81% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala rename to sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala index 1eec8d7..4934a96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala +++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.sql.execution.datasources.orc import java.io.{DataInput, DataOutput, IOException} import java.sql.Date -import org.apache.hadoop.hive.serde2.io.DateWritable import org.apache.hadoop.io.WritableUtils +import org.apache.orc.storage.serde2.io.DateWritable import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays} @@ -30,16 +30,22 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian * via conversion to local date in Julian calendar for dates before 1582-10-15 * in read/write for backward compatibility with Spark 2.4 and earlier versions. * + * This is a clone of `org.apache.spark.sql.execution.datasources.DaysWritable`. + * The class is cloned because Hive ORC v1.2 uses different `DateWritable`: + * - v1.2: `org.apache.orc.storage.serde2.io.DateWritable` + * - v2.3 and `HiveInspectors`: `org.apache.hadoop.hive.serde2.io.DateWritable` + * * @param gregorianDays The number of days since the epoch 1970-01-01 in * Gregorian calendar. * @param julianDays The number of days since the epoch 1970-01-01 in * Julian calendar. */ -private[hive] class DaysWritable( +class DaysWritable( var gregorianDays: Int, var julianDays: Int) extends DateWritable { + def this() = this(0, 0) def this(gregorianDays: Int) = this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays)) def this(dateWritable: DateWritable) = { @@ -55,6 +61,11 @@ private[hive] class DaysWritable( override def getDays: Int = julianDays override def get(): Date = new Date(DateWritable.daysToMillis(julianDays)) + override def set(d: Int): Unit = { + gregorianDays = d + julianDays = rebaseGregorianToJulianDays(d) + } + @throws[IOException] override def write(out: DataOutput): Unit = { WritableUtils.writeVInt(out, julianDays) diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala index 68503ab..ece5280 100644 --- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala +++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -47,13 +47,13 @@ private[sql] object OrcShimUtils { def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = { if (reuseObj) { - val result = new DateWritable() + val result = new DaysWritable() (getter, ordinal) => result.set(getter.getInt(ordinal)) result } else { (getter: SpecializedGetters, ordinal: Int) => - new DateWritable(getter.getInt(ordinal)) + new DaysWritable(getter.getInt(ordinal)) } } diff --git a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 35447fe..7be4a6f 100644 --- a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto private DecimalColumnVector decimalData; private TimestampColumnVector timestampData; private final boolean isTimestamp; + private final boolean isDate; private int batchSize; @@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto isTimestamp = false; } + if (type instanceof DateType) { + isDate = true; + } else { + isDate = false; + } + baseData = vector; if (vector instanceof LongColumnVector) { longData = (LongColumnVector) vector; @@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto @Override public int getInt(int rowId) { - return (int) longData.vector[getRowIndex(rowId)]; + int value = (int) longData.vector[getRowIndex(rowId)]; + if (isDate) { + return DateTimeUtils.rebaseJulianToGregorianDays(value); + } else { + return value; + } } @Override diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala index c32f024..5666d31 100644 --- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala +++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable} import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.execution.datasources.DaysWritable import org.apache.spark.sql.types.Decimal /** @@ -47,13 +48,13 @@ private[sql] object OrcShimUtils { def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = { if (reuseObj) { - val result = new DateWritable() + val result = new DaysWritable() (getter, ordinal) => result.set(getter.getInt(ordinal)) result } else { (getter: SpecializedGetters, ordinal: Int) => - new DateWritable(getter.getInt(ordinal)) + new DaysWritable(getter.getInt(ordinal)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index e3e9a31..16e9014 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.DaysWritable import org.apache.spark.sql.types import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org