This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 6c55a6c0c68 [SPARK-45604][SQL] Add LogicalType checking on INT64 ->
DateTime conversion on Parquet Vectorized Reader
6c55a6c0c68 is described below
commit 6c55a6c0c680f80a6cdef7f1a83045b6400b4d09
Author: Zamil Majdy <[email protected]>
AuthorDate: Sun Oct 22 10:53:22 2023 +0500
[SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion
on Parquet Vectorized Reader
### What changes were proposed in this pull request?
Currently, the read logical type is not checked while converting physical
types INT64 into DateTime. One valid scenario where this can break is where the
physical type is `timestamp_ntz`, and the logical type is
`array<timestamp_ntz>`, since the logical type check does not happen, this
conversion is allowed. However, the vectorized reader does not support this and
will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode.
Segmentation fault on off-heap memory mode c [...]
### Why are the changes needed?
Prevent NPE or Segfault from happening.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
A new test is added in `ParquetSchemaSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43451 from majdyz/SPARK-45604.
Lead-authored-by: Zamil Majdy <[email protected]>
Co-authored-by: Zamil Majdy <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c)
Signed-off-by: Max Gekk <[email protected]>
---
.../parquet/ParquetVectorUpdaterFactory.java | 10 ++++++++--
.../datasources/parquet/ParquetSchemaSuite.scala | 21 +++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 15d58f0c757..42442cf8ea8 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory {
// For unsigned int64, it stores as plain signed int64 in Parquet
when dictionary
// fallbacks. We read them as decimal values.
return new UnsignedLongUpdater();
- } else if
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+ } else if (isTimestamp(sparkType) &&
+ isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
validateTimestampType(sparkType);
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongUpdater();
@@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
}
- } else if
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+ } else if (isTimestamp(sparkType) &&
+ isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
validateTimestampType(sparkType);
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongAsMicrosUpdater();
@@ -1150,6 +1152,10 @@ public class ParquetVectorUpdaterFactory {
return false;
}
+ private static boolean isTimestamp(DataType dt) {
+ return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
+ }
+
private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor,
DataType dt) {
DecimalType d = (DecimalType) dt;
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index facc9b90ff7..3f47c5e506f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1087,6 +1087,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}
+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>") {
+ import testImplicits._
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5)
+ val df1 = Seq((1, timestamp)).toDF()
+ val df2 = Seq((2, Array(timestamp))).toDF()
+ df1.write.mode("overwrite").parquet(s"$path/parquet")
+ df2.write.mode("append").parquet(s"$path/parquet")
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ val e = intercept[SparkException] {
+ spark.read.schema(df2.schema).parquet(s"$path/parquet").collect()
+ }
+ assert(e.getCause.isInstanceOf[SparkException])
+
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
+ }
+ }
+
test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with
nanosAsLong=true)") {
val tsAttribute = "birthday"
withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]