This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 50520fe [SPARK-38314][SQL] Fix of failing to read parquet files after writing the hidden file metadata in 50520fe is described below commit 50520fe3eb8237adefdb30938736536148110be1 Author: yaohua <yaohua.z...@databricks.com> AuthorDate: Mon Feb 28 19:47:17 2022 +0800 [SPARK-38314][SQL] Fix of failing to read parquet files after writing the hidden file metadata in ### What changes were proposed in this pull request? Selecting and then writing df containing hidden file metadata column `_metadata` into a file format like `parquet`, `delta` will still keep the internal `Attribute` metadata information. Then when reading those `parquet`, `delta` files again, it will actually break the code, because it wrongly thinks user data schema`_metadata` is a hidden file source metadata column. ``` // prepare a file source df df.select("*", "_metadata").write.format("parquet").save(path) spark.read.format("parquet").load(path).select("*").show() ``` This PR fixes this by cleaning up any remaining metadata information of output columns. ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new UT Closes #35650 from Yaohua628/spark-38314. Authored-by: yaohua <yaohua.z...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/expressions/namedExpressions.scala | 15 +++++++++ .../execution/datasources/FileFormatWriter.scala | 14 ++++++--- .../datasources/FileMetadataStructSuite.scala | 36 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 248584b..d5df6a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -475,4 +475,19 @@ object FileSourceMetadataAttribute { && attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) => Some(attr) case _ => None } + + /** + * Cleanup the internal metadata information of an attribute if it is + * a [[FileSourceMetadataAttribute]], it will remove both [[METADATA_COL_ATTR_KEY]] and + * [[FILE_SOURCE_METADATA_COL_ATTR_KEY]] from the attribute [[Metadata]] + */ + def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute = attr match { + case FileSourceMetadataAttribute(attr) => attr.withMetadata( + new MetadataBuilder().withMetadata(attr.metadata) + .remove(METADATA_COL_ATTR_KEY) + .remove(FILE_SOURCE_METADATA_COL_ATTR_KEY) + .build() + ) + case attr => attr + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 409e334..fe48ddc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -111,7 +111,11 @@ object FileFormatWriter extends Logging { FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) - val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) + // cleanup the internal metadata information of + // the file source metadata attribute if any before write out + val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns + .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)) + val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) var needConvert = false val projectList: Seq[NamedExpression] = plan.output.map { @@ -167,12 +171,12 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID.toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = outputSpec.outputColumns, + allColumns = finalOutputSpec.outputColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, bucketSpec = writerBucketSpec, - path = outputSpec.outputPath, - customPartitionLocations = outputSpec.customPartitionLocations, + path = finalOutputSpec.outputPath, + customPartitionLocations = finalOutputSpec.customPartitionLocations, maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) @@ -212,7 +216,7 @@ object FileFormatWriter extends Logging { // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. val orderingExpr = bindReferences( - requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + requiredOrdering.map(SortOrder(_, Ascending)), finalOutputSpec.outputColumns) val sortPlan = SortExec( orderingExpr, global = false, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 0d391e0..175b420 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -474,4 +474,40 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { Seq(Row("jack", 24, 12345L, f0(METADATA_FILE_SIZE))) ) } + + metadataColumnsTest("write _metadata in parquet and read back", schema) { (df, f0, f1) => + // SPARK-38314: Selecting and then writing df containing hidden file + // metadata column `_metadata` into parquet files will still keep the internal `Attribute` + // metadata information of the column. It will then fail when read again. + withTempDir { dir => + df.select("*", "_metadata") + .write.format("parquet").save(dir.getCanonicalPath + "/new-data") + + val newDF = spark.read.format("parquet").load(dir.getCanonicalPath + "/new-data") + + // SELECT * will have: name, age, info, _metadata of f0 and f1 + checkAnswer( + newDF.select("*"), + Seq( + Row("jack", 24, Row(12345L, "uom"), + Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), + Row("lily", 31, Row(54321L, "ucb"), + Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + ) + ) + + // SELECT _metadata won't override the existing user data (_metadata of f0 and f1) + checkAnswer( + newDF.select("_metadata"), + Seq( + Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), + Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + ) + ) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org