This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c16d8217 [GLUTEN-7145][CH][PART]refactor for rel parsers (#7193)
8c16d8217 is described below

commit 8c16d82174fb77da042ea9d124fe57dcfc4b1831
Author: lgbo <[email protected]>
AuthorDate: Thu Sep 12 14:00:48 2024 +0800

    [GLUTEN-7145][CH][PART]refactor for rel parsers (#7193)
    
    What changes were proposed in this pull request?
    (Please fill in changes proposed in this fix)
    
    Fixes: #7145
    
    Refactor SerializedPlanParser::parseOp. RelParser don't need to call 
SerializedPlanParser from down to top at the parsing process
    
    How was this patch tested?
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 cpp-ch/local-engine/CMakeLists.txt                 |   1 +
 cpp-ch/local-engine/Common/CHUtil.cpp              |   2 +-
 cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp  |   2 +-
 .../Parser/AggregateFunctionParser.cpp             |   2 +-
 .../Parser/{ => RelParsers}/AggregateRelParser.cpp |   0
 .../Parser/{ => RelParsers}/AggregateRelParser.h   |   4 +-
 .../Parser/{ => RelParsers}/CrossRelParser.cpp     |  28 +-
 .../Parser/{ => RelParsers}/CrossRelParser.h       |  18 +-
 .../Parser/{ => RelParsers}/ExpandRelParser.cpp    |   2 +-
 .../Parser/{ => RelParsers}/ExpandRelParser.h      |   5 +-
 .../Parser/RelParsers/FetchRelParser.cpp           |  49 +++
 .../Parser/{ => RelParsers}/FilterRelParser.cpp    |   0
 .../Parser/{ => RelParsers}/FilterRelParser.h      |   8 +-
 .../Parser/{ => RelParsers}/JoinRelParser.cpp      |  47 ++-
 .../Parser/{ => RelParsers}/JoinRelParser.h        |  12 +-
 .../Parser/{ => RelParsers}/MergeTreeRelParser.cpp |   0
 .../Parser/{ => RelParsers}/MergeTreeRelParser.h   |   5 +-
 .../Parser/{ => RelParsers}/ProjectRelParser.cpp   |   0
 .../Parser/{ => RelParsers}/ProjectRelParser.h     |  15 +-
 .../Parser/RelParsers/ReadRelParser.cpp            | 136 +++++++
 .../local-engine/Parser/RelParsers/ReadRelParser.h |  71 ++++
 .../Parser/{ => RelParsers}/RelParser.cpp          |  26 +-
 .../Parser/{ => RelParsers}/RelParser.h            |  18 +-
 .../Parser/{ => RelParsers}/SortRelParser.cpp      |   2 +-
 .../Parser/{ => RelParsers}/SortRelParser.h        |   5 +-
 .../{ => RelParsers}/WindowGroupLimitRelParser.cpp |   4 -
 .../{ => RelParsers}/WindowGroupLimitRelParser.h   |  11 +-
 .../Parser/{ => RelParsers}/WindowRelParser.cpp    |   4 +-
 .../Parser/{ => RelParsers}/WindowRelParser.h      |   4 +-
 .../Parser/{ => RelParsers}/WriteRelParser.cpp     |   0
 .../Parser/{ => RelParsers}/WriteRelParser.h       |   0
 .../local-engine/Parser/SerializedPlanParser.cpp   | 408 +++------------------
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |  45 +--
 cpp-ch/local-engine/Parser/TypeParser.cpp          |   4 +-
 cpp-ch/local-engine/local_engine_jni.cpp           |  51 ++-
 35 files changed, 475 insertions(+), 514 deletions(-)

diff --git a/cpp-ch/local-engine/CMakeLists.txt 
b/cpp-ch/local-engine/CMakeLists.txt
index 85045941c..ca25692bf 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -47,6 +47,7 @@ add_subdirectory(proto)
 add_headers_and_sources(builder Builder)
 add_headers_and_sources(join Join)
 add_headers_and_sources(parser Parser)
+add_headers_and_sources(parser Parser/RelParsers)
 add_headers_and_sources(rewriter Rewriter)
 add_headers_and_sources(storages Storages)
 add_headers_and_sources(storages Storages/Output)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 94a214e5e..b51683197 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -48,7 +48,7 @@
 #include <Functions/registerFunctions.h>
 #include <IO/SharedThreadPools.h>
 #include <Interpreters/JIT/CompiledExpressionCache.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Parser/SubstraitParserUtils.h>
 #include <Planner/PlannerActionsVisitor.h>
diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp 
b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
index 5f20c17bf..bccdbded5 100644
--- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
+++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp
@@ -19,7 +19,7 @@
 #include <Compression/CompressedReadBuffer.h>
 #include <Interpreters/TableJoin.h>
 #include <Join/StorageJoinFromReadBuffer.h>
-#include <Parser/JoinRelParser.h>
+#include <Parser/RelParsers/JoinRelParser.h>
 #include <Parser/TypeParser.h>
 #include <QueryPipeline/ProfileInfo.h>
 #include <Shuffle/ShuffleReader.h>
diff --git a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp 
b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
index b843d1565..d7f8ff6c0 100644
--- a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
+++ b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
@@ -20,7 +20,7 @@
 #include <DataTypes/DataTypeAggregateFunction.h>
 #include <DataTypes/DataTypeTuple.h>
 #include <Functions/FunctionHelpers.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Parser/TypeParser.h>
 #include <Common/CHUtil.h>
 #include <Common/Exception.h>
diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
similarity index 100%
rename from cpp-ch/local-engine/Parser/AggregateRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h
similarity index 93%
rename from cpp-ch/local-engine/Parser/AggregateRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h
index 8f68f858f..8b4ec9c4a 100644
--- a/cpp-ch/local-engine/Parser/AggregateRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h
@@ -16,7 +16,7 @@
  */
 #pragma once
 #include <Parser/AggregateFunctionParser.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Poco/Logger.h>
 #include <Common/logger_useful.h>
 
@@ -30,7 +30,7 @@ public:
     ~AggregateRelParser() override = default;
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.aggregate().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.aggregate().input(); }
 
 private:
     struct AggregateInfo
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
similarity index 94%
rename from cpp-ch/local-engine/Parser/CrossRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 3cb6ff7ed..99ade1802 100644
--- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "CrossRelParser.h"
+#include <optional>
 
 #include <Interpreters/CollectJoinOnKeysVisitor.h>
 #include <Interpreters/GraceHashJoin.h>
@@ -73,12 +74,7 @@ CrossRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const 
substrait::Rel & /*
     throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call parse().");
 }
 
-const substrait::Rel & CrossRelParser::getSingleInput(const substrait::Rel & 
/*rel*/)
-{
-    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call getSingleInput().");
-}
-
-DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
+std::vector<const substrait::Rel *> CrossRelParser::getInputs(const 
substrait::Rel & rel)
 {
     const auto & join = rel.cross();
     if (!join.has_left() || !join.has_right())
@@ -86,12 +82,19 @@ DB::QueryPlanPtr CrossRelParser::parseOp(const 
substrait::Rel & rel, std::list<c
         throw Exception(ErrorCodes::BAD_ARGUMENTS, "left table or right table 
is missing.");
     }
 
-    rel_stack.push_back(&rel);
-    auto left_plan = getPlanParser()->parseOp(join.left(), rel_stack);
-    auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack);
-    rel_stack.pop_back();
+    return {&join.left(), &join.right()};
+}
+std::optional<const substrait::Rel *> CrossRelParser::getSingleInput(const 
substrait::Rel & /*rel*/)
+{
+    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call getSingleInput().");
+}
 
-    return parseJoin(join, std::move(left_plan), std::move(right_plan));
+DB::QueryPlanPtr
+CrossRelParser::parse(std::vector<DB::QueryPlanPtr> & input_plans_, const 
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+{
+    assert(input_plans_.size() == 2);
+    const auto & join = rel.cross();
+    return parseJoin(join, std::move(input_plans_[0]), 
std::move(input_plans_[1]));
 }
 
 void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & 
right, const StorageJoinFromReadBuffer & storage_join)
@@ -194,7 +197,8 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     else
     {
         JoinPtr hash_join = std::make_shared<HashJoin>(table_join, 
right->getCurrentDataStream().header.cloneEmpty());
-        QueryPlanStepPtr join_step = 
std::make_unique<DB::JoinStep>(left->getCurrentDataStream(), 
right->getCurrentDataStream(), hash_join, 8192, 1, false);
+        QueryPlanStepPtr join_step
+            = std::make_unique<DB::JoinStep>(left->getCurrentDataStream(), 
right->getCurrentDataStream(), hash_join, 8192, 1, false);
         join_step->setStepDescription("CROSS_JOIN");
         steps.emplace_back(join_step.get());
         std::vector<QueryPlanPtr> plans;
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h
similarity index 70%
rename from cpp-ch/local-engine/Parser/CrossRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h
index f1cd60385..635a2fb62 100644
--- a/cpp-ch/local-engine/Parser/CrossRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h
@@ -17,7 +17,8 @@
 #pragma once
 
 #include <memory>
-#include <Parser/RelParser.h>
+#include <optional>
+#include <Parser/RelParsers/RelParser.h>
 #include <substrait/algebra.pb.h>
 
 namespace DB
@@ -37,12 +38,13 @@ public:
     explicit CrossRelParser(SerializedPlanParser * plan_paser_);
     ~CrossRelParser() override = default;
 
+    DB::QueryPlanPtr parse(
+        std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & 
rel, std::list<const substrait::Rel *> & rel_stack_) override;
     DB::QueryPlanPtr
-    parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+    parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
 
-    DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack) override;
-
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override;
+    std::vector<const substrait::Rel *> getInputs(const substrait::Rel & rel) 
override;
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override;
 
 private:
     std::unordered_map<std::string, std::string> & function_mapping;
