This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1fbd9e6c4 [VL] parquet file metadata columns support in velox (#3870)
1fbd9e6c4 is described below
commit 1fbd9e6c4144a5df4d6e5b73951791841480f1a0
Author: 高阳阳 <[email protected]>
AuthorDate: Thu Mar 14 15:23:05 2024 +0800
[VL] parquet file metadata columns support in velox (#3870)
[VL] parquet file metadata columns support in velox.
Co-authored-by: Zhen Li <[email protected]>
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 4 +-
.../backendsapi/velox/IteratorApiImpl.scala | 21 ++-
.../backendsapi/velox/VeloxBackend.scala | 2 +
cpp/velox/compute/VeloxPlanConverter.cc | 7 +
cpp/velox/compute/WholeStageResultIterator.cc | 15 ++-
cpp/velox/substrait/SubstraitParser.cc | 20 ++-
cpp/velox/substrait/SubstraitParser.h | 7 +-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 15 ++-
cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 +
.../substrait/SubstraitToVeloxPlanValidator.cc | 4 +-
.../substrait/rel/LocalFilesBuilder.java | 10 +-
.../substrait/rel/LocalFilesNode.java | 20 ++-
.../substrait/proto/substrait/algebra.proto | 6 +
.../resources/substrait/proto/substrait/type.proto | 1 +
.../backendsapi/BackendSettingsApi.scala | 1 +
.../io/glutenproject/backendsapi/IteratorApi.scala | 3 +-
.../execution/BasicScanExecTransformer.scala | 7 +-
.../execution/BatchScanExecTransformer.scala | 2 +
.../execution/FileSourceScanExecTransformer.scala | 18 ++-
.../sql/hive/HiveTableScanExecTransformer.scala | 4 +-
.../substrait/rel/IcebergLocalFilesNode.java | 11 +-
.../GlutenFileMetadataStructSuite.scala | 141 ++++++++++++++++++++-
.../GlutenFileMetadataStructSuite.scala | 141 ++++++++++++++++++++-
.../io/glutenproject/sql/shims/SparkShims.scala | 6 +
.../sql/shims/spark32/Spark32Shims.scala | 7 +
.../sql/execution/FileSourceScanExecShim.scala | 9 +-
.../sql/shims/spark33/Spark33Shims.scala | 27 +++-
.../sql/execution/FileSourceScanExecShim.scala | 19 ++-
.../sql/shims/spark34/Spark34Shims.scala | 34 ++++-
.../sql/execution/FileSourceScanExecShim.scala | 17 ++-
.../sql/shims/spark35/Spark35Shims.scala | 34 ++++-
.../sql/execution/FileSourceScanExecShim.scala | 23 +++-
32 files changed, 591 insertions(+), 48 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 1841faccf..6fecb2c5f 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -75,7 +75,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
- fileFormat: ReadFileFormat): SplitInfo = {
+ fileFormat: ReadFileFormat,
+ metadataColumnNames: Seq[String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
val partLists = new JArrayList[String]()
@@ -128,6 +129,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
starts,
lengths,
partitionColumns,
+ new JArrayList[JMap[String, String]](),
fileFormat,
preferredLocations.toList.asJava)
case _ =>
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
index f2943d31d..84198a6f8 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
@@ -20,6 +20,7 @@ import io.glutenproject.GlutenNumaBindingInfo
import io.glutenproject.backendsapi.IteratorApi
import io.glutenproject.execution._
import io.glutenproject.metrics.IMetrics
+import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.plan.PlanNode
import io.glutenproject.substrait.rel.{LocalFilesBuilder, LocalFilesNode,
SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -53,11 +54,12 @@ class IteratorApiImpl extends IteratorApi with Logging {
override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
- fileFormat: ReadFileFormat): SplitInfo = {
+ fileFormat: ReadFileFormat,
+ metadataColumnNames: Seq[String]): SplitInfo = {
partition match {
case f: FilePartition =>
- val (paths, starts, lengths, partitionColumns) =
- constructSplitInfo(partitionSchema, f.files)
+ val (paths, starts, lengths, partitionColumns, metadataColumns) =
+ constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
@@ -66,6 +68,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
starts,
lengths,
partitionColumns,
+ metadataColumns,
fileFormat,
preferredLocations.toList.asJava)
case _ =>
@@ -92,11 +95,15 @@ class IteratorApiImpl extends IteratorApi with Logging {
}
}
- private def constructSplitInfo(schema: StructType, files:
Array[PartitionedFile]) = {
+ private def constructSplitInfo(
+ schema: StructType,
+ files: Array[PartitionedFile],
+ metadataColumnNames: Seq[String]) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
+ var metadataColumns = new JArrayList[JMap[String, String]]
files.foreach {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded
path, so the decoded
@@ -106,7 +113,9 @@ class IteratorApiImpl extends IteratorApi with Logging {
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
-
+ val metadataColumn =
+ SparkShimLoader.getSparkShims.generateMetadataColumns(file,
metadataColumnNames)
+ metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
val partitionColumnValue = if (file.partitionValues.isNullAt(i)) {
@@ -131,7 +140,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
}
partitionColumns.add(partitionColumn)
}
- (paths, starts, lengths, partitionColumns)
+ (paths, starts, lengths, partitionColumns, metadataColumns)
}
override def injectWriteFilesTempPath(path: String): Unit = {
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index bab5e68ec..22a0f4ebc 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -248,6 +248,8 @@ object BackendSettings extends BackendSettingsApi {
}
}
+ override def supportNativeMetadataColumns(): Boolean = true
+
override def supportExpandExec(): Boolean = true
override def supportSortExec(): Boolean = true
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 47738cff9..8ca9f85cd 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -60,6 +60,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
splitInfo->partitionColumns.reserve(fileList.size());
+ splitInfo->metadataColumns.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all Partitions share the same index.
splitInfo->partitionIndex = file.partition_index();
@@ -70,6 +71,12 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
}
splitInfo->partitionColumns.emplace_back(partitionColumnMap);
+ std::unordered_map<std::string, std::string> metadataColumnMap;
+ for (const auto& metadataColumn : file.metadata_columns()) {
+ metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
+ }
+ splitInfo->metadataColumns.emplace_back(metadataColumnMap);
+
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index b10c643c8..86431819b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -145,15 +145,28 @@ WholeStageResultIterator::WholeStageResultIterator(
const auto& lengths = scanInfo->lengths;
const auto& format = scanInfo->format;
const auto& partitionColumns = scanInfo->partitionColumns;
+ const auto& metadataColumns = scanInfo->metadataColumns;
std::vector<std::shared_ptr<velox::connector::ConnectorSplit>>
connectorSplits;
connectorSplits.reserve(paths.size());
for (int idx = 0; idx < paths.size(); idx++) {
auto partitionColumn = partitionColumns[idx];
+ auto metadataColumn = metadataColumns[idx];
std::unordered_map<std::string, std::optional<std::string>>
partitionKeys;
constructPartitionColumns(partitionKeys, partitionColumn);
auto split =
std::make_shared<velox::connector::hive::HiveConnectorSplit>(
- kHiveConnectorId, paths[idx], format, starts[idx], lengths[idx],
partitionKeys);
+ kHiveConnectorId,
+ paths[idx],
+ format,
+ starts[idx],
+ lengths[idx],
+ partitionKeys,
+ std::nullopt,
+ std::unordered_map<std::string, std::string>(),
+ nullptr,
+ std::unordered_map<std::string, std::string>(),
+ 0,
+ metadataColumn);
connectorSplits.emplace_back(split);
}
diff --git a/cpp/velox/substrait/SubstraitParser.cc
b/cpp/velox/substrait/SubstraitParser.cc
index 36fe84558..35f130076 100644
--- a/cpp/velox/substrait/SubstraitParser.cc
+++ b/cpp/velox/substrait/SubstraitParser.cc
@@ -104,31 +104,41 @@ std::vector<TypePtr>
SubstraitParser::parseNamedStruct(const ::substrait::NamedS
return typeList;
}
-std::vector<bool> SubstraitParser::parsePartitionColumns(const
::substrait::NamedStruct& namedStruct) {
+void SubstraitParser::parsePartitionAndMetadataColumns(
+ const ::substrait::NamedStruct& namedStruct,
+ std::vector<bool>& isPartitionColumns,
+ std::vector<bool>& isMetadataColumns) {
const auto& columnsTypes = namedStruct.column_types();
- std::vector<bool> isPartitionColumns;
if (columnsTypes.size() == 0) {
- // Regard all columns as non-partitioned columns.
+ // Regard all columns as regular columns.
isPartitionColumns.resize(namedStruct.names().size(), false);
- return isPartitionColumns;
+ isMetadataColumns.resize(namedStruct.names().size(), false);
+ return;
} else {
VELOX_CHECK_EQ(columnsTypes.size(), namedStruct.names().size(), "Wrong
size for column types and column names.");
}
isPartitionColumns.reserve(columnsTypes.size());
+ isMetadataColumns.reserve(columnsTypes.size());
for (const auto& columnType : columnsTypes) {
switch (columnType) {
case ::substrait::NamedStruct::NORMAL_COL:
isPartitionColumns.emplace_back(false);
+ isMetadataColumns.emplace_back(false);
break;
case ::substrait::NamedStruct::PARTITION_COL:
isPartitionColumns.emplace_back(true);
+ isMetadataColumns.emplace_back(false);
+ break;
+ case ::substrait::NamedStruct::METADATA_COL:
+ isPartitionColumns.emplace_back(false);
+ isMetadataColumns.emplace_back(true);
break;
default:
VELOX_FAIL("Unspecified column type.");
}
}
- return isPartitionColumns;
+ return;
}
int32_t SubstraitParser::parseReferenceSegment(const
::substrait::Expression::ReferenceSegment& refSegment) {
diff --git a/cpp/velox/substrait/SubstraitParser.h
b/cpp/velox/substrait/SubstraitParser.h
index 80d4f2ee2..4aaac5a71 100644
--- a/cpp/velox/substrait/SubstraitParser.h
+++ b/cpp/velox/substrait/SubstraitParser.h
@@ -41,8 +41,11 @@ class SubstraitParser {
const ::substrait::NamedStruct& namedStruct,
bool asLowerCase = false);
- /// Used to parse partition columns from Substrait NamedStruct.
- static std::vector<bool> parsePartitionColumns(const
::substrait::NamedStruct& namedStruct);
+ /// Used to parse partition & metadata columns from Substrait NamedStruct.
+ static void parsePartitionAndMetadataColumns(
+ const ::substrait::NamedStruct& namedStruct,
+ std::vector<bool>& isPartitionColumns,
+ std::vector<bool>& isMetadataColumns);
/// Parse Substrait Type to Velox type.
static facebook::velox::TypePtr parseType(const ::substrait::Type&
substraitType, bool asLowerCase = false);
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b8ca25a43..aa19b7f6a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -596,11 +596,12 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
std::vector<std::string> tableColumnNames;
std::vector<std::string> partitionedKey;
std::vector<bool> isPartitionColumns;
+ std::vector<bool> isMetadataColumns;
tableColumnNames.reserve(writeRel.table_schema().names_size());
VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table
schema to store the column information");
const auto& tableSchema = writeRel.table_schema();
- isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema);
+ SubstraitParser::parsePartitionAndMetadataColumns(tableSchema,
isPartitionColumns, isMetadataColumns);
for (const auto& name : tableSchema.names()) {
tableColumnNames.emplace_back(name);
@@ -1040,6 +1041,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
std::vector<std::string> colNameList;
std::vector<TypePtr> veloxTypeList;
std::vector<bool> isPartitionColumns;
+ std::vector<bool> isMetadataColumns;
// Convert field names into lower case when not case-sensitive.
std::shared_ptr<const facebook::velox::Config> veloxCfg =
std::make_shared<const
facebook::velox::core::MemConfigMutable>(confMap_);
@@ -1055,7 +1057,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
colNameList.emplace_back(fieldName);
}
veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase);
- isPartitionColumns = SubstraitParser::parsePartitionColumns(baseSchema);
+ SubstraitParser::parsePartitionAndMetadataColumns(baseSchema,
isPartitionColumns, isMetadataColumns);
}
// Do not hard-code connector ID and allow for connectors other than Hive.
@@ -1110,8 +1112,13 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;
for (int idx = 0; idx < colNameList.size(); idx++) {
auto outName = SubstraitParser::makeNodeName(planNodeId_, idx);
- auto columnType = isPartitionColumns[idx] ?
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey
- :
connector::hive::HiveColumnHandle::ColumnType::kRegular;
+ auto columnType = connector::hive::HiveColumnHandle::ColumnType::kRegular;
+ if (isPartitionColumns[idx]) {
+ columnType =
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey;
+ }
+ if (isMetadataColumns[idx]) {
+ columnType = connector::hive::HiveColumnHandle::ColumnType::kSynthesized;
+ }
assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
colNameList[idx], columnType, veloxTypeList[idx], veloxTypeList[idx]);
outNames.emplace_back(outName);
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 21a318b91..895c1d24e 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -36,6 +36,9 @@ struct SplitInfo {
/// The partition columns associated with partitioned table.
std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
+ /// The metadata columns associated with partitioned table.
+ std::vector<std::unordered_map<std::string, std::string>> metadataColumns;
+
/// The file paths to be scanned.
std::vector<std::string> paths;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 3826465b3..5df59a348 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -366,7 +366,9 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::WriteRel& writeR
// Validate partition key type.
if (writeRel.has_table_schema()) {
const auto& tableSchema = writeRel.table_schema();
- auto isPartitionColumns =
SubstraitParser::parsePartitionColumns(tableSchema);
+ std::vector<bool> isMetadataColumns;
+ std::vector<bool> isPartitionColumns;
+ SubstraitParser::parsePartitionAndMetadataColumns(tableSchema,
isPartitionColumns, isMetadataColumns);
for (auto i = 0; i < types.size(); i++) {
if (isPartitionColumns[i]) {
switch (types[i]->kind()) {
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
index c86c90cc6..1f6eaabce 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
@@ -28,10 +28,18 @@ public class LocalFilesBuilder {
List<Long> starts,
List<Long> lengths,
List<Map<String, String>> partitionColumns,
+ List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations) {
return new LocalFilesNode(
- index, paths, starts, lengths, partitionColumns, fileFormat,
preferredLocations);
+ index,
+ paths,
+ starts,
+ lengths,
+ partitionColumns,
+ metadataColumns,
+ fileFormat,
+ preferredLocations);
}
public static LocalFilesNode makeLocalFiles(String iterPath) {
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index 32e81f2c6..e0700ded2 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -35,6 +35,7 @@ public class LocalFilesNode implements SplitInfo {
private final List<Long> starts = new ArrayList<>();
private final List<Long> lengths = new ArrayList<>();
private final List<Map<String, String>> partitionColumns = new ArrayList<>();
+ private final List<Map<String, String>> metadataColumns = new ArrayList<>();
private final List<String> preferredLocations = new ArrayList<>();
// The format of file to read.
@@ -60,6 +61,7 @@ public class LocalFilesNode implements SplitInfo {
List<Long> starts,
List<Long> lengths,
List<Map<String, String>> partitionColumns,
+ List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations) {
this.index = index;
@@ -68,6 +70,7 @@ public class LocalFilesNode implements SplitInfo {
this.lengths.addAll(lengths);
this.fileFormat = fileFormat;
this.partitionColumns.addAll(partitionColumns);
+ this.metadataColumns.addAll(metadataColumns);
this.preferredLocations.addAll(preferredLocations);
}
@@ -141,7 +144,22 @@ public class LocalFilesNode implements SplitInfo {
}
fileBuilder.setLength(lengths.get(i));
fileBuilder.setStart(starts.get(i));
-
+ if (!metadataColumns.isEmpty()) {
+ Map<String, String> metadataColumn = metadataColumns.get(i);
+ if (!metadataColumn.isEmpty()) {
+ metadataColumn.forEach(
+ (key, value) -> {
+ ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder
mcBuilder =
+ ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder();
+ mcBuilder.setKey(key).setValue(value);
+ fileBuilder.addMetadataColumns(mcBuilder.build());
+ });
+ }
+ } else {
+ ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder mcBuilder =
+ ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder();
+ fileBuilder.addMetadataColumns(mcBuilder.build());
+ }
NamedStruct namedStruct = buildNamedStruct();
fileBuilder.setSchema(namedStruct);
diff --git
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index b72bcbb01..63a0f36ea 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -169,6 +169,12 @@ message ReadRel {
/// File schema
NamedStruct schema = 17;
+
+ message metadataColumn {
+ string key = 1;
+ string value = 2;
+ }
+ repeated metadataColumn metadata_columns = 18;
}
}
}
diff --git
a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
index 6130f2d76..5c7ee6a38 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
@@ -237,5 +237,6 @@ message NamedStruct {
enum ColumnType {
NORMAL_COL = 0;
PARTITION_COL = 1;
+ METADATA_COL = 2;
}
}
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
index 83db8a8da..25d71f0fc 100644
---
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
@@ -41,6 +41,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = ValidationResult.ok
+ def supportNativeMetadataColumns(): Boolean = false
def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
def supportSortMergeJoinExec(): Boolean = true
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
index 2f506f483..cd8995211 100644
--- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
@@ -36,7 +36,8 @@ trait IteratorApi {
def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
- fileFormat: ReadFileFormat): SplitInfo
+ fileFormat: ReadFileFormat,
+ metadataColumnNames: Seq[String]): SplitInfo
/** Generate native row partition. */
def genPartitions(
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 625ab6e97..2be1a162c 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -37,12 +37,15 @@ import com.google.protobuf.StringValue
import scala.collection.JavaConverters._
trait BasicScanExecTransformer extends LeafTransformSupport with
BaseDataSource {
+ import org.apache.spark.sql.catalyst.util._
/** Returns the filters that can be pushed down to native file scan */
def filterExprs(): Seq[Expression]
def outputAttributes(): Seq[Attribute]
+ def getMetadataColumns(): Seq[AttributeReference]
+
/** This can be used to report FileFormat for a file based scan operator. */
val fileFormat: ReadFileFormat
@@ -63,7 +66,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def getSplitInfos: Seq[SplitInfo] = {
getPartitions.map(
BackendsApiManager.getIteratorApiInstance
- .genSplitInfo(_, getPartitionSchema, fileFormat))
+ .genSplitInfo(_, getPartitionSchema, fileFormat,
getMetadataColumns.map(_.name)))
}
def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
@@ -112,6 +115,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
attr =>
if (getPartitionSchema.exists(_.name.equals(attr.name))) {
new ColumnTypeNode(1)
+ } else if (attr.isMetadataCol) {
+ new ColumnTypeNode(2)
} else {
new ColumnTypeNode(0)
}
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
index afa0ce0e2..dfb448f13 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
@@ -108,6 +108,8 @@ abstract class BatchScanExecTransformerBase(
throw new UnsupportedOperationException(s"${scan.getClass.toString} is
not supported")
}
+ override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
override def outputAttributes(): Seq[Attribute] = output
override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
index fb8cd31cf..2c8bee43f 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
@@ -23,7 +23,7 @@ import
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
PlanExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExecShim
@@ -99,9 +99,11 @@ abstract class FileSourceScanExecTransformerBase(
.genFileSourceScanTransformerMetrics(sparkContext)
.filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias
- def getPartitionFilters(): Seq[Expression] = partitionFilters
+ override def filterExprs(): Seq[Expression] = dataFiltersInScan
+
+ override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
- override def filterExprs(): Seq[Expression] = dataFilters
+ def getPartitionFilters(): Seq[Expression] = partitionFilters
override def outputAttributes(): Seq[Attribute] = output
@@ -125,8 +127,14 @@ abstract class FileSourceScanExecTransformerBase(
}
override protected def doValidateInternal(): ValidationResult = {
- if (hasMetadataColumns) {
- return ValidationResult.notOk(s"Unsupported metadataColumns scan in
native.")
+ if (
+ !metadataColumns.isEmpty &&
!BackendsApiManager.getSettings.supportNativeMetadataColumns()
+ ) {
+ return ValidationResult.notOk(s"Unsupported metadata columns scan in
native.")
+ }
+
+ if (hasUnsupportedColumns) {
+ return ValidationResult.notOk(s"Unsupported columns scan in native.")
}
if (hasFieldIds) {
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index dc3beeb0d..1f32ee8b4 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -25,7 +25,7 @@ import
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
@@ -63,6 +63,8 @@ case class HiveTableScanExecTransformer(
override def filterExprs(): Seq[Expression] = Seq.empty
+ override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
override def outputAttributes(): Seq[Attribute] = output
override def getPartitions: Seq[InputPartition] = partitions
diff --git
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index c763a46b1..98cc0d90e 100644
---
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -16,6 +16,7 @@
*/
package io.glutenproject.substrait.rel;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -58,6 +59,14 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations) {
- super(index, paths, starts, lengths, partitionColumns, fileFormat,
preferredLocations);
+ super(
+ index,
+ paths,
+ starts,
+ lengths,
+ partitionColumns,
+ new ArrayList<>(),
+ fileFormat,
+ preferredLocations);
}
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
index af15f7386..794c0089b 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
@@ -16,6 +16,145 @@
*/
package org.apache.spark.sql.execution.datasources
+import io.glutenproject.execution.{FileSourceScanExecTransformer,
FilterExecTransformer}
+import io.glutenproject.utils.BackendTestUtils
+
+import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
+
+import java.io.File
+import java.sql.Timestamp
+
+import scala.reflect.ClassTag
+
+class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with
GlutenSQLTestsBaseTrait {
+
+ val schemaWithFilePathField: StructType = new StructType()
+ .add(StructField("file_path", StringType))
+ .add(StructField("age", IntegerType))
+ .add(
+ StructField(
+ "info",
+ new StructType()
+ .add(StructField("id", LongType))
+ .add(StructField("university", StringType))))
+
+ private val METADATA_FILE_PATH = "_metadata.file_path"
+ private val METADATA_FILE_NAME = "_metadata.file_name"
+ private val METADATA_FILE_SIZE = "_metadata.file_size"
+ private val METADATA_FILE_MODIFICATION_TIME =
"_metadata.file_modification_time"
+
+ private def getMetadataForFile(f: File): Map[String, Any] = {
+ Map(
+ METADATA_FILE_PATH -> f.toURI.toString,
+ METADATA_FILE_NAME -> f.getName,
+ METADATA_FILE_SIZE -> f.length(),
+ METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified())
+ )
+ }
+
+ private def metadataColumnsNativeTest(testName: String, fileSchema:
StructType)(
+ f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+ Seq("parquet").foreach {
+ testFileFormat =>
+ test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) {
+ withTempDir {
+ dir =>
+ import scala.collection.JavaConverters._
+
+ // 1. create df0 and df1 and save under /data/f0 and /data/f1
+ val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+ val f0 = new File(dir, "data/f0").getCanonicalPath
+ df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+ val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+ val f1 = new File(dir, "data/f1 gluten").getCanonicalPath
+ df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+ // 2. read both f0 and f1
+ val df = spark.read
+ .format(testFileFormat)
+ .schema(fileSchema)
+ .load(new File(dir, "data").getCanonicalPath + "/*")
+ val realF0 = new File(dir, "data/f0")
+ .listFiles()
+ .filter(_.getName.endsWith(s".$testFileFormat"))
+ .head
+ val realF1 = new File(dir, "data/f1 gluten")
+ .listFiles()
+ .filter(_.getName.endsWith(s".$testFileFormat"))
+ .head
+ f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
+ }
+ }
+ }
+ }
+
+ def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+ }
+
+ metadataColumnsNativeTest(
+ "plan check with metadata and user data select",
+ schemaWithFilePathField) {
+ (df, f0, f1) =>
+ var dfWithMetadata = df.select(
+ METADATA_FILE_NAME,
+ METADATA_FILE_PATH,
+ METADATA_FILE_SIZE,
+ METADATA_FILE_MODIFICATION_TIME,
+ "age")
+ dfWithMetadata.collect
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+ }
+
+ // would fallback
+ dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path")
+ checkAnswer(
+ dfWithMetadata,
+ Seq(
+ Row(f0(METADATA_FILE_PATH), "jack"),
+ Row(f1(METADATA_FILE_PATH), "lily")
+ )
+ )
+ checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+ }
+
+ metadataColumnsNativeTest("plan check with metadata filter",
schemaWithFilePathField) {
+ (df, f0, f1) =>
+ var filterDF = df
+ .select("file_path", "age", METADATA_FILE_NAME)
+ .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
+ val ret = filterDF.collect
+ assert(ret.size == 1)
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](filterDF)
+ }
+ checkOperatorMatch[FilterExecTransformer](filterDF)
-class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with
GlutenSQLTestsBaseTrait {}
+ // case to check if file_path is URI string
+ filterDF =
+ df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) ===
f1((METADATA_FILE_NAME)))
+ checkAnswer(
+ filterDF,
+ Seq(
+ Row(f1(METADATA_FILE_PATH))
+ )
+ )
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](filterDF)
+ }
+ checkOperatorMatch[FilterExecTransformer](filterDF)
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
index af15f7386..794c0089b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
@@ -16,6 +16,145 @@
*/
package org.apache.spark.sql.execution.datasources
+import io.glutenproject.execution.{FileSourceScanExecTransformer,
FilterExecTransformer}
+import io.glutenproject.utils.BackendTestUtils
+
+import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
+
+import java.io.File
+import java.sql.Timestamp
+
+import scala.reflect.ClassTag
+
+class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with
GlutenSQLTestsBaseTrait {
+
+ val schemaWithFilePathField: StructType = new StructType()
+ .add(StructField("file_path", StringType))
+ .add(StructField("age", IntegerType))
+ .add(
+ StructField(
+ "info",
+ new StructType()
+ .add(StructField("id", LongType))
+ .add(StructField("university", StringType))))
+
+ private val METADATA_FILE_PATH = "_metadata.file_path"
+ private val METADATA_FILE_NAME = "_metadata.file_name"
+ private val METADATA_FILE_SIZE = "_metadata.file_size"
+ private val METADATA_FILE_MODIFICATION_TIME =
"_metadata.file_modification_time"
+
+ private def getMetadataForFile(f: File): Map[String, Any] = {
+ Map(
+ METADATA_FILE_PATH -> f.toURI.toString,
+ METADATA_FILE_NAME -> f.getName,
+ METADATA_FILE_SIZE -> f.length(),
+ METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified())
+ )
+ }
+
+ private def metadataColumnsNativeTest(testName: String, fileSchema:
StructType)(
+ f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+ Seq("parquet").foreach {
+ testFileFormat =>
+ test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) {
+ withTempDir {
+ dir =>
+ import scala.collection.JavaConverters._
+
+ // 1. create df0 and df1 and save under /data/f0 and /data/f1
+ val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+ val f0 = new File(dir, "data/f0").getCanonicalPath
+ df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+ val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+ val f1 = new File(dir, "data/f1 gluten").getCanonicalPath
+ df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+ // 2. read both f0 and f1
+ val df = spark.read
+ .format(testFileFormat)
+ .schema(fileSchema)
+ .load(new File(dir, "data").getCanonicalPath + "/*")
+ val realF0 = new File(dir, "data/f0")
+ .listFiles()
+ .filter(_.getName.endsWith(s".$testFileFormat"))
+ .head
+ val realF1 = new File(dir, "data/f1 gluten")
+ .listFiles()
+ .filter(_.getName.endsWith(s".$testFileFormat"))
+ .head
+ f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
+ }
+ }
+ }
+ }
+
+ def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+ }
+
+ metadataColumnsNativeTest(
+ "plan check with metadata and user data select",
+ schemaWithFilePathField) {
+ (df, f0, f1) =>
+ var dfWithMetadata = df.select(
+ METADATA_FILE_NAME,
+ METADATA_FILE_PATH,
+ METADATA_FILE_SIZE,
+ METADATA_FILE_MODIFICATION_TIME,
+ "age")
+ dfWithMetadata.collect
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+ }
+
+ // would fallback
+ dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path")
+ checkAnswer(
+ dfWithMetadata,
+ Seq(
+ Row(f0(METADATA_FILE_PATH), "jack"),
+ Row(f1(METADATA_FILE_PATH), "lily")
+ )
+ )
+ checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+ }
+
+ metadataColumnsNativeTest("plan check with metadata filter",
schemaWithFilePathField) {
+ (df, f0, f1) =>
+ var filterDF = df
+ .select("file_path", "age", METADATA_FILE_NAME)
+ .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
+ val ret = filterDF.collect
+ assert(ret.size == 1)
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](filterDF)
+ }
+ checkOperatorMatch[FilterExecTransformer](filterDF)
-class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with
GlutenSQLTestsBaseTrait {}
+ // case to check if file_path is URI string
+ filterDF =
+ df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) ===
f1((METADATA_FILE_NAME)))
+ checkAnswer(
+ filterDF,
+ Seq(
+ Row(f1(METADATA_FILE_PATH))
+ )
+ )
+ if (BackendTestUtils.isVeloxBackendLoaded()) {
+ checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+ } else {
+ checkOperatorMatch[FileSourceScanExec](filterDF)
+ }
+ checkOperatorMatch[FilterExecTransformer](filterDF)
+ }
+}
diff --git
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index ab560a060..64ed1b866 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -44,6 +44,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
+import java.util.{ArrayList => JArrayList, Map => JMap}
+
sealed abstract class ShimDescriptor
case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends
ShimDescriptor {
@@ -159,6 +161,10 @@ trait SparkShims {
def attributesFromStruct(structType: StructType): Seq[Attribute]
+ def generateMetadataColumns(
+ file: PartitionedFile,
+ metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
+
// For compatibility with Spark-3.5.
def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan]
diff --git
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
index 5a3579b25..8f028968e 100644
---
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
@@ -45,6 +45,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
+import java.util.{HashMap => JHashMap, Map => JMap}
+
class Spark32Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -185,6 +187,11 @@ class Spark32Shims extends SparkShims {
}
}
+ override def generateMetadataColumns(
+ file: PartitionedFile,
+ metadataColumnNames: Seq[String]): JMap[String, String] =
+ new JHashMap[String, String]()
+
def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] = {
ae.plan
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 654bc4924..e27f4bc28 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -53,7 +53,14 @@ abstract class FileSourceScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
- def hasMetadataColumns: Boolean = false
+ def dataFiltersInScan: Seq[Expression] = dataFilters
+
+ def metadataColumns: Seq[AttributeReference] = Seq.empty
+
+ def hasUnsupportedColumns: Boolean = {
+ // Below name has special meaning in Velox.
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
def isMetadataColumn(attr: Attribute): Boolean = false
diff --git
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
index 5f8134f7e..8e770325f 100644
---
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
@@ -33,10 +33,11 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec,
PartitionedFileUtil, SparkPlan}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile,
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat,
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile,
PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
@@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
class Spark33Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -152,6 +156,27 @@ class Spark33Shims extends SparkShims {
case _ => None
}
}
+ override def generateMetadataColumns(
+ file: PartitionedFile,
+ metadataColumnNames: Seq[String]): JMap[String, String] = {
+ val metadataColumn = new JHashMap[String, String]()
+ val path = new Path(file.filePath.toString)
+ for (columnName <- metadataColumnNames) {
+ columnName match {
+ case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
+ case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_SIZE =>
+ metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ case FileFormat.FILE_MODIFICATION_TIME =>
+ val fileModifyTime = TimestampFormatter
+ .getFractionFormatter(ZoneOffset.UTC)
+ .format(file.modificationTime * 1000L)
+ metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ case _ =>
+ }
+ }
+ metadataColumn
+ }
private def invalidBucketFile(path: String): Throwable = {
new SparkException(
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index e07899793..92f32f847 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
import io.glutenproject.metrics.GlutenTimeMetric
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, DynamicPruningExpression, Expression,
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, DynamicPruningExpression, Expression,
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
@@ -54,7 +54,20 @@ abstract class FileSourceScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
- def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+ def dataFiltersInScan: Seq[Expression] =
dataFilters.filterNot(_.references.exists {
+ case FileSourceMetadataAttribute(attr) if attr.name == "_metadata" => true
+ case _ => false
+ })
+
+ def hasUnsupportedColumns: Boolean = {
+ // TODO, fallback if user define same name column due to we can't right now
+ // detect which column is metadata column which is user defined column.
+ val metadataColumnsNames = metadataColumns.map(_.name)
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
def isMetadataColumn(attr: Attribute): Boolean =
metadataColumns.contains(attr)
diff --git
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index cd8449bb3..c98def5da 100644
---
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -34,11 +34,12 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec,
GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan,
TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile,
PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat,
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile,
PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -49,6 +50,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
class Spark34Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -155,6 +159,34 @@ class Spark34Shims extends SparkShims {
}
}
+ override def generateMetadataColumns(
+ file: PartitionedFile,
+ metadataColumnNames: Seq[String]): JMap[String, String] = {
+ val metadataColumn = new JHashMap[String, String]()
+ val path = new Path(file.filePath.toString)
+ for (columnName <- metadataColumnNames) {
+ columnName match {
+ case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
+ case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_SIZE =>
+ metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ case FileFormat.FILE_MODIFICATION_TIME =>
+ val fileModifyTime = TimestampFormatter
+ .getFractionFormatter(ZoneOffset.UTC)
+ .format(file.modificationTime * 1000L)
+ metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ case FileFormat.FILE_BLOCK_START =>
+ metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+ case FileFormat.FILE_BLOCK_LENGTH =>
+ metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH,
file.length.toString)
+ case _ =>
+ }
+ }
+
+ // TODO: row_index metadata support
+ metadataColumn
+ }
+
// https://issues.apache.org/jira/browse/SPARK-40400
private def invalidBucketFile(path: String): Throwable = {
new SparkException(
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index f24ab8d57..57d1bd06a 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
import io.glutenproject.metrics.GlutenTimeMetric
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, DynamicPruningExpression, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
@@ -55,7 +55,18 @@ abstract class FileSourceScanExecShim(
case FileSourceGeneratedMetadataAttribute(attr) => attr
}
- def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+ def dataFiltersInScan: Seq[Expression] = dataFilters
+
+ def hasUnsupportedColumns: Boolean = {
+ val metadataColumnsNames = metadataColumns.map(_.name)
+ // row_index metadata is not support yet
+ metadataColumnsNames.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
||
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ // Below name has special meaning in Velox.
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
def isMetadataColumn(attr: Attribute): Boolean =
metadataColumns.contains(attr)
diff --git
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index a33801653..6a6f3b2c8 100644
---
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -34,10 +34,11 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec,
GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan,
TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory,
PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription,
WriteTaskResult}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat,
FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory,
PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription,
WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.hadoop.fs.{FileStatus, Path}
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
class Spark35Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -152,6 +156,34 @@ class Spark35Shims extends SparkShims {
}
}
+ override def generateMetadataColumns(
+ file: PartitionedFile,
+ metadataColumnNames: Seq[String]): JMap[String, String] = {
+ val metadataColumn = new JHashMap[String, String]()
+ val path = new Path(file.filePath.toString)
+ for (columnName <- metadataColumnNames) {
+ columnName match {
+ case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH,
path.toString)
+ case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME,
path.getName)
+ case FileFormat.FILE_SIZE =>
+ metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+ case FileFormat.FILE_MODIFICATION_TIME =>
+ val fileModifyTime = TimestampFormatter
+ .getFractionFormatter(ZoneOffset.UTC)
+ .format(file.modificationTime * 1000L)
+ metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+ case FileFormat.FILE_BLOCK_START =>
+ metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+ case FileFormat.FILE_BLOCK_LENGTH =>
+ metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH,
file.length.toString)
+ case _ =>
+ }
+ }
+
+ // TODO row_index metadata support
+ metadataColumn
+ }
+
// https://issues.apache.org/jira/browse/SPARK-40400
private def invalidBucketFile(path: String): Throwable = {
new SparkException(
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index acede96da..efcb9dbad 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
import io.glutenproject.metrics.GlutenTimeMetric
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, DynamicPruningExpression, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, DynamicPruningExpression, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
@@ -50,14 +50,27 @@ abstract class FileSourceScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
- lazy val metadataColumns = output.collect {
+ lazy val metadataColumns: Seq[AttributeReference] = output.collect {
case FileSourceConstantMetadataAttribute(attr) => attr
- case FileSourceGeneratedMetadataAttribute(attr) => attr
+ case FileSourceGeneratedMetadataAttribute(attr, _) => attr
}
protected lazy val driverMetricsAlias = driverMetrics
- def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+ def dataFiltersInScan: Seq[Expression] =
dataFilters.filterNot(_.references.exists {
+ case FileSourceMetadataAttribute(_) => true
+ case _ => false
+ })
+
+ def hasUnsupportedColumns: Boolean = {
+ // TODO, fallback if user define same name column due to we can't right now
+ // detect which column is metadata column which is user defined column.
+ val metadataColumnsNames = metadataColumns.map(_.name)
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
def isMetadataColumn(attr: Attribute): Boolean =
metadataColumns.contains(attr)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]