This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new e8a4fb8fad1 [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file
_metadata fields nullable value consistent
e8a4fb8fad1 is described below
commit e8a4fb8fad1e203720ee54a5250425b4d71a0bd7
Author: yaohua <[email protected]>
AuthorDate: Mon Dec 5 16:23:07 2022 -0800
[SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file _metadata fields
nullable value consistent
### What changes were proposed in this pull request?
Cherry-pick https://github.com/apache/spark/pull/38777. Resolved conflicts
in
https://github.com/apache/spark/commit/ac2d027a768f50e279a1785ebf4dae1a37b7d3f4
### Why are the changes needed?
N/A
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
Closes #38910 from Yaohua628/spark-41151-follow-up-3-3.
Authored-by: yaohua <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/datasources/FileFormat.scala | 13 ++++++++-----
.../execution/datasources/FileMetadataStructSuite.scala | 16 ++++++++++++----
2 files changed, 20 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 0263de8525f..941d2cffe21 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -197,12 +197,15 @@ object FileFormat {
*/
val OPTION_RETURNING_BATCH = "returning_batch"
- // supported metadata struct fields for hadoop fs relation
+ /**
+ * Schema of metadata struct that can be produced by every file format,
+ * metadata fields for every file format must be *not* nullable.
+ * */
val METADATA_STRUCT: StructType = new StructType()
- .add(StructField(FILE_PATH, StringType))
- .add(StructField(FILE_NAME, StringType))
- .add(StructField(FILE_SIZE, LongType))
- .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+ .add(StructField(FILE_PATH, StringType, nullable = false))
+ .add(StructField(FILE_NAME, StringType, nullable = false))
+ .add(StructField(FILE_SIZE, LongType, nullable = false))
+ .add(StructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
// create a file metadata struct col
def createFileMetadataCol: AttributeReference =
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 37016b58431..b53ac3a838f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -267,8 +267,8 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
val expectedSchema = new StructType()
.add(StructField("myName", StringType))
.add(StructField("myAge", IntegerType))
- .add(StructField("myFileName", StringType))
- .add(StructField("myFileSize", LongType))
+ .add(StructField("myFileName", StringType, nullable = false))
+ .add(StructField("myFileSize", LongType, nullable = false))
assert(aliasDF.schema.fields.toSet == expectedSchema.fields.toSet)
@@ -654,13 +654,21 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
val queryExecution = df.select("_metadata").queryExecution
val analyzedSchema = queryExecution.analyzed.schema
val executedSchema = queryExecution.executedPlan.schema
- assert(analyzedSchema.fields.head.name == "_metadata")
- assert(executedSchema.fields.head.name == "_metadata")
// For stateful streaming, we store the schema in the state store
// and check consistency across batches.
// To avoid state schema compatibility mismatched,
// we should keep nullability consistent for _metadata struct
+ assert(analyzedSchema.fields.head.name == "_metadata")
+ assert(executedSchema.fields.head.name == "_metadata")
+
+ // Metadata struct is not nullable
assert(!analyzedSchema.fields.head.nullable)
assert(analyzedSchema.fields.head.nullable ==
executedSchema.fields.head.nullable)
+
+ // All sub-fields all not nullable
+ val analyzedStruct =
analyzedSchema.fields.head.dataType.asInstanceOf[StructType]
+ val executedStruct =
executedSchema.fields.head.dataType.asInstanceOf[StructType]
+ assert(analyzedStruct.fields.forall(!_.nullable))
+ assert(executedStruct.fields.forall(!_.nullable))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]