@@ -55,7 +57,11 @@ private:
     void addConvertStep(TableJoin & table_join, DB::QueryPlan & left, 
DB::QueryPlan & right);
     void addPostFilter(DB::QueryPlan & query_plan, const substrait::CrossRel & 
join);
     bool applyJoinFilter(
-        DB::TableJoin & table_join, const substrait::CrossRel & join_rel, 
DB::QueryPlan & left, DB::QueryPlan & right, bool allow_mixed_condition);
+        DB::TableJoin & table_join,
+        const substrait::CrossRel & join_rel,
+        DB::QueryPlan & left,
+        DB::QueryPlan & right,
+        bool allow_mixed_condition);
 };
 
 }
diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
similarity index 99%
rename from cpp-ch/local-engine/Parser/ExpandRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
index c621332db..aaf98baf2 100644
--- a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
@@ -20,7 +20,7 @@
 #include <Core/ColumnWithTypeAndName.h>
 #include <Operator/ExpandStep.h>
 #include <Parser/ExpandField.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Processors/QueryPlan/QueryPlan.h>
 #include <Common/logger_useful.h>
diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
similarity index 86%
rename from cpp-ch/local-engine/Parser/ExpandRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
index 1ca7cc814..4cfa358b7 100644
--- a/cpp-ch/local-engine/Parser/ExpandRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 #pragma once
-#include <Parser/RelParser.h>
+#include <optional>
+#include <Parser/RelParsers/RelParser.h>
 
 
 namespace local_engine
@@ -29,6 +30,6 @@ public:
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
 
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.expand().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.expand().input(); }
 };
 }
diff --git a/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp
new file mode 100644
index 000000000..9594741a5
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <optional>
+#include <Parser/SerializedPlanParser.h>
+#include <Processors/QueryPlan/LimitStep.h>
+#include "RelParser.h"
+namespace local_engine
+{
+class FetchRelParser : public RelParser
+{
+public:
+    explicit FetchRelParser(SerializedPlanParser * plan_parser_) : 
RelParser(plan_parser_) { }
+    ~FetchRelParser() override = default;
+
+    DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & 
rel, std::list<const substrait::Rel *> &)
+    {
+        const auto & limit = rel.fetch();
+        auto limit_step = 
std::make_unique<DB::LimitStep>(query_plan->getCurrentDataStream(), 
limit.count(), limit.offset());
+        limit_step->setStepDescription("LIMIT");
+        steps.push_back(limit_step.get());
+        query_plan->addStep(std::move(limit_step));
+        return query_plan;
+    }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.fetch().input(); }
+};
+
+void registerFetchRelParser(RelParserFactory & factory)
+{
+    auto builder = [](SerializedPlanParser * plan_parser_) { return 
std::make_unique<FetchRelParser>(plan_parser_); };
+    factory.registerBuilder(substrait::Rel::RelTypeCase::kFetch, builder);
+}
+
+}
diff --git a/cpp-ch/local-engine/Parser/FilterRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
similarity index 100%
rename from cpp-ch/local-engine/Parser/FilterRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
diff --git a/cpp-ch/local-engine/Parser/FilterRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h
similarity index 86%
rename from cpp-ch/local-engine/Parser/FilterRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h
index a71515952..cf0e633d5 100644
--- a/cpp-ch/local-engine/Parser/FilterRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h
@@ -17,7 +17,8 @@
 
 #pragma once
 
-#include <Parser/RelParser.h>
+#include <optional>
+#include <Parser/RelParsers/RelParser.h>
 #include <Poco/Logger.h>
 #include <Common/logger_useful.h>
 
@@ -29,11 +30,12 @@ public:
     explicit FilterRelParser(SerializedPlanParser * plan_paser_);
     ~FilterRelParser() override = default;
 
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.filter().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.filter().input(); }
 
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+
 private:
     // Poco::Logger * logger = &Poco::Logger::get("ProjectRelParser");
-};   
+};
 }
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
similarity index 95%
rename from cpp-ch/local-engine/Parser/JoinRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
index 46b68a4d3..39212cf8d 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "JoinRelParser.h"
+#include <optional>
 
 #include <Core/Block.h>
 #include <Functions/FunctionFactory.h>
@@ -27,10 +28,10 @@
 #include <Interpreters/TableJoin.h>
 #include <Join/BroadCastJoinBuilder.h>
 #include <Join/StorageJoinFromReadBuffer.h>
+#include <Operator/EarlyStopStep.h>
 #include <Parser/AdvancedParametersParseUtil.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Parsers/ASTIdentifier.h>
-#include <Operator/EarlyStopStep.h>
 #include <Processors/QueryPlan/ExpressionStep.h>
 #include <Processors/QueryPlan/FilterStep.h>
 #include <Processors/QueryPlan/JoinStep.h>
@@ -56,8 +57,8 @@ namespace local_engine
 {
 std::shared_ptr<DB::TableJoin> 
createDefaultTableJoin(substrait::JoinRel_JoinType join_type, bool 
is_existence_join, ContextPtr & context)
 {
-    auto table_join = std::make_shared<TableJoin>(
-        context->getSettingsRef(), context->getGlobalTemporaryVolume(), 
context->getTempDataOnDisk());
+    auto table_join
+        = std::make_shared<TableJoin>(context->getSettingsRef(), 
context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());
 
     std::pair<DB::JoinKind, DB::JoinStrictness> kind_and_strictness = 
JoinUtil::getJoinKindAndStrictness(join_type, is_existence_join);
     table_join->setKind(kind_and_strictness.first);
@@ -79,12 +80,7 @@ JoinRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const 
substrait::Rel & /*r
     throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call parse().");
 }
 
-const substrait::Rel & JoinRelParser::getSingleInput(const substrait::Rel & 
/*rel*/)
-{
-    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call getSingleInput().");
-}
-
-DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
+std::vector<const substrait::Rel *> JoinRelParser::getInputs(const 
substrait::Rel & rel)
 {
     const auto & join = rel.join();
     if (!join.has_left() || !join.has_right())
@@ -92,12 +88,19 @@ DB::QueryPlanPtr JoinRelParser::parseOp(const 
substrait::Rel & rel, std::list<co
         throw Exception(ErrorCodes::BAD_ARGUMENTS, "left table or right table 
is missing.");
     }
 
-    rel_stack.push_back(&rel);
-    auto left_plan = getPlanParser()->parseOp(join.left(), rel_stack);
-    auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack);
-    rel_stack.pop_back();
+    return {&join.left(), &join.right()};
+}
+std::optional<const substrait::Rel *> JoinRelParser::getSingleInput(const 
substrait::Rel & /*rel*/)
+{
+    throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't 
call getSingleInput().");
+}
 
-    return parseJoin(join, std::move(left_plan), std::move(right_plan));
+DB::QueryPlanPtr JoinRelParser::parse(
+    std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_)
+{
+    assert(input_plans_.size() == 2);
+    const auto & join = rel.join();
+    return parseJoin(join, std::move(input_plans_[0]), 
std::move(input_plans_[1]));
 }
 
 std::unordered_set<DB::JoinTableSide> 
JoinRelParser::extractTableSidesFromExpression(
@@ -282,13 +285,22 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
                 auto input_header = left->getCurrentDataStream().header;
                 DB::ActionsDAG 
filter_is_not_null_dag{input_header.getColumnsWithTypeAndName()};
                 // when is_null_aware_anti_join is true, there is only one 
join key
-                const auto * key_field = 
filter_is_not_null_dag.getInputs()[join.expression().scalar_function().arguments().at(0).value().selection().direct_reference().struct_field().field()];
+                const auto * key_field = 
filter_is_not_null_dag.getInputs()[join.expression()
+                                                                               
 .scalar_function()
+                                                                               
 .arguments()
+                                                                               
 .at(0)
+                                                                               
 .value()
+                                                                               
 .selection()
+                                                                               
 .direct_reference()
+                                                                               
 .struct_field()
+                                                                               
 .field()];
 
                 auto result_node = 
filter_is_not_null_dag.tryFindInOutputs(key_field->result_name);
                 // add a function isNotNull to filter the null key on the left 
side
                 const auto * cond_node = 
plan_parser->toFunctionNode(filter_is_not_null_dag, "isNotNull", {result_node});
                 filter_is_not_null_dag.addOrReplaceInOutputs(*cond_node);
-                auto filter_step = 
std::make_unique<FilterStep>(left->getCurrentDataStream(), 
std::move(filter_is_not_null_dag), cond_node->result_name, true);
+                auto filter_step = std::make_unique<FilterStep>(
+                    left->getCurrentDataStream(), 
std::move(filter_is_not_null_dag), cond_node->result_name, true);
                 left->addStep(std::move(filter_step));
             }
             // other case: is_empty_hash_table, don't need to handle
@@ -342,8 +354,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
             = couldRewriteToMultiJoinOnClauses(table_join->getOnlyClause(), 
join_on_clauses, join, left_header, right_header);
         if (is_multi_join_on_clauses && 
join_config.prefer_multi_join_on_clauses && join_opt_info.right_table_rows > 0
             && join_opt_info.partitions_num > 0
-            && join_opt_info.right_table_rows / join_opt_info.partitions_num
-                < join_config.multi_join_on_clauses_build_side_rows_limit)
+            && join_opt_info.right_table_rows / join_opt_info.partitions_num < 
join_config.multi_join_on_clauses_build_side_rows_limit)
         {
             query_plan = buildMultiOnClauseHashJoin(table_join, 
std::move(left), std::move(right), join_on_clauses);
         }
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h
similarity index 84%
rename from cpp-ch/local-engine/Parser/JoinRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h
index 7e43187be..42ebc5c0a 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h
@@ -20,7 +20,7 @@
 #include <unordered_set>
 #include <Core/Joins.h>
 #include <Interpreters/TableJoin.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <substrait/algebra.pb.h>
 
 namespace DB
