This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 13babf369 [VL] Pass file size and modification time in split  (#6029)
13babf369 is described below

commit 13babf369d4a5f5dc9833bea7ec22bfa682f8ffe
Author: Ankita Victor <[email protected]>
AuthorDate: Tue Jun 11 07:19:57 2024 +0530

    [VL] Pass file size and modification time in split  (#6029)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  5 ++++-
 .../backendsapi/velox/VeloxIteratorApi.scala       | 23 ++++++++++++++++++++--
 cpp/velox/compute/VeloxPlanConverter.cc            |  3 +++
 cpp/velox/compute/WholeStageResultIterator.cc      |  8 ++++++--
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  5 +++++
 .../gluten/substrait/rel/LocalFilesBuilder.java    |  4 ++++
 .../gluten/substrait/rel/LocalFilesNode.java       | 18 +++++++++++++++++
 .../substrait/proto/substrait/algebra.proto        |  7 +++++++
 .../substrait/rel/IcebergLocalFilesNode.java       |  2 ++
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  3 +++
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |  5 +++++
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  5 +++++
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  5 +++++
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  5 +++++
 14 files changed, 93 insertions(+), 5 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 63f7eeb79..1221710bc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -131,10 +131,13 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
           paths,
           starts,
           lengths,
+          new JArrayList[JLong](),
+          new JArrayList[JLong](),
           partitionColumns,
           new JArrayList[JMap[String, String]](),
           fileFormat,
-          preferredLocations.toList.asJava)
+          preferredLocations.toList.asJava
+        )
       case _ =>
         throw new UnsupportedOperationException(s"Unsupported input partition: 
$partition.")
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 5f9b5afa9..b20eccafb 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -56,7 +56,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       metadataColumnNames: Seq[String]): SplitInfo = {
     partition match {
       case f: FilePartition =>
-        val (paths, starts, lengths, partitionColumns, metadataColumns) =
+        val (
+          paths,
+          starts,
+          lengths,
+          fileSizes,
+          modificationTimes,
+          partitionColumns,
+          metadataColumns) =
           constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
         val preferredLocations =
           SoftAffinity.getFilePartitionLocations(f)
@@ -65,6 +72,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
           paths,
           starts,
           lengths,
+          fileSizes,
+          modificationTimes,
           partitionColumns,
           metadataColumns,
           fileFormat,
@@ -100,6 +109,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]
     val lengths = new JArrayList[JLong]()
+    val fileSizes = new JArrayList[JLong]()
+    val modificationTimes = new JArrayList[JLong]()
     val partitionColumns = new JArrayList[JMap[String, String]]
     var metadataColumns = new JArrayList[JMap[String, String]]
     files.foreach {
@@ -111,6 +122,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
             .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
         starts.add(JLong.valueOf(file.start))
         lengths.add(JLong.valueOf(file.length))
+        val (fileSize, modificationTime) =
+          SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
+        (fileSize, modificationTime) match {
+          case (Some(size), Some(time)) =>
+            fileSizes.add(JLong.valueOf(size))
+            modificationTimes.add(JLong.valueOf(time))
+          case _ => // Do nothing
+        }
         val metadataColumn =
           SparkShimLoader.getSparkShims.generateMetadataColumns(file, 
metadataColumnNames)
         metadataColumns.add(metadataColumn)
@@ -138,7 +157,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         }
         partitionColumns.add(partitionColumn)
     }
