This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 76f40eef8b9 [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support
in Parquet for Spark 3.3
76f40eef8b9 is described below
commit 76f40eef8b97e23f4a16e471366ae410a3e6cc20
Author: Ivan Sadikov <[email protected]>
AuthorDate: Wed Apr 13 17:06:03 2022 +0800
[SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for
Spark 3.3
### What changes were proposed in this pull request?
This is a follow-up for https://github.com/apache/spark/pull/36094.
I added `Utils.isTesting` whenever we perform schema conversion or row
conversion for TimestampNTZType.
I verified that the tests, e.g. ParquetIOSuite, fail with unsupported data
type when running in non-testing mode:
```
[info] Cause: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1 in stage 40.0 failed 1 times, most recent failure: Lost task
1.0 in stage 40.0 (TID 66) (ip-10-110-16-208.us-west-2.compute.internal
executor driver): org.apache.spark.sql.AnalysisException: Unsupported data type
timestamp_ntz
[info] at
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotConvertDataTypeToParquetTypeError(QueryCompilationErrors.scala:1304)
[info] at
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:707)
[info] at
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:479)
[info] at
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.$anonfun$convert$1(ParquetSchemaConverter.scala:471)
```
### Why are the changes needed?
We have to disable TimestampNTZType as other parts of the codebase do not
yet support this type.
### Does this PR introduce _any_ user-facing change?
No, the TimestampNTZ type is not released yet.
### How was this patch tested?
I tested the changes manually by rerunning the test suites that verify
TimestampNTZType in the non-testing mode.
Closes #36137 from sadikovi/SPARK-38829-parquet-ntz-off.
Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../sql/execution/datasources/parquet/ParquetRowConverter.scala | 5 ++++-
.../sql/execution/datasources/parquet/ParquetSchemaConverter.scala | 7 +++++--
.../sql/execution/datasources/parquet/ParquetWriteSupport.scala | 4 +++-
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index a955dd6fc76..ffd90fd722b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set
converted values to some
@@ -487,7 +488,9 @@ private[parquet] class ParquetRowConverter(
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
&&
!parquetType.getLogicalTypeAnnotation
- .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
+ .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC &&
+ // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+ Utils.isTesting
/**
* Parquet converter for strings. A dictionary is used to minimize string
decoding cost.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 0e065f19a88..3419bf15f8e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* This converter class is used to convert Parquet [[MessageType]] to Spark
SQL [[StructType]]
@@ -253,7 +254,8 @@ class ParquetToSparkSchemaConverter(
if (timestamp.isAdjustedToUTC) {
TimestampType
} else {
- TimestampNTZType
+ // SPARK-38829: Remove TimestampNTZ type support in Parquet for
Spark 3.3
+ if (Utils.isTesting) TimestampNTZType else TimestampType
}
case _ => illegalType()
}
@@ -547,7 +549,8 @@ class SparkToParquetSchemaConverter(
.as(LogicalTypeAnnotation.timestampType(true,
TimeUnit.MILLIS)).named(field.name)
}
- case TimestampNTZType =>
+ // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+ case TimestampNTZType if Utils.isTesting =>
Types.primitive(INT64, repetition)
.as(LogicalTypeAnnotation.timestampType(false,
TimeUnit.MICROS)).named(field.name)
case BinaryType =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 9d38c967cb0..e71863657dd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -39,6 +39,7 @@ import
org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* A Parquet [[WriteSupport]] implementation that writes Catalyst
[[InternalRow]]s as Parquet
@@ -227,7 +228,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
recordConsumer.addLong(millis)
}
- case TimestampNTZType =>
+ // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+ case TimestampNTZType if Utils.isTesting =>
// For TimestampNTZType column, Spark always output as INT64 with
Timestamp annotation in
// MICROS time unit.
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]