This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 970adf6af3 [GLUTEN-11175] Refactor generateMetadataColumns for 
SparkShims (#11176)
970adf6af3 is described below

commit 970adf6af3656d734ee4c6ddad38735c188785b8
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Nov 27 16:13:25 2025 +0800

    [GLUTEN-11175] Refactor generateMetadataColumns for SparkShims (#11176)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  4 +++-
 .../backendsapi/velox/VeloxIteratorApi.scala       |  3 ++-
 .../org/apache/gluten/sql/shims/SparkShims.scala   | 10 +++++++--
 .../gluten/sql/shims/spark32/Spark32Shims.scala    | 12 -----------
 .../gluten/sql/shims/spark33/Spark33Shims.scala    | 21 +++++++++---------
 .../gluten/sql/shims/spark34/Spark34Shims.scala    | 24 ++++++++++-----------
 .../gluten/sql/shims/spark35/Spark35Shims.scala    | 25 +++++++++++-----------
 .../gluten/sql/shims/spark40/Spark40Shims.scala    | 25 +++++++++++-----------
 8 files changed, 58 insertions(+), 66 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 5fff386d72..2cd9d85164 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -169,7 +169,9 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
             starts.add(JLong.valueOf(file.start))
             lengths.add(JLong.valueOf(file.length))
             val metadataColumn =
-              SparkShimLoader.getSparkShims.generateMetadataColumns(file, 
metadataColumnNames)
+              SparkShimLoader.getSparkShims
+                .generateMetadataColumns(file, metadataColumnNames)
+                .asJava
             metadataColumns.add(metadataColumn)
             val partitionColumn = new JHashMap[String, String]()
             for (i <- 0 until file.partitionValues.numFields) {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 80c33efe5e..8c496b7419 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -91,7 +91,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
 
     val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
     val metadataColumns = partitionFiles
-      .map(f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, 
metadataColumnNames))
+      .map(
+        f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, 
metadataColumnNames).asJava)
     val otherMetadataColumns = partitionFiles
       .map(f => 
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))
 
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index bb7b55fdae..2ffb1fff60 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression, 
Expression, RaiseError, UnBase64}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression, 
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, 
RaiseError, UnBase64}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -233,7 +233,13 @@ trait SparkShims {
 
   def generateMetadataColumns(
       file: PartitionedFile,
-      metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
+      metadataColumnNames: Seq[String] = Seq.empty): Map[String, String] = {
+    Map(
+      InputFileName().prettyName -> file.filePath.toString,
+      InputFileBlockStart().prettyName -> file.start.toString,
+      InputFileBlockLength().prettyName -> file.length.toString
+    )
+  }
 
   // For compatibility with Spark-3.5.
   def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan]
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 007a962884..6c4cb7ff62 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -59,8 +59,6 @@ import org.apache.parquet.crypto.ParquetCryptoRuntimeException
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
-import java.util.{HashMap => JHashMap, Map => JMap}
-
 class Spark32Shims extends SparkShims {
 
   override def getDistribution(
@@ -238,16 +236,6 @@ class Spark32Shims extends SparkShims {
     (None, None)
   }
 
-  override def generateMetadataColumns(
-      file: PartitionedFile,
-      metadataColumnNames: Seq[String]): JMap[String, String] = {
-    val metadataColumn = new JHashMap[String, String]()
-    metadataColumn.put(InputFileName().prettyName, file.filePath)
-    metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
-    metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
-    metadataColumn
-  }
-
   def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] = {
     ae.plan
   }
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 2c345c65aa..c18e7dae42 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -61,7 +61,8 @@ import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+
+import scala.collection.mutable
 
 class Spark33Shims extends SparkShims {
   override def getDistribution(
@@ -226,27 +227,25 @@ class Spark33Shims extends SparkShims {
 
   override def generateMetadataColumns(
       file: PartitionedFile,
-      metadataColumnNames: Seq[String]): JMap[String, String] = {
-    val metadataColumn = new JHashMap[String, String]()
+      metadataColumnNames: Seq[String]): Map[String, String] = {
+    val originMetadataColumn = super.generateMetadataColumns(file, 
metadataColumnNames)
+    val metadataColumn: mutable.Map[String, String] = 
mutable.Map(originMetadataColumn.toSeq: _*)
     val path = new Path(file.filePath)
     for (columnName <- metadataColumnNames) {
       columnName match {
-        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
-        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH 
-> path.toString)
+        case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME 
-> path.getName)
         case FileFormat.FILE_SIZE =>
-          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+          metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
         case FileFormat.FILE_MODIFICATION_TIME =>
           val fileModifyTime = TimestampFormatter
             .getFractionFormatter(ZoneOffset.UTC)
             .format(file.modificationTime * 1000L)
-          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+          metadataColumn += (FileFormat.FILE_MODIFICATION_TIME -> 
fileModifyTime)
         case _ =>
       }
     }
-    metadataColumn.put(InputFileName().prettyName, file.filePath)
-    metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
-    metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
-    metadataColumn
+    metadataColumn.toMap
   }
 
   private def invalidBucketFile(path: String): Throwable = {
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 890ee8c592..ac15aff6e7 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -64,8 +64,8 @@ import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
 
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 class Spark34Shims extends SparkShims {
@@ -244,31 +244,29 @@ class Spark34Shims extends SparkShims {
 
   override def generateMetadataColumns(
       file: PartitionedFile,
-      metadataColumnNames: Seq[String]): JMap[String, String] = {
-    val metadataColumn = new JHashMap[String, String]()
+      metadataColumnNames: Seq[String]): Map[String, String] = {
+    val originMetadataColumn = super.generateMetadataColumns(file, 
metadataColumnNames)
+    val metadataColumn: mutable.Map[String, String] = 
mutable.Map(originMetadataColumn.toSeq: _*)
     val path = new Path(file.filePath.toString)
     for (columnName <- metadataColumnNames) {
       columnName match {
-        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
-        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH 
-> path.toString)
+        case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME 
-> path.getName)
         case FileFormat.FILE_SIZE =>
-          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+          metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
         case FileFormat.FILE_MODIFICATION_TIME =>
           val fileModifyTime = TimestampFormatter
             .getFractionFormatter(ZoneOffset.UTC)
             .format(file.modificationTime * 1000L)
-          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+          metadataColumn += (FileFormat.FILE_MODIFICATION_TIME -> 
fileModifyTime)
         case FileFormat.FILE_BLOCK_START =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_START -> 
file.start.toString)
         case FileFormat.FILE_BLOCK_LENGTH =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, 
file.length.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_LENGTH -> 
file.length.toString)
         case _ =>
       }
     }
-    metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
-    metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
-    metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
-    metadataColumn
+    metadataColumn.toMap
   }
 
   // https://issues.apache.org/jira/browse/SPARK-40400
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index ee52e8b2b9..4d373b1cc3 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -66,9 +66,10 @@ import 
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 class Spark35Shims extends SparkShims {
@@ -248,31 +249,29 @@ class Spark35Shims extends SparkShims {
 
   override def generateMetadataColumns(
       file: PartitionedFile,
-      metadataColumnNames: Seq[String]): JMap[String, String] = {
-    val metadataColumn = new JHashMap[String, String]()
+      metadataColumnNames: Seq[String]): Map[String, String] = {
+    val originMetadataColumn = super.generateMetadataColumns(file, 
metadataColumnNames)
+    val metadataColumn: mutable.Map[String, String] = 
mutable.Map(originMetadataColumn.toSeq: _*)
     val path = new Path(file.filePath.toString)
     for (columnName <- metadataColumnNames) {
       columnName match {
-        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
-        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH 
-> path.toString)
+        case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME 
-> path.getName)
         case FileFormat.FILE_SIZE =>
-          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+          metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
         case FileFormat.FILE_MODIFICATION_TIME =>
           val fileModifyTime = TimestampFormatter
             .getFractionFormatter(ZoneOffset.UTC)
             .format(file.modificationTime * 1000L)
-          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+          metadataColumn += (FileFormat.FILE_MODIFICATION_TIME -> 
fileModifyTime)
         case FileFormat.FILE_BLOCK_START =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_START -> 
file.start.toString)
         case FileFormat.FILE_BLOCK_LENGTH =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, 
file.length.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_LENGTH -> 
file.length.toString)
         case _ =>
       }
     }
