This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 748a27245b [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629)
(#10080)
748a27245b is described below
commit 748a27245b7c04ecf0dcaea7ff4dca77c07aa897
Author: Kyligence Git <[email protected]>
AuthorDate: Sun Jun 29 08:48:42 2025 -0500
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629) (#10080)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629)
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/80931
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/81976
* Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/82508
* Try to Fix issue caused by
https://github.com/ClickHouse/ClickHouse/pull/81754
see https://github.com/ClickHouse/ClickHouse/pull/82379
* Fix UT due to https://github.com/ClickHouse/ClickHouse/pull/82358
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang chen <[email protected]>
---
cpp-ch/clickhouse.version | 4 ++--
cpp-ch/local-engine/Common/DebugUtils.cpp | 2 +-
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 4 ++--
cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp | 14 +++++++-------
cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h | 2 +-
cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp | 4 ----
.../Storages/Parquet/VectorizedParquetRecordReader.h | 3 ---
.../Storages/SubstraitSource/ExcelTextFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/ExcelTextFormatFile.h | 3 ++-
.../local-engine/Storages/SubstraitSource/FileReader.cpp | 15 ++++++---------
cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h | 2 +-
cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h | 6 ++++--
.../Storages/SubstraitSource/Iceberg/IcebergReader.h | 6 ++++++
.../Storages/SubstraitSource/JSONFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/JSONFormatFile.h | 3 ++-
.../Storages/SubstraitSource/ORCFormatFile.cpp | 6 ++++--
.../local-engine/Storages/SubstraitSource/ORCFormatFile.h | 3 ++-
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 13 ++++---------
.../Storages/SubstraitSource/ParquetFormatFile.h | 3 ++-
.../Storages/SubstraitSource/SubstraitFileSource.cpp | 12 ++++++------
.../Storages/SubstraitSource/SubstraitFileSource.h | 13 ++++++++++---
.../Storages/SubstraitSource/SubstraitFileSourceStep.cpp | 12 +++++++-----
.../Storages/SubstraitSource/SubstraitFileSourceStep.h | 5 ++---
.../Storages/SubstraitSource/TextFormatFile.cpp | 3 ++-
.../Storages/SubstraitSource/TextFormatFile.h | 3 ++-
cpp-ch/local-engine/tests/gtest_parquet_read.cpp | 5 ++++-
26 files changed, 83 insertions(+), 69 deletions(-)
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index af948e3a3c..b2dd1231a9 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250621
-CH_COMMIT=8960b38d0bf
+CH_BRANCH=rebase_ch/20250629
+CH_COMMIT=addbf00cfd7
diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp
b/cpp-ch/local-engine/Common/DebugUtils.cpp
index 09e2f7f5d6..1dea6f508f 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.cpp
+++ b/cpp-ch/local-engine/Common/DebugUtils.cpp
@@ -361,7 +361,7 @@ void dumpMessage(const google::protobuf::Message & message,
const char * type, b
if (!force && !logger->debug())
return;
- pb_util::JsonOptions options;
+ pb_util::JsonPrintOptions options;
std::string json;
if (auto s = MessageToJsonString(message, &json, options); !s.ok())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {}
to Json", type);
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index 20029a4121..2edf5b1679 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -87,7 +87,7 @@ void registerGlutenDisks(bool global_skip_access_check)
config_prefix,
object_storage_creator);
- disk->startup(context, skip_access_check);
+ disk->startup(skip_access_check);
return disk;
};
@@ -134,7 +134,7 @@ void registerGlutenDisks(bool global_skip_access_check)
config_prefix,
object_storage_creator);
- disk->startup(context, skip_access_check);
+ disk->startup(skip_access_check);
return disk;
};
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
index 710e12950a..fdab797c79 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
@@ -101,13 +101,13 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
collectAllInputs(data, right_sample_block);
}
-void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header,
std::shared_ptr<DB::TableJoin> analyzed_join)
+void StorageJoinFromReadBuffer::buildJoin(const Blocks & data, const Block &
header, std::shared_ptr<DB::TableJoin> analyzed_join)
{
auto build_join = [&]
{
- join = std::make_shared<HashJoin>(analyzed_join, header, overwrite,
row_count);
- for (Block block : data)
- join->addBlockToJoin(std::move(block), true);
+ join = std::make_shared<HashJoin>(analyzed_join, header, overwrite,
row_count, "", false, true);
+ for (const Block& block : data)
+ join->addBlockToJoin(block, true);
};
/// Record memory usage in Total Memory Tracker
ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join);
@@ -132,7 +132,7 @@ void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block
header, std::shared_pt
std::unique_lock lock(join_mutex);
if (join)
return;
- join = std::make_shared<HashJoin>(analyzed_join, header, overwrite,
row_count);
+ join = std::make_shared<HashJoin>(analyzed_join, header, overwrite,
row_count, "", false, true);
while (!input_blocks.empty())
{
auto & block = *input_blocks.begin();
@@ -154,11 +154,11 @@ void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block
header, std::shared_pt
}
-/// The column names of 'rgiht_header' could be different from the ones in
`input_blocks`, and we must
+/// The column names of 'right_header' could be different from the ones in
`input_blocks`, and we must
/// use 'right_header' to build the HashJoin. Otherwise, it will cause
exceptions with name mismatches.
///
/// In most cases, 'getJoinLocked' is called only once, and the input_blocks
should not be too large.
-/// This is will be OK.
+/// This will be OK.
DB::JoinPtr
StorageJoinFromReadBuffer::getJoinLocked(std::shared_ptr<DB::TableJoin>
analyzed_join, DB::ContextPtr /*context*/)
{
if ((analyzed_join->forceNullableRight() && !use_nulls)
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
index e000952a45..5bb29f7d4b 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
@@ -72,7 +72,7 @@ private:
bool is_null_aware_anti_join;
void readAllBlocksFromInput(DB::ReadBuffer & in);
- void buildJoin(DB::Blocks & data, const DB::Block header,
std::shared_ptr<DB::TableJoin> analyzed_join);
+ void buildJoin(const DB::Blocks & data, const DB::Block & header,
std::shared_ptr<DB::TableJoin> analyzed_join);
void collectAllInputs(DB::Blocks & data, const DB::Block header);
void buildJoinLazily(DB::Block header, std::shared_ptr<DB::TableJoin>
analyzed_join);
};
diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 107203590a..67494555ca 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -171,10 +171,6 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const
substrait::CrossRel & join, DB:
QueryPlanPtr query_plan;
if (storage_join)
{
- /// FIXME: There is mistake in
HashJoin::needUsedFlagsForPerRightTableRow which returns true when
- /// join clauses is empty. But in fact there should not be any join
clause in cross join.
- table_join->addDisjunct();
-
auto broadcast_hash_join = storage_join->getJoinLocked(table_join,
context);
// table_join->resetKeys();
QueryPlanStepPtr join_step =
std::make_unique<FilledJoinStep>(left->getCurrentHeader(), broadcast_hash_join,
8192);
diff --git
a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
index 512cc91bed..ce1a139014 100644
--- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
+++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
@@ -214,9 +214,6 @@ class VectorizedParquetBlockInputFormat final : public
DB::IInputFormat
protected:
void onCancel() noexcept override { is_stopped = 1; }
- // TODO: create ColumnIndexFilter here, currently disable it now.
- void setKeyCondition(const std::shared_ptr<const DB::KeyCondition> &
key_condition_) override { }
-
public:
VectorizedParquetBlockInputFormat(
DB::ReadBuffer & in_,
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
index f094bf915d..dd191831c3 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
@@ -62,7 +62,8 @@ bool ExcelTextFormatFile::useThis(const DB::ContextPtr &
context)
return settingsEqual(context->getSettingsRef(), USE_EXCEL_PARSER, "true");
}
-FormatFile::InputFormatPtr ExcelTextFormatFile::createInputFormat(const
DB::Block & header)
+FormatFile::InputFormatPtr
+ExcelTextFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer = read_buffer_builder->build(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
index 5e70d22eeb..0b92f86219 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
@@ -47,7 +47,8 @@ public:
~ExcelTextFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
bool supportSplit() const override { return true; }
String getFileFormat() const override { return "ExcelText"; }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index e5d1f8d8a0..8de7f83509 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -274,17 +274,12 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
const FormatFilePtr & file,
const DB::Block & to_read_header_,
const DB::Block & output_header_,
- const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr,
const ColumnIndexFilterPtr & column_index_filter = nullptr)
{
file->initialize(column_index_filter);
auto createInputFormat = [&](const DB::Block & new_read_header_) ->
FormatFile::InputFormatPtr
- {
- auto input_format = file->createInputFormat(new_read_header_);
- if (key_condition && input_format)
- input_format->inputFormat().setKeyCondition(key_condition);
- return input_format;
- };
+ { return file->createInputFormat(new_read_header_, filter_actions_dag); };
if (file->getFileInfo().has_iceberg())
return iceberg::IcebergReader::create(file, to_read_header_,
output_header_, createInputFormat);
@@ -316,11 +311,13 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
return std::make_unique<NormalFileReader>(file, to_read_header_,
output_header_, input_format);
}
}
+
+/// TODO Remove ColumnIndexFilterPtr
std::unique_ptr<BaseReader> BaseReader::create(
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
- const std::shared_ptr<const DB::KeyCondition> & key_condition,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter)
{
if (!readHeader)
@@ -335,7 +332,7 @@ std::unique_ptr<BaseReader> BaseReader::create(
}
}
- return createNormalFileReader(current_file, readHeader, outputHeader,
key_condition, column_index_filter);
+ return createNormalFileReader(current_file, readHeader, outputHeader,
filter_actions_dag, column_index_filter);
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
index bdb6772ab5..467824610e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
@@ -77,7 +77,7 @@ public:
const FormatFilePtr & current_file,
const DB::Block & readHeader,
const DB::Block & outputHeader,
- const std::shared_ptr<const DB::KeyCondition> & key_condition,
+ const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
const ColumnIndexFilterPtr & column_index_filter);
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 262aeff6fa..60d28f5bba 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -30,6 +30,7 @@
namespace DB
{
+class ActionsDAG;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
@@ -115,7 +116,9 @@ public:
virtual ~FormatFile() = default;
/// Create a new input format for reading this file
- virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;
+ virtual InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr)
+ = 0;
/// Spark would split a large file into small segements and read in
different tasks
/// If this file doesn't support the split feacture, only the task with
offset 0 will generate data.
@@ -149,7 +152,6 @@ protected:
std::map<String, String> partition_values;
/// partition keys are normalized to lower cases for partition column
case-insensitive matching
std::map<String, String> normalized_partition_values;
- std::shared_ptr<const DB::KeyCondition> key_condition;
const FileMetaColumns meta_columns;
/// Currently, it is used to read an iceberg format, and initialized in
the constructor of child class
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
index f8845c1367..ddfd0954db 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
@@ -18,6 +18,12 @@
#include <Storages/SubstraitSource/FileReader.h>
+namespace DB
+{
+class ExpressionActions;
+using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
+}
+
namespace local_engine
{
class DeltaDVRoaringBitmapArray;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
index 5cbab2bbda..b5728abbf9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
@@ -31,7 +31,8 @@ JSONFormatFile::JSONFormatFile(
{
}
-FormatFile::InputFormatPtr JSONFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+JSONFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer =
read_buffer_builder->buildWithCompressionWrapper(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
index 97b3a694de..a98af115a2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
@@ -28,7 +28,8 @@ public:
bool supportSplit() const override { return true; }
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
String getFileFormat() const override { return "JSONEachRow"; }
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
index f78cca5d87..9b87c6528e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
@@ -36,7 +36,8 @@ ORCFormatFile::ORCFormatFile(
{
}
-FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+ORCFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
{
auto read_buffer = read_buffer_builder->build(file_info);
@@ -70,7 +71,8 @@ FormatFile::InputFormatPtr
ORCFormatFile::createInputFormat(const DB::Block & he
format_settings.orc.reader_time_zone_name = mapped_timezone;
}
//TODO: support prefetch
- auto input_format =
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header,
format_settings, false, 0);
+ auto parser_group =
std::make_shared<DB::FormatParserGroup>(context->getSettingsRef(), 1,
filter_actions_dag, context);
+ auto input_format =
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header,
format_settings, false, 0, parser_group);
return std::make_shared<InputFormat>(std::move(read_buffer), input_format);
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
index b6f6fd2321..0d8beb5d3b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
@@ -44,7 +44,8 @@ public:
DB::ContextPtr context_, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr
read_buffer_builder_);
~ORCFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
std::optional<size_t> getTotalRows() override;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index a4ccfa93b7..50e84e60f9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -142,7 +142,8 @@ void ParquetFormatFile::initialize(const
ColumnIndexFilterPtr & filter)
file_schema = ParquetMetaBuilder::collectFileSchema(context,
*read_buffer_);
}
-FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const Block &
header)
+FormatFile::InputFormatPtr
+ParquetFormatFile::createInputFormat(const Block & header, const
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
{
assert(read_buffer_);
@@ -205,14 +206,8 @@ FormatFile::InputFormatPtr
ParquetFormatFile::createInputFormat(const Block & he
// We need to disable fiter push down and read all row groups, so
that we can get correct row index.
format_settings.parquet.filter_push_down = false;
}
-
- auto input = std::make_shared<ParquetBlockInputFormat>(
- *read_buffer_,
- read_header,
- format_settings,
- settings[Setting::max_parsing_threads],
- settings[Setting::max_download_threads],
- 8192);
+ auto parser_group =
std::make_shared<FormatParserGroup>(context->getSettingsRef(), 1,
filter_actions_dag, context);
+ auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_,
read_header, format_settings, parser_group, 8192);
return std::make_shared<ParquetInputFormat>(
std::move(read_buffer_), input, std::move(provider),
std::move(read_header), std::move(output_header));
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 2014e1d643..587b788f79 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -35,7 +35,8 @@ public:
bool use_local_format_);
~ParquetFormatFile() override = default;
- InputFormatPtr createInputFormat(const DB::Block & header) override;
+ InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
std::optional<size_t> getTotalRows() override;
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index a5df941a48..51c041fa3c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -54,7 +54,7 @@ static DB::Block initReadHeader(const DB::Block & block,
const FormatFiles & fil
SubstraitFileSource::SubstraitFileSource(
const DB::ContextPtr & context_, const DB::Block & outputHeader_, const
substrait::ReadRel::LocalFiles & file_infos)
- :
DB::SourceWithKeyCondition(BaseReader::buildRowCountHeader(outputHeader_),
false)
+ : DB::ISource(BaseReader::buildRowCountHeader(outputHeader_), false)
, files(initializeFiles(file_infos, context_))
, outputHeader(outputHeader_)
, readHeader(initReadHeader(outputHeader, files))
@@ -63,11 +63,11 @@ SubstraitFileSource::SubstraitFileSource(
SubstraitFileSource::~SubstraitFileSource() = default;
-void SubstraitFileSource::setKeyCondition(const std::optional<DB::ActionsDAG>
& filter_actions_dag, DB::ContextPtr context_)
+void SubstraitFileSource::setKeyCondition(const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag_, DB::ContextPtr context_)
{
- setKeyConditionImpl(filter_actions_dag, context_, readHeader);
- if (filter_actions_dag)
- column_index_filter =
std::make_shared<ColumnIndexFilter>(filter_actions_dag.value(), context_);
+ assert(filter_actions_dag_);
+ filter_actions_dag = filter_actions_dag_;
+ column_index_filter =
std::make_shared<ColumnIndexFilter>(*filter_actions_dag, context_);
}
DB::Chunk SubstraitFileSource::generate()
@@ -105,7 +105,7 @@ bool SubstraitFileSource::tryPrepareReader()
if (!current_file->supportSplit() && current_file->getStartOffset())
continue;
- file_reader = BaseReader::create(current_file, readHeader,
outputHeader, key_condition, column_index_filter);
+ file_reader = BaseReader::create(current_file, readHeader,
outputHeader, filter_actions_dag, column_index_filter);
if (file_reader)
return true;
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index a2280da0d2..b40dd9a82d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -17,9 +17,15 @@
#pragma once
#include <memory>
-#include <Processors/SourceWithKeyCondition.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/ISource.h>
#include <substrait/algebra.pb.h>
+namespace DB
+{
+class ActionsDAG;
+}
+
namespace local_engine
{
class ColumnIndexFilter;
@@ -29,7 +35,7 @@ class FormatFile;
using FormatFilePtr = std::shared_ptr<FormatFile>;
using FormatFiles = std::vector<FormatFilePtr>;
-class SubstraitFileSource : public DB::SourceWithKeyCondition
+class SubstraitFileSource : public DB::ISource
{
public:
SubstraitFileSource(const DB::ContextPtr & context_, const DB::Block &
header_, const substrait::ReadRel::LocalFiles & file_infos);
@@ -37,7 +43,7 @@ public:
String getName() const override { return "SubstraitFileSource"; }
- void setKeyCondition(const std::optional<DB::ActionsDAG> &
filter_actions_dag, DB::ContextPtr context_) override;
+ void setKeyCondition(const std::shared_ptr<const DB::ActionsDAG> &
filter_actions_dag_, DB::ContextPtr context_);
protected:
DB::Chunk generate() override;
@@ -54,5 +60,6 @@ private:
std::unique_ptr<BaseReader> file_reader;
ColumnIndexFilterPtr column_index_filter;
+ std::shared_ptr<const DB::ActionsDAG> filter_actions_dag;
};
}
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
index 91e50b4275..220d181226 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-#include <Interpreters/Context_fwd.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@@ -66,9 +65,12 @@ void
SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipe
void SubstraitFileSourceStep::applyFilters(const DB::ActionDAGNodes
added_filter_nodes)
{
- filter_actions_dag =
DB::ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
- for (const auto & processor : pipe.getProcessors())
- if (auto * source = dynamic_cast<DB::SourceWithKeyCondition
*>(processor.get()))
- source->setKeyCondition(filter_actions_dag, context);
+ SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
+ if (filter_actions_dag)
+ {
+ for (const auto & processor : pipe.getProcessors())
+ if (auto * source = dynamic_cast<SubstraitFileSource
*>(processor.get()))
+ source->setKeyCondition(filter_actions_dag, context);
+ }
}
}
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
index 78fcb767f0..e1175507f5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
@@ -17,11 +17,10 @@
#pragma once
+#include <Interpreters/Context_fwd.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
-#include <Storages/MergeTree/KeyCondition.h>
-#include <Interpreters/Context_fwd.h>
-#include <Core/NamesAndTypes.h>
+
namespace local_engine
{
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
index fcbe650d46..9e40209eea 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
@@ -34,7 +34,8 @@ TextFormatFile::TextFormatFile(
{
}
-FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block &
header)
+FormatFile::InputFormatPtr
+TextFormatFile::createInputFormat(const DB::Block & header, const
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
{
auto read_buffer =
read_buffer_builder->buildWithCompressionWrapper(file_info);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
index 62e60af4a8..0e6827e653 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
@@ -32,7 +32,8 @@ public:
DB::ContextPtr context_, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr
read_buffer_builder_);
~TextFormatFile() override = default;
- FormatFile::InputFormatPtr createInputFormat(const DB::Block & header)
override;
+ FormatFile::InputFormatPtr
+ createInputFormat(const DB::Block & header, const std::shared_ptr<const
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
DB::NamesAndTypesList getSchema() const
{
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 57b6b86ae0..57b4eca6cf 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -45,6 +45,7 @@
#include <tests/utils/gluten_test_util.h>
#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
@@ -111,8 +112,10 @@ void readData(const String & path, const std::map<String,
Field> & fields)
ReadBufferFromFile in(full_path);
InputFormatPtr format;
+ auto parser_group
+ =
std::make_shared<FormatParserGroup>(QueryContext::globalContext()->getSettingsRef(),
1, nullptr, QueryContext::globalContext());
if constexpr (std::is_same_v<InputFormat, DB::ParquetBlockInputFormat>)
- format = std::make_shared<InputFormat>(in, header, settings, 1, 1,
8192);
+ format = std::make_shared<InputFormat>(in, header, settings,
parser_group, 8192);
else
format = std::make_shared<InputFormat>(in, header, settings);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]