-    (paths, starts, lengths, partitionColumns, metadataColumns)
+    (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, 
metadataColumns)
   }
 
   override def injectWriteFilesTempPath(path: String): Unit = {
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index ed42cb15a..bcd03b110 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -60,6 +60,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
   splitInfo->starts.reserve(fileList.size());
   splitInfo->lengths.reserve(fileList.size());
   splitInfo->partitionColumns.reserve(fileList.size());
+  splitInfo->properties.reserve(fileList.size());
   splitInfo->metadataColumns.reserve(fileList.size());
   for (const auto& file : fileList) {
     // Expect all Partitions share the same index.
@@ -80,6 +81,8 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
     splitInfo->paths.emplace_back(file.uri_file());
     splitInfo->starts.emplace_back(file.start());
     splitInfo->lengths.emplace_back(file.length());
+    facebook::velox::FileProperties fileProps = {file.properties().filesize(), 
file.properties().modificationtime()};
+    splitInfo->properties.emplace_back(fileProps);
     switch (file.file_format_case()) {
       case SubstraitFileFormatCase::kOrc:
         splitInfo->format = dwio::common::FileFormat::ORC;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index f719c119c..867d347cd 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -109,6 +109,7 @@ WholeStageResultIterator::WholeStageResultIterator(
     const auto& paths = scanInfo->paths;
     const auto& starts = scanInfo->starts;
     const auto& lengths = scanInfo->lengths;
+    const auto& properties = scanInfo->properties;
     const auto& format = scanInfo->format;
     const auto& partitionColumns = scanInfo->partitionColumns;
     const auto& metadataColumns = scanInfo->metadataColumns;
@@ -135,7 +136,9 @@ WholeStageResultIterator::WholeStageResultIterator(
             std::nullopt,
             customSplitInfo,
             nullptr,
-            deleteFiles);
+            deleteFiles,
+            std::unordered_map<std::string, std::string>(),
+            properties[idx]);
       } else {
         split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
             kHiveConnectorId,
@@ -149,7 +152,8 @@ WholeStageResultIterator::WholeStageResultIterator(
             nullptr,
             std::unordered_map<std::string, std::string>(),
             0,
-            metadataColumn);
+            metadataColumn,
+            properties[idx]);
       }
       connectorSplits.emplace_back(split);
     }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1bda6435e..567ebb215 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -19,6 +19,7 @@
 
 #include "SubstraitToVeloxExpr.h"
 #include "TypeUtils.h"
+#include "velox/connectors/hive/FileProperties.h"
 #include "velox/connectors/hive/TableHandle.h"
 #include "velox/core/PlanNode.h"
 #include "velox/dwio/common/Options.h"
@@ -51,6 +52,9 @@ struct SplitInfo {
   /// The file format of the files to be scanned.
   dwio::common::FileFormat format;
 
+  /// The file sizes and modification times of the files to be scanned.
+  std::vector<std::optional<facebook::velox::FileProperties>> properties;
+
   /// Make SplitInfo polymorphic
   virtual ~SplitInfo() = default;
 };
