This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5a70fa8 [SPARK-31361][SQL] Rebase datetime in parquet/avro according
to file metadata
5a70fa8 is described below
commit 5a70fa8f09bae59b3ad900b9ece8fbc455425b7f
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Apr 22 00:26:23 2020 +0900
[SPARK-31361][SQL] Rebase datetime in parquet/avro according to file
metadata
### What changes were proposed in this pull request?
This PR adds a new parquet/avro file metadata:
`org.apache.spark.legacyDatetime`. It indicates that the file was written with
the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.
This makes Spark be able to do rebase more smartly:
1. If we don't know which Spark version writes the file, do rebase if the
"rebaseInRead" config is true.
2. If the file was written by Spark 2.4 and earlier, then do rebase.
3. If the file was written by Spark 3.0 and later, do rebase if the
`org.apache.spark.legacyDatetime` exists in file metadata.
### Why are the changes needed?
It's very easy to have mixed-calendar parquet/avro files: e.g. A user
upgrades to Spark 3.0 and writes some parquet files to an existing directory.
Then he realizes that the directory contains legacy datetime values before
1582. However, it's too late and he has to find out all the legacy files
manually and read them separately.
To support mixed-calendar parquet/avro files, we need to decide to rebase
or not based on the file metadata.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Updated test
Closes #28137 from cloud-fan/datetime.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit a5ebbacf538cd78f3edc81542a8514c60d340109)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 12 ++-
.../org/apache/spark/sql/avro/AvroFileFormat.scala | 11 ++-
.../apache/spark/sql/avro/AvroOutputWriter.scala | 17 +++-
.../org/apache/spark/sql/avro/AvroSerializer.scala | 14 ++-
.../sql/v2/avro/AvroPartitionReaderFactory.scala | 10 +-
.../sql/avro/AvroCatalystDataConversionSuite.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSuite.scala | 103 ++++++++++++++++-----
.../parquet/VectorizedColumnReader.java | 6 +-
.../parquet/VectorizedParquetRecordReader.java | 18 +++-
.../execution/datasources/DataSourceUtils.scala | 17 ++++
.../datasources/parquet/ParquetFileFormat.scala | 13 ++-
.../datasources/parquet/ParquetReadSupport.scala | 15 +--
.../parquet/ParquetRecordMaterializer.scala | 10 +-
.../datasources/parquet/ParquetRowConverter.scala | 10 +-
.../datasources/parquet/ParquetWriteSupport.scala | 6 +-
.../v2/parquet/ParquetPartitionReaderFactory.scala | 26 ++++--
.../main/scala/org/apache/spark/sql/package.scala | 6 ++
.../benchmark/DataSourceReadBenchmark.scala | 6 +-
.../datasources/parquet/ParquetEncodingSuite.scala | 6 +-
.../datasources/parquet/ParquetIOSuite.scala | 89 ++++++++++++++----
.../datasources/parquet/ParquetTest.scala | 7 +-
21 files changed, 299 insertions(+), 105 deletions(-)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index ce8f976..27206ed 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -41,12 +41,14 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst
format.
*/
-class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
- private lazy val decimalConversions = new DecimalConversion()
+class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType,
rebaseDateTime: Boolean) {
+
+ def this(rootAvroType: Schema, rootCatalystType: DataType) {
+ this(rootAvroType, rootCatalystType,
+ SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
+ }
- // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
- private val rebaseDateTime =
- SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+ private lazy val decimalConversions = new DecimalConversion()
private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 123669b..e69c95b 100755
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -34,7 +34,8 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileFormat,
OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -123,8 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
reader.sync(file.start)
val stop = file.start + file.length
- val deserializer =
- new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema),
requiredSchema)
+ val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+ reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
+ SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+ }
+ val deserializer = new AvroDeserializer(
+ userProvidedSchema.getOrElse(reader.getSchema), requiredSchema,
rebaseDateTime)
new Iterator[InternalRow] {
private[this] var completed = false
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 2cfa3a4..82a5680 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -29,9 +29,10 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
// NOTE: This class is instantiated and used on executor side only, no need to
be serializable.
@@ -41,16 +42,24 @@ private[avro] class AvroOutputWriter(
schema: StructType,
avroSchema: Schema) extends OutputWriter {
+ // Whether to rebase datetimes from Gregorian to Julian calendar in write
+ private val rebaseDateTime: Boolean =
+ SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+
// The input rows will never be null.
- private lazy val serializer = new AvroSerializer(schema, avroSchema,
nullable = false)
+ private lazy val serializer =
+ new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
/**
* Overrides the couple of methods responsible for generating the output
streams / files so
* that the data can be correctly partitioned
*/
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable]
= {
- val sparkVersion = Map(SPARK_VERSION_METADATA_KEY ->
SPARK_VERSION_SHORT).asJava
- new SparkAvroKeyOutputFormat(sparkVersion) {
+ val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
+ if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
+ }
+
+ new SparkAvroKeyOutputFormat(fileMeta.asJava) {
override def getDefaultWorkFile(context: TaskAttemptContext, extension:
String): Path = {
new Path(path)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 350d7a3..dc23216 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -42,12 +42,16 @@ import org.apache.spark.sql.types._
/**
* A serializer to serialize data in catalyst format to data in avro format.
*/
-class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean)
- extends Logging {
+class AvroSerializer(
+ rootCatalystType: DataType,
+ rootAvroType: Schema,
+ nullable: Boolean,
+ rebaseDateTime: Boolean) extends Logging {
- // Whether to rebase datetimes from Gregorian to Julian calendar in write
- private val rebaseDateTime: Boolean =
- SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+ def this(rootCatalystType: DataType, rootAvroType: Schema, nullable:
Boolean) {
+ this(rootCatalystType, rootAvroType, nullable,
+ SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
+ }
def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 8230dba..712aec6 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader,
FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -88,8 +88,12 @@ case class AvroPartitionReaderFactory(
reader.sync(partitionedFile.start)
val stop = partitionedFile.start + partitionedFile.length
- val deserializer =
- new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema),
readDataSchema)
+ val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+ reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
+ SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+ }
+ val deserializer = new AvroDeserializer(
+ userProvidedSchema.getOrElse(reader.getSchema), readDataSchema,
rebaseDateTime)
val fileReader = new PartitionReader[InternalRow] {
private[this] var completed = false
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index c8a1f67..64d790b 100644
---
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
""".stripMargin
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
- val deserializer = new AvroDeserializer(avroSchema, dataType)
+ val deserializer = new AvroDeserializer(avroSchema, dataType,
rebaseDateTime = false)
def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
assert(checkResult(
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a5224fd1..3e754f0 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -84,9 +84,8 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}
- private def readResourceAvroFile(name: String): DataFrame = {
- val url = Thread.currentThread().getContextClassLoader.getResource(name)
- spark.read.format("avro").load(url.toString)
+ private def getResourceAvroFilePath(name: String): String = {
+ Thread.currentThread().getContextClassLoader.getResource(name).toString
}
test("resolve avro data source") {
@@ -1530,16 +1529,50 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
+ // test reading the existing 2.4 files and new 3.0 files (with rebase
on/off) together.
+ def checkReadMixedFiles(fileName: String, dt: String, dataStr: String):
Unit = {
+ withTempPaths(2) { paths =>
+ paths.foreach(_.delete())
+ val path2_4 = getResourceAvroFilePath(fileName)
+ val path3_0 = paths(0).getCanonicalPath
+ val path3_0_rebase = paths(1).getCanonicalPath
+ if (dt == "date") {
+ val df =
Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+ df.write.format("avro").save(path3_0)
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+ df.write.format("avro").save(path3_0_rebase)
+ }
+ checkAnswer(
+ spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+ } else {
+ val df =
Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+ val avroSchema =
+ s"""
+ |{
+ | "type" : "record",
+ | "name" : "test_schema",
+ | "fields" : [
+ | {"name": "ts", "type": {"type": "long", "logicalType":
"$dt"}}
+ | ]
+ |}""".stripMargin
+ df.write.format("avro").option("avroSchema",
avroSchema).save(path3_0)
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+ df.write.format("avro").option("avroSchema",
avroSchema).save(path3_0_rebase)
+ }
+ checkAnswer(
+ spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+ }
+ }
+ }
+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
+ checkReadMixedFiles(
+ "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01
01:02:03.123456")
+ checkReadMixedFiles(
+ "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01
01:02:03.124")
}
}
@@ -1554,10 +1587,18 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
.write.format("avro")
.save(path)
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
+
+ // The file metadata indicates if it needs rebase or not, so we can
always get the correct
+ // result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
rebase.toString) {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
+ }
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
+
+ // Force to not rebase to prove the written datetime values are rebased
and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
@@ -1589,12 +1630,20 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
.format("avro")
.save(path)
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true")
{
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(rebased)))
+
+ // The file metadata indicates if it needs rebase or not, so we can
always get the correct
+ // result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
rebase.toString) {
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(rebased)))
+ }
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
"false") {
+
+ // Force to not rebase to prove the written datetime values are
rebased and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
@@ -1612,10 +1661,18 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
.write.format("avro")
.save(path)
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
+
+ // The file metadata indicates if it needs rebase or not, so we can
always get the correct
+ // result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
rebase.toString) {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
+ }
}
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
+
+ // Force to not rebase to prove the written datetime values are rebased
and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 79edec3..7d35e96 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
@@ -109,7 +108,8 @@ public class VectorizedColumnReader {
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader,
- ZoneId convertTz) throws IOException {
+ ZoneId convertTz,
+ boolean rebaseDateTime) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
@@ -132,7 +132,7 @@ public class VectorizedColumnReader {
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
- this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeInReadEnabled();
+ this.rebaseDateTime = rebaseDateTime;
}
/**
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 7306709..c9590b9 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -86,7 +86,12 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
* The timezone that timestamp INT96 values should be converted to. Null if
no conversion. Here to
* workaround incompatibilities between different engines when writing
timestamp values.
*/
- private ZoneId convertTz = null;
+ private final ZoneId convertTz;
+
+ /**
+ * true if need to rebase date/timestamp from Julian to Proleptic Gregorian
calendar.
+ */
+ private final boolean rebaseDateTime;
/**
* columnBatch object that is used for batch decoding. This is created on
first use and triggers
@@ -116,12 +121,19 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
*/
private final MemoryMode MEMORY_MODE;
- public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap,
int capacity) {
+ public VectorizedParquetRecordReader(
+ ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int
capacity) {
this.convertTz = convertTz;
+ this.rebaseDateTime = rebaseDateTime;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
}
+ // For test only.
+ public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
+ this(null, false, useOffHeap, capacity);
+ }
+
/**
* Implementation of RecordReader API.
*/
@@ -309,7 +321,7 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
types.get(i).getOriginalType(),
- pages.getPageReader(columns.get(i)), convertTz);
+ pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
}
totalCountLoadedSoFar += pages.getRowCount();
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index bd56635..b19de6d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -21,8 +21,11 @@ import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
object DataSourceUtils {
@@ -64,4 +67,18 @@ object DataSourceUtils {
private[sql] def isDataFile(fileName: String) =
!(fileName.startsWith("_") || fileName.startsWith("."))
+
+ def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+ if (Utils.isTesting &&
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+ return Some(false)
+ }
+ // If there is no version, we return None and let the caller side to
decide.
+ Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+ // Files written by Spark 2.4 and earlier follow the legacy hybrid
calendar and we need to
+ // rebase the datetime values.
+ // Files written by Spark 3.0 and latter may also need the rebase if
they were written with
+ // the "rebaseInWrite" config enabled.
+ version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29dbd8d..c6d9ddf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -300,6 +300,11 @@ class ParquetFileFormat
None
}
+ val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+ footerFileMetaData.getKeyValueMetaData.get).getOrElse {
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
+ }
+
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP,
0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value,
attemptId)
@@ -312,7 +317,10 @@ class ParquetFileFormat
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
- convertTz.orNull, enableOffHeapColumnVector &&
taskContext.isDefined, capacity)
+ convertTz.orNull,
+ rebaseDateTime,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before
`initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ =>
iter.close()))
@@ -328,7 +336,8 @@ class ParquetFileFormat
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(convertTz,
enableVectorizedReader = false)
+ val readSupport = new ParquetReadSupport(
+ convertTz, enableVectorizedReader = false, rebaseDateTime)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index c05ecf1..28165e0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -50,16 +50,18 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested
schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
-class ParquetReadSupport(val convertTz: Option[ZoneId],
- enableVectorizedReader: Boolean)
+class ParquetReadSupport(
+ val convertTz: Option[ZoneId],
+ enableVectorizedReader: Boolean,
+ rebaseDateTime: Boolean)
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _
def this() {
// We need a zero-arg constructor for SpecificParquetRecordReaderBase.
But that is only
- // used in the vectorized reader, where we get the convertTz value
directly, and the value here
- // is ignored.
- this(None, enableVectorizedReader = true)
+ // used in the vectorized reader, where we get the
convertTz/rebaseDateTime value directly,
+ // and the values here are ignored.
+ this(None, enableVectorizedReader = true, rebaseDateTime = false)
}
/**
@@ -127,7 +129,8 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetToSparkSchemaConverter(conf),
- convertTz)
+ convertTz,
+ rebaseDateTime)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 5622169..ec03713 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -31,16 +31,20 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps
initializing row converters
+ * @param convertTz the optional time zone to convert to int96 data
+ * @param rebaseDateTime true if need to rebase date/timestamp from Julian to
Proleptic Gregorian
+ * calendar
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
- convertTz: Option[ZoneId])
+ convertTz: Option[ZoneId],
+ rebaseDateTime: Boolean)
extends RecordMaterializer[InternalRow] {
- private val rootConverter =
- new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema,
convertTz, NoopUpdater)
+ private val rootConverter = new ParquetRowConverter(
+ schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime,
NoopUpdater)
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index a5e769c..08fbca2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -120,7 +120,9 @@ private[parquet] class ParquetPrimitiveConverter(val
updater: ParentContainerUpd
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record
type. User-defined
* types should have been expanded.
- * @param convertTz the optional time zone to convert to for int96 data
+ * @param convertTz the optional time zone to convert to int96 data
+ * @param rebaseDateTime true if need to rebase date/timestamp from Julian to
Proleptic Gregorian
+ * calendar
* @param updater An updater which propagates converted field values to the
parent container
*/
private[parquet] class ParquetRowConverter(
@@ -128,12 +130,10 @@ private[parquet] class ParquetRowConverter(
parquetType: GroupType,
catalystType: StructType,
convertTz: Option[ZoneId],
+ rebaseDateTime: Boolean,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {
- // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
- private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeInReadEnabled
-
assert(
parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of
the Catalyst schema:
@@ -386,7 +386,7 @@ private[parquet] class ParquetRowConverter(
}
}
new ParquetRowConverter(
- schemaConverter, parquetType.asGroupType(), t, convertTz,
wrappedUpdater)
+ schemaConverter, parquetType.asGroupType(), t, convertTz,
rebaseDateTime, wrappedUpdater)
case t =>
throw new RuntimeException(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 409ea88..e367b9c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -103,7 +103,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
val metadata = Map(
SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
- ).asJava
+ ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None)
logInfo(
s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -112,7 +112,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
|$messageType
""".stripMargin)
- new WriteContext(messageType, metadata)
+ new WriteContext(messageType, metadata.asJava)
}
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 047bc74..1925fa1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
-import org.apache.spark.sql.execution.datasources.{PartitionedFile,
RecordReaderIterator}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
@@ -117,7 +117,7 @@ case class ParquetPartitionReaderFactory(
file: PartitionedFile,
buildReaderFunc: (
ParquetInputSplit, InternalRow, TaskAttemptContextImpl,
Option[FilterPredicate],
- Option[ZoneId]) => RecordReader[Void, T]): RecordReader[Void, T] = {
+ Option[ZoneId], Boolean) => RecordReader[Void, T]):
RecordReader[Void, T] = {
val conf = broadcastedConf.value.value
val filePath = new Path(new URI(file.filePath))
@@ -169,8 +169,12 @@ case class ParquetPartitionReaderFactory(
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration,
pushed.get)
}
- val reader =
- buildReaderFunc(split, file.partitionValues, hadoopAttemptContext,
pushed, convertTz)
+ val rebaseDatetime = DataSourceUtils.needRebaseDateTime(
+ footerFileMetaData.getKeyValueMetaData.get).getOrElse {
+ SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
+ }
+ val reader = buildReaderFunc(
+ split, file.partitionValues, hadoopAttemptContext, pushed, convertTz,
rebaseDatetime)
reader.initialize(split, hadoopAttemptContext)
reader
}
@@ -184,11 +188,13 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
- convertTz: Option[ZoneId]): RecordReader[Void, InternalRow] = {
+ convertTz: Option[ZoneId],
+ needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = {
logDebug(s"Falling back to parquet-mr")
val taskContext = Option(TaskContext.get())
// ParquetRecordReader returns InternalRow
- val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader
= false)
+ val readSupport = new ParquetReadSupport(
+ convertTz, enableVectorizedReader = false, needDateTimeRebase)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -213,10 +219,14 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
- convertTz: Option[ZoneId]): VectorizedParquetRecordReader = {
+ convertTz: Option[ZoneId],
+ rebaseDatetime: Boolean): VectorizedParquetRecordReader = {
val taskContext = Option(TaskContext.get())
val vectorizedReader = new VectorizedParquetRecordReader(
- convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined,
capacity)
+ convertTz.orNull,
+ rebaseDatetime,
+ enableOffHeapColumnVector && taskContext.isDefined,
+ capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 58de675..c039701 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -54,4 +54,10 @@ package object sql {
* Note that Hive table property `spark.sql.create.version` also has Spark
version.
*/
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
+
+ /**
+ * Parquet/Avro file metadata key to indicate that the file was written with
legacy datetime
+ * values.
+ */
+ private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index a084bec..d29c5e3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -169,7 +169,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new VectorizedParquetRecordReader(
- null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+ enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -203,7 +203,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new VectorizedParquetRecordReader(
- null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+ enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -458,7 +458,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new VectorizedParquetRecordReader(
- null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+ enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 6d681af..fbfedf0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -42,7 +42,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest
with SharedSparkSess
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -69,7 +69,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest
with SharedSparkSess
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -100,7 +100,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest
with SharedSparkSess
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
reader.initialize(file, null /* set columns to null to project all
columns */)
val column = reader.resultBatch().column(0)
assert(reader.nextBatch())
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f901ce1..239db7d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.sql.{Date, Timestamp}
+import java.time._
import java.util.Locale
import scala.collection.JavaConverters._
@@ -720,7 +721,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -739,7 +740,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
@@ -757,7 +758,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -776,7 +777,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, List[String]().asJava)
var result = 0
@@ -817,7 +818,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val schema = StructType(StructField("pcol", dt) :: Nil)
val conf = sqlContext.conf
val vectorizedReader = new VectorizedParquetRecordReader(
- null, conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
+ conf.offHeapColumnVectorEnabled,
conf.parquetVectorizedReaderBatchSize)
val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
@@ -882,19 +883,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
test("SPARK-31159: compatibility with Spark 2.4 in reading
dates/timestamps") {
+ // test reading the existing 2.4 files and new 3.0 files (with rebase
on/off) together.
+ def checkReadMixedFiles(fileName: String, dt: String, dataStr: String):
Unit = {
+ withTempPaths(2) { paths =>
+ paths.foreach(_.delete())
+ val path2_4 = getResourceParquetFilePath("test-data/" + fileName)
+ val path3_0 = paths(0).getCanonicalPath
+ val path3_0_rebase = paths(1).getCanonicalPath
+ if (dt == "date") {
+ val df =
Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+ df.write.parquet(path3_0)
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+ df.write.parquet(path3_0_rebase)
+ }
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+ } else {
+ val df =
Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
+ df.write.parquet(path3_0)
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key ->
"true") {
+ df.write.parquet(path3_0_rebase)
+ }
+ }
+ checkAnswer(
+ spark.read.format("parquet").load(path2_4, path3_0,
path3_0_rebase),
+ 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+ }
+ }
+ }
+
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
vectorized.toString) {
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"true") {
- checkAnswer(
-
readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(readResourceParquetFile(
- "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(readResourceParquetFile(
- "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
+ checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date",
"1001-01-01")
+ checkReadMixedFiles(
+ "before_1582_timestamp_micros_v2_4.snappy.parquet",
+ "TIMESTAMP_MICROS",
+ "1001-01-01 01:02:03.123456")
+ checkReadMixedFiles(
+ "before_1582_timestamp_millis_v2_4.snappy.parquet",
+ "TIMESTAMP_MILLIS",
+ "1001-01-01 01:02:03.123")
}
+
+ // INT96 is a legacy timestamp format and we always rebase the seconds
for it.
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
@@ -918,10 +952,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
.write
.parquet(path)
}
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"true") {
- checkAnswer(spark.read.parquet(path),
Row(Timestamp.valueOf(tsStr)))
+ // The file metadata indicates if it needs rebase or not, so we
can always get the
+ // correct result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
rebase.toString) {
+ checkAnswer(spark.read.parquet(path),
Row(Timestamp.valueOf(tsStr)))
+ }
}
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"false") {
+
+ // Force to not rebase to prove the written datetime values are
rebased and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.parquet(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
@@ -939,10 +980,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
.write
.parquet(path)
}
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"true") {
- checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+
+ // The file metadata indicates if it needs rebase or not, so we can
always get the correct
+ // result regardless of the "rebaseInRead" config.
+ Seq(true, false).foreach { rebase =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key ->
rebase.toString) {
+ checkAnswer(spark.read.parquet(path),
Row(Date.valueOf("1001-01-01")))
+ }
}
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key ->
"false") {
+
+ // Force to not rebase to prove the written datetime values are rebased
and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index f2dbc53..c833d5f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -156,7 +156,10 @@ private[sql] trait ParquetTest extends
FileBasedDataSourceTest {
}
protected def readResourceParquetFile(name: String): DataFrame = {
- val url = Thread.currentThread().getContextClassLoader.getResource(name)
- spark.read.parquet(url.toString)
+ spark.read.parquet(getResourceParquetFilePath(name))
+ }
+
+ protected def getResourceParquetFilePath(name: String): String = {
+ Thread.currentThread().getContextClassLoader.getResource(name).toString
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]