This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a8bb8b05d41e [SPARK-53625][SS] Propagate metadata columns through 
projections to address ApplyCharTypePadding incompatibility
a8bb8b05d41e is described below

commit a8bb8b05d41e1aae60b72905f5429e170c82e5ca
Author: Livia Zhu <livia....@databricks.com>
AuthorDate: Fri Sep 19 09:30:07 2025 +0900

    [SPARK-53625][SS] Propagate metadata columns through projections to address 
ApplyCharTypePadding incompatibility
    
    ### What changes were proposed in this pull request?
    
    Modify streaming MicrobatchExecution to propagate metadata columns through 
projections to resolve an incompatibility with the ApplyCharTypePadding rule 
which is applied by default in serverless which previous resulted in an 
`assertion failed: Invalid batch: ACTV_IND#130290,_metadata#130291 != 
ACTV_IND#130307` error.
    
    ### Why are the changes needed?
    
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52375 from liviazhu/liviazhu-db/col-metadata.
    
    Authored-by: Livia Zhu <livia....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/runtime/MicroBatchExecution.scala    | 10 +++
 .../datasources/FileMetadataStructSuite.scala      | 96 +++++++++++++++++++++-
 2 files changed, 105 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index 461936b40218..174421fcf835 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -801,6 +801,16 @@ class MicroBatchExecution(
             case _ => false
           }
           val finalDataPlan = dataPlan transformUp {
+            // SPARK-53625: Propagate metadata columns through Projects
+            case p: Project if hasFileMetadata =>
+              // Check if there is any metadata fields not in the output list
+              val newMetadata = 
p.metadataOutput.filterNot(p.outputSet.contains)
+              if (newMetadata.nonEmpty) {
+                // If so, add it to projection
+                p.copy(projectList = p.projectList ++ newMetadata)
+              } else {
+                p
+              }
             case l: LogicalRelation =>
               var newRelation = l
               if (hasFileMetadata) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 880f1dd9af8f..c38113f50558 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, 
StringType, StructField, StructType}
 
 class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
@@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       assert(selectSingleRowDf.count() === 1)
     }
   }
+
+  Seq("true", "false").foreach { sideCharPadding =>
+    test(s"SPARK-53625: file metadata in streaming with char type, " +
+      s"sideCharPadding=$sideCharPadding") {
+      withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
+        withTempDir { dir =>
+          import scala.jdk.CollectionConverters._
+
+          val metadata = new MetadataBuilder()
+            .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
+            .build()
+          val charSchemaStruct = new StructType()
+            .add(StructField("char_col", StringType, metadata = metadata))
+
+          val data = Seq(Row("A"), Row("B"))
+          val df = spark.createDataFrame(data.asJava, charSchemaStruct)
+          df.coalesce(1).write.format("json")
+            .save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+          val streamDf = spark.readStream.format("json")
+            .schema(charSchemaStruct)
+            .load(dir.getCanonicalPath + "/source/new-streaming-data")
+            .select("*", "_metadata")
+
+          val streamQuery0 = streamDf
+            .writeStream.format("json")
+            .option("checkpointLocation", dir.getCanonicalPath + 
"/target/checkpoint")
+            .trigger(Trigger.AvailableNow())
+            .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          streamQuery0.awaitTermination()
+          assert(streamQuery0.lastProgress.numInputRows == 2L)
+
+          val newDF = spark.read.format("json")
+            .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          val sourceFile = new File(dir, 
"/source/new-streaming-data").listFiles()
+            .filter(_.getName.endsWith(".json")).head
+          val sourceFileMetadata = Map(
+            METADATA_FILE_PATH -> sourceFile.toURI.toString,
+            METADATA_FILE_NAME -> sourceFile.getName,
+            METADATA_FILE_SIZE -> sourceFile.length(),
+            METADATA_FILE_BLOCK_START -> 0,
+            METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
+            METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(sourceFile.lastModified())
+          )
+
+          // SELECT * will have: char_col, _metadata of 
/source/new-streaming-data
+          assert(newDF.select("*").columns.toSet == Set("char_col", 
"_metadata"))
+          // Verify the data is expected
+          checkAnswer(
+            newDF.select(col("char_col"),
+              col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
+              col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START),
+              col(METADATA_FILE_BLOCK_LENGTH),
+              // since we are writing _metadata to a json file,
+              // we should explicitly cast the column to timestamp type
+              to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
+            Seq(
+              Row(
+                "A",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
+              Row(
+                "B",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
+            )
+          )
+
+          checkAnswer(
+            newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),
+            Seq(
+              Row(sourceFileMetadata(METADATA_FILE_SIZE)),
+              Row(sourceFileMetadata(METADATA_FILE_SIZE)))
+          )
+          checkAnswer(
+            newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH),
+            Seq(
+              Row(sourceFileMetadata(METADATA_FILE_PATH)),
+              Row(sourceFileMetadata(METADATA_FILE_PATH)))
+          )
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to