This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fbd9e6c4 [VL]  parquet file metadata columns support in velox (#3870)
1fbd9e6c4 is described below

commit 1fbd9e6c4144a5df4d6e5b73951791841480f1a0
Author: 高阳阳 <[email protected]>
AuthorDate: Thu Mar 14 15:23:05 2024 +0800

    [VL]  parquet file metadata columns support in velox (#3870)
    
    [VL]  parquet file metadata columns support in velox.
    
    Co-authored-by: Zhen Li <[email protected]>
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   4 +-
 .../backendsapi/velox/IteratorApiImpl.scala        |  21 ++-
 .../backendsapi/velox/VeloxBackend.scala           |   2 +
 cpp/velox/compute/VeloxPlanConverter.cc            |   7 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  15 ++-
 cpp/velox/substrait/SubstraitParser.cc             |  20 ++-
 cpp/velox/substrait/SubstraitParser.h              |   7 +-
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  15 ++-
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |   3 +
 .../substrait/SubstraitToVeloxPlanValidator.cc     |   4 +-
 .../substrait/rel/LocalFilesBuilder.java           |  10 +-
 .../substrait/rel/LocalFilesNode.java              |  20 ++-
 .../substrait/proto/substrait/algebra.proto        |   6 +
 .../resources/substrait/proto/substrait/type.proto |   1 +
 .../backendsapi/BackendSettingsApi.scala           |   1 +
 .../io/glutenproject/backendsapi/IteratorApi.scala |   3 +-
 .../execution/BasicScanExecTransformer.scala       |   7 +-
 .../execution/BatchScanExecTransformer.scala       |   2 +
 .../execution/FileSourceScanExecTransformer.scala  |  18 ++-
 .../sql/hive/HiveTableScanExecTransformer.scala    |   4 +-
 .../substrait/rel/IcebergLocalFilesNode.java       |  11 +-
 .../GlutenFileMetadataStructSuite.scala            | 141 ++++++++++++++++++++-
 .../GlutenFileMetadataStructSuite.scala            | 141 ++++++++++++++++++++-
 .../io/glutenproject/sql/shims/SparkShims.scala    |   6 +
 .../sql/shims/spark32/Spark32Shims.scala           |   7 +
 .../sql/execution/FileSourceScanExecShim.scala     |   9 +-
 .../sql/shims/spark33/Spark33Shims.scala           |  27 +++-
 .../sql/execution/FileSourceScanExecShim.scala     |  19 ++-
 .../sql/shims/spark34/Spark34Shims.scala           |  34 ++++-
 .../sql/execution/FileSourceScanExecShim.scala     |  17 ++-
 .../sql/shims/spark35/Spark35Shims.scala           |  34 ++++-
 .../sql/execution/FileSourceScanExecShim.scala     |  23 +++-
 32 files changed, 591 insertions(+), 48 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 1841faccf..6fecb2c5f 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -75,7 +75,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
   override def genSplitInfo(
       partition: InputPartition,
       partitionSchema: StructType,
-      fileFormat: ReadFileFormat): SplitInfo = {
+      fileFormat: ReadFileFormat,
+      metadataColumnNames: Seq[String]): SplitInfo = {
     partition match {
       case p: GlutenMergeTreePartition =>
         val partLists = new JArrayList[String]()
@@ -128,6 +129,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
           starts,
           lengths,
           partitionColumns,
+          new JArrayList[JMap[String, String]](),
           fileFormat,
           preferredLocations.toList.asJava)
       case _ =>
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
index f2943d31d..84198a6f8 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
@@ -20,6 +20,7 @@ import io.glutenproject.GlutenNumaBindingInfo
 import io.glutenproject.backendsapi.IteratorApi
 import io.glutenproject.execution._
 import io.glutenproject.metrics.IMetrics
+import io.glutenproject.sql.shims.SparkShimLoader
 import io.glutenproject.substrait.plan.PlanNode
 import io.glutenproject.substrait.rel.{LocalFilesBuilder, LocalFilesNode, 
SplitInfo}
 import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -53,11 +54,12 @@ class IteratorApiImpl extends IteratorApi with Logging {
   override def genSplitInfo(
       partition: InputPartition,
       partitionSchema: StructType,
-      fileFormat: ReadFileFormat): SplitInfo = {
+      fileFormat: ReadFileFormat,
+      metadataColumnNames: Seq[String]): SplitInfo = {
     partition match {
       case f: FilePartition =>
-        val (paths, starts, lengths, partitionColumns) =
-          constructSplitInfo(partitionSchema, f.files)
+        val (paths, starts, lengths, partitionColumns, metadataColumns) =
+          constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
         val preferredLocations =
           SoftAffinity.getFilePartitionLocations(f)
         LocalFilesBuilder.makeLocalFiles(
@@ -66,6 +68,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
           starts,
           lengths,
           partitionColumns,
+          metadataColumns,
           fileFormat,
           preferredLocations.toList.asJava)
       case _ =>
@@ -92,11 +95,15 @@ class IteratorApiImpl extends IteratorApi with Logging {
     }
   }
 
-  private def constructSplitInfo(schema: StructType, files: 
Array[PartitionedFile]) = {
+  private def constructSplitInfo(
+      schema: StructType,
+      files: Array[PartitionedFile],
+      metadataColumnNames: Seq[String]) = {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]
     val lengths = new JArrayList[JLong]()
     val partitionColumns = new JArrayList[JMap[String, String]]
+    var metadataColumns = new JArrayList[JMap[String, String]]
     files.foreach {
       file =>
         // The "file.filePath" in PartitionedFile is not the original encoded 
path, so the decoded
@@ -106,7 +113,9 @@ class IteratorApiImpl extends IteratorApi with Logging {
             .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
         starts.add(JLong.valueOf(file.start))
         lengths.add(JLong.valueOf(file.length))
-
+        val metadataColumn =
+          SparkShimLoader.getSparkShims.generateMetadataColumns(file, 
metadataColumnNames)
+        metadataColumns.add(metadataColumn)
         val partitionColumn = new JHashMap[String, String]()
         for (i <- 0 until file.partitionValues.numFields) {
           val partitionColumnValue = if (file.partitionValues.isNullAt(i)) {
@@ -131,7 +140,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
         }
         partitionColumns.add(partitionColumn)
     }
-    (paths, starts, lengths, partitionColumns)
+    (paths, starts, lengths, partitionColumns, metadataColumns)
   }
 
   override def injectWriteFilesTempPath(path: String): Unit = {
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index bab5e68ec..22a0f4ebc 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -248,6 +248,8 @@ object BackendSettings extends BackendSettingsApi {
     }
   }
 
+  override def supportNativeMetadataColumns(): Boolean = true
+
   override def supportExpandExec(): Boolean = true
 
   override def supportSortExec(): Boolean = true
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 47738cff9..8ca9f85cd 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -60,6 +60,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
   splitInfo->starts.reserve(fileList.size());
   splitInfo->lengths.reserve(fileList.size());
   splitInfo->partitionColumns.reserve(fileList.size());
+  splitInfo->metadataColumns.reserve(fileList.size());
   for (const auto& file : fileList) {
     // Expect all Partitions share the same index.
     splitInfo->partitionIndex = file.partition_index();
@@ -70,6 +71,12 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
     }
     splitInfo->partitionColumns.emplace_back(partitionColumnMap);
 
+    std::unordered_map<std::string, std::string> metadataColumnMap;
+    for (const auto& metadataColumn : file.metadata_columns()) {
+      metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
+    }
+    splitInfo->metadataColumns.emplace_back(metadataColumnMap);
+
     splitInfo->paths.emplace_back(file.uri_file());
     splitInfo->starts.emplace_back(file.start());
     splitInfo->lengths.emplace_back(file.length());
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index b10c643c8..86431819b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -145,15 +145,28 @@ WholeStageResultIterator::WholeStageResultIterator(
     const auto& lengths = scanInfo->lengths;
     const auto& format = scanInfo->format;
     const auto& partitionColumns = scanInfo->partitionColumns;
+    const auto& metadataColumns = scanInfo->metadataColumns;
 
     std::vector<std::shared_ptr<velox::connector::ConnectorSplit>> 
connectorSplits;
     connectorSplits.reserve(paths.size());
     for (int idx = 0; idx < paths.size(); idx++) {
       auto partitionColumn = partitionColumns[idx];
+      auto metadataColumn = metadataColumns[idx];
       std::unordered_map<std::string, std::optional<std::string>> 
partitionKeys;
       constructPartitionColumns(partitionKeys, partitionColumn);
       auto split = 
std::make_shared<velox::connector::hive::HiveConnectorSplit>(
-          kHiveConnectorId, paths[idx], format, starts[idx], lengths[idx], 
partitionKeys);
+          kHiveConnectorId,
+          paths[idx],
+          format,
+          starts[idx],
+          lengths[idx],
+          partitionKeys,
+          std::nullopt,
+          std::unordered_map<std::string, std::string>(),
+          nullptr,
+          std::unordered_map<std::string, std::string>(),
+          0,
+          metadataColumn);
       connectorSplits.emplace_back(split);
     }
 
diff --git a/cpp/velox/substrait/SubstraitParser.cc 
b/cpp/velox/substrait/SubstraitParser.cc
index 36fe84558..35f130076 100644
--- a/cpp/velox/substrait/SubstraitParser.cc
+++ b/cpp/velox/substrait/SubstraitParser.cc
@@ -104,31 +104,41 @@ std::vector<TypePtr> 
SubstraitParser::parseNamedStruct(const ::substrait::NamedS
   return typeList;
 }
 
-std::vector<bool> SubstraitParser::parsePartitionColumns(const 
::substrait::NamedStruct& namedStruct) {
+void SubstraitParser::parsePartitionAndMetadataColumns(
+    const ::substrait::NamedStruct& namedStruct,
+    std::vector<bool>& isPartitionColumns,
+    std::vector<bool>& isMetadataColumns) {
   const auto& columnsTypes = namedStruct.column_types();
-  std::vector<bool> isPartitionColumns;
   if (columnsTypes.size() == 0) {
-    // Regard all columns as non-partitioned columns.
+    // Regard all columns as regular columns.
     isPartitionColumns.resize(namedStruct.names().size(), false);
-    return isPartitionColumns;
+    isMetadataColumns.resize(namedStruct.names().size(), false);
+    return;
   } else {
     VELOX_CHECK_EQ(columnsTypes.size(), namedStruct.names().size(), "Wrong 
size for column types and column names.");
   }
 
   isPartitionColumns.reserve(columnsTypes.size());
+  isMetadataColumns.reserve(columnsTypes.size());
   for (const auto& columnType : columnsTypes) {
     switch (columnType) {
       case ::substrait::NamedStruct::NORMAL_COL:
         isPartitionColumns.emplace_back(false);
+        isMetadataColumns.emplace_back(false);
         break;
       case ::substrait::NamedStruct::PARTITION_COL:
         isPartitionColumns.emplace_back(true);
+        isMetadataColumns.emplace_back(false);
+        break;
+      case ::substrait::NamedStruct::METADATA_COL:
+        isPartitionColumns.emplace_back(false);
+        isMetadataColumns.emplace_back(true);
         break;
       default:
         VELOX_FAIL("Unspecified column type.");
     }
   }
-  return isPartitionColumns;
+  return;
 }
 
 int32_t SubstraitParser::parseReferenceSegment(const 
::substrait::Expression::ReferenceSegment& refSegment) {
diff --git a/cpp/velox/substrait/SubstraitParser.h 
b/cpp/velox/substrait/SubstraitParser.h
index 80d4f2ee2..4aaac5a71 100644
--- a/cpp/velox/substrait/SubstraitParser.h
+++ b/cpp/velox/substrait/SubstraitParser.h
@@ -41,8 +41,11 @@ class SubstraitParser {
       const ::substrait::NamedStruct& namedStruct,
       bool asLowerCase = false);
 
-  /// Used to parse partition columns from Substrait NamedStruct.
-  static std::vector<bool> parsePartitionColumns(const 
::substrait::NamedStruct& namedStruct);
+  /// Used to parse partition & metadata columns from Substrait NamedStruct.
+  static void parsePartitionAndMetadataColumns(
+      const ::substrait::NamedStruct& namedStruct,
+      std::vector<bool>& isPartitionColumns,
+      std::vector<bool>& isMetadataColumns);
 
   /// Parse Substrait Type to Velox type.
   static facebook::velox::TypePtr parseType(const ::substrait::Type& 
substraitType, bool asLowerCase = false);
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b8ca25a43..aa19b7f6a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -596,11 +596,12 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   std::vector<std::string> tableColumnNames;
   std::vector<std::string> partitionedKey;
   std::vector<bool> isPartitionColumns;
+  std::vector<bool> isMetadataColumns;
   tableColumnNames.reserve(writeRel.table_schema().names_size());
 
   VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table 
schema to store the column information");
   const auto& tableSchema = writeRel.table_schema();
-  isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema);
+  SubstraitParser::parsePartitionAndMetadataColumns(tableSchema, 
isPartitionColumns, isMetadataColumns);
 
   for (const auto& name : tableSchema.names()) {
     tableColumnNames.emplace_back(name);
@@ -1040,6 +1041,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   std::vector<std::string> colNameList;
   std::vector<TypePtr> veloxTypeList;
   std::vector<bool> isPartitionColumns;
+  std::vector<bool> isMetadataColumns;
   // Convert field names into lower case when not case-sensitive.
   std::shared_ptr<const facebook::velox::Config> veloxCfg =
       std::make_shared<const 
facebook::velox::core::MemConfigMutable>(confMap_);
@@ -1055,7 +1057,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
       colNameList.emplace_back(fieldName);
     }
     veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase);
-    isPartitionColumns = SubstraitParser::parsePartitionColumns(baseSchema);
+    SubstraitParser::parsePartitionAndMetadataColumns(baseSchema, 
isPartitionColumns, isMetadataColumns);
   }
 
   // Do not hard-code connector ID and allow for connectors other than Hive.
@@ -1110,8 +1112,13 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>> 
assignments;
   for (int idx = 0; idx < colNameList.size(); idx++) {
     auto outName = SubstraitParser::makeNodeName(planNodeId_, idx);
-    auto columnType = isPartitionColumns[idx] ? 
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey
-                                              : 
connector::hive::HiveColumnHandle::ColumnType::kRegular;
+    auto columnType = connector::hive::HiveColumnHandle::ColumnType::kRegular;
+    if (isPartitionColumns[idx]) {
+      columnType = 
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey;
+    }
+    if (isMetadataColumns[idx]) {
+      columnType = connector::hive::HiveColumnHandle::ColumnType::kSynthesized;
+    }
     assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
         colNameList[idx], columnType, veloxTypeList[idx], veloxTypeList[idx]);
     outNames.emplace_back(outName);
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 21a318b91..895c1d24e 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -36,6 +36,9 @@ struct SplitInfo {
   /// The partition columns associated with partitioned table.
   std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
 
+  /// The metadata columns associated with partitioned table.
+  std::vector<std::unordered_map<std::string, std::string>> metadataColumns;
+
   /// The file paths to be scanned.
   std::vector<std::string> paths;
 
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 3826465b3..5df59a348 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -366,7 +366,9 @@ bool SubstraitToVeloxPlanValidator::validate(const 
::substrait::WriteRel& writeR
   // Validate partition key type.
   if (writeRel.has_table_schema()) {
     const auto& tableSchema = writeRel.table_schema();
-    auto isPartitionColumns = 
SubstraitParser::parsePartitionColumns(tableSchema);
+    std::vector<bool> isMetadataColumns;
+    std::vector<bool> isPartitionColumns;
+    SubstraitParser::parsePartitionAndMetadataColumns(tableSchema, 
isPartitionColumns, isMetadataColumns);
     for (auto i = 0; i < types.size(); i++) {
       if (isPartitionColumns[i]) {
         switch (types[i]->kind()) {
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
index c86c90cc6..1f6eaabce 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java
@@ -28,10 +28,18 @@ public class LocalFilesBuilder {
       List<Long> starts,
       List<Long> lengths,
       List<Map<String, String>> partitionColumns,
+      List<Map<String, String>> metadataColumns,
       LocalFilesNode.ReadFileFormat fileFormat,
       List<String> preferredLocations) {
     return new LocalFilesNode(
-        index, paths, starts, lengths, partitionColumns, fileFormat, 
preferredLocations);
+        index,
+        paths,
+        starts,
+        lengths,
+        partitionColumns,
+        metadataColumns,
+        fileFormat,
+        preferredLocations);
   }
 
   public static LocalFilesNode makeLocalFiles(String iterPath) {
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
index 32e81f2c6..e0700ded2 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java
@@ -35,6 +35,7 @@ public class LocalFilesNode implements SplitInfo {
   private final List<Long> starts = new ArrayList<>();
   private final List<Long> lengths = new ArrayList<>();
   private final List<Map<String, String>> partitionColumns = new ArrayList<>();
+  private final List<Map<String, String>> metadataColumns = new ArrayList<>();
   private final List<String> preferredLocations = new ArrayList<>();
 
   // The format of file to read.
@@ -60,6 +61,7 @@ public class LocalFilesNode implements SplitInfo {
       List<Long> starts,
       List<Long> lengths,
       List<Map<String, String>> partitionColumns,
+      List<Map<String, String>> metadataColumns,
       ReadFileFormat fileFormat,
       List<String> preferredLocations) {
     this.index = index;
@@ -68,6 +70,7 @@ public class LocalFilesNode implements SplitInfo {
     this.lengths.addAll(lengths);
     this.fileFormat = fileFormat;
     this.partitionColumns.addAll(partitionColumns);
+    this.metadataColumns.addAll(metadataColumns);
     this.preferredLocations.addAll(preferredLocations);
   }
 
@@ -141,7 +144,22 @@ public class LocalFilesNode implements SplitInfo {
       }
       fileBuilder.setLength(lengths.get(i));
       fileBuilder.setStart(starts.get(i));
-
+      if (!metadataColumns.isEmpty()) {
+        Map<String, String> metadataColumn = metadataColumns.get(i);
+        if (!metadataColumn.isEmpty()) {
+          metadataColumn.forEach(
+              (key, value) -> {
+                ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder 
mcBuilder =
+                    ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder();
+                mcBuilder.setKey(key).setValue(value);
+                fileBuilder.addMetadataColumns(mcBuilder.build());
+              });
+        }
+      } else {
+        ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder mcBuilder =
+            ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder();
+        fileBuilder.addMetadataColumns(mcBuilder.build());
+      }
       NamedStruct namedStruct = buildNamedStruct();
       fileBuilder.setSchema(namedStruct);
 
diff --git 
a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
index b72bcbb01..63a0f36ea 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -169,6 +169,12 @@ message ReadRel {
 
       /// File schema
       NamedStruct schema = 17;
+
+      message metadataColumn {
+        string key = 1;
+        string value = 2;
+      }
+      repeated metadataColumn metadata_columns = 18;
     }
   }
 }
diff --git 
a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto 
b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
index 6130f2d76..5c7ee6a38 100644
--- a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
+++ b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto
@@ -237,5 +237,6 @@ message NamedStruct {
   enum ColumnType {
     NORMAL_COL = 0;
     PARTITION_COL = 1;
+    METADATA_COL = 2;
   }
 }
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
index 83db8a8da..25d71f0fc 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
@@ -41,6 +41,7 @@ trait BackendSettingsApi {
       fields: Array[StructField],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String]): ValidationResult = ValidationResult.ok
+  def supportNativeMetadataColumns(): Boolean = false
   def supportExpandExec(): Boolean = false
   def supportSortExec(): Boolean = false
   def supportSortMergeJoinExec(): Boolean = true
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
index 2f506f483..cd8995211 100644
--- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
@@ -36,7 +36,8 @@ trait IteratorApi {
   def genSplitInfo(
       partition: InputPartition,
       partitionSchema: StructType,
-      fileFormat: ReadFileFormat): SplitInfo
+      fileFormat: ReadFileFormat,
+      metadataColumnNames: Seq[String]): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 625ab6e97..2be1a162c 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -37,12 +37,15 @@ import com.google.protobuf.StringValue
 import scala.collection.JavaConverters._
 
 trait BasicScanExecTransformer extends LeafTransformSupport with 
BaseDataSource {
+  import org.apache.spark.sql.catalyst.util._
 
   /** Returns the filters that can be pushed down to native file scan */
   def filterExprs(): Seq[Expression]
 
   def outputAttributes(): Seq[Attribute]
 
+  def getMetadataColumns(): Seq[AttributeReference]
+
   /** This can be used to report FileFormat for a file based scan operator. */
   val fileFormat: ReadFileFormat
 
@@ -63,7 +66,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getSplitInfos: Seq[SplitInfo] = {
     getPartitions.map(
       BackendsApiManager.getIteratorApiInstance
-        .genSplitInfo(_, getPartitionSchema, fileFormat))
+        .genSplitInfo(_, getPartitionSchema, fileFormat, 
getMetadataColumns.map(_.name)))
   }
 
   def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
@@ -112,6 +115,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
       attr =>
         if (getPartitionSchema.exists(_.name.equals(attr.name))) {
           new ColumnTypeNode(1)
+        } else if (attr.isMetadataCol) {
+          new ColumnTypeNode(2)
         } else {
           new ColumnTypeNode(0)
         }
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
index afa0ce0e2..dfb448f13 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala
@@ -108,6 +108,8 @@ abstract class BatchScanExecTransformerBase(
       throw new UnsupportedOperationException(s"${scan.getClass.toString} is 
not supported")
   }
 
+  override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
   override def outputAttributes(): Seq[Attribute] = output
 
   override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
index fb8cd31cf..2c8bee43f 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala
@@ -23,7 +23,7 @@ import 
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PlanExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, PlanExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.FileSourceScanExecShim
@@ -99,9 +99,11 @@ abstract class FileSourceScanExecTransformerBase(
       .genFileSourceScanTransformerMetrics(sparkContext)
       .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias
 
-  def getPartitionFilters(): Seq[Expression] = partitionFilters
+  override def filterExprs(): Seq[Expression] = dataFiltersInScan
+
+  override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
 
-  override def filterExprs(): Seq[Expression] = dataFilters
+  def getPartitionFilters(): Seq[Expression] = partitionFilters
 
   override def outputAttributes(): Seq[Attribute] = output
 
@@ -125,8 +127,14 @@ abstract class FileSourceScanExecTransformerBase(
   }
 
   override protected def doValidateInternal(): ValidationResult = {
-    if (hasMetadataColumns) {
-      return ValidationResult.notOk(s"Unsupported metadataColumns scan in 
native.")
+    if (
+      !metadataColumns.isEmpty && 
!BackendsApiManager.getSettings.supportNativeMetadataColumns()
+    ) {
+      return ValidationResult.notOk(s"Unsupported metadata columns scan in 
native.")
+    }
+
+    if (hasUnsupportedColumns) {
+      return ValidationResult.notOk(s"Unsupported columns scan in native.")
     }
 
     if (hasFieldIds) {
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index dc3beeb0d..1f32ee8b4 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -25,7 +25,7 @@ import 
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.SparkPlan
@@ -63,6 +63,8 @@ case class HiveTableScanExecTransformer(
 
   override def filterExprs(): Seq[Expression] = Seq.empty
 
+  override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
   override def outputAttributes(): Seq[Attribute] = output
 
   override def getPartitions: Seq[InputPartition] = partitions
diff --git 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
index c763a46b1..98cc0d90e 100644
--- 
a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
+++ 
b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java
@@ -16,6 +16,7 @@
  */
 package io.glutenproject.substrait.rel;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -58,6 +59,14 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
       List<Map<String, String>> partitionColumns,
       ReadFileFormat fileFormat,
       List<String> preferredLocations) {
-    super(index, paths, starts, lengths, partitionColumns, fileFormat, 
preferredLocations);
+    super(
+        index,
+        paths,
+        starts,
+        lengths,
+        partitionColumns,
+        new ArrayList<>(),
+        fileFormat,
+        preferredLocations);
   }
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
index af15f7386..794c0089b 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
@@ -16,6 +16,145 @@
  */
 package org.apache.spark.sql.execution.datasources
 
+import io.glutenproject.execution.{FileSourceScanExecTransformer, 
FilterExecTransformer}
+import io.glutenproject.utils.BackendTestUtils
+
+import org.apache.spark.sql.{Column, DataFrame, Row}
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+import java.io.File
+import java.sql.Timestamp
+
+import scala.reflect.ClassTag
+
+class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with 
GlutenSQLTestsBaseTrait {
+
+  val schemaWithFilePathField: StructType = new StructType()
+    .add(StructField("file_path", StringType))
+    .add(StructField("age", IntegerType))
+    .add(
+      StructField(
+        "info",
+        new StructType()
+          .add(StructField("id", LongType))
+          .add(StructField("university", StringType))))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  private def getMetadataForFile(f: File): Map[String, Any] = {
+    Map(
+      METADATA_FILE_PATH -> f.toURI.toString,
+      METADATA_FILE_NAME -> f.getName,
+      METADATA_FILE_SIZE -> f.length(),
+      METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified())
+    )
+  }
+
+  private def metadataColumnsNativeTest(testName: String, fileSchema: 
StructType)(
+      f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+    Seq("parquet").foreach {
+      testFileFormat =>
+        test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) {
+          withTempDir {
+            dir =>
+              import scala.collection.JavaConverters._
+
+              // 1. create df0 and df1 and save under /data/f0 and /data/f1
+              val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+              val f0 = new File(dir, "data/f0").getCanonicalPath
+              df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+              val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+              val f1 = new File(dir, "data/f1 gluten").getCanonicalPath
+              df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+              // 2. read both f0 and f1
+              val df = spark.read
+                .format(testFileFormat)
+                .schema(fileSchema)
+                .load(new File(dir, "data").getCanonicalPath + "/*")
+              val realF0 = new File(dir, "data/f0")
+                .listFiles()
+                .filter(_.getName.endsWith(s".$testFileFormat"))
+                .head
+              val realF1 = new File(dir, "data/f1 gluten")
+                .listFiles()
+                .filter(_.getName.endsWith(s".$testFileFormat"))
+                .head
+              f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
+          }
+        }
+    }
+  }
+
+  def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
+    val executedPlan = getExecutedPlan(df)
+    assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+  }
+
+  metadataColumnsNativeTest(
+    "plan check with metadata and user data select",
+    schemaWithFilePathField) {
+    (df, f0, f1) =>
+      var dfWithMetadata = df.select(
+        METADATA_FILE_NAME,
+        METADATA_FILE_PATH,
+        METADATA_FILE_SIZE,
+        METADATA_FILE_MODIFICATION_TIME,
+        "age")
+      dfWithMetadata.collect
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+      }
+
+      // would fallback
+      dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path")
+      checkAnswer(
+        dfWithMetadata,
+        Seq(
+          Row(f0(METADATA_FILE_PATH), "jack"),
+          Row(f1(METADATA_FILE_PATH), "lily")
+        )
+      )
+      checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+  }
+
+  metadataColumnsNativeTest("plan check with metadata filter", 
schemaWithFilePathField) {
+    (df, f0, f1) =>
+      var filterDF = df
+        .select("file_path", "age", METADATA_FILE_NAME)
+        .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
+      val ret = filterDF.collect
+      assert(ret.size == 1)
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](filterDF)
+      }
+      checkOperatorMatch[FilterExecTransformer](filterDF)
 
-class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with 
GlutenSQLTestsBaseTrait {}
+      // case to check if file_path is URI string
+      filterDF =
+        df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) === 
f1((METADATA_FILE_NAME)))
+      checkAnswer(
+        filterDF,
+        Seq(
+          Row(f1(METADATA_FILE_PATH))
+        )
+      )
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](filterDF)
+      }
+      checkOperatorMatch[FilterExecTransformer](filterDF)
+  }
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
index af15f7386..794c0089b 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala
@@ -16,6 +16,145 @@
  */
 package org.apache.spark.sql.execution.datasources
 
+import io.glutenproject.execution.{FileSourceScanExecTransformer, 
FilterExecTransformer}
+import io.glutenproject.utils.BackendTestUtils
+
+import org.apache.spark.sql.{Column, DataFrame, Row}
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+import java.io.File
+import java.sql.Timestamp
+
+import scala.reflect.ClassTag
+
+class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with 
GlutenSQLTestsBaseTrait {
+
+  val schemaWithFilePathField: StructType = new StructType()
+    .add(StructField("file_path", StringType))
+    .add(StructField("age", IntegerType))
+    .add(
+      StructField(
+        "info",
+        new StructType()
+          .add(StructField("id", LongType))
+          .add(StructField("university", StringType))))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  private def getMetadataForFile(f: File): Map[String, Any] = {
+    Map(
+      METADATA_FILE_PATH -> f.toURI.toString,
+      METADATA_FILE_NAME -> f.getName,
+      METADATA_FILE_SIZE -> f.length(),
+      METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified())
+    )
+  }
+
+  private def metadataColumnsNativeTest(testName: String, fileSchema: 
StructType)(
+      f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+    Seq("parquet").foreach {
+      testFileFormat =>
+        test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) {
+          withTempDir {
+            dir =>
+              import scala.collection.JavaConverters._
+
+              // 1. create df0 and df1 and save under /data/f0 and /data/f1
+              val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+              val f0 = new File(dir, "data/f0").getCanonicalPath
+              df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+              val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+              val f1 = new File(dir, "data/f1 gluten").getCanonicalPath
+              df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+              // 2. read both f0 and f1
+              val df = spark.read
+                .format(testFileFormat)
+                .schema(fileSchema)
+                .load(new File(dir, "data").getCanonicalPath + "/*")
+              val realF0 = new File(dir, "data/f0")
+                .listFiles()
+                .filter(_.getName.endsWith(s".$testFileFormat"))
+                .head
+              val realF1 = new File(dir, "data/f1 gluten")
+                .listFiles()
+                .filter(_.getName.endsWith(s".$testFileFormat"))
+                .head
+              f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
+          }
+        }
+    }
+  }
+
+  def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
+    val executedPlan = getExecutedPlan(df)
+    assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+  }
+
+  metadataColumnsNativeTest(
+    "plan check with metadata and user data select",
+    schemaWithFilePathField) {
+    (df, f0, f1) =>
+      var dfWithMetadata = df.select(
+        METADATA_FILE_NAME,
+        METADATA_FILE_PATH,
+        METADATA_FILE_SIZE,
+        METADATA_FILE_MODIFICATION_TIME,
+        "age")
+      dfWithMetadata.collect
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+      }
+
+      // would fallback
+      dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path")
+      checkAnswer(
+        dfWithMetadata,
+        Seq(
+          Row(f0(METADATA_FILE_PATH), "jack"),
+          Row(f1(METADATA_FILE_PATH), "lily")
+        )
+      )
+      checkOperatorMatch[FileSourceScanExec](dfWithMetadata)
+  }
+
+  metadataColumnsNativeTest("plan check with metadata filter", 
schemaWithFilePathField) {
+    (df, f0, f1) =>
+      var filterDF = df
+        .select("file_path", "age", METADATA_FILE_NAME)
+        .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME)))
+      val ret = filterDF.collect
+      assert(ret.size == 1)
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](filterDF)
+      }
+      checkOperatorMatch[FilterExecTransformer](filterDF)
 
-class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with 
GlutenSQLTestsBaseTrait {}
+      // case to check if file_path is URI string
+      filterDF =
+        df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) === 
f1((METADATA_FILE_NAME)))
+      checkAnswer(
+        filterDF,
+        Seq(
+          Row(f1(METADATA_FILE_PATH))
+        )
+      )
+      if (BackendTestUtils.isVeloxBackendLoaded()) {
+        checkOperatorMatch[FileSourceScanExecTransformer](filterDF)
+      } else {
+        checkOperatorMatch[FileSourceScanExec](filterDF)
+      }
+      checkOperatorMatch[FilterExecTransformer](filterDF)
+  }
+}
diff --git 
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index ab560a060..64ed1b866 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -44,6 +44,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import java.util.{ArrayList => JArrayList, Map => JMap}
+
 sealed abstract class ShimDescriptor
 
 case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends 
