This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 970adf6af3 [GLUTEN-11175] Refactor generateMetadataColumns for
SparkShims (#11176)
970adf6af3 is described below
commit 970adf6af3656d734ee4c6ddad38735c188785b8
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Nov 27 16:13:25 2025 +0800
[GLUTEN-11175] Refactor generateMetadataColumns for SparkShims (#11176)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 4 +++-
.../backendsapi/velox/VeloxIteratorApi.scala | 3 ++-
.../org/apache/gluten/sql/shims/SparkShims.scala | 10 +++++++--
.../gluten/sql/shims/spark32/Spark32Shims.scala | 12 -----------
.../gluten/sql/shims/spark33/Spark33Shims.scala | 21 +++++++++---------
.../gluten/sql/shims/spark34/Spark34Shims.scala | 24 ++++++++++-----------
.../gluten/sql/shims/spark35/Spark35Shims.scala | 25 +++++++++++-----------
.../gluten/sql/shims/spark40/Spark40Shims.scala | 25 +++++++++++-----------
8 files changed, 58 insertions(+), 66 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 5fff386d72..2cd9d85164 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -169,7 +169,9 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val metadataColumn =
- SparkShimLoader.getSparkShims.generateMetadataColumns(file,
metadataColumnNames)
+ SparkShimLoader.getSparkShims
+ .generateMetadataColumns(file, metadataColumnNames)
+ .asJava
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 80c33efe5e..8c496b7419 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -91,7 +91,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
val metadataColumns = partitionFiles
- .map(f => SparkShimLoader.getSparkShims.generateMetadataColumns(f,
metadataColumnNames))
+ .map(
+ f => SparkShimLoader.getSparkShims.generateMetadataColumns(f,
metadataColumnNames).asJava)
val otherMetadataColumns = partitionFiles
.map(f =>
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index bb7b55fdae..2ffb1fff60 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression,
Expression, RaiseError, UnBase64}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BinaryExpression,
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName,
RaiseError, UnBase64}
import
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -233,7 +233,13 @@ trait SparkShims {
def generateMetadataColumns(
file: PartitionedFile,
- metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
+ metadataColumnNames: Seq[String] = Seq.empty): Map[String, String] = {
+ Map(
+ InputFileName().prettyName -> file.filePath.toString,
+ InputFileBlockStart().prettyName -> file.start.toString,
+ InputFileBlockLength().prettyName -> file.length.toString
+ )
+ }
// For compatibility with Spark-3.5.
def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan]
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 007a962884..6c4cb7ff62 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -59,8 +59,6 @@ import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
-import java.util.{HashMap => JHashMap, Map => JMap}
-
class Spark32Shims extends SparkShims {
override def getDistribution(
@@ -238,16 +236,6 @@ class Spark32Shims extends SparkShims {
(None, None)
}
- override def generateMetadataColumns(
- file: PartitionedFile,
- metadataColumnNames: Seq[String]): JMap[String, String] = {
- val metadataColumn = new JHashMap[String, String]()
- metadataColumn.put(InputFileName().prettyName, file.filePath)
- metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
- metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
- metadataColumn
- }
-
def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] = {
ae.plan
}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 2c345c65aa..c18e7dae42 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -61,7 +61,8 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+
+import scala.collection.mutable
class Spark33Shims extends SparkShims {
override def getDistribution(
@@ -226,27 +227,25 @@ class Spark33Shims extends SparkShims {
override def generateMetadataColumns(
file: PartitionedFile,
- metadataColumnNames: Seq[String]): JMap[String, String] = {
- val metadataColumn = new JHashMap[String, String]()
+ metadataColumnNames: Seq[String]): Map[String, String] = {
+ val originMetadataColumn = super.generateMetadataColumns(file,
metadataColumnNames)
+ val metadataColumn: mutable.Map[String, String] =
mutable.Map(originMetadataColumn.toSeq: _*)
val path = new Path(file.filePath)
for (columnName <- metadataColumnNames) {
columnName match {
- case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
- case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH
-> path.toString)
+ case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME
-> path.getName)
case FileFormat.FILE_SIZE =>
- metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
case FileFormat.FILE_MODIFICATION_TIME =>
val fileModifyTime = TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(file.modificationTime * 1000L)
- metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ metadataColumn += (FileFormat.FILE_MODIFICATION_TIME ->
fileModifyTime)
case _ =>
}
}
- metadataColumn.put(InputFileName().prettyName, file.filePath)
- metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
- metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
- metadataColumn
+ metadataColumn.toMap
}
private def invalidBucketFile(path: String): Throwable = {
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 890ee8c592..ac15aff6e7 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -64,8 +64,8 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.mutable
import scala.reflect.ClassTag
class Spark34Shims extends SparkShims {
@@ -244,31 +244,29 @@ class Spark34Shims extends SparkShims {
override def generateMetadataColumns(
file: PartitionedFile,
- metadataColumnNames: Seq[String]): JMap[String, String] = {
- val metadataColumn = new JHashMap[String, String]()
+ metadataColumnNames: Seq[String]): Map[String, String] = {
+ val originMetadataColumn = super.generateMetadataColumns(file,
metadataColumnNames)
+ val metadataColumn: mutable.Map[String, String] =
mutable.Map(originMetadataColumn.toSeq: _*)
val path = new Path(file.filePath.toString)
for (columnName <- metadataColumnNames) {
columnName match {
- case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
- case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH
-> path.toString)
+ case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME
-> path.getName)
case FileFormat.FILE_SIZE =>
- metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
case FileFormat.FILE_MODIFICATION_TIME =>
val fileModifyTime = TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(file.modificationTime * 1000L)
- metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ metadataColumn += (FileFormat.FILE_MODIFICATION_TIME ->
fileModifyTime)
case FileFormat.FILE_BLOCK_START =>
- metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_START ->
file.start.toString)
case FileFormat.FILE_BLOCK_LENGTH =>
- metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH,
file.length.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_LENGTH ->
file.length.toString)
case _ =>
}
}
- metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
- metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
- metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
- metadataColumn
+ metadataColumn.toMap
}
// https://issues.apache.org/jira/browse/SPARK-40400
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index ee52e8b2b9..4d373b1cc3 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -66,9 +66,10 @@ import
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.ClassTag
class Spark35Shims extends SparkShims {
@@ -248,31 +249,29 @@ class Spark35Shims extends SparkShims {
override def generateMetadataColumns(
file: PartitionedFile,
- metadataColumnNames: Seq[String]): JMap[String, String] = {
- val metadataColumn = new JHashMap[String, String]()
+ metadataColumnNames: Seq[String]): Map[String, String] = {
+ val originMetadataColumn = super.generateMetadataColumns(file,
metadataColumnNames)
+ val metadataColumn: mutable.Map[String, String] =
mutable.Map(originMetadataColumn.toSeq: _*)
val path = new Path(file.filePath.toString)
for (columnName <- metadataColumnNames) {
columnName match {
- case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
- case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH
-> path.toString)
+ case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME
-> path.getName)
case FileFormat.FILE_SIZE =>
- metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
case FileFormat.FILE_MODIFICATION_TIME =>
val fileModifyTime = TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(file.modificationTime * 1000L)
- metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ metadataColumn += (FileFormat.FILE_MODIFICATION_TIME ->
fileModifyTime)
case FileFormat.FILE_BLOCK_START =>
- metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_START ->
file.start.toString)
case FileFormat.FILE_BLOCK_LENGTH =>
- metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH,
file.length.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_LENGTH ->
file.length.toString)
case _ =>
}
}
- metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
- metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
- metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
- metadataColumn
+ metadataColumn.toMap
}
// https://issues.apache.org/jira/browse/SPARK-40400
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 41478c589f..87fce3de0d 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -66,9 +66,10 @@ import
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.ClassTag
class Spark40Shims extends SparkShims {
@@ -249,31 +250,29 @@ class Spark40Shims extends SparkShims {
override def generateMetadataColumns(
file: PartitionedFile,
- metadataColumnNames: Seq[String]): JMap[String, String] = {
- val metadataColumn = new JHashMap[String, String]()
+ metadataColumnNames: Seq[String]): Map[String, String] = {
+ val originMetadataColumn = super.generateMetadataColumns(file,
metadataColumnNames)
+ val metadataColumn: mutable.Map[String, String] =
mutable.Map(originMetadataColumn.toSeq: _*)
val path = new Path(file.filePath.toString)
for (columnName <- metadataColumnNames) {
columnName match {
- case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
- case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_PATH => metadataColumn += (FileFormat.FILE_PATH
-> path.toString)
+ case FileFormat.FILE_NAME => metadataColumn += (FileFormat.FILE_NAME
-> path.getName)
case FileFormat.FILE_SIZE =>
- metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ metadataColumn += (FileFormat.FILE_SIZE -> file.fileSize.toString)
case FileFormat.FILE_MODIFICATION_TIME =>
val fileModifyTime = TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(file.modificationTime * 1000L)
- metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ metadataColumn += (FileFormat.FILE_MODIFICATION_TIME ->
fileModifyTime)
case FileFormat.FILE_BLOCK_START =>
- metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_START ->
file.start.toString)
case FileFormat.FILE_BLOCK_LENGTH =>
- metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH,
file.length.toString)
+ metadataColumn += (FileFormat.FILE_BLOCK_LENGTH ->
file.length.toString)
case _ =>
}
}
- metadataColumn.put(InputFileName().prettyName, file.filePath.toString)
- metadataColumn.put(InputFileBlockStart().prettyName, file.start.toString)
- metadataColumn.put(InputFileBlockLength().prettyName, file.length.toString)
- metadataColumn
+ metadataColumn.toMap
}
// https://issues.apache.org/jira/browse/SPARK-40400
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]