This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b10ee0d6b2 [Gluten-9334][CH] Support delta metadata column `file_path`
and `row_index` for mergetree (#9340)
b10ee0d6b2 is described below
commit b10ee0d6b2722ca691475cd76b7f708504965651
Author: Shuai li <[email protected]>
AuthorDate: Wed Apr 16 21:39:39 2025 +0800
[Gluten-9334][CH] Support delta metadata column `file_path` and `row_index`
for mergetree (#9340)
[Gluten-9334][CH] Support delta metadata column `file_path` and `row_index`
for mergetree
---
.../GlutenDeltaMergetreeDeletionVectorSuite.scala | 85 ++++++
.../GlutenClickHouseTPCHAbstractSuite.scala | 83 +++---
.../Parser/RelParsers/MergeTreeRelParser.cpp | 286 +++++++++++++--------
.../Parser/RelParsers/MergeTreeRelParser.h | 37 ++-
.../Storages/SubstraitSource/FormatFile.h | 9 -
5 files changed, 336 insertions(+), 164 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
new file mode 100644
index 0000000000..1378e52d5e
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaMergetreeDeletionVectorSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.gluten.delta
+
+import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenDeltaMergetreeDeletionVectorSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ // import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+ /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.files.maxPartitionBytes", "20000000")
+ .set("spark.sql.storeAssignmentPolicy", "legacy")
+ .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+ }
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ test("Gluten-9334: column `_tmp_metadata_row_index` and `file_path` not
found") {
+ val tableName = "delta_metadata_column"
+ withTable(tableName) {
+ withTempDir {
+ dirName =>
+ val deltaPath = s"$dirName/$tableName"
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |($lineitemNullableSchema)
+ |USING Clickhouse
+ |TBLPROPERTIES (delta.enableDeletionVectors='true')
+ |LOCATION '$deltaPath'
+ |""".stripMargin)
+
+ spark.sql(s"""insert into table $tableName select * from lineitem
""".stripMargin)
+
+ val df = sql(s"""
+ | select
+ | _metadata.file_path,
+ | _metadata.row_index
+ | from $tableName
+ | limit 1
+ |""".stripMargin)
+
+ checkFallbackOperators(df, 0)
+ }
+ }
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
index a299795ad7..b876ab80bb 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
@@ -44,6 +44,9 @@ abstract class GlutenClickHouseTPCHAbstractSuite
protected val tpchQueries: String
protected val queriesResults: String
+ protected val lineitemNullableSchema: String = lineitemSchema()
+ protected val lineitemNotNullSchema: String = lineitemSchema(false)
+
override def beforeAll(): Unit = {
super.beforeAll()
@@ -98,22 +101,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
spark.sql(s"DROP TABLE IF EXISTS lineitem")
spark.sql(s"""
| CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
- | l_orderkey bigint not null,
- | l_partkey bigint not null,
- | l_suppkey bigint not null,
- | l_linenumber bigint not null,
- | l_quantity double not null,
- | l_extendedprice double not null,
- | l_discount double not null,
- | l_tax double not null,
- | l_returnflag string not null,
- | l_linestatus string not null,
- | l_shipdate date not null,
- | l_commitdate date not null,
- | l_receiptdate date not null,
- | l_shipinstruct string not null,
- | l_shipmode string not null,
- | l_comment string not null)
+ | $lineitemNotNullSchema)
| USING clickhouse
| TBLPROPERTIES (engine='MergeTree'
| )
@@ -269,22 +257,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
spark.sql(s"DROP TABLE IF EXISTS lineitem")
spark.sql(s"""
| CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
- | l_orderkey bigint,
- | l_partkey bigint,
- | l_suppkey bigint,
- | l_linenumber bigint,
- | l_quantity double,
- | l_extendedprice double,
- | l_discount double,
- | l_tax double,
- | l_returnflag string,
- | l_linestatus string,
- | l_shipdate date,
- | l_commitdate date,
- | l_receiptdate date,
- | l_shipinstruct string,
- | l_shipmode string,
- | l_comment string)
+ | $lineitemNullableSchema
+ | )
| USING clickhouse
| TBLPROPERTIES (engine='MergeTree'
| )
@@ -445,22 +419,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
spark.sql(s"DROP TABLE IF EXISTS lineitem")
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS lineitem (
- | l_orderkey bigint,
- | l_partkey bigint,
- | l_suppkey bigint,
- | l_linenumber bigint,
- | l_quantity double,
- | l_extendedprice double,
- | l_discount double,
- | l_tax double,
- | l_returnflag string,
- | l_linestatus string,
- | l_shipdate date,
- | l_commitdate date,
- | l_receiptdate date,
- | l_shipinstruct string,
- | l_shipmode string,
- | l_comment string)
+ | $lineitemNullableSchema
+ | )
| USING PARQUET LOCATION '$lineitemData'
|""".stripMargin)
@@ -657,4 +617,31 @@ abstract class GlutenClickHouseTPCHAbstractSuite
| AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
| AND l_quantity < 24
|""".stripMargin
+
+ private def lineitemSchema(nullable: Boolean = true): String = {
+ val nullableSql = if (nullable) {
+ ""
+ } else {
+ " not null "
+ }
+
+ s"""
+ | l_orderkey bigint $nullableSql,
+ | l_partkey bigint $nullableSql,
+ | l_suppkey bigint $nullableSql,
+ | l_linenumber bigint $nullableSql,
+ | l_quantity double $nullableSql,
+ | l_extendedprice double $nullableSql,
+ | l_discount double $nullableSql,
+ | l_tax double $nullableSql,
+ | l_returnflag string $nullableSql,
+ | l_linestatus string $nullableSql,
+ | l_shipdate date $nullableSql,
+ | l_commitdate date $nullableSql,
+ | l_receiptdate date $nullableSql,
+ | l_shipinstruct string $nullableSql,
+ | l_shipmode string $nullableSql,
+ | l_comment string $nullableSql
+ |""".stripMargin
+ }
}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index 428313ecc7..d41599adb5 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -16,6 +16,7 @@
*/
#include "MergeTreeRelParser.h"
+
#include <Core/Settings.h>
#include <Parser/ExpressionParser.h>
#include <Parser/FunctionParser.h>
@@ -51,6 +52,58 @@ namespace local_engine
{
using namespace DB;
+void replaceFilePathNodeCommon(
+ const String & alias_name, DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+ auto concat_func = DB::FunctionFactory::instance().get("concat", context);
+ DB::ActionsDAG::NodeRawConstPtrs args;
+ const auto string_type = std::make_shared<DB::DataTypeString>();
+ const auto * path_node = &actions_dag.addColumn(
+ DB::ColumnWithTypeAndName(string_type->createColumnConst(1,
merge_tree_table.absolute_path + "/"), string_type, "path"));
+ args.emplace_back(path_node);
+ const auto & part_name =
actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
+ args.emplace_back(&part_name);
+ actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func,
args, alias_name));
+}
+
+void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+ replaceFilePathNodeCommon(FileMetaColumns::INPUT_FILE_NAME, actions_dag,
merge_tree_table, context);
+}
+
+void replaceFilePathNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context)
+{
+ replaceFilePathNodeCommon(FileMetaColumns::FILE_PATH, actions_dag,
merge_tree_table, context);
+}
+
+void replaceFileNameNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance &, DB::ContextPtr)
+{
+ const auto & part_name =
actions_dag.findInOutputs(MergeTreeRelParser::VIRTUAL_COLUMN_PART);
+ const auto & alias = actions_dag.addAlias(part_name,
FileMetaColumns::FILE_NAME);
+ actions_dag.addOrReplaceInOutputs(alias);
+}
+
+void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance &, DB::ContextPtr)
+{
+ const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+ actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
+ DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1),
int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
+}
+
+void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance &, DB::ContextPtr)
+{
+ const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+ actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(
+ DB::ColumnWithTypeAndName(int64_type->createColumnConst(1, -1),
int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
+}
+
+void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance &, DB::ContextPtr)
+{
+ const auto & tmp_metadata_row_index =
actions_dag.findInOutputs(DB::BlockOffsetColumn::name);
+ const auto & alias = actions_dag.addAlias(tmp_metadata_row_index,
ParquetVirtualMeta::TMP_ROWINDEX);
+ actions_dag.addOrReplaceInOutputs(alias);
+}
+
/// Find minimal position of the column in primary key.
static Int64 findMinPosition(const NameSet & condition_table_columns, const
NameToIndexMap & primary_key_positions)
{
@@ -66,52 +119,140 @@ static Int64 findMinPosition(const NameSet &
condition_table_columns, const Name
return min_position;
}
-DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
- DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const
substrait::ReadRel::ExtensionTable & extension_table)
-{
- MergeTreeTableInstance merge_tree_table(extension_table);
- // ignore snapshot id for a query
- merge_tree_table.snapshot_id = "";
- auto storage =
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
- DB::Block input;
- DB::Block original_input;
+DB::Block MergeTreeRelParser::parseMergeTreeOutput(const substrait::ReadRel &
rel, SparkStorageMergeTreePtr storage)
+{
if (rel.has_base_schema() && rel.base_schema().names_size())
+ return TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+
+ NamesAndTypesList one_column_name_type;
+
one_column_name_type.push_back(storage->getInMemoryMetadataPtr()->getColumns().getAll().front());
+ LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead of
empty header", one_column_name_type.front().dump());
+ return toSampleBlock(one_column_name_type);
+}
+
+
+DB::Block MergeTreeRelParser::replaceDeltaNameIfNeeded(const DB::Block &
output)
+{
+ DB::ColumnsWithTypeAndName read_block;
+ for (const auto & column : output)
{
- input = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
- if (input.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr)
+ if (DELTA_META_COLUMN_MAP.contains(column.name))
+ {
+ if (auto tuple = DELTA_META_COLUMN_MAP.at(column.name);
std::get<0>(tuple).has_value())
+
read_block.emplace_back(ColumnWithTypeAndName(std::get<1>(tuple),
std::get<0>(tuple).value()));
+ }
+ else
{
- // mergetree use concat(path, _part) instead of input_file_name
-
input.insert(ColumnWithTypeAndName(ColumnWithTypeAndName(std::make_shared<DataTypeString>(),
VIRTUAL_COLUMN_PART)));
+ read_block.emplace_back(column);
}
- // remove input_file_name, input_file_block_start,
input_file_block_size due to mergetree doesn't have them
- input = FileMetaColumns::removeVirtualColumns(input);
+ }
+ return DB::Block(std::move(read_block));
+}
- SparkSQLConfig sql_config = SparkSQLConfig::loadFromContext(context);
- // case_insensitive_matching
- if (!sql_config.caseSensitive)
+void MergeTreeRelParser::recoverDeltaNameIfNeeded(
+ DB::QueryPlan & plan, const DB::Block & output, const
MergeTreeTableInstance & merge_tree_table)
+{
+ const auto & header = plan.getCurrentHeader();
+ DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
+ NameSet names;
+ bool need_recover = false;
+ for (const auto & column : output)
+ {
+ if (DELTA_META_COLUMN_MAP.contains(column.name))
{
- original_input = input;
- auto all =
storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
- std::ranges::for_each(
- input,
- [&all](ColumnWithTypeAndName & column)
- {
- const auto found
- = std::ranges::find_if(all, [&column](const auto &
name) -> bool { return boost::iequals(column.name, name); });
- if (found != all.end())
- column.name = *found;
- });
+ need_recover = true;
+ auto tuple = DELTA_META_COLUMN_MAP.at(column.name);
+ ReplaceDeltaNodeFunc func = std::get<2>(tuple);
+ func(actions_dag, merge_tree_table, context);
}
+
+ names.insert(column.name);
}
- else
+
+ if (!need_recover)
+ return;
+
+ actions_dag.removeUnusedActions(names);
+ auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(),
std::move(actions_dag));
+ step->setStepDescription("Recover virtual columns");
+ steps.emplace_back(step.get());
+ plan.addStep(std::move(step));
+}
+
+void MergeTreeRelParser::replaceNodeWithCaseSensitive(DB::Block & read_block,
SparkStorageMergeTreePtr storage)
+{
+ // case_insensitive_matching
+ if (spark_sql_config.caseSensitive)
+ return;
+
+ auto all =
storage->getInMemoryMetadataPtr()->getColumns().getNamesOfPhysical();
+ std::ranges::for_each(
+ read_block,
+ [&all](ColumnWithTypeAndName & column)
+ {
+ const auto found
+ = std::ranges::find_if(all, [&column](const auto & name) ->
bool { return boost::iequals(column.name, name); });
+ if (found != all.end())
+ column.name = *found;
+ });
+}
+
+
+void MergeTreeRelParser::recoverNodeWithCaseSensitive(DB::QueryPlan &
query_plan, const DB::Block & output)
+{
+ if (spark_sql_config.caseSensitive)
+ return;
+
+ auto read_Header = query_plan.getCurrentHeader();
+ NameToNameMap names;
+ names.reserve(output.columns());
+ for (const auto & elem : output.getColumnsWithTypeAndName())
+ names[Poco::toLower(elem.name)] = elem.name;
+
+ DB::NamesWithAliases aliases;
+ aliases.reserve(read_Header.columns());
+ bool need_alias = false;
+ for (const auto & elem : read_Header)
{
- NamesAndTypesList one_column_name_type;
-
one_column_name_type.push_back(storage->getInMemoryMetadataPtr()->getColumns().getAll().front());
- input = toSampleBlock(one_column_name_type);
- LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead
of empty header", one_column_name_type.front().dump());
+ if (auto lower_name = Poco::toLower(elem.name);
names.contains(lower_name))
+ {
+ if (!need_alias && !boost::equals(elem.name, names[lower_name]))
+ need_alias = true;
+
+ aliases.emplace_back(DB::NameWithAlias(elem.name,
names[lower_name]));
+ }
+ else
+ {
+ aliases.emplace_back(DB::NameWithAlias(elem.name, elem.name));
+ }
}
+ if (!need_alias)
+ return;
+
+ DB::ActionsDAG actions_dag{blockToRowType(query_plan.getCurrentHeader())};
+ actions_dag.project(aliases);
+ auto expression_step =
std::make_unique<DB::ExpressionStep>(query_plan.getCurrentHeader(),
std::move(actions_dag));
+ expression_step->setStepDescription("Rename MergeTree Output(Cause: case
sensitive)");
+ steps.emplace_back(expression_step.get());
+ query_plan.addStep(std::move(expression_step));
+}
+
+
+DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
+ DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const
substrait::ReadRel::ExtensionTable & extension_table)
+{
+ MergeTreeTableInstance merge_tree_table(extension_table);
+ // ignore snapshot id for a query
+ merge_tree_table.snapshot_id = "";
+ auto storage =
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
+
+ const DB::Block output = parseMergeTreeOutput(rel, storage);
+ DB::Block read_block = replaceDeltaNameIfNeeded(output);
+ replaceNodeWithCaseSensitive(read_block, storage);
+
+
std::vector<DataPartPtr> selected_parts =
StorageMergeTreeFactory::getDataPartsByNames(
storage->getStorageID(), merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
@@ -119,16 +260,15 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
column_sizes[name] = sizes.data_compressed;
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage,
storage->getInMemoryMetadataPtr());
- auto names_and_types_list = input.getNamesAndTypesList();
-
+ auto names_and_types_list = read_block.getNamesAndTypesList();
auto query_info = buildQueryInfo(names_and_types_list);
std::set<String> non_nullable_columns;
if (rel.has_filter())
{
- NonNullableColumnsResolver non_nullable_columns_resolver(input,
parser_context, rel.filter());
+ NonNullableColumnsResolver non_nullable_columns_resolver(read_block,
parser_context, rel.filter());
non_nullable_columns = non_nullable_columns_resolver.resolve();
- query_info->prewhere_info = parsePreWhereInfo(rel.filter(), input);
+ query_info->prewhere_info = parsePreWhereInfo(rel.filter(),
read_block);
}
auto read_step = storage->reader.readFromParts(
@@ -142,9 +282,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
1);
auto * source_step_with_filter = static_cast<SourceStepWithFilterBase
*>(read_step.get());
- if (const auto & storage_prewhere_info = query_info->prewhere_info)
+ if (const auto & storage_preWhere_info = query_info->prewhere_info)
{
-
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions.clone(),
storage_prewhere_info->prewhere_column_name);
+
source_step_with_filter->addFilter(storage_preWhere_info->prewhere_actions.clone(),
storage_preWhere_info->prewhere_column_name);
source_step_with_filter->applyFilters();
}
@@ -164,74 +304,12 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
steps.emplace_back(remove_null_step);
}
- if (original_input.columns() == input.columns() && !sameName(input,
original_input))
- {
- steps.emplace_back(PlanUtil::renamePlanHeader(
- *query_plan,
- [&original_input](const Block & input, NamesWithAliases & aliases)
{ aliases = buildNamesWithAliases(input, original_input); },
- "Rename MergeTree Output"));
- }
+ recoverNodeWithCaseSensitive(*query_plan, output);
+ recoverDeltaNameIfNeeded(*query_plan, output, merge_tree_table);
- // add virtual columns step
- if (auto step =
MergeTreeRelParser::addVirtualColumnsProjectStep(*query_plan, rel,
merge_tree_table.absolute_path); step.has_value())
- steps.emplace_back(step.value());
return query_plan;
}
-std::optional<DB::IQueryPlanStep *>
MergeTreeRelParser::addVirtualColumnsProjectStep(DB::QueryPlan & plan, const
substrait::ReadRel & rel, const std::string & path)
-{
- if (!rel.has_base_schema() || rel.base_schema().names_size() < 1)
- return std::nullopt;
- bool contains_input_file_name = false;
- bool contains_input_file_block_start = false;
- bool contains_input_file_block_length = false;
- for (const auto & name : rel.base_schema().names())
- {
- if (name == FileMetaColumns::INPUT_FILE_NAME)
- contains_input_file_name = true;
- if (name == FileMetaColumns::INPUT_FILE_BLOCK_START)
- contains_input_file_block_start = true;
- if (name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
- contains_input_file_block_length = true;
- }
- if (!contains_input_file_name && !contains_input_file_block_start &&
!contains_input_file_block_length)
- return std::nullopt;
-
- const auto & header = plan.getCurrentHeader();
- DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
- if (contains_input_file_name)
- {
- auto concat_func = FunctionFactory::instance().get("concat", context);
- DB::ActionsDAG::NodeRawConstPtrs args;
- const auto string_type = std::make_shared<DB::DataTypeString>();
- const auto * path_node =
&actions_dag.addColumn(DB::ColumnWithTypeAndName(string_type->createColumnConst(1,
path + "/"), string_type, "path"));
- args.emplace_back(path_node);
- const auto & part_name =
actions_dag.findInOutputs(VIRTUAL_COLUMN_PART);
- args.emplace_back(&part_name);
- actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func,
args, FileMetaColumns::INPUT_FILE_NAME));
- }
- if (contains_input_file_block_start)
- {
- const auto int64_type = std::make_shared<DB::DataTypeInt64>();
-
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
-1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
- }
- if (contains_input_file_block_length)
- {
- const auto int64_type = std::make_shared<DB::DataTypeInt64>();
-
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
-1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
- }
-
- if (contains_input_file_name)
- actions_dag.removeUnusedResult(VIRTUAL_COLUMN_PART);
- auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(),
std::move(actions_dag));
- step->setStepDescription("Add virtual columns");
- std::optional<DB::IQueryPlanStep *> result = step.get();
- plan.addStep(std::move(step));
- return result;
-}
-
-
-
PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const
substrait::Expression & rel, const Block & input)
{
std::string filter_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
index d3bf29e1f6..739f6db557 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
@@ -18,9 +18,14 @@
#include <memory>
#include <optional>
+#include <DataTypes/DataTypesNumber.h>
#include <Parser/RelParsers/RelParser.h>
-#include <Storages/SelectQueryInfo.h>
+#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
+#include <Storages/MergeTree/SparkMergeTreeMeta.h>
+#include <Storages/Parquet/ParquetMeta.h>
+#include <Storages/SubstraitSource/FormatFile.h>
#include <substrait/algebra.pb.h>
+#include <Common/GlutenConfig.h>
namespace DB
{
@@ -33,6 +38,27 @@ extern const int LOGICAL_ERROR;
namespace local_engine
{
+using ReplaceDeltaNodeFunc = std::function<void(DB::ActionsDAG &, const
MergeTreeTableInstance &, DB::ContextPtr)>;
+
+void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceFilePathNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceFileNameNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const
MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
+
+static const std::unordered_map<String, std::tuple<std::optional<String>,
DB::DataTypePtr, ReplaceDeltaNodeFunc>> DELTA_META_COLUMN_MAP
+ = {{FileMetaColumns::INPUT_FILE_NAME, std::tuple("_part",
std::make_shared<DB::DataTypeString>(), replaceInputFileNameNode)},
+ {FileMetaColumns::INPUT_FILE_BLOCK_START,
+ std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(),
replaceInputFileBlockStartNode)},
+ {FileMetaColumns::INPUT_FILE_BLOCK_LENGTH,
+ std::tuple(std::nullopt, std::make_shared<DB::DataTypeInt64>(),
replaceInputFileBlockLengthNode)},
+ {ParquetVirtualMeta::TMP_ROWINDEX,
+ std::tuple(DB::BlockOffsetColumn::name,
std::make_shared<DB::DataTypeUInt64>(), replaceTmpRowIndexNode)},
+ {FileMetaColumns::FILE_PATH, std::tuple("_part",
std::make_shared<DB::DataTypeString>(), replaceFilePathNode)},
+ {FileMetaColumns::FILE_NAME, std::tuple("_part",
std::make_shared<DB::DataTypeString>(), replaceFileNameNode)}
+ };
+
class MergeTreeRelParser : public RelParser
{
public:
@@ -41,6 +67,7 @@ public:
explicit MergeTreeRelParser(ParserContextPtr parser_context_, const
DB::ContextPtr & context_)
: RelParser(parser_context_), context(context_)
{
+ spark_sql_config = SparkSQLConfig::loadFromContext(context);
}
~MergeTreeRelParser() override = default;
@@ -58,8 +85,6 @@ public:
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser
can't call getSingleInput().");
}
- std::optional<DB::IQueryPlanStep *>
addVirtualColumnsProjectStep(DB::QueryPlan & plan, const substrait::ReadRel &
rel, const std::string & path);
-
String filterRangesOnDriver(const substrait::ReadRel & read_rel);
struct Condition
@@ -84,6 +109,11 @@ public:
std::unordered_map<std::string, UInt64> column_sizes;
private:
+ DB::Block parseMergeTreeOutput(const substrait::ReadRel & rel,
SparkStorageMergeTreePtr storage);
+ DB::Block replaceDeltaNameIfNeeded(const DB::Block & output);
+ void replaceNodeWithCaseSensitive(DB::Block & read_block,
SparkStorageMergeTreePtr storage);
+ void recoverDeltaNameIfNeeded(DB::QueryPlan & plan, const DB::Block &
output, const MergeTreeTableInstance & merge_tree_table);
+ void recoverNodeWithCaseSensitive(DB::QueryPlan & query_plan, const
DB::Block & output);
void parseToAction(DB::ActionsDAG & filter_action, const
substrait::Expression & rel, std::string & filter_name) const;
DB::PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel,
const DB::Block & input);
DB::ActionsDAG optimizePrewhereAction(const substrait::Expression & rel,
std::string & filter_name, const DB::Block & block);
@@ -92,6 +122,7 @@ private:
UInt64 getColumnsSize(const DB::NameSet & columns);
DB::ContextPtr context;
+ SparkSQLConfig spark_sql_config;
};
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 25243d2c4d..262aeff6fa 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -72,15 +72,6 @@ public:
return std::ranges::any_of(block, [](const auto & column) { return
isVirtualColumn(column.name); });
}
- static DB::Block removeVirtualColumns(const DB::Block & block)
- {
- DB::ColumnsWithTypeAndName result_columns;
- std::ranges::copy_if(
- block.getColumnsWithTypeAndName(),
- std::back_inserter(result_columns),
- [](const auto & column) { return !isVirtualColumn(column.name); });
- return result_columns;
- }
///
explicit FileMetaColumns(const SubstraitInputFile & file);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]