@@ -42,9 +42,11 @@ public:
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
 
-    DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack) override;
+    DB::QueryPlanPtr parse(
+        std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & 
rel, std::list<const substrait::Rel *> & rel_stack_) override;
 
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override;
+    std::vector<const substrait::Rel *> getInputs(const substrait::Rel & rel) 
override;
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override;
 
 private:
     std::unordered_map<std::string, std::string> & function_mapping;
@@ -69,8 +71,8 @@ private:
 
     void existenceJoinPostProject(DB::QueryPlan & plan, const DB::Names & 
left_input_cols);
 
-    static std::unordered_set<DB::JoinTableSide> 
extractTableSidesFromExpression(
-        const substrait::Expression & expr, const DB::Block & left_header, 
const DB::Block & right_header);
+    static std::unordered_set<DB::JoinTableSide>
+    extractTableSidesFromExpression(const substrait::Expression & expr, const 
DB::Block & left_header, const DB::Block & right_header);
 
     bool couldRewriteToMultiJoinOnClauses(
         const DB::TableJoin::JoinOnClause & prefix_clause,
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
similarity index 100%
rename from cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
similarity index 95%
rename from cpp-ch/local-engine/Parser/MergeTreeRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
index 94b4809d3..5e7be5fa4 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
@@ -17,9 +17,10 @@
 #pragma once
 
 #include <memory>
+#include <optional>
 #include <substrait/algebra.pb.h>
 
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 
 namespace DB
 {
@@ -50,7 +51,7 @@ public:
     DB::QueryPlanPtr parseReadRel(
         DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel, 
const substrait::ReadRel::ExtensionTable & extension_table);
 
-    const substrait::Rel & getSingleInput(const substrait::Rel &) override
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
&) override
     {
         throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't 
call getSingleInput().");
     }
diff --git a/cpp-ch/local-engine/Parser/ProjectRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
similarity index 100%
rename from cpp-ch/local-engine/Parser/ProjectRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
diff --git a/cpp-ch/local-engine/Parser/ProjectRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
similarity index 86%
rename from cpp-ch/local-engine/Parser/ProjectRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
index 94accff2d..87f5e20ed 100644
--- a/cpp-ch/local-engine/Parser/ProjectRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 #pragma once
+#include <optional>
 #include <Core/Block.h>
 #include <Core/SortDescription.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Poco/Logger.h>
 
 namespace local_engine
@@ -29,7 +30,7 @@ public:
     {
         ActionsDAG before_array_join; /// Optional
         ActionsDAG array_join;
-        ActionsDAG after_array_join;  /// Optional
+        ActionsDAG after_array_join; /// Optional
     };
 
     explicit ProjectRelParser(SerializedPlanParser * plan_paser_);
@@ -44,21 +45,21 @@ private:
     DB::QueryPlanPtr parseProject(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
     DB::QueryPlanPtr parseGenerate(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
 
-    static const DB::ActionsDAG::Node * findArrayJoinNode(const ActionsDAG& 
actions_dag);
+    static const DB::ActionsDAG::Node * findArrayJoinNode(const ActionsDAG & 
actions_dag);
 
     /// Split actions_dag of generate rel into 3 parts: before array join + 
during array join + after array join
-    static SplittedActionsDAGs splitActionsDAGInGenerate(const ActionsDAG& 
actions_dag);
+    static SplittedActionsDAGs splitActionsDAGInGenerate(const ActionsDAG & 
actions_dag);
 
     bool isReplicateRows(substrait::GenerateRel rel);
 
     DB::QueryPlanPtr parseReplicateRows(QueryPlanPtr query_plan, 
substrait::GenerateRel generate_rel);
 
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override
     {
         if (rel.has_generate())
-            return rel.generate().input();
+            return &rel.generate().input();
 
-        return rel.project().input();
+        return &rel.project().input();
     }
 };
 }
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
new file mode 100644
index 000000000..5532baca5
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadRelParser.h"
+#include <memory>
+
+namespace DB::ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+
+namespace local_engine
+{
+DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+{
+    if (query_plan)
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's 
input plan should be null");
+    const auto & read = rel.read();
+    if (read.has_local_files() || (!read.has_extension_table() && 
!isReadFromMergeTree(read)))
+    {
+        assert(read.has_base_schema());
+        DB::QueryPlanStepPtr read_step;
+        if (isReadRelFromJava(read))
+            read_step = parseReadRelWithJavaIter(read);
+        else
+            read_step = parseReadRelWithLocalFile(read);
+        query_plan = std::make_unique<DB::QueryPlan>();
+        steps.emplace_back(read_step.get());
+        query_plan->addStep(std::move(read_step));
+
+        if (getContext()->getSettingsRef().max_threads > 1)
+        {
+            auto buffer_step = 
std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
+            steps.emplace_back(buffer_step.get());
+            query_plan->addStep(std::move(buffer_step));
+        }
+    }
+    else
+    {
+        substrait::ReadRel::ExtensionTable extension_table;
+        if (read.has_extension_table())
+            extension_table = read.extension_table();
+        else
+        {
+            extension_table = 
BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_info);
+            logDebugMessage(extension_table, "extension_table");
+        }
+        MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext());
+        query_plan = 
mergeTreeParser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, 
extension_table);
+        steps = mergeTreeParser.getSteps();
+    }
+    return query_plan;
+}
+
+bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel)
+{
+    return rel.has_local_files() && rel.local_files().items().size() == 1
+        && rel.local_files().items().at(0).uri_file().starts_with("iterator");
+}
+
+bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel)
+{
+    assert(rel.has_advanced_extension());
+    bool is_read_from_merge_tree;
+    google::protobuf::StringValue optimization;
+    
optimization.ParseFromString(rel.advanced_extension().optimization().value());
+    ReadBufferFromString in(optimization.value());
+    if (!checkString("isMergeTree=", in))
+        return false;
+    readBoolText(is_read_from_merge_tree, in);
+    assertChar('\n', in);
+    return is_read_from_merge_tree;
+}
+
+DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const 
substrait::ReadRel & rel)
+{
+    GET_JNIENV(env)
+    SCOPE_EXIT({CLEAN_JNIENV});
+    auto first_block = SourceFromJavaIter::peekBlock(env, input_iter);
+
+    /// Try to decide header from the first block read from Java iterator. 
Thus AggregateFunction with parameters has more precise types.
+    auto header = first_block.has_value() ? first_block->cloneEmpty() : 
TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+    auto source = std::make_shared<SourceFromJavaIter>(
+        getContext(), std::move(header), input_iter, 
is_input_iter_materialize, std::move(first_block));
+
+    QueryPlanStepPtr source_step = 
std::make_unique<ReadFromPreparedSource>(Pipe(source));
+    source_step->setStepDescription("Read From Java Iter");
+    return source_step;
+}
+
+QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const 
substrait::ReadRel & rel)
+{
+    auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+    substrait::ReadRel::LocalFiles local_files;
+    if (rel.has_local_files())
+        local_files = rel.local_files();
+    else
+    {
+        local_files = 
BinaryToMessage<substrait::ReadRel::LocalFiles>(getPlanParser()->nextSplitInfo());
+        logDebugMessage(local_files, "local_files");
+    }
+    auto source = std::make_shared<SubstraitFileSource>(getContext(), header, 
local_files);
+    auto source_pipe = Pipe(source);
+    auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), 
std::move(source_pipe), "substrait local files");
+    source_step->setStepDescription("read local files");
+    if (rel.has_filter())
+    {
+        DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
+        const DB::ActionsDAG::Node * filter_node = 
parseExpression(actions_dag, rel.filter());
+        actions_dag.addOrReplaceInOutputs(*filter_node);
+        assert(filter_node == 
&(actions_dag.findInOutputs(filter_node->result_name)));
+        source_step->addFilter(std::move(actions_dag), 
filter_node->result_name);
+    }
+    return source_step;
+}
+
+void registerReadRelParser(RelParserFactory & factory)
+{
+    auto builder = [](SerializedPlanParser * plan_parser_) { return 
std::make_unique<ReadRelParser>(plan_parser_); };
+    factory.registerBuilder(substrait::Rel::RelTypeCase::kRead, builder);
+}
+}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
new file mode 100644
index 000000000..8f9c578fb
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Core/Block.h>
+#include <Core/Settings.h>
+#include <Interpreters/Context.h>
+#include <Operator/BlocksBufferPoolTransform.h>
+#include <Parser/RelParsers/MergeTreeRelParser.h>
+#include <Parser/SerializedPlanParser.h>
+#include <Parser/SubstraitParserUtils.h>
+#include <Parser/TypeParser.h>
+#include <Processors/QueryPlan/ReadFromPreparedSource.h>
+#include <Storages/SourceFromJavaIter.h>
+#include <Storages/SubstraitSource/SubstraitFileSource.h>
+#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <Common/BlockTypeUtils.h>
+#include <Common/JNIUtils.h>
+
+namespace local_engine
+{
+class ReadRelParser : public RelParser
+{
+public:
+    explicit ReadRelParser(SerializedPlanParser * plan_parser_) : 
RelParser(plan_parser_) { }
+    ~ReadRelParser() override = default;
+
+    DB::QueryPlanPtr
+    parse(std::vector<DB::QueryPlanPtr> &, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override
+    {
+        DB::QueryPlanPtr query_plan;
+        return parse(std::move(query_plan), rel, rel_stack_);
+    }
+
+    DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & 
rel, std::list<const substrait::Rel *> &) override;
+    // This is source node, there is no input
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return {}; }
+
+    bool isReadRelFromJava(const substrait::ReadRel & rel);
+    bool isReadFromMergeTree(const substrait::ReadRel & rel);
+
+    void setInputIter(jobject input_iter_, bool is_materialze)
+    {
+        input_iter = input_iter_;
+        is_input_iter_materialize = is_materialze;
+    }
+
+    void setSplitInfo(String split_info_) { split_info = split_info_; }
+
+private:
+    jobject input_iter;
+    bool is_input_iter_materialize;
+    String split_info;
+    DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & 
rel);
+    QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel);
+};
+}
diff --git a/cpp-ch/local-engine/Parser/RelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp
similarity index 89%
rename from cpp-ch/local-engine/Parser/RelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp
index a7f6d0586..c5e7d4b47 100644
--- a/cpp-ch/local-engine/Parser/RelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp
@@ -22,9 +22,10 @@
 #include <AggregateFunctions/AggregateFunctionFactory.h>
 #include <DataTypes/DataTypeAggregateFunction.h>
 #include <DataTypes/IDataType.h>