ShimDescriptor {
@@ -159,6 +161,10 @@ trait SparkShims {
 
   def attributesFromStruct(structType: StructType): Seq[Attribute]
 
+  def generateMetadataColumns(
+      file: PartitionedFile,
+      metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
+
   // For compatibility with Spark-3.5.
   def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan]
 
diff --git 
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
index 5a3579b25..8f028968e 100644
--- 
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
@@ -45,6 +45,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import java.util.{HashMap => JHashMap, Map => JMap}
+
 class Spark32Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -185,6 +187,11 @@ class Spark32Shims extends SparkShims {
     }
   }
 
+  override def generateMetadataColumns(
+      file: PartitionedFile,
+      metadataColumnNames: Seq[String]): JMap[String, String] =
+    new JHashMap[String, String]()
+
   def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] = {
     ae.plan
   }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 654bc4924..e27f4bc28 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -53,7 +53,14 @@ abstract class FileSourceScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
-  def hasMetadataColumns: Boolean = false
+  def dataFiltersInScan: Seq[Expression] = dataFilters
+
+  def metadataColumns: Seq[AttributeReference] = Seq.empty
+
+  def hasUnsupportedColumns: Boolean = {
+    // Below name has special meaning in Velox.
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
 
   def isMetadataColumn(attr: Attribute): Boolean = false
 
diff --git 
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
index 5f8134f7e..8e770325f 100644
--- 
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
@@ -33,10 +33,11 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, SparkPlan}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, 
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, 
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, 
PartitioningAwareFileIndex}
 import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
