This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 674540f928 [GLUTEN-11182][VL] Refactor genSplitInfo for 
VeloxIteratorApi (#11183)
674540f928 is described below

commit 674540f92821f1ba75dc0eb59f8e63f694a801d1
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Nov 26 14:20:20 2025 +0800

    [GLUTEN-11182][VL] Refactor genSplitInfo for VeloxIteratorApi (#11183)
---
 .../backendsapi/velox/VeloxIteratorApi.scala       | 112 +++++++++------------
 1 file changed, 46 insertions(+), 66 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index fdb859bfee..80c33efe5e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -78,32 +78,37 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         throw new UnsupportedOperationException(
           s"Unsupported input partition: ${o.getClass.getName}")
     }
-    val partitionFiles = filePartitions.flatMap(_.files).toArray
+    val partitionFiles = filePartitions.flatMap(_.files)
     val locations = filePartitions.flatMap(p => 
SoftAffinity.getFilePartitionLocations(p))
-    val (
-      paths,
-      starts,
-      lengths,
-      fileSizes,
-      modificationTimes,
-      partitionColumns,
-      metadataColumns,
-      otherMetadataColumns) =
-      constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames)
+    val (paths, starts, lengths) = 
getPartitionedFileInfo(partitionFiles).unzip3
+    val (fileSizes, modificationTimes) = partitionFiles
+      .map(f => 
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(f))
+      .collect {
+        case (Some(size), Some(time)) =>
+          (JLong.valueOf(size), JLong.valueOf(time))
+      }
+      .unzip
+
+    val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
+    val metadataColumns = partitionFiles
+      .map(f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, 
metadataColumnNames))
+    val otherMetadataColumns = partitionFiles
+      .map(f => 
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))
+
     setFileSchemaForLocalFiles(
       LocalFilesBuilder.makeLocalFiles(
         partitionIndex,
-        paths,
-        starts,
-        lengths,
-        fileSizes,
-        modificationTimes,
-        partitionColumns,
-        metadataColumns,
+        paths.asJava,
+        starts.asJava,
+        lengths.asJava,
+        fileSizes.asJava,
+        modificationTimes.asJava,
+        partitionColumns.asJava,
+        metadataColumns.asJava,
         fileFormat,
         locations.toList.asJava,
         mapAsJavaMap(properties),
-        otherMetadataColumns
+        otherMetadataColumns.asJava
       ),
       dataSchema,
       fileFormat
@@ -128,69 +133,44 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     }
   }
 
-  private def constructSplitInfo(
+  private def getPartitionedFileInfo(
+      partitionedFiles: Seq[PartitionedFile]): Seq[(String, JLong, JLong)] = {
+    partitionedFiles.map {
+      partitionedFile =>
+        val path = unescapePathName(partitionedFile.filePath.toString)
+        (path, JLong.valueOf(partitionedFile.start), 
JLong.valueOf(partitionedFile.length))
+    }
+  }
+
+  private def getPartitionColumns(
       schema: StructType,
-      files: Array[PartitionedFile],
-      metadataColumnNames: Seq[String]) = {
-    val paths = new JArrayList[String]()
-    val starts = new JArrayList[JLong]
-    val lengths = new JArrayList[JLong]()
-    val fileSizes = new JArrayList[JLong]()
-    val modificationTimes = new JArrayList[JLong]()
-    val partitionColumns = new JArrayList[JMap[String, String]]
-    val metadataColumns = new JArrayList[JMap[String, String]]
-    val otherMetadataColumns = new JArrayList[JMap[String, Object]]
-    files.foreach {
-      file =>
-        paths.add(unescapePathName(file.filePath.toString))
-        starts.add(JLong.valueOf(file.start))
-        lengths.add(JLong.valueOf(file.length))
-        val (fileSize, modificationTime) =
-          SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
-        (fileSize, modificationTime) match {
-          case (Some(size), Some(time)) =>
-            fileSizes.add(JLong.valueOf(size))
-            modificationTimes.add(JLong.valueOf(time))
-          case _ => // Do nothing
-        }
-        val metadataColumn =
-          SparkShimLoader.getSparkShims.generateMetadataColumns(file, 
metadataColumnNames)
-        metadataColumns.add(metadataColumn)
+      partitionedFiles: Seq[PartitionedFile]): Seq[JMap[String, String]] = {
+    partitionedFiles.map {
+      partitionedFile =>
         val partitionColumn = new JHashMap[String, String]()
-        for (i <- 0 until file.partitionValues.numFields) {
-          val partitionColumnValue = if (file.partitionValues.isNullAt(i)) {
+        for (i <- 0 until partitionedFile.partitionValues.numFields) {
+          val partitionColumnValue = if 
(partitionedFile.partitionValues.isNullAt(i)) {
             ExternalCatalogUtils.DEFAULT_PARTITION_NAME
           } else {
-            val pn = file.partitionValues.get(i, schema.fields(i).dataType)
+            val pv = partitionedFile.partitionValues.get(i, 
schema.fields(i).dataType)
             schema.fields(i).dataType match {
               case _: BinaryType =>
-                new String(pn.asInstanceOf[Array[Byte]], 
StandardCharsets.UTF_8)
+                new String(pv.asInstanceOf[Array[Byte]], 
StandardCharsets.UTF_8)
               case _: DateType =>
-                DateFormatter.apply().format(pn.asInstanceOf[Integer])
+                DateFormatter.apply().format(pv.asInstanceOf[Integer])
               case _: DecimalType =>
-                pn.asInstanceOf[Decimal].toJavaBigInteger.toString
+                pv.asInstanceOf[Decimal].toJavaBigInteger.toString
               case _: TimestampType =>
                 TimestampFormatter
                   .getFractionFormatter(ZoneOffset.UTC)
-                  .format(pn.asInstanceOf[java.lang.Long])
-              case _ => pn.toString
+                  .format(pv.asInstanceOf[java.lang.Long])
+              case _ => pv.toString
             }
           }
           partitionColumn.put(schema.names(i), partitionColumnValue)
         }
-        partitionColumns.add(partitionColumn)
-        otherMetadataColumns.add(
-          
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file))
+        partitionColumn
     }
-    (
-      paths,
-      starts,
-      lengths,
-      fileSizes,
-      modificationTimes,
-      partitionColumns,
-      metadataColumns,
-      otherMetadataColumns)
   }
 
   override def injectWriteFilesTempPath(path: String, fileName: String): Unit 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to