+#include <Poco/Logger.h>
 #include <Poco/StringTokenizer.h>
 #include <Common/Exception.h>
-
+#include <Common/logger_useful.h>
 
 namespace DB
 {
@@ -37,6 +38,14 @@ extern const int LOGICAL_ERROR;
 
 namespace local_engine
 {
+
+std::vector<const substrait::Rel *> RelParser::getInputs(const substrait::Rel 
& rel)
+{
+    auto input = getSingleInput(rel);
+    if (!input)
+        return {};
+    return {*input};
+}
 AggregateFunctionPtr RelParser::getAggregateFunction(
     const String & name, DB::DataTypes arg_types, 
DB::AggregateFunctionProperties & properties, const DB::Array & parameters)
 {
@@ -80,13 +89,12 @@ std::optional<String> RelParser::parseFunctionName(UInt32 
function_ref, const su
     }
     return plan_parser->getFunctionName(*sigature_name, function);
 }
-DB::QueryPlanPtr RelParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
+
+DB::QueryPlanPtr
+RelParser::parse(std::vector<DB::QueryPlanPtr> & input_plans_, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
 {
-    SerializedPlanParser & planParser = *getPlanParser();
-    rel_stack.push_back(&rel);
-    auto query_plan = planParser.parseOp(getSingleInput(rel), rel_stack);
-    rel_stack.pop_back();
-    return parse(std::move(query_plan), rel, rel_stack);
+    assert(input_plans_.size() == 1);
+    return parse(std::move(input_plans_[0]), rel, rel_stack_);
 }
 
 std::map<std::string, std::string>
@@ -160,6 +168,8 @@ void registerProjectRelParser(RelParserFactory & factory);
 void registerJoinRelParser(RelParserFactory & factory);
 void registerFilterRelParser(RelParserFactory & factory);
 void registerCrossRelParser(RelParserFactory & factory);
+void registerFetchRelParser(RelParserFactory & factory);
+void registerReadRelParser(RelParserFactory & factory);
 
 void registerRelParsers()
 {
@@ -173,5 +183,7 @@ void registerRelParsers()
     registerJoinRelParser(factory);
     registerCrossRelParser(factory);
     registerFilterRelParser(factory);
+    registerFetchRelParser(factory);
+    registerReadRelParser(factory);
 }
 }
diff --git a/cpp-ch/local-engine/Parser/RelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h
similarity index 78%
rename from cpp-ch/local-engine/Parser/RelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/RelParser.h
index 885622281..2e38c30c6 100644
--- a/cpp-ch/local-engine/Parser/RelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h
@@ -39,8 +39,10 @@ public:
     virtual DB::QueryPlanPtr
     parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_)
         = 0;
-    virtual DB::QueryPlanPtr parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack);
-    virtual const substrait::Rel & getSingleInput(const substrait::Rel & rel) 
= 0;
+    virtual DB::QueryPlanPtr
+    parse(std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & 
rel, std::list<const substrait::Rel *> & rel_stack_);
+    virtual std::vector<const substrait::Rel *> getInputs(const substrait::Rel 
& rel);
+    virtual std::optional<const substrait::Rel *> getSingleInput(const 
substrait::Rel & rel) = 0;
     const std::vector<IQueryPlanStep *> & getSteps() const { return steps; }
 
     static AggregateFunctionPtr getAggregateFunction(
@@ -59,12 +61,12 @@ protected:
     // Get coresponding function name in ClickHouse.
     std::optional<String> parseFunctionName(UInt32 function_ref, const 
substrait::Expression_ScalarFunction & function);
 
-    const DB::ActionsDAG::Node * parseArgument(ActionsDAG& action_dag, const 
substrait::Expression & rel)
+    const DB::ActionsDAG::Node * parseArgument(ActionsDAG & action_dag, const 
substrait::Expression & rel)
     {
         return plan_parser->parseExpression(action_dag, rel);
     }
 
-    const DB::ActionsDAG::Node * parseExpression(ActionsDAG& action_dag, const 
substrait::Expression & rel)
+    const DB::ActionsDAG::Node * parseExpression(ActionsDAG & action_dag, 
const substrait::Expression & rel)
     {
         return plan_parser->parseExpression(action_dag, rel);
     }
@@ -77,13 +79,15 @@ protected:
     std::vector<IQueryPlanStep *> steps;
 
     const ActionsDAG::Node *
-    buildFunctionNode(ActionsDAG& action_dag, const String & function, const 
DB::ActionsDAG::NodeRawConstPtrs & args)
+    buildFunctionNode(ActionsDAG & action_dag, const String & function, const 
DB::ActionsDAG::NodeRawConstPtrs & args)
     {
         return plan_parser->toFunctionNode(action_dag, function, args);
     }
 
-    static std::map<std::string, std::string> 
parseFormattedRelAdvancedOptimization(const 
substrait::extensions::AdvancedExtension &advanced_extension);
-    static std::string getStringConfig(const std::map<std::string, 
std::string> & configs, const std::string & key, const std::string & 
default_value = "");
+    static std::map<std::string, std::string>
+    parseFormattedRelAdvancedOptimization(const 
substrait::extensions::AdvancedExtension & advanced_extension);
+    static std::string
+    getStringConfig(const std::map<std::string, std::string> & configs, const 
std::string & key, const std::string & default_value = "");
 
     SerializedPlanParser * plan_parser;
 };
