This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 13babf369 [VL] Pass file size and modification time in split (#6029)
13babf369 is described below
commit 13babf369d4a5f5dc9833bea7ec22bfa682f8ffe
Author: Ankita Victor <[email protected]>
AuthorDate: Tue Jun 11 07:19:57 2024 +0530
[VL] Pass file size and modification time in split (#6029)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 5 ++++-
.../backendsapi/velox/VeloxIteratorApi.scala | 23 ++++++++++++++++++++--
cpp/velox/compute/VeloxPlanConverter.cc | 3 +++
cpp/velox/compute/WholeStageResultIterator.cc | 8 ++++++--
cpp/velox/substrait/SubstraitToVeloxPlan.h | 5 +++++
.../gluten/substrait/rel/LocalFilesBuilder.java | 4 ++++
.../gluten/substrait/rel/LocalFilesNode.java | 18 +++++++++++++++++
.../substrait/proto/substrait/algebra.proto | 7 +++++++
.../substrait/rel/IcebergLocalFilesNode.java | 2 ++
.../org/apache/gluten/sql/shims/SparkShims.scala | 3 +++
.../gluten/sql/shims/spark32/Spark32Shims.scala | 5 +++++
.../gluten/sql/shims/spark33/Spark33Shims.scala | 5 +++++
.../gluten/sql/shims/spark34/Spark34Shims.scala | 5 +++++
.../gluten/sql/shims/spark35/Spark35Shims.scala | 5 +++++
14 files changed, 93 insertions(+), 5 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 63f7eeb79..1221710bc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -131,10 +131,13 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
paths,
starts,
lengths,
+ new JArrayList[JLong](),
+ new JArrayList[JLong](),
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
- preferredLocations.toList.asJava)
+ preferredLocations.toList.asJava
+ )
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition:
$partition.")
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 5f9b5afa9..b20eccafb 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -56,7 +56,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
metadataColumnNames: Seq[String]): SplitInfo = {
partition match {
case f: FilePartition =>
- val (paths, starts, lengths, partitionColumns, metadataColumns) =
+ val (
+ paths,
+ starts,
+ lengths,
+ fileSizes,
+ modificationTimes,
+ partitionColumns,
+ metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
@@ -65,6 +72,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
paths,
starts,
lengths,
+ fileSizes,
+ modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
@@ -100,6 +109,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
+ val fileSizes = new JArrayList[JLong]()
+ val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
var metadataColumns = new JArrayList[JMap[String, String]]
files.foreach {
@@ -111,6 +122,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
+ val (fileSize, modificationTime) =
+ SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
+ (fileSize, modificationTime) match {
+ case (Some(size), Some(time)) =>
+ fileSizes.add(JLong.valueOf(size))
+ modificationTimes.add(JLong.valueOf(time))
+ case _ => // Do nothing
+ }
val metadataColumn =
SparkShimLoader.getSparkShims.generateMetadataColumns(file,
metadataColumnNames)
metadataColumns.add(metadataColumn)
@@ -138,7 +157,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
partitionColumns.add(partitionColumn)
}
- (paths, starts, lengths, partitionColumns, metadataColumns)
+ (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns,
metadataColumns)
}
override def injectWriteFilesTempPath(path: String): Unit = {
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index ed42cb15a..bcd03b110 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -60,6 +60,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
splitInfo->partitionColumns.reserve(fileList.size());
+ splitInfo->properties.reserve(fileList.size());
splitInfo->metadataColumns.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all Partitions share the same index.
@@ -80,6 +81,8 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
+ facebook::velox::FileProperties fileProps = {file.properties().filesize(),
file.properties().modificationtime()};
+ splitInfo->properties.emplace_back(fileProps);
switch (file.file_format_case()) {
case SubstraitFileFormatCase::kOrc:
splitInfo->format = dwio::common::FileFormat::ORC;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index f719c119c..867d347cd 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -109,6 +109,7 @@ WholeStageResultIterator::WholeStageResultIterator(
const auto& paths = scanInfo->paths;
const auto& starts = scanInfo->starts;
const auto& lengths = scanInfo->lengths;
+ const auto& properties = scanInfo->properties;
const auto& format = scanInfo->format;
const auto& partitionColumns = scanInfo->partitionColumns;
const auto& metadataColumns = scanInfo->metadataColumns;
@@ -135,7 +136,9 @@ WholeStageResultIterator::WholeStageResultIterator(
std::nullopt,
customSplitInfo,
nullptr,
- deleteFiles);
+ deleteFiles,
+ std::unordered_map<std::string, std::string>(),
+ properties[idx]);
} else {
split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
kHiveConnectorId,
@@ -149,7 +152,8 @@ WholeStageResultIterator::WholeStageResultIterator(
nullptr,
std::unordered_map<std::string, std::string>(),
0,
- metadataColumn);
+ metadataColumn,
+ properties[idx]);
}
connectorSplits.emplace_back(split);
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1bda6435e..567ebb215 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -19,6 +19,7 @@
#include "SubstraitToVeloxExpr.h"
#include "TypeUtils.h"
+#include "velox/connectors/hive/FileProperties.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/common/Options.h"
@@ -51,6 +52,9 @@ struct SplitInfo {
/// The file format of the files to be scanned.
dwio::common::FileFormat format;
+ /// The file sizes and modification times of the files to be scanned.
+ std::vector<std::optional<facebook::velox::FileProperties>> properties;
+
/// Make SplitInfo polymorphic
virtual ~SplitInfo() = default;
};
@@ -111,6 +115,7 @@ class SubstraitToVeloxPlanConverter {
/// Index: the index of the partition this item belongs to.
/// Starts: the start positions in byte to read from the items.
/// Lengths: the lengths in byte to read from the items.
+ /// FileProperties: the file sizes and modification times of the files to be
scanned.
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel&
sRead, int32_t streamIdx);
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
index 94acc8336..7e085f81f 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java
@@ -27,6 +27,8 @@ public class LocalFilesBuilder {
List<String> paths,
List<Long> starts,
List<Long> lengths,
+ List<Long> fileSizes,
+ List<Long> modificationTimes,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
@@ -36,6 +38,8 @@ public class LocalFilesBuilder {
paths,
starts,
lengths,
+ fileSizes,
+ modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index cbcda72dd..fa9f3d516 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -34,6 +34,8 @@ public class LocalFilesNode implements SplitInfo {
private final List<String> paths = new ArrayList<>();
private final List<Long> starts = new ArrayList<>();
private final List<Long> lengths = new ArrayList<>();
+ private final List<Long> fileSizes = new ArrayList<>();
+ private final List<Long> modificationTimes = new ArrayList<>();
private final List<Map<String, String>> partitionColumns = new ArrayList<>();
private final List<Map<String, String>> metadataColumns = new ArrayList<>();
private final List<String> preferredLocations = new ArrayList<>();
@@ -60,6 +62,8 @@ public class LocalFilesNode implements SplitInfo {
List<String> paths,
List<Long> starts,
List<Long> lengths,
+ List<Long> fileSizes,
+ List<Long> modificationTimes,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
@@ -68,6 +72,8 @@ public class LocalFilesNode implements SplitInfo {
this.paths.addAll(paths);
this.starts.addAll(starts);
this.lengths.addAll(lengths);
+ this.fileSizes.addAll(fileSizes);
+ this.modificationTimes.addAll(modificationTimes);
this.fileFormat = fileFormat;
this.partitionColumns.addAll(partitionColumns);
this.metadataColumns.addAll(metadataColumns);
@@ -153,6 +159,18 @@ public class LocalFilesNode implements SplitInfo {
}
fileBuilder.setLength(lengths.get(i));
fileBuilder.setStart(starts.get(i));
+
+ if (!fileSizes.isEmpty()
+ && !modificationTimes.isEmpty()
+ && fileSizes.size() == modificationTimes.size()
+ && fileSizes.size() == paths.size()) {
+ ReadRel.LocalFiles.FileOrFiles.fileProperties.Builder filePropsBuilder
=
+ ReadRel.LocalFiles.FileOrFiles.fileProperties.newBuilder();
+ filePropsBuilder.setFileSize(fileSizes.get(i));
+ filePropsBuilder.setModificationTime(modificationTimes.get(i));
+ fileBuilder.setProperties(filePropsBuilder.build());
+ }
+
if (!metadataColumns.isEmpty()) {
Map<String, String> metadataColumn = metadataColumns.get(i);
if (!metadataColumn.isEmpty()) {
diff --git
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index 266aba4b0..877493439 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -198,6 +198,13 @@ message ReadRel {
string value = 2;
}
repeated metadataColumn metadata_columns = 19;
+
+ // File properties contained in split
+ message fileProperties {
+ int64 fileSize = 1;
+ int64 modificationTime = 2;
+ }
+ fileProperties properties = 20;
}
}
}
diff --git
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
index 7d065f105..ba6b0ac4a 100644
---
a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
@@ -42,6 +42,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
paths,
starts,
lengths,
+ new ArrayList<>(),
+ new ArrayList<>(),
partitionColumns,
new ArrayList<>(),
fileFormat,
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index d6acc8c27..8bbc6d3d1 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -207,6 +207,9 @@ trait SparkShims {
def attributesFromStruct(structType: StructType): Seq[Attribute]
+ // Spark 3.3 and later only have file size and modification time in
PartitionedFile
+ def getFileSizeAndModificationTime(file: PartitionedFile): (Option[Long],
Option[Long])
+
def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 29fddc697..f24aef66a 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -217,6 +217,11 @@ class Spark32Shims extends SparkShims {
}
}
+ override def getFileSizeAndModificationTime(
+ file: PartitionedFile): (Option[Long], Option[Long]) = {
+ (None, None)
+ }
+
override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] =
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 7c6ce644d..68fc4ad0d 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -202,6 +202,11 @@ class Spark33Shims extends SparkShims {
case other => other
}
+ override def getFileSizeAndModificationTime(
+ file: PartitionedFile): (Option[Long], Option[Long]) = {
+ (Some(file.fileSize), Some(file.modificationTime))
+ }
+
override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index f2c248294..7d9fc389b 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -208,6 +208,11 @@ class Spark34Shims extends SparkShims {
case other => other
}
+ override def getFileSizeAndModificationTime(
+ file: PartitionedFile): (Option[Long], Option[Long]) = {
+ (Some(file.fileSize), Some(file.modificationTime))
+ }
+
override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index e0835c306..54cea6993 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -206,6 +206,11 @@ class Spark35Shims extends SparkShims {
case other => other
}
+ override def getFileSizeAndModificationTime(
+ file: PartitionedFile): (Option[Long], Option[Long]) = {
+ (Some(file.fileSize), Some(file.modificationTime))
+ }
+
override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]