-    metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
-    metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
-    metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
-    metadataColumn
+    metadataColumn.toMap
   }
 
   // https://issues.apache.org/jira/browse/SPARK-40400
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 41478c589f..87fce3de0d 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -66,9 +66,10 @@ import 
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 class Spark40Shims extends SparkShims {
@@ -249,31 +250,29 @@ class Spark40Shims extends SparkShims {
 
   override def generateMetadataColumns(
       file: PartitionedFile,
-      metadataColumnNames: Seq[String]): JMap[String, String] = {
-    val metadataColumn = new JHashMap[String, String]()
+      metadataColumnNames: Seq[String]): Map[String, String] = {
+    val originMetadataColumn = super.generateMetadataColumns(file, 
metadataColumnNames)
+    val metadataColumn: mutable.Map[String, String] = 
mutable.Map(originMetadataColumn.toSeq: _*)
     val path = new Path(file.filePath.toString)
     for (columnName <- metadataColumnNames) {
       columnName match {
-        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
-        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH 
-> path.toString)
+        case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME 
-> path.getName)
         case FileFormat.FILE_SIZE =>
-          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+          metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
         case FileFormat.FILE_MODIFICATION_TIME =>
           val fileModifyTime = TimestampFormatter
             .getFractionFormatter(ZoneOffset.UTC)
             .format(file.modificationTime * 1000L)
-          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+          metadataColumn += (FileFormat.FILE_MODIFICATION_TIME -> 
fileModifyTime)
         case FileFormat.FILE_BLOCK_START =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_START -> 
file.start.toString)
         case FileFormat.FILE_BLOCK_LENGTH =>
-          metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, 
file.length.toString)
+          metadataColumn += (FileFormat.FILE_BLOCK_LENGTH -> 
file.length.toString)
         case _ =>
       }
     }
-    metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
-    metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
-    metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
-    metadataColumn
+    metadataColumn.toMap
   }
 
   // https://issues.apache.org/jira/browse/SPARK-40400


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to