diff --git a/cpp-ch/local-engine/Parser/SortRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp
similarity index 99%
rename from cpp-ch/local-engine/Parser/SortRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp
index 8fb97d6da..48e29234e 100644
--- a/cpp-ch/local-engine/Parser/SortRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp
@@ -17,7 +17,7 @@
 #include "SortRelParser.h"
 
 #include <Common/GlutenConfig.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Processors/QueryPlan/SortingStep.h>
 #include <Common/logger_useful.h>
 #include <Common/QueryContext.h>
diff --git a/cpp-ch/local-engine/Parser/SortRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h
similarity index 88%
rename from cpp-ch/local-engine/Parser/SortRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h
index 5426b267b..2b9d0915f 100644
--- a/cpp-ch/local-engine/Parser/SortRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 #pragma once
+#include <optional>
 #include <Core/Block.h>
 #include <Core/SortDescription.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <google/protobuf/repeated_field.h>
 namespace local_engine
 {
@@ -32,7 +33,7 @@ public:
     static DB::SortDescription
     parseSortDescription(const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields, const 
DB::Block & header);
 
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.sort().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.sort().input(); }
 
 private:
     size_t parseLimit(std::list<const substrait::Rel *> & rel_stack_);
diff --git a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp
similarity index 96%
rename from cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp
index f6c10386f..75488937e 100644
--- a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp
@@ -16,15 +16,11 @@
  */
 
 #include "WindowGroupLimitRelParser.h"
-#include <Interpreters/ActionsDAG.h>
 #include <Operator/WindowGroupLimitStep.h>
 #include <Parser/AdvancedParametersParseUtil.h>
-#include <Parser/SortRelParser.h>
-#include <Parser/WindowGroupLimitRelParser.h>
 #include <Processors/QueryPlan/ExpressionStep.h>
 #include <google/protobuf/repeated_field.h>
 #include <google/protobuf/wrappers.pb.h>
-#include "AdvancedParametersParseUtil.h"
 
 namespace DB::ErrorCodes
 {
diff --git a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h
similarity index 84%
rename from cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h
index c9c503ed4..7939d232c 100644
--- a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h
@@ -15,13 +15,8 @@
  * limitations under the License.
  */
 #pragma once
-#include <unordered_map>
-#include <Core/Field.h>
-#include <Core/SortDescription.h>
-#include <DataTypes/IDataType.h>
-#include <Interpreters/WindowDescription.h>
-#include <Parser/AggregateFunctionParser.h>
-#include <Parser/RelParser.h>
+#include <optional>
+#include <Parser/RelParsers/RelParser.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Processors/QueryPlan/QueryPlan.h>
 #include <Poco/Logger.h>
@@ -40,7 +35,7 @@ public:
     ~WindowGroupLimitRelParser() override = default;
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.windowgrouplimit().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.windowgrouplimit().input(); }
 
 private:
     DB::QueryPlanPtr current_plan;
diff --git a/cpp-ch/local-engine/Parser/WindowRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp
similarity index 99%
rename from cpp-ch/local-engine/Parser/WindowRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp
index 0676924c1..7506daabf 100644
--- a/cpp-ch/local-engine/Parser/WindowRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp
@@ -29,8 +29,8 @@
 #include <IO/WriteBufferFromString.h>
 #include <Interpreters/ActionsDAG.h>
 #include <Interpreters/WindowDescription.h>
-#include <Parser/RelParser.h>
-#include <Parser/SortRelParser.h>
+#include <Parser/RelParsers/RelParser.h>
+#include <Parser/RelParsers/SortRelParser.h>
 #include <Parser/TypeParser.h>
 #include <Processors/QueryPlan/ExpressionStep.h>
 #include <Processors/QueryPlan/WindowStep.h>
diff --git a/cpp-ch/local-engine/Parser/WindowRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h
similarity index 95%
rename from cpp-ch/local-engine/Parser/WindowRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h
index 6610915fb..6168bebe4 100644
--- a/cpp-ch/local-engine/Parser/WindowRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h
@@ -21,7 +21,7 @@
 #include <DataTypes/IDataType.h>
 #include <Interpreters/WindowDescription.h>
 #include <Parser/AggregateFunctionParser.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Processors/QueryPlan/QueryPlan.h>
 #include <Poco/Logger.h>
 #include <Common/logger_useful.h>
@@ -35,7 +35,7 @@ public:
     ~WindowRelParser() override = default;
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
-    const substrait::Rel & getSingleInput(const substrait::Rel & rel) override 
{ return rel.window().input(); }
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.window().input(); }
 
 private:
     struct WindowInfo
diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
similarity index 100%
rename from cpp-ch/local-engine/Parser/WriteRelParser.cpp
rename to cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.h
similarity index 100%
rename from cpp-ch/local-engine/Parser/WriteRelParser.h
rename to cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.h
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index e893dd35b..aa4bf5aac 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -52,14 +52,16 @@
 #include <Join/StorageJoinFromReadBuffer.h>
 #include <Operator/BlocksBufferPoolTransform.h>
 #include <Parser/FunctionParser.h>
-#include <Parser/MergeTreeRelParser.h>
-#include <Parser/RelParser.h>
+#include <Parser/InputFileNameParser.h>
+#include <Parser/LocalExecutor.h>
+#include <Parser/RelParsers/ReadRelParser.h>
+#include <Parser/RelParsers/RelParser.h>
+#include <Parser/RelParsers/WriteRelParser.h>
 #include <Parser/SubstraitParserUtils.h>
 #include <Parser/TypeParser.h>
-#include <Parser/WriteRelParser.h>
 #include <Parsers/ASTIdentifier.h>
 #include <Parsers/ExpressionListParsers.h>
-#include <Parser/LocalExecutor.h>
+#include <Processors/Executors/PipelineExecutor.h>
 #include <Processors/QueryPlan/AggregatingStep.h>
 #include <Processors/QueryPlan/ExpressionStep.h>
 #include <Processors/QueryPlan/LimitStep.h>
@@ -83,8 +85,7 @@
 #include <Common/JNIUtils.h>
 #include <Common/logger_useful.h>
 #include <Common/typeid_cast.h>
-#include <Processors/Executors/PipelineExecutor.h>
-#include <Parser/InputFileNameParser.h>
+#include "RelParsers/RelParser.h"
 
 namespace DB
 {
@@ -117,7 +118,7 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, 
char c)
     return res;
 }
 
