This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new e8a4fb8fad1 [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file 
_metadata fields nullable value consistent
e8a4fb8fad1 is described below

commit e8a4fb8fad1e203720ee54a5250425b4d71a0bd7
Author: yaohua <[email protected]>
AuthorDate: Mon Dec 5 16:23:07 2022 -0800

    [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file _metadata fields 
nullable value consistent
    
    ### What changes were proposed in this pull request?
    Cherry-pick https://github.com/apache/spark/pull/38777. Resolved conflicts 
in 
https://github.com/apache/spark/commit/ac2d027a768f50e279a1785ebf4dae1a37b7d3f4
    
    ### Why are the changes needed?
    N/A
    
    ### Does this PR introduce _any_ user-facing change?
    N/A
    
    ### How was this patch tested?
    N/A
    
    Closes #38910 from Yaohua628/spark-41151-follow-up-3-3.
    
    Authored-by: yaohua <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/execution/datasources/FileFormat.scala     | 13 ++++++++-----
 .../execution/datasources/FileMetadataStructSuite.scala  | 16 ++++++++++++----
 2 files changed, 20 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 0263de8525f..941d2cffe21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -197,12 +197,15 @@ object FileFormat {
    */
   val OPTION_RETURNING_BATCH = "returning_batch"
 
-  // supported metadata struct fields for hadoop fs relation
+  /**
+   * Schema of metadata struct that can be produced by every file format,
+   * metadata fields for every file format must be *not* nullable.
+   * */
   val METADATA_STRUCT: StructType = new StructType()
-    .add(StructField(FILE_PATH, StringType))
-    .add(StructField(FILE_NAME, StringType))
-    .add(StructField(FILE_SIZE, LongType))
-    .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+    .add(StructField(FILE_PATH, StringType, nullable = false))
+    .add(StructField(FILE_NAME, StringType, nullable = false))
+    .add(StructField(FILE_SIZE, LongType, nullable = false))
+    .add(StructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   // create a file metadata struct col
   def createFileMetadataCol: AttributeReference =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 37016b58431..b53ac3a838f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -267,8 +267,8 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
     val expectedSchema = new StructType()
       .add(StructField("myName", StringType))
       .add(StructField("myAge", IntegerType))
-      .add(StructField("myFileName", StringType))
-      .add(StructField("myFileSize", LongType))
+      .add(StructField("myFileName", StringType, nullable = false))
+      .add(StructField("myFileSize", LongType, nullable = false))
 
     assert(aliasDF.schema.fields.toSet == expectedSchema.fields.toSet)
 
@@ -654,13 +654,21 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
     val queryExecution = df.select("_metadata").queryExecution
     val analyzedSchema = queryExecution.analyzed.schema
     val executedSchema = queryExecution.executedPlan.schema
-    assert(analyzedSchema.fields.head.name == "_metadata")
-    assert(executedSchema.fields.head.name == "_metadata")
     // For stateful streaming, we store the schema in the state store
     // and check consistency across batches.
     // To avoid state schema compatibility mismatched,
     // we should keep nullability consistent for _metadata struct
+    assert(analyzedSchema.fields.head.name == "_metadata")
+    assert(executedSchema.fields.head.name == "_metadata")
+
+    // Metadata struct is not nullable
     assert(!analyzedSchema.fields.head.nullable)
     assert(analyzedSchema.fields.head.nullable == 
executedSchema.fields.head.nullable)
+
+    // All sub-fields all not nullable
+    val analyzedStruct = 
analyzedSchema.fields.head.dataType.asInstanceOf[StructType]
+    val executedStruct = 
executedSchema.fields.head.dataType.asInstanceOf[StructType]
+    assert(analyzedStruct.fields.forall(!_.nullable))
+    assert(executedStruct.fields.forall(!_.nullable))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to