This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 748a27245b [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629) 
(#10080)
748a27245b is described below

commit 748a27245b7c04ecf0dcaea7ff4dca77c07aa897
Author: Kyligence Git <[email protected]>
AuthorDate: Sun Jun 29 08:48:42 2025 -0500

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629) (#10080)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250629)
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/80931
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/81976
    
    * Fix Build due to  https://github.com/ClickHouse/ClickHouse/pull/82508
    
    * Try to Fix issue caused by 
https://github.com/ClickHouse/ClickHouse/pull/81754
    
    see https://github.com/ClickHouse/ClickHouse/pull/82379
    
    * Fix UT due to https://github.com/ClickHouse/ClickHouse/pull/82358
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang chen <[email protected]>
---
 cpp-ch/clickhouse.version                                 |  4 ++--
 cpp-ch/local-engine/Common/DebugUtils.cpp                 |  2 +-
 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp         |  4 ++--
 cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp    | 14 +++++++-------
 cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h      |  2 +-
 cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp  |  4 ----
 .../Storages/Parquet/VectorizedParquetRecordReader.h      |  3 ---
 .../Storages/SubstraitSource/ExcelTextFormatFile.cpp      |  3 ++-
 .../Storages/SubstraitSource/ExcelTextFormatFile.h        |  3 ++-
 .../local-engine/Storages/SubstraitSource/FileReader.cpp  | 15 ++++++---------
 cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h |  2 +-
 cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h |  6 ++++--
 .../Storages/SubstraitSource/Iceberg/IcebergReader.h      |  6 ++++++
 .../Storages/SubstraitSource/JSONFormatFile.cpp           |  3 ++-
 .../Storages/SubstraitSource/JSONFormatFile.h             |  3 ++-
 .../Storages/SubstraitSource/ORCFormatFile.cpp            |  6 ++++--
 .../local-engine/Storages/SubstraitSource/ORCFormatFile.h |  3 ++-
 .../Storages/SubstraitSource/ParquetFormatFile.cpp        | 13 ++++---------
 .../Storages/SubstraitSource/ParquetFormatFile.h          |  3 ++-
 .../Storages/SubstraitSource/SubstraitFileSource.cpp      | 12 ++++++------
 .../Storages/SubstraitSource/SubstraitFileSource.h        | 13 ++++++++++---
 .../Storages/SubstraitSource/SubstraitFileSourceStep.cpp  | 12 +++++++-----
 .../Storages/SubstraitSource/SubstraitFileSourceStep.h    |  5 ++---
 .../Storages/SubstraitSource/TextFormatFile.cpp           |  3 ++-
 .../Storages/SubstraitSource/TextFormatFile.h             |  3 ++-
 cpp-ch/local-engine/tests/gtest_parquet_read.cpp          |  5 ++++-
 26 files changed, 83 insertions(+), 69 deletions(-)

diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index af948e3a3c..b2dd1231a9 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250621
-CH_COMMIT=8960b38d0bf
+CH_BRANCH=rebase_ch/20250629
+CH_COMMIT=addbf00cfd7
diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp 
b/cpp-ch/local-engine/Common/DebugUtils.cpp
index 09e2f7f5d6..1dea6f508f 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.cpp
+++ b/cpp-ch/local-engine/Common/DebugUtils.cpp
@@ -361,7 +361,7 @@ void dumpMessage(const google::protobuf::Message & message, 
const char * type, b
 
     if (!force && !logger->debug())
         return;
-    pb_util::JsonOptions options;
+    pb_util::JsonPrintOptions options;
     std::string json;
     if (auto s = MessageToJsonString(message, &json, options); !s.ok())
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} 
to Json", type);
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index 20029a4121..2edf5b1679 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -87,7 +87,7 @@ void registerGlutenDisks(bool global_skip_access_check)
             config_prefix,
             object_storage_creator);
 
-        disk->startup(context, skip_access_check);
+        disk->startup(skip_access_check);
         return disk;
     };
 
@@ -134,7 +134,7 @@ void registerGlutenDisks(bool global_skip_access_check)
             config_prefix,
             object_storage_creator);
 
-        disk->startup(context, skip_access_check);
+        disk->startup(skip_access_check);
         return disk;
     };
 
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
index 710e12950a..fdab797c79 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp
@@ -101,13 +101,13 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer(
         collectAllInputs(data, right_sample_block);
 }
 