-const ActionsDAG::Node * SerializedPlanParser::addColumn(ActionsDAG& 
actions_dag, const DataTypePtr & type, const Field & field)
+const ActionsDAG::Node * SerializedPlanParser::addColumn(ActionsDAG & 
actions_dag, const DataTypePtr & type, const Field & field)
 {
     return &actions_dag.addColumn(
         ColumnWithTypeAndName(type->createColumnConst(1, field), type, 
getUniqueName(toString(field).substr(0, 10))));
@@ -242,74 +243,6 @@ std::string getDecimalFunction(const 
substrait::Type_Decimal & decimal, bool nul
     return ch_function_name;
 }
 
-bool SerializedPlanParser::isReadRelFromJava(const substrait::ReadRel & rel)
-{
-    return rel.has_local_files() && rel.local_files().items().size() == 1
-        && rel.local_files().items().at(0).uri_file().starts_with("iterator");
-}
-
-bool SerializedPlanParser::isReadFromMergeTree(const substrait::ReadRel & rel)
-{
-    assert(rel.has_advanced_extension());
-    bool is_read_from_merge_tree;
-    google::protobuf::StringValue optimization;
-    
optimization.ParseFromString(rel.advanced_extension().optimization().value());
-    ReadBufferFromString in(optimization.value());
-    assertString("isMergeTree=", in);
-    readBoolText(is_read_from_merge_tree, in);
-    assertChar('\n', in);
-    return is_read_from_merge_tree;
-}
-
-QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const 
substrait::ReadRel & rel)
-{
-    auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
-    substrait::ReadRel::LocalFiles local_files;
-    if (rel.has_local_files())
-        local_files = rel.local_files();
-    else
-    {
-        local_files = 
BinaryToMessage<substrait::ReadRel::LocalFiles>(split_infos.at(nextSplitInfoIndex()));
-        logDebugMessage(local_files, "local_files");
-    }
-    auto source = std::make_shared<SubstraitFileSource>(context, header, 
local_files);
-    auto source_pipe = Pipe(source);
-    auto source_step = std::make_unique<SubstraitFileSourceStep>(context, 
std::move(source_pipe), "substrait local files");
-    source_step->setStepDescription("read local files");
-    if (rel.has_filter())
-    {
-        ActionsDAG actions_dag{blockToNameAndTypeList(header)};
-        const ActionsDAG::Node * filter_node = parseExpression(actions_dag, 
rel.filter());
-        actions_dag.addOrReplaceInOutputs(*filter_node);
-        assert(filter_node == 
&(actions_dag.findInOutputs(filter_node->result_name)));
-        source_step->addFilter(std::move(actions_dag), 
filter_node->result_name);
-    }
-    return source_step;
-}
-
-QueryPlanStepPtr SerializedPlanParser::parseReadRealWithJavaIter(const 
substrait::ReadRel & rel)
-{
-    assert(rel.has_local_files());
-    assert(rel.local_files().items().size() == 1);
-    auto iter = rel.local_files().items().at(0).uri_file();
-    auto pos = iter.find(':');
-    auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
-    jobject input_iter = input_iters[iter_index];
-    bool materialize_input = materialize_inputs[iter_index];
-
-    GET_JNIENV(env)
-    SCOPE_EXIT({CLEAN_JNIENV});
-    auto first_block = SourceFromJavaIter::peekBlock(env, input_iter);
-
-    /// Try to decide header from the first block read from Java iterator. 
Thus AggregateFunction with parameters has more precise types.
-    auto header = first_block.has_value() ? first_block->cloneEmpty() : 
TypeParser::buildBlockFromNamedStruct(rel.base_schema());
-    auto source = std::make_shared<SourceFromJavaIter>(context, 
std::move(header), input_iter, materialize_input, std::move(first_block));
-
-    QueryPlanStepPtr source_step = 
std::make_unique<ReadFromPreparedSource>(Pipe(source));
-    source_step->setStepDescription("Read From Java Iter");
-    return source_step;
-}
-
 IQueryPlanStep * SerializedPlanParser::addRemoveNullableStep(QueryPlan & plan, 
const std::set<String> & columns)
 {
     if (columns.empty())
@@ -345,7 +278,11 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel
         NamesWithAliases aliases;
         auto cols = 
query_plan->getCurrentDataStream().header.getNamesAndTypesList();
         if (cols.getNames().size() != 
static_cast<size_t>(root_rel.root().names_size()))
-            throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result 
columns size. plan column size {}, subtrait plan size {}.", 
cols.getNames().size(), root_rel.root().names_size());
+            throw Exception(
+                ErrorCodes::LOGICAL_ERROR,
+                "Missmatch result columns size. plan column size {}, subtrait 
plan size {}.",
+                cols.getNames().size(),
+                root_rel.root().names_size());
         for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
             aliases.emplace_back(NameWithAlias(cols.getNames()[i], 
root_rel.root().names(i)));
         actions_dag.project(aliases);
@@ -433,89 +370,55 @@ QueryPlanPtr SerializedPlanParser::parse(const 
substrait::Plan & plan)
 }
 
 std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const 
