This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b10ee0d6b2 [Gluten-9334][CH] Support delta metadata column `file_path` 
and `row_index` for mergetree (#9340)
b10ee0d6b2 is described below

commit b10ee0d6b2722ca691475cd76b7f708504965651
Author: Shuai li <[email protected]>
AuthorDate: Wed Apr 16 21:39:39 2025 +0800

    [Gluten-9334][CH] Support delta metadata column `file_path` and `row_index` 
for mergetree (#9340)
    
    [Gluten-9334][CH] Support delta metadata column `file_path` and `row_index` 
for mergetree
---
 .../GlutenDeltaMergetreeDeletionVectorSuite.scala  |  85 ++++++
 .../GlutenClickHouseTPCHAbstractSuite.scala        |  83 +++---
 .../Parser/RelParsers/MergeTreeRelParser.cpp       | 286 +++++++++++++--------
 .../Parser/RelParsers/MergeTreeRelParser.h         |  37 ++-
 .../Storages/SubstraitSource/FormatFile.h          |   9 -
 5 files changed, 336 insertions(+), 164 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
new file mode 100644
index 0000000000..1378e52d5e
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.gluten.delta
+
+import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenDeltaMergetreeDeletionVectorSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  // import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+  /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.files.maxPartitionBytes", "20000000")
+      .set("spark.sql.storeAssignmentPolicy", "legacy")
+      .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+  }
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  test("Gluten-9334: column `_tmp_metadata_row_index` and `file_path` not 
found") {
+    val tableName = "delta_metadata_column"
+    withTable(tableName) {
+      withTempDir {
+        dirName =>
+          val deltaPath = s"$dirName/$tableName"
+          spark.sql(s"""
+                       |CREATE TABLE IF NOT EXISTS $tableName
+                       |($lineitemNullableSchema)
+                       |USING Clickhouse
+                       |TBLPROPERTIES (delta.enableDeletionVectors='true')
+                       |LOCATION '$deltaPath'
+                       |""".stripMargin)
+
+          spark.sql(s"""insert into table $tableName select * from lineitem 
""".stripMargin)
+
+          val df = sql(s"""
+                          | select
+                          |   _metadata.file_path,
+                          |   _metadata.row_index
+                          | from $tableName
+                          | limit 1
+                          |""".stripMargin)
+
+          checkFallbackOperators(df, 0)
+      }
+    }
+  }
+}
+// scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
index a299795ad7..b876ab80bb 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
@@ -44,6 +44,9 @@ abstract class GlutenClickHouseTPCHAbstractSuite
   protected val tpchQueries: String
   protected val queriesResults: String
 
+  protected val lineitemNullableSchema: String = lineitemSchema()
+  protected val lineitemNotNullSchema: String = lineitemSchema(false)
+
   override def beforeAll(): Unit = {
 
     super.beforeAll()
@@ -98,22 +101,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     spark.sql(s"DROP TABLE IF EXISTS lineitem")
     spark.sql(s"""
                  | CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
-                 | l_orderkey      bigint not null,
-                 | l_partkey       bigint not null,
-                 | l_suppkey       bigint not null,
-                 | l_linenumber    bigint not null,
-                 | l_quantity      double not null,
-                 | l_extendedprice double not null,
-                 | l_discount      double not null,
-                 | l_tax           double not null,
-                 | l_returnflag    string not null,
-                 | l_linestatus    string not null,
-                 | l_shipdate      date not null,
-                 | l_commitdate    date not null,
-                 | l_receiptdate   date not null,
-                 | l_shipinstruct  string not null,
-                 | l_shipmode      string not null,
-                 | l_comment       string not null)
+                 | $lineitemNotNullSchema)
                  | USING clickhouse
                  | TBLPROPERTIES (engine='MergeTree'
                  |                )
@@ -269,22 +257,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     spark.sql(s"DROP TABLE IF EXISTS lineitem")
     spark.sql(s"""
                  | CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
-                 | l_orderkey      bigint,
-                 | l_partkey       bigint,
-                 | l_suppkey       bigint,
-                 | l_linenumber    bigint,
-                 | l_quantity      double,
-                 | l_extendedprice double,
-                 | l_discount      double,
-                 | l_tax           double,
-                 | l_returnflag    string,
-                 | l_linestatus    string,
-                 | l_shipdate      date,
-                 | l_commitdate    date,
-                 | l_receiptdate   date,
-                 | l_shipinstruct  string,
-                 | l_shipmode      string,
-                 | l_comment       string)
+                 | $lineitemNullableSchema
+                 | )
                  | USING clickhouse
                  | TBLPROPERTIES (engine='MergeTree'
                  |                )