@@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
 class Spark33Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -152,6 +156,27 @@ class Spark33Shims extends SparkShims {
       case _ => None
     }
   }
+  override def generateMetadataColumns(
+      file: PartitionedFile,
+      metadataColumnNames: Seq[String]): JMap[String, String] = {
+    val metadataColumn = new JHashMap[String, String]()
+    val path = new Path(file.filePath.toString)
+    for (columnName <- metadataColumnNames) {
+      columnName match {
+        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
+        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_SIZE =>
+          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+        case FileFormat.FILE_MODIFICATION_TIME =>
+          val fileModifyTime = TimestampFormatter
+            .getFractionFormatter(ZoneOffset.UTC)
+            .format(file.modificationTime * 1000L)
+          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+        case _ =>
+      }
+    }
+    metadataColumn
+  }
 
   private def invalidBucketFile(path: String): Throwable = {
     new SparkException(
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index e07899793..92f32f847 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
 import io.glutenproject.metrics.GlutenTimeMetric
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.types.StructType
@@ -54,7 +54,20 @@ abstract class FileSourceScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
-  def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+  def dataFiltersInScan: Seq[Expression] = 
dataFilters.filterNot(_.references.exists {
+    case FileSourceMetadataAttribute(attr) if attr.name == "_metadata" => true
+    case _ => false
+  })
+
+  def hasUnsupportedColumns: Boolean = {
+    // TODO, fallback if user define same name column due to we can't right now
+    // detect which column is metadata column which is user defined column.
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
 
   def isMetadataColumn(attr: Attribute): Boolean = 
metadataColumns.contains(attr)
 
diff --git 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index cd8449bb3..c98def5da 100644
--- 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -34,11 +34,12 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, 
GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan, 
TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, 
PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, 
FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, 
PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -49,6 +50,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
 class Spark34Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -155,6 +159,34 @@ class Spark34Shims extends SparkShims {
     }
   }
 
+  override def generateMetadataColumns(
+      file: PartitionedFile,
+      metadataColumnNames: Seq[String]): JMap[String, String] = {
+    val metadataColumn = new JHashMap[String, String]()
+    val path = new Path(file.filePath.toString)
+    for (columnName <- metadataColumnNames) {
+      columnName match {
+        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
+        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_SIZE =>
+          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+        case FileFormat.FILE_MODIFICATION_TIME =>
+          val fileModifyTime = TimestampFormatter
+            .getFractionFormatter(ZoneOffset.UTC)
+            .format(file.modificationTime * 1000L)
+          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+        case FileFormat.FILE_BLOCK_START =>
+          metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+        case FileFormat.FILE_BLOCK_LENGTH =>
+          metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, 
file.length.toString)
+        case _ =>
+      }
+    }
+
+    // TODO: row_index metadata support
+    metadataColumn
+  }
+
   // https://issues.apache.org/jira/browse/SPARK-40400
   private def invalidBucketFile(path: String): Throwable = {
     new SparkException(
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index f24ab8d57..57d1bd06a 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
 import io.glutenproject.metrics.GlutenTimeMetric
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
@@ -55,7 +55,18 @@ abstract class FileSourceScanExecShim(
     case FileSourceGeneratedMetadataAttribute(attr) => attr
   }
 
-  def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+  def dataFiltersInScan: Seq[Expression] = dataFilters
+
+  def hasUnsupportedColumns: Boolean = {
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    // row_index metadata is not support yet
+    metadataColumnsNames.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) 
||
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    // Below name has special meaning in Velox.
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
 
   def isMetadataColumn(attr: Attribute): Boolean = 
metadataColumns.contains(attr)
 
diff --git 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index a33801653..6a6f3b2c8 100644
--- 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -34,10 +34,11 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, 
GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan, 
TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory, 
PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, 
WriteTaskResult}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, 
FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory, 
PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, 
WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
+import java.time.ZoneOffset
+import java.util.{HashMap => JHashMap, Map => JMap}
+
 class Spark35Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -152,6 +156,34 @@ class Spark35Shims extends SparkShims {
     }
   }
 
+  override def generateMetadataColumns(
+      file: PartitionedFile,
+      metadataColumnNames: Seq[String]): JMap[String, String] = {
+    val metadataColumn = new JHashMap[String, String]()
+    val path = new Path(file.filePath.toString)
+    for (columnName <- metadataColumnNames) {
+      columnName match {
+        case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, 
path.toString)
+        case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, 
path.getName)
+        case FileFormat.FILE_SIZE =>
+          metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString)
+        case FileFormat.FILE_MODIFICATION_TIME =>
+          val fileModifyTime = TimestampFormatter
+            .getFractionFormatter(ZoneOffset.UTC)
+            .format(file.modificationTime * 1000L)
+          metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime)
+        case FileFormat.FILE_BLOCK_START =>
+          metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString)
+        case FileFormat.FILE_BLOCK_LENGTH =>
+          metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, 
file.length.toString)
+        case _ =>
+      }
+    }
+
+    // TODO row_index metadata support
+    metadataColumn
+  }
+
   // https://issues.apache.org/jira/browse/SPARK-40400
   private def invalidBucketFile(path: String): Throwable = {
     new SparkException(
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index acede96da..efcb9dbad 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution
 import io.glutenproject.metrics.GlutenTimeMetric
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
FileSourceMetadataAttribute, PlanExpression, Predicate}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
@@ -50,14 +50,27 @@ abstract class FileSourceScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
-  lazy val metadataColumns = output.collect {
+  lazy val metadataColumns: Seq[AttributeReference] = output.collect {
     case FileSourceConstantMetadataAttribute(attr) => attr
-    case FileSourceGeneratedMetadataAttribute(attr) => attr
+    case FileSourceGeneratedMetadataAttribute(attr, _) => attr
   }
 
   protected lazy val driverMetricsAlias = driverMetrics
 
-  def hasMetadataColumns: Boolean = metadataColumns.nonEmpty
+  def dataFiltersInScan: Seq[Expression] = 
dataFilters.filterNot(_.references.exists {
+    case FileSourceMetadataAttribute(_) => true
+    case _ => false
+  })
+
+  def hasUnsupportedColumns: Boolean = {
+    // TODO, fallback if user define same name column due to we can't right now
+    // detect which column is metadata column which is user defined column.
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
 
   def isMetadataColumn(attr: Attribute): Boolean = 
metadataColumns.contains(attr)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to