substrait::Plan & plan)
-{ return createExecutor(parse(plan), plan); }
+{
+    return createExecutor(parse(plan), plan);
+}
 
 QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack)
 {
-    QueryPlanPtr query_plan;
-    std::vector<IQueryPlanStep *> steps;
-    switch (rel.rel_type_case())
-    {
-        case substrait::Rel::RelTypeCase::kFetch: {
-            rel_stack.push_back(&rel);
-            const auto & limit = rel.fetch();
-            query_plan = parseOp(limit.input(), rel_stack);
-            rel_stack.pop_back();
-            auto limit_step = 
std::make_unique<LimitStep>(query_plan->getCurrentDataStream(), limit.count(), 
limit.offset());
-            limit_step->setStepDescription("LIMIT");
-            steps.emplace_back(limit_step.get());
-            query_plan->addStep(std::move(limit_step));
-            break;
-        }
-        case substrait::Rel::RelTypeCase::kRead: {
-            const auto & read = rel.read();
-            // TODO: We still maintain the old logic of parsing LocalFiles or 
ExtensionTable in RealRel
-            // to be compatiable with some suites about metrics.
-            // Remove this compatiability in later and then only java iter has 
local files in ReadRel.
-            if (read.has_local_files() || (!read.has_extension_table() && 
!isReadFromMergeTree(read)))
-            {
-                assert(read.has_base_schema());
-                QueryPlanStepPtr step;
-                if (isReadRelFromJava(read))
-                    step = parseReadRealWithJavaIter(read);
-                else
-                    step = parseReadRealWithLocalFile(read);
+    DB::QueryPlanPtr query_plan;
+    auto rel_parser = 
RelParserFactory::instance().getBuilder(rel.rel_type_case())(this);
 
-                query_plan = std::make_unique<QueryPlan>();
-                steps.emplace_back(step.get());
-                query_plan->addStep(std::move(step));
+    auto all_input_rels = rel_parser->getInputs(rel);
+    std::vector<DB::QueryPlanPtr> input_query_plans;
+    rel_stack.push_back(&rel);
+    for (const auto * input_rel : all_input_rels)
+    {
+        auto input_query_plan = parseOp(*input_rel, rel_stack);
+        input_query_plans.push_back(std::move(input_query_plan));
+    }
+    rel_stack.pop_back();
 
-                // Add a buffer after source, it try to preload data from 
source and reduce the
-                // waiting time of downstream nodes.
-                if (context->getSettingsRef().max_threads > 1)
-                {
-                    auto buffer_step = 
std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
-                    steps.emplace_back(buffer_step.get());
-                    query_plan->addStep(std::move(buffer_step));
-                }
-            }
-            else
+    // source node is special
+    if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead)
+    {
+        assert(all_input_rels.empty());
+        auto read_rel_parser = 
std::dynamic_pointer_cast<ReadRelParser>(rel_parser);
+        const auto & read = rel.read();
+        if (read.has_local_files())
+        {
+            if (read_rel_parser->isReadRelFromJava(read))
             {
-                substrait::ReadRel::ExtensionTable extension_table;
-                if (read.has_extension_table())
-                    extension_table = read.extension_table();
-                else
-                {
-                    extension_table = 
BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_infos.at(nextSplitInfoIndex()));
-                    logDebugMessage(extension_table, "extension_table");
-                }
-
-                MergeTreeRelParser mergeTreeParser(this, context);
-                query_plan = 
mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read, 
extension_table);
-                steps = mergeTreeParser.getSteps();
+                auto iter = read.local_files().items().at(0).uri_file();
+                auto pos = iter.find(':');
+                auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
+                auto [input_iter, materalize_input] = 
getInputIter(static_cast<size_t>(iter_index));
+                read_rel_parser->setInputIter(input_iter, materalize_input);
             }
-            break;
         }
-        case substrait::Rel::RelTypeCase::kFilter:
-        case substrait::Rel::RelTypeCase::kGenerate:
-        case substrait::Rel::RelTypeCase::kProject:
-        case substrait::Rel::RelTypeCase::kAggregate:
-        case substrait::Rel::RelTypeCase::kSort:
-        case substrait::Rel::RelTypeCase::kWindow:
-        case substrait::Rel::RelTypeCase::kJoin:
-        case substrait::Rel::RelTypeCase::kCross:
-        case substrait::Rel::RelTypeCase::kWindowGroupLimit:
-        case substrait::Rel::RelTypeCase::kExpand: {
-            auto op_parser = 
RelParserFactory::instance().getBuilder(rel.rel_type_case())(this);
-            query_plan = op_parser->parseOp(rel, rel_stack);
-            auto parser_steps = op_parser->getSteps();
-            steps.insert(steps.end(), parser_steps.begin(), 
parser_steps.end());
-            break;
+        else if (read_rel_parser->isReadFromMergeTree(read))
+        {
+            if (!read.has_extension_table())
+            {
+                read_rel_parser->setSplitInfo(nextSplitInfo());
+            }
         }
-        default:
-            throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support 
relation type: {}.\n{}", rel.rel_type_case(), rel.DebugString());
     }
 
+    query_plan = rel_parser->parse(input_query_plans, rel, rel_stack);
+
+    std::vector<DB::IQueryPlanStep *> steps = rel_parser->getSteps();
+
     if (!context->getSettingsRef().query_plan_enable_optimizations)
     {
         if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead)
@@ -1321,215 +1224,6 @@ SerializedPlanParser::SerializedPlanParser(const 
ContextPtr & context_) : contex
 {
 }
 
-void SerializedPlanParser::collectJoinKeys(
-    const substrait::Expression & condition, std::vector<std::pair<int32_t, 
int32_t>> & join_keys, int32_t right_key_start)
-{
-    auto condition_name = getFunctionName(
-        
function_mapping.at(std::to_string(condition.scalar_function().function_reference())),
 condition.scalar_function());
-    if (condition_name == "and")
-    {
-        collectJoinKeys(condition.scalar_function().arguments(0).value(), 
join_keys, right_key_start);
-        collectJoinKeys(condition.scalar_function().arguments(1).value(), 
join_keys, right_key_start);
-    }
-    else if (condition_name == "equals")
-    {
-        const auto & function = condition.scalar_function();
-        auto left_key_idx = 
function.arguments(0).value().selection().direct_reference().struct_field().field();
-        auto right_key_idx = 
function.arguments(1).value().selection().direct_reference().struct_field().field()
 - right_key_start;
-        join_keys.emplace_back(std::pair(left_key_idx, right_key_idx));
-    }
-    else
-    {
-        throw Exception(ErrorCodes::BAD_ARGUMENTS, "doesn't support condition 
{}", condition_name);
-    }
-}
-
-ActionsDAG ASTParser::convertToActions(const NamesAndTypesList & 
name_and_types, const ASTPtr & ast) const
-{
-    NamesAndTypesList aggregation_keys;
-    ColumnNumbersList aggregation_keys_indexes_list;
-    AggregationKeysInfo info(aggregation_keys, aggregation_keys_indexes_list, 
GroupByKind::NONE);
-    SizeLimits size_limits_for_set;
-    ActionsMatcher::Data visitor_data(
-        context,
-        size_limits_for_set,
-        static_cast<size_t>(0),
-        name_and_types,
-        ActionsDAG(name_and_types),
-        std::make_shared<PreparedSets>(),
-        false /* no_subqueries */,
-        false /* no_makeset */,
-        false /* only_consts */,
-        info);
-    ActionsVisitor(visitor_data).visit(ast);
-    return visitor_data.getActions();
-}
-
-ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression 
& rel)
-{
-    LOG_DEBUG(&Poco::Logger::get("ASTParser"), "substrait plan:\n{}", 
rel.DebugString());
-    if (rel.has_scalar_function())
-    {
-        const auto & scalar_function = rel.scalar_function();
-        auto function_signature = 
function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
-
-        auto substrait_name = function_signature.substr(0, 
function_signature.find(':'));
-        auto func_parser = 
FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
-        String function_name = func_parser->getName();
-
-        ASTs ast_args;
-        parseFunctionArgumentsToAST(names, scalar_function, ast_args);
-
-        return makeASTFunction(function_name, ast_args);
-    }
-    else
-        return parseArgumentToAST(names, rel);
-}
-
-void ASTParser::parseFunctionArgumentsToAST(
-    const Names & names, const substrait::Expression_ScalarFunction & 
scalar_function, ASTs & ast_args)
-{
-    const auto & args = scalar_function.arguments();
-
-    for (const auto & arg : args)
-    {
-        if (arg.value().has_scalar_function())
-        {
-            ast_args.emplace_back(parseToAST(names, arg.value()));
-        }
-        else
-        {
-            ast_args.emplace_back(parseArgumentToAST(names, arg.value()));
-        }
-    }
-}
-
-ASTPtr ASTParser::parseArgumentToAST(const Names & names, const 
substrait::Expression & rel)
-{
-    switch (rel.rex_type_case())
-    {
-        case substrait::Expression::RexTypeCase::kLiteral: {
-            DataTypePtr type;
-            Field field;
-            std::tie(type, field) = 
SerializedPlanParser::parseLiteral(rel.literal());
-            return std::make_shared<ASTLiteral>(field, type);
-        }
-        case substrait::Expression::RexTypeCase::kSelection: {
-            if (!rel.selection().has_direct_reference() || 
!rel.selection().direct_reference().has_struct_field())
-                throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can only have 
direct struct references in selections");
-
-            const auto field = 
rel.selection().direct_reference().struct_field().field();
-            return std::make_shared<ASTIdentifier>(names[field]);
-        }
-        case substrait::Expression::RexTypeCase::kCast: {
-            if (!rel.cast().has_type() || !rel.cast().has_input())
-                throw Exception(ErrorCodes::BAD_ARGUMENTS, "Doesn't have type 
or input in cast node.");
-
-            /// Append input to asts
-            ASTs args;
-            args.emplace_back(parseArgumentToAST(names, rel.cast().input()));
-
-            /// Append destination type to asts
-            const auto & substrait_type = rel.cast().type();
-            /// Spark cast(x as BINARY) -> CH reinterpretAsStringSpark(x)
-            if (substrait_type.has_binary())
-                return makeASTFunction("reinterpretAsStringSpark", args);
-            else
-            {
-                DataTypePtr ch_type = TypeParser::parseType(substrait_type);
-                
args.emplace_back(std::make_shared<ASTLiteral>(ch_type->getName()));
-
-                return makeASTFunction("CAST", args);
-            }
-        }
-        case substrait::Expression::RexTypeCase::kIfThen: {
-            const auto & if_then = rel.if_then();
-            auto condition_nums = if_then.ifs_size();
-            std::string ch_function_name = condition_nums == 1 ? "if" : 
"multiIf";
-            auto function_multi_if = 
FunctionFactory::instance().get(ch_function_name, context);
-            ASTs args;
-
-            for (int i = 0; i < condition_nums; ++i)
-            {
-                const auto & ifs = if_then.ifs(i);
-                auto if_node = parseArgumentToAST(names, ifs.if_());
-                args.emplace_back(if_node);
-
-                auto then_node = parseArgumentToAST(names, ifs.then());
-                args.emplace_back(then_node);
-            }
-
-            auto else_node = parseArgumentToAST(names, if_then.else_());
-            args.emplace_back(std::move(else_node));
-            return makeASTFunction(ch_function_name, args);
-        }
-        case substrait::Expression::RexTypeCase::kScalarFunction: {
-            return parseToAST(names, rel);
-        }
-        case substrait::Expression::RexTypeCase::kSingularOrList: {
-            const auto & options = rel.singular_or_list().options();
-            /// options is empty always return false
-            if (options.empty())
-                return std::make_shared<ASTLiteral>(0);
-            /// options should be literals
-            if (!options[0].has_literal())
-                throw Exception(ErrorCodes::LOGICAL_ERROR, "Options of 
SingularOrList must have literal type");
-
-            ASTs args;
-            args.emplace_back(parseArgumentToAST(names, 
rel.singular_or_list().value()));
-
-            bool nullable = false;
-            size_t options_len = options.size();
-            ASTs in_args;
-            in_args.reserve(options_len);
-
-            for (int i = 0; i < static_cast<int>(options_len); ++i)
-            {
-                if (!options[i].has_literal())
-                    throw Exception(ErrorCodes::BAD_ARGUMENTS, "in expression 
values must be the literal!");
-                if (!nullable)
-                    nullable = options[i].literal().has_null();
-            }
-
-            auto elem_type_and_field = 
SerializedPlanParser::parseLiteral(options[0].literal());
-            DataTypePtr elem_type = wrapNullableType(nullable, 
elem_type_and_field.first);
-            for (int i = 0; i < static_cast<int>(options_len); ++i)
-            {
-                auto type_and_field = 
SerializedPlanParser::parseLiteral(options[i].literal());
-                auto option_type = wrapNullableType(nullable, 
type_and_field.first);
-                if (!elem_type->equals(*option_type))
-                    throw Exception(
-                        ErrorCodes::LOGICAL_ERROR,
-                        "SingularOrList options type mismatch:{} and {}",
-                        elem_type->getName(),
-                        option_type->getName());
-
-                
in_args.emplace_back(std::make_shared<ASTLiteral>(type_and_field.second));
-            }
-            auto array_ast = makeASTFunction("array", in_args);
-            args.emplace_back(array_ast);
-
-            auto ast = makeASTFunction("in", args);
-            if (nullable)
-            {
-                /// if sets has `null` and value not in sets
-                /// In Spark: return `null`, is the standard behaviour from 
ANSI.(SPARK-37920)
-                /// In CH: return `false`
-                /// So we used if(a, b, c) cast `false` to `null` if sets has 
`null`
-                ast = makeASTFunction("if", ast, 
std::make_shared<ASTLiteral>(true), std::make_shared<ASTLiteral>(Field()));
-            }
-
-            return ast;
-        }
-        default:
-            throw Exception(
-                ErrorCodes::UNKNOWN_TYPE,
-                "Join on condition error. Unsupported spark expression type {} 
: {}",
-                magic_enum::enum_name(rel.rex_type_case()),
-                rel.DebugString());
-    }
-}
-
 void SerializedPlanParser::removeNullableForRequiredColumns(const 
std::set<String> & require_columns, ActionsDAG & actions_dag) const
 {
     for (const auto & item : require_columns)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 88ebb0087..c9a48106a 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -90,18 +90,19 @@ public:
     ///
     std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
 
-    DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & 
rel);
-    DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & 
rel);
-
-    static bool isReadRelFromJava(const substrait::ReadRel & rel);
-    static bool isReadFromMergeTree(const substrait::ReadRel & rel);
-
     void addInputIter(jobject iter, bool materialize_input)
     {
         input_iters.emplace_back(iter);
         materialize_inputs.emplace_back(materialize_input);
     }
 
+    std::pair<jobject, bool> getInputIter(size_t index)
+    {
+        if (index > input_iters.size())
+            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Index({}) is 
overflow input_iters's size({})", index, input_iters.size());
+        return {input_iters[index], materialize_inputs[index]};
+    }
+
     void addSplitInfo(std::string && split_info) { 
split_infos.emplace_back(std::move(split_info)); }
 
     int nextSplitInfoIndex()
@@ -115,6 +116,12 @@ public:
         return split_info_index++;
     }
 
+    const String & nextSplitInfo()
+    {
+        auto next_index = nextSplitInfoIndex();
+        return split_infos.at(next_index);
+    }
+
     void parseExtensions(const 
::google::protobuf::RepeatedPtrField<substrait::extensions::SimpleExtensionDeclaration>
 & extensions);
     DB::ActionsDAG expressionsToActionsDAG(
         const std::vector<substrait::Expression> & expressions, const 
DB::Block & header, const DB::Block & read_schema);
@@ -133,8 +140,6 @@ public:
 
 private:
     DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack);
-    void
-    collectJoinKeys(const substrait::Expression & condition, 
std::vector<std::pair<int32_t, int32_t>> & join_keys, int32_t right_key_start);
 
     void parseFunctionOrExpression(
         const substrait::Expression & rel, std::string & result_name, 
DB::ActionsDAG & actions_dag, bool keep_result = false);
@@ -185,34 +190,10 @@ private:
     int split_info_index = 0;
     std::vector<bool> materialize_inputs;
     ContextPtr context;
-    // for parse rel node, collect steps from a rel node
-    std::vector<IQueryPlanStep *> temp_step_collection;
     std::vector<RelMetricPtr> metrics;
 
 public:
     const ActionsDAG::Node * addColumn(DB::ActionsDAG & actions_dag, const 
DataTypePtr & type, const Field & field);
 };
 