@@ -445,22 +419,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     spark.sql(s"DROP TABLE IF EXISTS lineitem")
     spark.sql(s"""
                  | CREATE TABLE IF NOT EXISTS lineitem (
-                 | l_orderkey      bigint,
-                 | l_partkey       bigint,
-                 | l_suppkey       bigint,
-                 | l_linenumber    bigint,
-                 | l_quantity      double,
-                 | l_extendedprice double,
-                 | l_discount      double,
-                 | l_tax           double,
-                 | l_returnflag    string,
-                 | l_linestatus    string,
-                 | l_shipdate      date,
-                 | l_commitdate    date,
-                 | l_receiptdate   date,
-                 | l_shipinstruct  string,
-                 | l_shipmode      string,
-                 | l_comment       string)
+                 | $lineitemNullableSchema
+                 | )
                  | USING PARQUET LOCATION '$lineitemData'
                  |""".stripMargin)
 
@@ -657,4 +617,31 @@ abstract class GlutenClickHouseTPCHAbstractSuite
        |    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
        |    AND l_quantity < 24
        |""".stripMargin
+
+  private def lineitemSchema(nullable: Boolean = true): String = {
+    val nullableSql = if (nullable) {
+      ""
+    } else {
+      " not null "
+    }
+
+    s"""
+       | l_orderkey      bigint $nullableSql,
+       | l_partkey       bigint $nullableSql,
+       | l_suppkey       bigint $nullableSql,
+       | l_linenumber    bigint $nullableSql,
+       | l_quantity      double $nullableSql,
+       | l_extendedprice double $nullableSql,
+       | l_discount      double $nullableSql,
+       | l_tax           double $nullableSql,
+       | l_returnflag    string $nullableSql,
+       | l_linestatus    string $nullableSql,
+       | l_shipdate      date   $nullableSql,
+       | l_commitdate    date   $nullableSql,
+       | l_receiptdate   date   $nullableSql,
+       | l_shipinstruct  string $nullableSql,
+       | l_shipmode      string $nullableSql,
+       | l_comment       string $nullableSql
+       |""".stripMargin
+  }
 }
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index 428313ecc7..d41599adb5 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -16,6 +16,7 @@
  */
 
 #include "MergeTreeRelParser.h"
+
 #include <Core/Settings.h>
 #include <Parser/ExpressionParser.h>
 #include <Parser/FunctionParser.h>
@@ -51,6 +52,58 @@ namespace local_engine
 {
 using namespace DB;
 
+void replaceFilePathNodeCommon(
+    const String & alias_name, DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+    auto concat_func = DB::FunctionFactory::instance().get("concat", context);
+    DB::ActionsDAG::NodeRawConstPtrs args;
+    const auto string_type = std::make_shared<DB::DataTypeString>();
+    const auto * path_node = &actions_dag.addColumn(
+        DB::ColumnWithTypeAndName(string_type->createColumnConst(1, 
merge_tree_table.absolute_path + "/"), string_type, "path"));
+    args.emplace_back(path_node);
+    const auto & part_name = 
actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
+    args.emplace_back(&part_name);
+    actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func, 
args, alias_name));
+}
+
+void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+    replaceFilePathNodeCommon(FileMetaColumns::INPUT_FILE_NAME, actions_dag, 
merge_tree_table, context);
+}
+
+void replaceFilePathNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+    replaceFilePathNodeCommon(FileMetaColumns::FILE_PATH, actions_dag, 
merge_tree_table, context);
+}
+
+void replaceFileNameNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance &, DB::ContextPtr)
+{
+    const auto & part_name = 
actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
+    const auto & alias = actions_dag.addAlias(part_name, 
FileMetaColumns::FILE_NAME);
+    actions_dag.addOrReplaceInOutputs(alias);
+}
+
+void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance &, DB::ContextPtr)
+{
+    const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+    actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
+        DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), 
int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
+}
+
+void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance &, DB::ContextPtr)
+{
+    const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+    actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
+        DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1), 
int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
+}
+
+void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance &, DB::ContextPtr)
+{
+    const auto & tmp_metadata_row_index = 
actions_dag.findInOutputs(DB::BlockOffsetColumn::name);
+    const auto & alias = actions_dag.addAlias(tmp_metadata_row_index, 
ParquetVirtualMeta::TMP_ROWINDEX);
+    actions_dag.addOrReplaceInOutputs(alias);
+}
+
 /// Find minimal position of the column in primary key.
 static Int64 findMinPosition(const NameSet & condition_table_columns, const 
NameToIndexMap & primary_key_positions)
 {
@@ -66,52 +119,140 @@ static Int64 findMinPosition(const NameSet & 
condition_table_columns, const Name
     return min_position;
 }
 
-DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
-    DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const 
substrait::ReadRel::ExtensionTable & extension_table)
-{
-    MergeTreeTableInstance merge_tree_table(extension_table);
-    // ignore snapshot id for a query
-    merge_tree_table.snapshot_id = "";
-    auto storage = 
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
 
-    DB::Block input;
-    DB::Block original_input;
+DB::Block MergeTreeRelParser::parseMergeTreeOutput(const substrait::ReadRel & 
rel, SparkStorageMergeTreePtr storage)
+{
     if (rel.has_base_schema() && rel.base_schema().names_size())
+        return TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+
+    NamesAndTypesList one_column_name_type;
+    
one_column_name_type.push_back(storage->getInMemoryMetadataPtr()->getColumns().getAll().front());
+    LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead of 
empty header", one_column_name_type.front().dump());
+    return toSampleBlock(one_column_name_type);
+}
+
+
+DB::Block MergeTreeRelParser::replaceDeltaNameIfNeeded(const DB::Block & 
output)
+{
+    DB::ColumnsWithTypeAndName read_block;
+    for (const auto & column : output)
     {
-        input = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
-        if (input.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr)
+        if (DELTA_META_COLUMN_MAP.contains(column.name))
+        {
+            if (auto tuple = DELTA_META_COLUMN_MAP.at(column.name); 
std::get<0>(tuple).has_value())
+                
read_block.emplace_back(ColumnWithTypeAndName(std::get<1>(tuple), 
std::get<0>(tuple).value()));
+        }
+        else
         {
-            // mergetree use concat(path, _part) instead of input_file_name
-            
input.insert(ColumnWithTypeAndName(ColumnWithTypeAndName(std::make_shared<DataTypeString>(),
 VIRTUAL_COLUMN_PART)));
+            read_block.emplace_back(column);
         }
-        // remove input_file_name, input_file_block_start, 
input_file_block_size due to mergetree doesn't have them
-        input = FileMetaColumns::removeVirtualColumns(input);
+    }
+    return DB::Block(std::move(read_block));
+}
 
-        SparkSQLConfig sql_config = SparkSQLConfig::loadFromContext(context);
-        // case_insensitive_matching
-        if (!sql_config.caseSensitive)
+void MergeTreeRelParser::recoverDeltaNameIfNeeded(
+    DB::QueryPlan & plan, const DB::Block & output, const 
MergeTreeTableInstance & merge_tree_table)
+{
+    const auto & header = plan.getCurrentHeader();
+    DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
+    NameSet names;
+    bool need_recover = false;
+    for (const auto & column : output)
+    {
+        if (DELTA_META_COLUMN_MAP.contains(column.name))
         {
-            original_input = input;
-            auto all = 
storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
-            std::ranges::for_each(
-                input,
-                [&all](ColumnWithTypeAndName & column)
-                {
-                    const auto found
-                        = std::ranges::find_if(all, [&column](const auto & 
name) -> bool { return boost::iequals(column.name, name); });
-                    if (found != all.end())
-                        column.name = *found;
-                });
+            need_recover = true;
+            auto tuple = DELTA_META_COLUMN_MAP.at(column.name);
+            ReplaceDeltaNodeFunc func = std::get<2>(tuple);
+            func(actions_dag, merge_tree_table, context);
         }
+
+        names.insert(column.name);
     }
-    else
+
+    if (!need_recover)
+        return;
+
+    actions_dag.removeUnusedActions(names);
+    auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(actions_dag));
+    step->setStepDescription("Recover virtual columns");
+    steps.emplace_back(step.get());
+    plan.addStep(std::move(step));
+}
+
+void MergeTreeRelParser::replaceNodeWithCaseSensitive(DB::Block & read_block, 
SparkStorageMergeTreePtr storage)
+{
+    // case_insensitive_matching
+    if (spark_sql_config.caseSensitive)
+        return;
+
+    auto all = 
storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
+    std::ranges::for_each(
+        read_block,
+        [&all](ColumnWithTypeAndName & column)
+        {
+            const auto found
+                = std::ranges::find_if(all, [&column](const auto & name) -> 
bool { return boost::iequals(column.name, name); });
+            if (found != all.end())
+                column.name = *found;
+        });
+}
+
+
+void MergeTreeRelParser::recoverNodeWithCaseSensitive(DB::QueryPlan & 
query_plan, const DB::Block & output)
+{
+    if (spark_sql_config.caseSensitive)
+        return;
+
+    auto read_Header = query_plan.getCurrentHeader();
+    NameToNameMap names;
+    names.reserve(output.columns());
+    for (const auto & elem : output.getColumnsWithTypeAndName())
+        names[Poco::toLower(elem.name)] = elem.name;
+
+    DB::NamesWithAliases aliases;
+    aliases.reserve(read_Header.columns());
+    bool need_alias = false;
+    for (const auto & elem : read_Header)
     {
-        NamesAndTypesList one_column_name_type;
-        
one_column_name_type.push_back(storage->getInMemoryMetadataPtr()->getColumns().getAll().front());
-        input = toSampleBlock(one_column_name_type);
-        LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead 
of empty header", one_column_name_type.front().dump());
+        if (auto lower_name = Poco::toLower(elem.name); 
names.contains(lower_name))
+        {
+            if (!need_alias && !boost::equals(elem.name, names[lower_name]))
+                need_alias = true;
+
+            aliases.emplace_back(DB::NameWithAlias(elem.name, 
names[lower_name]));
+        }
+        else
+        {
+            aliases.emplace_back(DB::NameWithAlias(elem.name, elem.name));
+        }
     }
 
+    if (!need_alias)
+        return;
+
+    DB::ActionsDAG actions_dag{blockToRowType(query_plan.getCurrentHeader())};
+    actions_dag.project(aliases);
+    auto expression_step = 
std::make_unique<DB::ExpressionStep>(query_plan.getCurrentHeader(), 
std::move(actions_dag));
+    expression_step->setStepDescription("Rename MergeTree Output(Cause: case 
sensitive)");
+    steps.emplace_back(expression_step.get());
+    query_plan.addStep(std::move(expression_step));
+}
+
+
+DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
+    DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const 
substrait::ReadRel::ExtensionTable & extension_table)
+{
+    MergeTreeTableInstance merge_tree_table(extension_table);
+    // ignore snapshot id for a query
+    merge_tree_table.snapshot_id = "";
+    auto storage = 
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
+
+    const DB::Block output = parseMergeTreeOutput(rel, storage);
+    DB::Block read_block = replaceDeltaNameIfNeeded(output);
+    replaceNodeWithCaseSensitive(read_block, storage);
+
+
     std::vector<DataPartPtr> selected_parts = 
StorageMergeTreeFactory::getDataPartsByNames(
         storage->getStorageID(), merge_tree_table.snapshot_id, 
merge_tree_table.getPartNames());
 
@@ -119,16 +260,15 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
         column_sizes[name] = sizes.data_compressed;
 
     auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, 
storage->getInMemoryMetadataPtr());
-    auto names_and_types_list = input.getNamesAndTypesList();
-
+    auto names_and_types_list = read_block.getNamesAndTypesList();
     auto query_info = buildQueryInfo(names_and_types_list);
 
     std::set<String> non_nullable_columns;
     if (rel.has_filter())
     {
-        NonNullableColumnsResolver non_nullable_columns_resolver(input, 
parser_context, rel.filter());
+        NonNullableColumnsResolver non_nullable_columns_resolver(read_block, 
parser_context, rel.filter());
         non_nullable_columns = non_nullable_columns_resolver.resolve();
-        query_info->prewhere_info = parsePreWhereInfo(rel.filter(), input);
+        query_info->prewhere_info = parsePreWhereInfo(rel.filter(), 
read_block);
     }
 
     auto read_step = storage->reader.readFromParts(
@@ -142,9 +282,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
         1);
 
     auto * source_step_with_filter = static_cast<SourceStepWithFilterBase 
*>(read_step.get());
-    if (const auto & storage_prewhere_info = query_info->prewhere_info)
+    if (const auto & storage_preWhere_info = query_info->prewhere_info)
     {
-        
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions.clone(),
 storage_prewhere_info->prewhere_column_name);
+        
source_step_with_filter->addFilter(storage_preWhere_info->prewhere_actions.clone(),
 storage_preWhere_info->prewhere_column_name);
         source_step_with_filter->applyFilters();
     }
 
@@ -164,74 +304,12 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
             steps.emplace_back(remove_null_step);
     }
 
-    if (original_input.columns() == input.columns() && !sameName(input, 
original_input))
-    {
-        steps.emplace_back(PlanUtil::renamePlanHeader(
-            *query_plan,
-            [&original_input](const Block & input, NamesWithAliases & aliases) 
{ aliases = buildNamesWithAliases(input, original_input); },
-            "Rename MergeTree Output"));
-    }
+    recoverNodeWithCaseSensitive(*query_plan, output);
+    recoverDeltaNameIfNeeded(*query_plan, output, merge_tree_table);
 
-    // add virtual columns step
-    if (auto step = 
MergeTreeRelParser::addVirtualColumnsProjectStep(*query_plan, rel, 
merge_tree_table.absolute_path); step.has_value())
-        steps.emplace_back(step.value());
     return query_plan;
 }
 
-std::optional<DB::IQueryPlanStep *> 
MergeTreeRelParser::addVirtualColumnsProjectStep(DB::QueryPlan & plan, const 
substrait::ReadRel & rel, const std::string & path)
-{
-    if (!rel.has_base_schema() || rel.base_schema().names_size() < 1)
-        return std::nullopt;
-    bool contains_input_file_name = false;
-    bool contains_input_file_block_start = false;
-    bool contains_input_file_block_length = false;
-    for (const auto & name : rel.base_schema().names())
-    {
-        if (name == FileMetaColumns::INPUT_FILE_NAME)
-            contains_input_file_name = true;
-        if (name == FileMetaColumns::INPUT_FILE_BLOCK_START)
-            contains_input_file_block_start = true;
-        if (name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
-            contains_input_file_block_length = true;
-    }
-    if (!contains_input_file_name && !contains_input_file_block_start && 
!contains_input_file_block_length)
-        return std::nullopt;
-
-    const auto & header = plan.getCurrentHeader();
-    DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
-    if (contains_input_file_name)
-    {
-        auto concat_func = FunctionFactory::instance().get("concat", context);
-        DB::ActionsDAG::NodeRawConstPtrs args;
-        const auto string_type = std::make_shared<DB::DataTypeString>();
-        const auto * path_node = 
&actions_dag.addColumn(DB::ColumnWithTypeAndName(string_type->createColumnConst(1,
 path + "/"), string_type, "path"));
-        args.emplace_back(path_node);
-        const auto & part_name = 
actions_dag.findInOutputs(VIRTUAL_COLUMN_PART);
-        args.emplace_back(&part_name);
-        actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func, 
args, FileMetaColumns::INPUT_FILE_NAME));
-    }
-    if (contains_input_file_block_start)
-    {
-        const auto int64_type = std::make_shared<DB::DataTypeInt64>();
-        
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
 -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
-    }
-    if (contains_input_file_block_length)
-    {
-        const auto int64_type = std::make_shared<DB::DataTypeInt64>();
-        
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
 -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
-    }
-
-    if (contains_input_file_name)
-        actions_dag.removeUnusedResult(VIRTUAL_COLUMN_PART);
-    auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(actions_dag));
-    step->setStepDescription("Add virtual columns");
-    std::optional<DB::IQueryPlanStep *> result = step.get();
-    plan.addStep(std::move(step));
-    return result;
-}
-
-
-
 PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const 
substrait::Expression & rel, const Block & input)
 {
     std::string filter_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
index d3bf29e1f6..739f6db557 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
@@ -18,9 +18,14 @@
 
 #include <memory>
 #include <optional>
+#include <DataTypes/DataTypesNumber.h>
 #include <Parser/RelParsers/RelParser.h>
-#include <Storages/SelectQueryInfo.h>
+#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
+#include <Storages/MergeTree/SparkMergeTreeMeta.h>
+#include <Storages/Parquet/ParquetMeta.h>
+#include <Storages/SubstraitSource/FormatFile.h>
 #include <substrait/algebra.pb.h>
+#include <Common/GlutenConfig.h>
 
 namespace DB
 {
@@ -33,6 +38,27 @@ extern const int LOGICAL_ERROR;
 namespace local_engine
 {
 
+using ReplaceDeltaNodeFunc = std::function<void(DB::ActionsDAG &, const 
MergeTreeTableInstance &, DB::ContextPtr)>;
+
+void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceFilePathNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceFileNameNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const 
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+
+static const std::unordered_map<String, std::tuple<std::optional<String>, 
DB::DataTypePtr, ReplaceDeltaNodeFunc>> DELTA_META_COLUMN_MAP
+    = {{FileMetaColumns::INPUT_FILE_NAME, std::tuple("_part", 
std::make_shared<DB::DataTypeString>(), replaceInputFileNameNode)},
+       {FileMetaColumns::INPUT_FILE_BLOCK_START,
+        std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), 
replaceInputFileBlockStartNode)},
+       {FileMetaColumns::INPUT_FILE_BLOCK_LENGTH,
+        std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(), 
replaceInputFileBlockLengthNode)},
+       {ParquetVirtualMeta::TMP_ROWINDEX,
+        std::tuple(DB::BlockOffsetColumn::name, 
std::make_shared<DB::DataTypeUInt64>(), replaceTmpRowIndexNode)},
+       {FileMetaColumns::FILE_PATH, std::tuple("_part", 
std::make_shared<DB::DataTypeString>(), replaceFilePathNode)},
+       {FileMetaColumns::FILE_NAME, std::tuple("_part", 
std::make_shared<DB::DataTypeString>(), replaceFileNameNode)}
+    };
+
 class MergeTreeRelParser : public RelParser
 {
 public:
@@ -41,6 +67,7 @@ public:
     explicit MergeTreeRelParser(ParserContextPtr parser_context_, const 
DB::ContextPtr & context_)
         : RelParser(parser_context_), context(context_)
     {
+        spark_sql_config = SparkSQLConfig::loadFromContext(context);
     }
 
     ~MergeTreeRelParser() override = default;
@@ -58,8 +85,6 @@ public:
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser 
can't call getSingleInput().");
     }
 
-    std::optional<DB::IQueryPlanStep *> 
addVirtualColumnsProjectStep(DB::QueryPlan & plan, const substrait::ReadRel & 
rel, const std::string & path);
-
     String filterRangesOnDriver(const substrait::ReadRel & read_rel);
 
     struct Condition
@@ -84,6 +109,11 @@ public:
     std::unordered_map<std::string, UInt64> column_sizes;
 
 private:
+    DB::Block parseMergeTreeOutput(const substrait::ReadRel & rel, 
SparkStorageMergeTreePtr storage);
+    DB::Block replaceDeltaNameIfNeeded(const DB::Block & output);
+    void replaceNodeWithCaseSensitive(DB::Block & read_block, 
SparkStorageMergeTreePtr storage);
+    void recoverDeltaNameIfNeeded(DB::QueryPlan & plan, const DB::Block & 
output, const MergeTreeTableInstance & merge_tree_table);
+    void recoverNodeWithCaseSensitive(DB::QueryPlan & query_plan, const 
DB::Block & output);
     void parseToAction(DB::ActionsDAG & filter_action, const 
substrait::Expression & rel, std::string & filter_name) const;
     DB::PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel, 
const DB::Block & input);
     DB::ActionsDAG optimizePrewhereAction(const substrait::Expression & rel, 
std::string & filter_name, const DB::Block & block);
@@ -92,6 +122,7 @@ private:
     UInt64 getColumnsSize(const DB::NameSet & columns);
 
     DB::ContextPtr context;
+    SparkSQLConfig spark_sql_config;
 };
 
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 25243d2c4d..262aeff6fa 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -72,15 +72,6 @@ public:
         return std::ranges::any_of(block, [](const auto & column) { return 
isVirtualColumn(column.name); });
     }
 
-    static DB::Block removeVirtualColumns(const DB::Block & block)
-    {
-        DB::ColumnsWithTypeAndName result_columns;
-        std::ranges::copy_if(
-            block.getColumnsWithTypeAndName(),
-            std::back_inserter(result_columns),
-            [](const auto & column) { return !isVirtualColumn(column.name); });
-        return result_columns;
-    }
     ///
 
     explicit FileMetaColumns(const SubstraitInputFile & file);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to