This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a8bb8b05d41e [SPARK-53625][SS] Propagate metadata columns through projections to address ApplyCharTypePadding incompatibility a8bb8b05d41e is described below commit a8bb8b05d41e1aae60b72905f5429e170c82e5ca Author: Livia Zhu <livia....@databricks.com> AuthorDate: Fri Sep 19 09:30:07 2025 +0900 [SPARK-53625][SS] Propagate metadata columns through projections to address ApplyCharTypePadding incompatibility ### What changes were proposed in this pull request? Modify streaming MicrobatchExecution to propagate metadata columns through projections to resolve an incompatibility with the ApplyCharTypePadding rule which is applied by default in serverless which previous resulted in an `assertion failed: Invalid batch: ACTV_IND#130290,_metadata#130291 != ACTV_IND#130307` error. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52375 from liviazhu/liviazhu-db/col-metadata. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/runtime/MicroBatchExecution.scala | 10 +++ .../datasources/FileMetadataStructSuite.scala | 96 +++++++++++++++++++++- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 461936b40218..174421fcf835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -801,6 +801,16 @@ class MicroBatchExecution( case _ => false } val finalDataPlan = dataPlan transformUp { + // SPARK-53625: Propagate metadata columns through Projects + case p: Project if hasFileMetadata => + // Check if there is any metadata fields not in the output list + val newMetadata = p.metadataOutput.filterNot(p.outputSet.contains) + if (newMetadata.nonEmpty) { + // If so, add it to projection + p.copy(projectList = p.projectList ++ newMetadata) + } else { + p + } case l: LogicalRelation => var newRelation = l if (hasFileMetadata) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 880f1dd9af8f..c38113f50558 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} class FileMetadataStructSuite extends QueryTest with SharedSparkSession { @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { assert(selectSingleRowDf.count() === 1) } } + + Seq("true", "false").foreach { sideCharPadding => + test(s"SPARK-53625: file metadata in streaming with char type, " + + s"sideCharPadding=$sideCharPadding") { + withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) { + withTempDir { dir => + import scala.jdk.CollectionConverters._ + + val metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)") + .build() + val charSchemaStruct = new StructType() + .add(StructField("char_col", StringType, metadata = metadata)) + + val data = Seq(Row("A"), Row("B")) + val df = spark.createDataFrame(data.asJava, charSchemaStruct) + df.coalesce(1).write.format("json") + .save(dir.getCanonicalPath + "/source/new-streaming-data") + + val streamDf = spark.readStream.format("json") + .schema(charSchemaStruct) + .load(dir.getCanonicalPath + "/source/new-streaming-data") + .select("*", "_metadata") + + val streamQuery0 = streamDf + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data") + + streamQuery0.awaitTermination() + assert(streamQuery0.lastProgress.numInputRows == 2L) + + val newDF = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data") + + val sourceFile = new File(dir, "/source/new-streaming-data").listFiles() + .filter(_.getName.endsWith(".json")).head + val sourceFileMetadata = Map( + METADATA_FILE_PATH -> sourceFile.toURI.toString, + METADATA_FILE_NAME -> sourceFile.getName, + METADATA_FILE_SIZE -> sourceFile.length(), + METADATA_FILE_BLOCK_START -> 0, + METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified()) + ) + + // SELECT * will have: char_col, _metadata of /source/new-streaming-data + assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata")) + // Verify the data is expected + checkAnswer( + newDF.select(col("char_col"), + col(METADATA_FILE_PATH), col(METADATA_FILE_NAME), + col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START), + col(METADATA_FILE_BLOCK_LENGTH), + // since we are writing _metadata to a json file, + // we should explicitly cast the column to timestamp type + to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))), + Seq( + Row( + "A", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)), + Row( + "B", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE), + Seq( + Row(sourceFileMetadata(METADATA_FILE_SIZE)), + Row(sourceFileMetadata(METADATA_FILE_SIZE))) + ) + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH), + Seq( + Row(sourceFileMetadata(METADATA_FILE_PATH)), + Row(sourceFileMetadata(METADATA_FILE_PATH))) + ) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org