This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 674540f928 [GLUTEN-11182][VL] Refactor genSplitInfo for
VeloxIteratorApi (#11183)
674540f928 is described below
commit 674540f92821f1ba75dc0eb59f8e63f694a801d1
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Nov 26 14:20:20 2025 +0800
[GLUTEN-11182][VL] Refactor genSplitInfo for VeloxIteratorApi (#11183)
---
.../backendsapi/velox/VeloxIteratorApi.scala | 112 +++++++++------------
1 file changed, 46 insertions(+), 66 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index fdb859bfee..80c33efe5e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -78,32 +78,37 @@ class VeloxIteratorApi extends IteratorApi with Logging {
throw new UnsupportedOperationException(
s"Unsupported input partition: ${o.getClass.getName}")
}
- val partitionFiles = filePartitions.flatMap(_.files).toArray
+ val partitionFiles = filePartitions.flatMap(_.files)
val locations = filePartitions.flatMap(p =>
SoftAffinity.getFilePartitionLocations(p))
- val (
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- otherMetadataColumns) =
- constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames)
+ val (paths, starts, lengths) =
getPartitionedFileInfo(partitionFiles).unzip3
+ val (fileSizes, modificationTimes) = partitionFiles
+ .map(f =>
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(f))
+ .collect {
+ case (Some(size), Some(time)) =>
+ (JLong.valueOf(size), JLong.valueOf(time))
+ }
+ .unzip
+
+ val partitionColumns = getPartitionColumns(partitionSchema, partitionFiles)
+ val metadataColumns = partitionFiles
+ .map(f => SparkShimLoader.getSparkShims.generateMetadataColumns(f,
metadataColumnNames))
+ val otherMetadataColumns = partitionFiles
+ .map(f =>
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f))
+
setFileSchemaForLocalFiles(
LocalFilesBuilder.makeLocalFiles(
partitionIndex,
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
+ paths.asJava,
+ starts.asJava,
+ lengths.asJava,
+ fileSizes.asJava,
+ modificationTimes.asJava,
+ partitionColumns.asJava,
+ metadataColumns.asJava,
fileFormat,
locations.toList.asJava,
mapAsJavaMap(properties),
- otherMetadataColumns
+ otherMetadataColumns.asJava
),
dataSchema,
fileFormat
@@ -128,69 +133,44 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
}
- private def constructSplitInfo(
+ private def getPartitionedFileInfo(
+ partitionedFiles: Seq[PartitionedFile]): Seq[(String, JLong, JLong)] = {
+ partitionedFiles.map {
+ partitionedFile =>
+ val path = unescapePathName(partitionedFile.filePath.toString)
+ (path, JLong.valueOf(partitionedFile.start),
JLong.valueOf(partitionedFile.length))
+ }
+ }
+
+ private def getPartitionColumns(
schema: StructType,
- files: Array[PartitionedFile],
- metadataColumnNames: Seq[String]) = {
- val paths = new JArrayList[String]()
- val starts = new JArrayList[JLong]
- val lengths = new JArrayList[JLong]()
- val fileSizes = new JArrayList[JLong]()
- val modificationTimes = new JArrayList[JLong]()
- val partitionColumns = new JArrayList[JMap[String, String]]
- val metadataColumns = new JArrayList[JMap[String, String]]
- val otherMetadataColumns = new JArrayList[JMap[String, Object]]
- files.foreach {
- file =>
- paths.add(unescapePathName(file.filePath.toString))
- starts.add(JLong.valueOf(file.start))
- lengths.add(JLong.valueOf(file.length))
- val (fileSize, modificationTime) =
- SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
- (fileSize, modificationTime) match {
- case (Some(size), Some(time)) =>
- fileSizes.add(JLong.valueOf(size))
- modificationTimes.add(JLong.valueOf(time))
- case _ => // Do nothing
- }
- val metadataColumn =
- SparkShimLoader.getSparkShims.generateMetadataColumns(file,
metadataColumnNames)
- metadataColumns.add(metadataColumn)
+ partitionedFiles: Seq[PartitionedFile]): Seq[JMap[String, String]] = {
+ partitionedFiles.map {
+ partitionedFile =>
val partitionColumn = new JHashMap[String, String]()
- for (i <- 0 until file.partitionValues.numFields) {
- val partitionColumnValue = if (file.partitionValues.isNullAt(i)) {
+ for (i <- 0 until partitionedFile.partitionValues.numFields) {
+ val partitionColumnValue = if
(partitionedFile.partitionValues.isNullAt(i)) {
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
} else {
- val pn = file.partitionValues.get(i, schema.fields(i).dataType)
+ val pv = partitionedFile.partitionValues.get(i,
schema.fields(i).dataType)
schema.fields(i).dataType match {
case _: BinaryType =>
- new String(pn.asInstanceOf[Array[Byte]],
StandardCharsets.UTF_8)
+ new String(pv.asInstanceOf[Array[Byte]],
StandardCharsets.UTF_8)
case _: DateType =>
- DateFormatter.apply().format(pn.asInstanceOf[Integer])
+ DateFormatter.apply().format(pv.asInstanceOf[Integer])
case _: DecimalType =>
- pn.asInstanceOf[Decimal].toJavaBigInteger.toString
+ pv.asInstanceOf[Decimal].toJavaBigInteger.toString
case _: TimestampType =>
TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
- .format(pn.asInstanceOf[java.lang.Long])
- case _ => pn.toString
+ .format(pv.asInstanceOf[java.lang.Long])
+ case _ => pv.toString
}
}
partitionColumn.put(schema.names(i), partitionColumnValue)
}
- partitionColumns.add(partitionColumn)
- otherMetadataColumns.add(
-
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file))
+ partitionColumn
}
- (
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- otherMetadataColumns)
}
override def injectWriteFilesTempPath(path: String, fileName: String): Unit
= {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]