-void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join)
+void StorageJoinFromReadBuffer::buildJoin(const Blocks & data, const Block & 
header, std::shared_ptr<DB::TableJoin> analyzed_join)
 {
     auto build_join = [&]
     {
-        join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count);
-        for (Block block : data)
-            join->addBlockToJoin(std::move(block), true);
+        join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count, "", false, true);
+        for (const Block& block : data)
+            join->addBlockToJoin(block, true);
     };
     /// Record memory usage in Total Memory Tracker
     ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join);
@@ -132,7 +132,7 @@ void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block 
header, std::shared_pt
         std::unique_lock lock(join_mutex);
         if (join)
             return;
-        join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count);
+        join = std::make_shared<HashJoin>(analyzed_join, header, overwrite, 
row_count, "", false, true);
         while (!input_blocks.empty())
         {
             auto & block = *input_blocks.begin();
@@ -154,11 +154,11 @@ void StorageJoinFromReadBuffer::buildJoinLazily(DB::Block 
header, std::shared_pt
 }
 
 
-/// The column names of 'rgiht_header' could be different from the ones in 
`input_blocks`, and we must
+/// The column names of 'right_header' could be different from the ones in 
`input_blocks`, and we must
 /// use 'right_header' to build the HashJoin. Otherwise, it will cause 
exceptions with name mismatches.
 ///
 /// In most cases, 'getJoinLocked' is called only once, and the input_blocks 
should not be too large.
-/// This is will be OK.
+/// This will be OK.
 DB::JoinPtr 
StorageJoinFromReadBuffer::getJoinLocked(std::shared_ptr<DB::TableJoin> 
analyzed_join, DB::ContextPtr /*context*/)
 {
     if ((analyzed_join->forceNullableRight() && !use_nulls)
diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h 
b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
index e000952a45..5bb29f7d4b 100644
--- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
+++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h
@@ -72,7 +72,7 @@ private:
     bool is_null_aware_anti_join;
 
     void readAllBlocksFromInput(DB::ReadBuffer & in);
-    void buildJoin(DB::Blocks & data, const DB::Block header, 
std::shared_ptr<DB::TableJoin> analyzed_join);
+    void buildJoin(const DB::Blocks & data, const DB::Block & header, 
std::shared_ptr<DB::TableJoin> analyzed_join);
     void collectAllInputs(DB::Blocks & data, const DB::Block header);
     void buildJoinLazily(DB::Block header, std::shared_ptr<DB::TableJoin> 
analyzed_join);
 };
diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 107203590a..67494555ca 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -171,10 +171,6 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     QueryPlanPtr query_plan;
     if (storage_join)
     {
-        /// FIXME: There is mistake in 
HashJoin::needUsedFlagsForPerRightTableRow which returns true when
-        /// join clauses is empty. But in fact there should not be any join 
clause in cross join.
-        table_join->addDisjunct();
-
         auto broadcast_hash_join = storage_join->getJoinLocked(table_join, 
context);
         // table_join->resetKeys();
         QueryPlanStepPtr join_step = 
std::make_unique<FilledJoinStep>(left->getCurrentHeader(), broadcast_hash_join, 
8192);
diff --git 
a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h 
b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
index 512cc91bed..ce1a139014 100644
--- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
+++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.h
@@ -214,9 +214,6 @@ class VectorizedParquetBlockInputFormat final : public 
DB::IInputFormat
 protected:
     void onCancel() noexcept override { is_stopped = 1; }
 
-    // TODO: create ColumnIndexFilter here, currently disable it now.
-    void setKeyCondition(const std::shared_ptr<const DB::KeyCondition> & 
key_condition_) override { }
-
 public:
     VectorizedParquetBlockInputFormat(
         DB::ReadBuffer & in_,
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
index f094bf915d..dd191831c3 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.cpp
@@ -62,7 +62,8 @@ bool ExcelTextFormatFile::useThis(const DB::ContextPtr & 
context)
     return settingsEqual(context->getSettingsRef(), USE_EXCEL_PARSER, "true");
 }
 
-FormatFile::InputFormatPtr ExcelTextFormatFile::createInputFormat(const 
DB::Block & header)
+FormatFile::InputFormatPtr
+ExcelTextFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = read_buffer_builder->build(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
index 5e70d22eeb..0b92f86219 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ExcelTextFormatFile.h
@@ -47,7 +47,8 @@ public:
 
     ~ExcelTextFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     bool supportSplit() const override { return true; }
     String getFileFormat() const override { return "ExcelText"; }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index e5d1f8d8a0..8de7f83509 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -274,17 +274,12 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
     const FormatFilePtr & file,
     const DB::Block & to_read_header_,
     const DB::Block & output_header_,
-    const std::shared_ptr<const DB::KeyCondition> & key_condition = nullptr,
+    const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag = nullptr,
     const ColumnIndexFilterPtr & column_index_filter = nullptr)
 {
     file->initialize(column_index_filter);
     auto createInputFormat = [&](const DB::Block & new_read_header_) -> 
FormatFile::InputFormatPtr
-    {
-        auto input_format = file->createInputFormat(new_read_header_);
-        if (key_condition && input_format)
-            input_format->inputFormat().setKeyCondition(key_condition);
-        return input_format;
-    };
+    { return file->createInputFormat(new_read_header_, filter_actions_dag); };
 
     if (file->getFileInfo().has_iceberg())
         return iceberg::IcebergReader::create(file, to_read_header_, 
output_header_, createInputFormat);
@@ -316,11 +311,13 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
     return std::make_unique<NormalFileReader>(file, to_read_header_, 
output_header_, input_format);
 }
 }
+
+/// TODO Remove ColumnIndexFilterPtr
 std::unique_ptr<BaseReader> BaseReader::create(
     const FormatFilePtr & current_file,
     const DB::Block & readHeader,
     const DB::Block & outputHeader,
-    const std::shared_ptr<const DB::KeyCondition> & key_condition,
+    const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
     const ColumnIndexFilterPtr & column_index_filter)
 {
     if (!readHeader)
@@ -335,7 +332,7 @@ std::unique_ptr<BaseReader> BaseReader::create(
         }
     }
 
-    return createNormalFileReader(current_file, readHeader, outputHeader, 
key_condition, column_index_filter);
+    return createNormalFileReader(current_file, readHeader, outputHeader, 
filter_actions_dag, column_index_filter);
 }
 
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
index bdb6772ab5..467824610e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.h
@@ -77,7 +77,7 @@ public:
         const FormatFilePtr & current_file,
         const DB::Block & readHeader,
         const DB::Block & outputHeader,
-        const std::shared_ptr<const DB::KeyCondition> & key_condition,
+        const std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag,
         const ColumnIndexFilterPtr & column_index_filter);
 };
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
index 262aeff6fa..60d28f5bba 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
@@ -30,6 +30,7 @@
 
 namespace DB
 {
+class ActionsDAG;
 namespace ErrorCodes
 {
 extern const int NOT_IMPLEMENTED;
@@ -115,7 +116,9 @@ public:
     virtual ~FormatFile() = default;
 
     /// Create a new input format for reading this file
-    virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;
+    virtual InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr)
+        = 0;
 
     /// Spark would split a large file into small segements and read in 
different tasks
     /// If this file doesn't support the split feacture, only the task with 
offset 0 will generate data.
@@ -149,7 +152,6 @@ protected:
     std::map<String, String> partition_values;
     /// partition keys are normalized to lower cases for partition column 
case-insensitive matching
     std::map<String, String> normalized_partition_values;
-    std::shared_ptr<const DB::KeyCondition> key_condition;
     const FileMetaColumns meta_columns;
 
     /// Currently, it is used to read an iceberg format, and initialized in 
the constructor of child class
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
index f8845c1367..ddfd0954db 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
@@ -18,6 +18,12 @@
 
 #include <Storages/SubstraitSource/FileReader.h>
 
+namespace DB
+{
+class ExpressionActions;
+using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
+}
+
 namespace local_engine
 {
 class DeltaDVRoaringBitmapArray;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
index 5cbab2bbda..b5728abbf9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.cpp
@@ -31,7 +31,8 @@ JSONFormatFile::JSONFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr JSONFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+JSONFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = 
read_buffer_builder->buildWithCompressionWrapper(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
index 97b3a694de..a98af115a2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/JSONFormatFile.h
@@ -28,7 +28,8 @@ public:
 
     bool supportSplit() const override { return true; }
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     String getFileFormat() const override { return "JSONEachRow"; }
 };
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
index f78cca5d87..9b87c6528e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
@@ -36,7 +36,8 @@ ORCFormatFile::ORCFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr ORCFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+ORCFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
 {
     auto read_buffer = read_buffer_builder->build(file_info);
 
@@ -70,7 +71,8 @@ FormatFile::InputFormatPtr 
ORCFormatFile::createInputFormat(const DB::Block & he
         format_settings.orc.reader_time_zone_name = mapped_timezone;
     }
     //TODO: support prefetch
-    auto input_format = 
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header, 
format_settings, false, 0);
+    auto parser_group = 
std::make_shared<DB::FormatParserGroup>(context->getSettingsRef(), 1, 
filter_actions_dag, context);
+    auto input_format = 
std::make_shared<DB::NativeORCBlockInputFormat>(*read_buffer, header, 
format_settings, false, 0, parser_group);
     return std::make_shared<InputFormat>(std::move(read_buffer), input_format);
 }
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
index b6f6fd2321..0d8beb5d3b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.h
@@ -44,7 +44,8 @@ public:
         DB::ContextPtr context_, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr 
read_buffer_builder_);
     ~ORCFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     std::optional<size_t> getTotalRows() override;
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index a4ccfa93b7..50e84e60f9 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -142,7 +142,8 @@ void ParquetFormatFile::initialize(const 
ColumnIndexFilterPtr & filter)
         file_schema = ParquetMetaBuilder::collectFileSchema(context, 
*read_buffer_);
 }
 
-FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const Block & 
header)
+FormatFile::InputFormatPtr
+ParquetFormatFile::createInputFormat(const Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & filter_actions_dag)
 {
     assert(read_buffer_);
 
@@ -205,14 +206,8 @@ FormatFile::InputFormatPtr 
ParquetFormatFile::createInputFormat(const Block & he
             // We need to disable fiter push down and read all row groups, so 
that we can get correct row index.
             format_settings.parquet.filter_push_down = false;
         }
-
-        auto input = std::make_shared<ParquetBlockInputFormat>(
-            *read_buffer_,
-            read_header,
-            format_settings,
-            settings[Setting::max_parsing_threads],
-            settings[Setting::max_download_threads],
-            8192);
+        auto parser_group = 
std::make_shared<FormatParserGroup>(context->getSettingsRef(), 1, 
filter_actions_dag, context);
+        auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_, 
read_header, format_settings, parser_group, 8192);
         return std::make_shared<ParquetInputFormat>(
             std::move(read_buffer_), input, std::move(provider), 
std::move(read_header), std::move(output_header));
     };
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 2014e1d643..587b788f79 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -35,7 +35,8 @@ public:
         bool use_local_format_);
     ~ParquetFormatFile() override = default;
 
-    InputFormatPtr createInputFormat(const DB::Block & header) override;
+    InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     std::optional<size_t> getTotalRows() override;
 
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index a5df941a48..51c041fa3c 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -54,7 +54,7 @@ static DB::Block initReadHeader(const DB::Block & block, 
const FormatFiles & fil
 
 SubstraitFileSource::SubstraitFileSource(
     const DB::ContextPtr & context_, const DB::Block & outputHeader_, const 
substrait::ReadRel::LocalFiles & file_infos)
-    : 
DB::SourceWithKeyCondition(BaseReader::buildRowCountHeader(outputHeader_), 
false)
+    : DB::ISource(BaseReader::buildRowCountHeader(outputHeader_), false)
     , files(initializeFiles(file_infos, context_))
     , outputHeader(outputHeader_)
     , readHeader(initReadHeader(outputHeader, files))
@@ -63,11 +63,11 @@ SubstraitFileSource::SubstraitFileSource(
 
 SubstraitFileSource::~SubstraitFileSource() = default;
 
-void SubstraitFileSource::setKeyCondition(const std::optional<DB::ActionsDAG> 
& filter_actions_dag, DB::ContextPtr context_)
+void SubstraitFileSource::setKeyCondition(const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag_, DB::ContextPtr context_)
 {
-    setKeyConditionImpl(filter_actions_dag, context_, readHeader);
-    if (filter_actions_dag)
-        column_index_filter = 
std::make_shared<ColumnIndexFilter>(filter_actions_dag.value(), context_);
+    assert(filter_actions_dag_);
+    filter_actions_dag = filter_actions_dag_;
+    column_index_filter = 
std::make_shared<ColumnIndexFilter>(*filter_actions_dag, context_);
 }
 
 DB::Chunk SubstraitFileSource::generate()
@@ -105,7 +105,7 @@ bool SubstraitFileSource::tryPrepareReader()
         if (!current_file->supportSplit() && current_file->getStartOffset())
             continue;
 
-        file_reader = BaseReader::create(current_file, readHeader, 
outputHeader, key_condition, column_index_filter);
+        file_reader = BaseReader::create(current_file, readHeader, 
outputHeader, filter_actions_dag, column_index_filter);
         if (file_reader)
             return true;
     }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index a2280da0d2..b40dd9a82d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -17,9 +17,15 @@
 #pragma once
 
 #include <memory>
-#include <Processors/SourceWithKeyCondition.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/ISource.h>
 #include <substrait/algebra.pb.h>
 
+namespace DB
+{
+class ActionsDAG;
+}
+
 namespace local_engine
 {
 class ColumnIndexFilter;
@@ -29,7 +35,7 @@ class FormatFile;
 using FormatFilePtr = std::shared_ptr<FormatFile>;
 using FormatFiles = std::vector<FormatFilePtr>;
 
-class SubstraitFileSource : public DB::SourceWithKeyCondition
+class SubstraitFileSource : public DB::ISource
 {
 public:
     SubstraitFileSource(const DB::ContextPtr & context_, const DB::Block & 
header_, const substrait::ReadRel::LocalFiles & file_infos);
@@ -37,7 +43,7 @@ public:
 
     String getName() const override { return "SubstraitFileSource"; }
 
-    void setKeyCondition(const std::optional<DB::ActionsDAG> & 
filter_actions_dag, DB::ContextPtr context_) override;
+    void setKeyCondition(const std::shared_ptr<const DB::ActionsDAG> & 
filter_actions_dag_, DB::ContextPtr context_);
 
 protected:
     DB::Chunk generate() override;
@@ -54,5 +60,6 @@ private:
 
     std::unique_ptr<BaseReader> file_reader;
     ColumnIndexFilterPtr column_index_filter;
+    std::shared_ptr<const DB::ActionsDAG> filter_actions_dag;
 };
 }
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
index 91e50b4275..220d181226 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-#include <Interpreters/Context_fwd.h>
 #include <Processors/QueryPlan/IQueryPlanStep.h>
 #include <QueryPipeline/Pipe.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
@@ -66,9 +65,12 @@ void 
SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipe
 
 void SubstraitFileSourceStep::applyFilters(const DB::ActionDAGNodes 
added_filter_nodes)
 {
-    filter_actions_dag = 
DB::ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
-    for (const auto & processor : pipe.getProcessors())
-        if (auto * source = dynamic_cast<DB::SourceWithKeyCondition 
*>(processor.get()))
-            source->setKeyCondition(filter_actions_dag, context);
+    SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
+    if (filter_actions_dag)
+    {
+        for (const auto & processor : pipe.getProcessors())
+            if (auto * source = dynamic_cast<SubstraitFileSource 
*>(processor.get()))
+                source->setKeyCondition(filter_actions_dag, context);
+    }
 }
 }
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
index 78fcb767f0..e1175507f5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h
@@ -17,11 +17,10 @@
 
 #pragma once
 