@@ -111,6 +115,7 @@ class SubstraitToVeloxPlanConverter {
   /// Index: the index of the partition this item belongs to.
   /// Starts: the start positions in byte to read from the items.
   /// Lengths: the lengths in byte to read from the items.
+  /// FileProperties: the file sizes and modification times of the files to be 
scanned.
   core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
 
   core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& 
sRead, int32_t streamIdx);
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
index 94acc8336..7e085f81f 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
@@ -27,6 +27,8 @@ public class LocalFilesBuilder {
       List<String> paths,
       List<Long> starts,
       List<Long> lengths,
+      List<Long> fileSizes,
+      List<Long> modificationTimes,
       List<Map<String, String>> partitionColumns,
       List<Map<String, String>> metadataColumns,
       LocalFilesNode.ReadFileFormat fileFormat,
@@ -36,6 +38,8 @@ public class LocalFilesBuilder {
         paths,
         starts,
         lengths,
+        fileSizes,
+        modificationTimes,
         partitionColumns,
         metadataColumns,
         fileFormat,
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index cbcda72dd..fa9f3d516 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -34,6 +34,8 @@ public class LocalFilesNode implements SplitInfo {
   private final List<String> paths = new ArrayList<>();
   private final List<Long> starts = new ArrayList<>();
   private final List<Long> lengths = new ArrayList<>();
+  private final List<Long> fileSizes = new ArrayList<>();
+  private final List<Long> modificationTimes = new ArrayList<>();
   private final List<Map<String, String>> partitionColumns = new ArrayList<>();
   private final List<Map<String, String>> metadataColumns = new ArrayList<>();
   private final List<String> preferredLocations = new ArrayList<>();
@@ -60,6 +62,8 @@ public class LocalFilesNode implements SplitInfo {
       List<String> paths,
       List<Long> starts,
       List<Long> lengths,
+      List<Long> fileSizes,
+      List<Long> modificationTimes,
       List<Map<String, String>> partitionColumns,
       List<Map<String, String>> metadataColumns,
       ReadFileFormat fileFormat,
@@ -68,6 +72,8 @@ public class LocalFilesNode implements SplitInfo {
     this.paths.addAll(paths);
     this.starts.addAll(starts);
     this.lengths.addAll(lengths);
+    this.fileSizes.addAll(fileSizes);
+    this.modificationTimes.addAll(modificationTimes);
     this.fileFormat = fileFormat;
     this.partitionColumns.addAll(partitionColumns);
     this.metadataColumns.addAll(metadataColumns);
@@ -153,6 +159,18 @@ public class LocalFilesNode implements SplitInfo {
       }
       fileBuilder.setLength(lengths.get(i));
       fileBuilder.setStart(starts.get(i));
+
+      if (!fileSizes.isEmpty()
+          && !modificationTimes.isEmpty()
+          && fileSizes.size() == modificationTimes.size()
+          && fileSizes.size() == paths.size()) {
+        ReadRel.LocalFiles.FileOrFiles.fileProperties.Builder filePropsBuilder 
=
+            ReadRel.LocalFiles.FileOrFiles.fileProperties.newBuilder();
+        filePropsBuilder.setFileSize(fileSizes.get(i));
+        filePropsBuilder.setModificationTime(modificationTimes.get(i));
+        fileBuilder.setProperties(filePropsBuilder.build());
+      }
+
       if (!metadataColumns.isEmpty()) {
         Map<String, String> metadataColumn = metadataColumns.get(i);
         if (!metadataColumn.isEmpty()) {
diff --git 
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index 266aba4b0..877493439 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -198,6 +198,13 @@ message ReadRel {
         string value = 2;
       }
       repeated metadataColumn metadata_columns = 19;
+
+      // File properties contained in split
+      message fileProperties {
+        int64 fileSize = 1;
+        int64 modificationTime = 2;
+      }
+      fileProperties properties = 20;
     }
   }
 }
diff --git 
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
 
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
index 7d065f105..ba6b0ac4a 100644
--- 
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
+++ 
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
@@ -42,6 +42,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
         paths,
         starts,
         lengths,
+        new ArrayList<>(),
+        new ArrayList<>(),
         partitionColumns,
         new ArrayList<>(),
         fileFormat,
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index d6acc8c27..8bbc6d3d1 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -207,6 +207,9 @@ trait SparkShims {
 
   def attributesFromStruct(structType: StructType): Seq[Attribute]
 
+  // Spark 3.3 and later only have file size and modification time in 
PartitionedFile
+  def getFileSizeAndModificationTime(file: PartitionedFile): (Option[Long], 
Option[Long])
+
   def generateMetadataColumns(
       file: PartitionedFile,
       metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 29fddc697..f24aef66a 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -217,6 +217,11 @@ class Spark32Shims extends SparkShims {
     }
   }
 
+  override def getFileSizeAndModificationTime(
+      file: PartitionedFile): (Option[Long], Option[Long]) = {
+    (None, None)
+  }
+
   override def generateMetadataColumns(
       file: PartitionedFile,
       metadataColumnNames: Seq[String]): JMap[String, String] =
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 7c6ce644d..68fc4ad0d 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -202,6 +202,11 @@ class Spark33Shims extends SparkShims {
     case other => other
   }
 
+  override def getFileSizeAndModificationTime(
+      file: PartitionedFile): (Option[Long], Option[Long]) = {
+    (Some(file.fileSize), Some(file.modificationTime))
+  }
+
   override def generateMetadataColumns(
       file: PartitionedFile,
       metadataColumnNames: Seq[String]): JMap[String, String] = {
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index f2c248294..7d9fc389b 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -208,6 +208,11 @@ class Spark34Shims extends SparkShims {
     case other => other
   }
 
+  override def getFileSizeAndModificationTime(
+      file: PartitionedFile): (Option[Long], Option[Long]) = {
+    (Some(file.fileSize), Some(file.modificationTime))
+  }
+
   override def generateMetadataColumns(
       file: PartitionedFile,
       metadataColumnNames: Seq[String]): JMap[String, String] = {
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index e0835c306..54cea6993 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -206,6 +206,11 @@ class Spark35Shims extends SparkShims {
     case other => other
   }
 
+  override def getFileSizeAndModificationTime(
+      file: PartitionedFile): (Option[Long], Option[Long]) = {
+    (Some(file.fileSize), Some(file.modificationTime))
+  }
+
   override def generateMetadataColumns(
       file: PartitionedFile,
       metadataColumnNames: Seq[String]): JMap[String, String] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to