-class ASTParser
-{
-public:
-    explicit ASTParser(
-        const ContextPtr & context_, std::unordered_map<std::string, 
std::string> & function_mapping_, SerializedPlanParser * plan_parser_)
-        : context(context_), function_mapping(function_mapping_), 
plan_parser(plan_parser_)
-    {
-    }
-
-    ~ASTParser() = default;
-
-    ASTPtr parseToAST(const Names & names, const substrait::Expression & rel);
-    ActionsDAG convertToActions(const NamesAndTypesList & name_and_types, 
const ASTPtr & ast) const;
-
-private:
-    ContextPtr context;
-    std::unordered_map<std::string, std::string> function_mapping;
-    SerializedPlanParser * plan_parser;
-
-    void parseFunctionArgumentsToAST(const Names & names, const 
substrait::Expression_ScalarFunction & scalar_function, ASTs & ast_args);
-    ASTPtr parseArgumentToAST(const Names & names, const substrait::Expression 
& rel);
-};
 }
diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp 
b/cpp-ch/local-engine/Parser/TypeParser.cpp
index 84c936226..7796953e2 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.cpp
+++ b/cpp-ch/local-engine/Parser/TypeParser.cpp
@@ -31,7 +31,7 @@
 #include <DataTypes/DataTypesNumber.h>
 #include <Parser/AggregateFunctionParser.h>
 #include <Parser/FunctionParser.h>
-#include <Parser/RelParser.h>
+#include <Parser/RelParsers/RelParser.h>
 #include <Parser/SerializedPlanParser.h>
 #include <Parser/TypeParser.h>
 #include <Poco/StringTokenizer.h>
@@ -334,4 +334,4 @@ DB::DataTypePtr 
TypeParser::tryWrapNullable(substrait::Type_Nullability nullable
         return std::make_shared<DB::DataTypeNullable>(nested_type);
     return nested_type;
 }
-}
\ No newline at end of file
+}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index c1923ae59..68c445863 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -23,13 +23,14 @@
 #include <DataTypes/DataTypeNullable.h>
 #include <Join/BroadCastJoinBuilder.h>
 #include <Parser/CHColumnToSparkRow.h>
-#include <Parser/MergeTreeRelParser.h>
-#include <Parser/RelParser.h>
-#include <Parser/SerializedPlanParser.h>
 #include <Parser/LocalExecutor.h>
+#include <Parser/RelParsers/MergeTreeRelParser.h>
+#include <Parser/RelParsers/RelParser.h>
+#include <Parser/RelParsers/WriteRelParser.h>
+#include <Parser/SerializedPlanParser.h>
 #include <Parser/SparkRowToCHColumn.h>
 #include <Parser/SubstraitParserUtils.h>
-#include <Parser/WriteRelParser.h>
+#include <Processors/Executors/PipelineExecutor.h>
 #include <Shuffle/NativeSplitter.h>
 #include <Shuffle/NativeWriterInMemory.h>
 #include <Shuffle/PartitionWriter.h>
@@ -53,12 +54,10 @@
 #include <Poco/Logger.h>
 #include <Poco/StringTokenizer.h>
 #include <Common/CHUtil.h>
+#include <Common/ErrorCodes.h>
 #include <Common/ExceptionUtils.h>
 #include <Common/JNIUtils.h>
 #include <Common/QueryContext.h>
-#include <Common/ErrorCodes.h>
-#include <Processors/Executors/PipelineExecutor.h>
-#include <Storages/Cache/CacheManager.h>
 
 #ifdef __cplusplus
 namespace DB
@@ -515,18 +514,15 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
-JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, 
jobject obj, jlong block_address, jint column_position)
+JNIEXPORT jobject
+Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, 
jobject obj, jlong block_address, jint column_position)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
     auto col = getColumnFromColumnVector(env, obj, block_address, 
column_position);
     if (!col.column->isNullable())
     {
-        jobject block_stats = env->NewObject(
-            block_stats_class,
-            block_stats_constructor,
-            block->rows(),
-            false);
+        jobject block_stats = env->NewObject(block_stats_class, 
block_stats_constructor, block->rows(), false);
         return block_stats;
     }
     else
@@ -535,10 +531,7 @@ JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockSta
         const auto & null_map_data = nullable->getNullMapData();
 
         jobject block_stats = env->NewObject(
-            block_stats_class,
-            block_stats_constructor,
-            block->rows(),
-            !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()));
+            block_stats_class, block_stats_constructor, block->rows(), 
!DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()));
         return block_stats;
     }
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
@@ -573,12 +566,8 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIE
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
-local_engine::SplitterHolder * buildAndExecuteShuffle(JNIEnv * env,
-        jobject iter,
-        const String & name,
-        const local_engine::SplitOptions& options,
-        jobject rss_pusher = nullptr
-        )
+local_engine::SplitterHolder * buildAndExecuteShuffle(
+    JNIEnv * env, jobject iter, const String & name, const 
local_engine::SplitOptions & options, jobject rss_pusher = nullptr)
 {
     auto current_executor = local_engine::LocalExecutor::getCurrentExecutor();
     local_engine::SplitterHolder * splitter = nullptr;
@@ -592,7 +581,8 @@ local_engine::SplitterHolder * 
buildAndExecuteShuffle(JNIEnv * env,
         {
             /// Try to decide header from the first block read from Java 
iterator.
             auto header = first_block.value().cloneEmpty();
-            splitter = new local_engine::SplitterHolder{.exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(header, name, options, 
rss_pusher)};
+            splitter = new local_engine::SplitterHolder{
+                .exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(header, name, options, 
rss_pusher)};
             splitter->exchange_manager->initSinks(1);
             splitter->exchange_manager->pushBlock(first_block.value());
             first_block = std::nullopt;
@@ -604,14 +594,18 @@ local_engine::SplitterHolder * 
buildAndExecuteShuffle(JNIEnv * env,
         }
         else
             // empty iterator
-            splitter = new local_engine::SplitterHolder{.exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(DB::Block(), name, 
options, rss_pusher)};
+            splitter = new local_engine::SplitterHolder{
+                .exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(DB::Block(), name, 
options, rss_pusher)};
     }
     else
     {
-        splitter = new local_engine::SplitterHolder{.exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(current_executor.value()->getHeader().cloneEmpty(),
 name, options, rss_pusher)};
+        splitter = new local_engine::SplitterHolder{
+            .exchange_manager = 
std::make_unique<local_engine::SparkExchangeManager>(
+                current_executor.value()->getHeader().cloneEmpty(), name, 
options, rss_pusher)};
         // TODO support multiple sinks
         splitter->exchange_manager->initSinks(1);
-        current_executor.value()->setSinks([&](auto & pipeline_builder) { 
splitter->exchange_manager->setSinksToPipeline(pipeline_builder);});
+        current_executor.value()->setSinks([&](auto & pipeline_builder)
+                                           { 
splitter->exchange_manager->setSinksToPipeline(pipeline_builder); });
         // execute pipeline
         current_executor.value()->execute();
     }
@@ -775,8 +769,7 @@ JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_
         result.total_serialize_time,
         result.total_rows,
         result.total_blocks,
-        result.wall_time
-        );
+        result.wall_time);
 
     return split_result;
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to