+#include <Interpreters/Context_fwd.h>
 #include <Processors/QueryPlan/ReadFromPreparedSource.h>
 #include <Processors/QueryPlan/SourceStepWithFilter.h>
-#include <Storages/MergeTree/KeyCondition.h>
-#include <Interpreters/Context_fwd.h>
-#include <Core/NamesAndTypes.h>
+
 
 namespace local_engine
 {
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
index fcbe650d46..9e40209eea 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp
@@ -34,7 +34,8 @@ TextFormatFile::TextFormatFile(
 {
 }
 
-FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & 
header)
+FormatFile::InputFormatPtr
+TextFormatFile::createInputFormat(const DB::Block & header, const 
std::shared_ptr<const DB::ActionsDAG> & /*filter_actions_dag*/)
 {
     auto read_buffer = 
read_buffer_builder->buildWithCompressionWrapper(file_info);
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
index 62e60af4a8..0e6827e653 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.h
@@ -32,7 +32,8 @@ public:
         DB::ContextPtr context_, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info_, ReadBufferBuilderPtr 
read_buffer_builder_);
     ~TextFormatFile() override = default;
 
-    FormatFile::InputFormatPtr createInputFormat(const DB::Block & header) 
override;
+    FormatFile::InputFormatPtr
+    createInputFormat(const DB::Block & header, const std::shared_ptr<const 
DB::ActionsDAG> & filter_actions_dag = nullptr) override;
 
     DB::NamesAndTypesList getSchema() const
     {
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp 
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 57b6b86ae0..57b4eca6cf 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -45,6 +45,7 @@
 #include <tests/utils/gluten_test_util.h>
 #include <Common/BlockTypeUtils.h>
 #include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
 
 using namespace DB;
 using namespace local_engine;
@@ -111,8 +112,10 @@ void readData(const String & path, const std::map<String, 
Field> & fields)
     ReadBufferFromFile in(full_path);
 
     InputFormatPtr format;
+    auto parser_group
+        = 
std::make_shared<FormatParserGroup>(QueryContext::globalContext()->getSettingsRef(),
 1, nullptr, QueryContext::globalContext());
     if constexpr (std::is_same_v<InputFormat, DB::ParquetBlockInputFormat>)
-        format = std::make_shared<InputFormat>(in, header, settings, 1, 1, 
8192);
+        format = std::make_shared<InputFormat>(in, header, settings, 
parser_group, 8192);
     else
         format = std::make_shared<InputFormat>(in, header, settings);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to