This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 76f40eef8b9 [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support 
in Parquet for Spark 3.3
76f40eef8b9 is described below

commit 76f40eef8b97e23f4a16e471366ae410a3e6cc20
Author: Ivan Sadikov <[email protected]>
AuthorDate: Wed Apr 13 17:06:03 2022 +0800

    [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for 
Spark 3.3
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up for https://github.com/apache/spark/pull/36094.
    I added `Utils.isTesting` whenever we perform schema conversion or row 
conversion for TimestampNTZType.
    
    I verified that the tests, e.g. ParquetIOSuite, fail with unsupported data 
type when running in non-testing mode:
    ```
    [info]   Cause: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 1 in stage 40.0 failed 1 times, most recent failure: Lost task 
1.0 in stage 40.0 (TID 66) (ip-10-110-16-208.us-west-2.compute.internal 
executor driver): org.apache.spark.sql.AnalysisException: Unsupported data type 
timestamp_ntz
    [info]  at 
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotConvertDataTypeToParquetTypeError(QueryCompilationErrors.scala:1304)
    [info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:707)
    [info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:479)
    [info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.$anonfun$convert$1(ParquetSchemaConverter.scala:471)
    ```
    
    ### Why are the changes needed?
    We have to disable TimestampNTZType as other parts of the codebase do not 
yet support this type.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the TimestampNTZ type is not released yet.
    
    ### How was this patch tested?
    
    I tested the changes manually by rerunning the test suites that verify 
TimestampNTZType in the non-testing mode.
    
    Closes #36137 from sadikovi/SPARK-38829-parquet-ntz-off.
    
    Authored-by: Ivan Sadikov <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../sql/execution/datasources/parquet/ParquetRowConverter.scala    | 5 ++++-
 .../sql/execution/datasources/parquet/ParquetSchemaConverter.scala | 7 +++++--
 .../sql/execution/datasources/parquet/ParquetWriteSupport.scala    | 4 +++-
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index a955dd6fc76..ffd90fd722b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
 
 /**
  * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
@@ -487,7 +488,9 @@ private[parquet] class ParquetRowConverter(
     parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
       
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 &&
       !parquetType.getLogicalTypeAnnotation
-        .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
+        .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC &&
+      // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+      Utils.isTesting
 
   /**
    * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 0e065f19a88..3419bf15f8e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * This converter class is used to convert Parquet [[MessageType]] to Spark 
SQL [[StructType]]
@@ -253,7 +254,8 @@ class ParquetToSparkSchemaConverter(
             if (timestamp.isAdjustedToUTC) {
               TimestampType
             } else {
-              TimestampNTZType
+              // SPARK-38829: Remove TimestampNTZ type support in Parquet for 
Spark 3.3
+              if (Utils.isTesting) TimestampNTZType else TimestampType
             }
           case _ => illegalType()
         }
@@ -547,7 +549,8 @@ class SparkToParquetSchemaConverter(
               .as(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS)).named(field.name)
         }
 
-      case TimestampNTZType =>
+      // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+      case TimestampNTZType if Utils.isTesting =>
         Types.primitive(INT64, repetition)
           .as(LogicalTypeAnnotation.timestampType(false, 
TimeUnit.MICROS)).named(field.name)
       case BinaryType =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 9d38c967cb0..e71863657dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -39,6 +39,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A Parquet [[WriteSupport]] implementation that writes Catalyst 
[[InternalRow]]s as Parquet
@@ -227,7 +228,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
               recordConsumer.addLong(millis)
         }
 
-      case TimestampNTZType =>
+      // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+      case TimestampNTZType if Utils.isTesting =>
         // For TimestampNTZType column, Spark always output as INT64 with 
Timestamp annotation in
         // MICROS time unit.
         (row: SpecializedGetters, ordinal: Int) => 
recordConsumer.addLong(row.getLong(ordinal))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to