This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_rec2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ef0ff57babbaa0d510e9f636fe596ed8d5494dcf
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Sep 18 21:02:33 2025 +0800

    recursive cte be part
---
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +
 be/src/pipeline/exec/exchange_source_operator.h    |  19 +
 be/src/pipeline/exec/operator.cpp                  |  10 +
 be/src/pipeline/exec/operator.h                    |   2 +
 .../pipeline/exec/rec_cte_anchor_sink_operator.cpp |  56 ++
 .../pipeline/exec/rec_cte_anchor_sink_operator.h   | 114 +++
 be/src/pipeline/exec/rec_cte_scan_operator.h       |  87 ++
 be/src/pipeline/exec/rec_cte_sink_operator.cpp     |  55 ++
 be/src/pipeline/exec/rec_cte_sink_operator.h       | 101 +++
 be/src/pipeline/exec/rec_cte_source_operator.cpp   |  86 ++
 be/src/pipeline/exec/rec_cte_source_operator.h     | 228 +++++
 be/src/pipeline/exec/union_sink_operator.h         |  42 +-
 be/src/pipeline/pipeline.h                         |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 236 +++--
 be/src/pipeline/pipeline_fragment_context.h        |  18 +-
 be/src/pipeline/pipeline_task.cpp                  |  25 +-
 be/src/pipeline/rec_cte_shared_state.h             | 175 ++++
 be/src/runtime/fragment_mgr.cpp                    |  60 ++
 be/src/runtime/fragment_mgr.h                      |  11 +
 be/src/runtime/query_context.cpp                   |  47 +
 be/src/runtime/query_context.h                     |  28 +-
 be/src/runtime/runtime_predicate.h                 |   1 +
 be/src/runtime/runtime_state.cpp                   |  16 +-
 be/src/runtime/runtime_state.h                     |  19 +-
 be/src/runtime_filter/runtime_filter_consumer.h    |  15 +-
 be/src/runtime_filter/runtime_filter_merger.h      |   2 +
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  22 +-
 be/src/runtime_filter/runtime_filter_mgr.h         |  26 +-
 be/src/service/internal_service.cpp                |  51 ++
 be/src/service/internal_service.h                  |  10 +
 .../runtime_filter_consumer_test.cpp               |  20 +-
 be/test/runtime_filter/runtime_filter_mgr_test.cpp |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 +-
 gensrc/proto/internal_service.proto                |  42 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 .../data/rec_cte_p0/rec_cte/rec_cte.out            | 953 +++++++++++++++++++++
 .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.out    |  29 +
 .../rec_cte_from_duckdb_doc.out                    |  30 +
 .../rec_cte_from_mysql_doc.out                     |  42 +
 .../suites/rec_cte_p0/rec_cte/rec_cte.groovy       | 271 ++++++
 .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy | 144 ++++
 .../rec_cte_from_duckdb_doc.groovy                 | 185 ++++
 .../rec_cte_from_mysql_doc.groovy                  | 140 +++
 43 files changed, 3316 insertions(+), 121 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index b31b193aff2..8762b40fa40 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) {
     _is_closed = true;
     return OperatorX<ExchangeLocalState>::close(state);
 }
+
+Status ExchangeSourceOperatorX::reset(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state.reset(state);
+}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 3008217e130..03f2a288cdf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -48,6 +48,23 @@ public:
     Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
+    Status reset(RuntimeState* state) {
+        if (stream_recvr) {
+            stream_recvr->close();
+        }
+        create_stream_recvr(state);
+
+        is_ready = false;
+        num_rows_skipped = 0;
+
+        const auto& queues = stream_recvr->sender_queues();
+        for (size_t i = 0; i < queues.size(); i++) {
+            deps[i]->block();
+            queues[i]->set_dependency(deps[i]);
+        }
+        return Status::OK();
+    }
+
     std::vector<Dependency*> dependencies() const override {
         std::vector<Dependency*> dep_vec;
         std::for_each(deps.begin(), deps.end(),
@@ -83,6 +100,8 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
 
+    Status reset(RuntimeState* state);
+
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     std::string debug_string(int indentation_level = 0) const override;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 0eea8c7e157..ceaa9148b23 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,6 +62,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
 DECLARE_OPERATOR(DictSinkLocalState)
+DECLARE_OPERATOR(RecCTESinkLocalState)
+DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(RecCTESourceLocalState)
+DECLARE_OPERATOR(RecCTEScanLocalState)
 
 #ifdef BE_TEST
 DECLARE_OPERATOR(MockLocalState)
@@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
 template class PipelineXSinkLocalState<DataQueueSharedState>;
+template class PipelineXSinkLocalState<RecCTESharedState>;
 
 template class PipelineXLocalState<HashJoinSharedState>;
 template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -888,6 +897,7 @@ template class 
PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
 template class PipelineXLocalState<LocalExchangeSharedState>;
 template class PipelineXLocalState<BasicSharedState>;
+template class PipelineXLocalState<RecCTESharedState>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d5630fd4aa6..7492ee75ae9 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -610,6 +610,8 @@ public:
     // For agg/sort/join sink.
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
+    virtual bool need_rerun(RuntimeState* state) const { return false; }
+
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
                                       const bool use_global_hash_shuffle,
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
new file mode 100644
index 00000000000..f1552ed8e5e
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[0];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    _name = "REC_CTE_ANCHOR_SINK_OPERATOR";
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
new file mode 100644
index 00000000000..340949a2f79
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTEAnchorSinkOperatorX;
+class RecCTEAnchorSinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState);
+    RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+    bool is_blockable() const override { return true; }
+
+private:
+    friend class RecCTEAnchorSinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTEAnchorSinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
+        : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>;
+
+    friend class RecCTEAnchorSinkLocalState;
+    RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                              const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTEAnchorSinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (_need_notify_rec_side_ready) {
+            
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state,
 0));
+            _need_notify_rec_side_ready = false;
+        }
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->anchor_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<RecCTESharedState>();
+        ss->id = operator_id();
+        for (const auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+
+    bool _need_notify_rec_side_ready = true;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h 
b/be/src/pipeline/exec/rec_cte_scan_operator.h
new file mode 100644
index 00000000000..661d97927d1
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_scan_operator.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/exec/operator.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+class RecCTEScanOperatorX;
+class RecCTEScanLocalState final : public PipelineXLocalState<> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEScanLocalState);
+
+    RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PipelineXLocalState<>(state, parent) {
+        _scan_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                     _parent->get_name() + 
"_DEPENDENCY");
+        
state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), 
parent->node_id(),
+                                                 this);
+    }
+    ~RecCTEScanLocalState() override {
+        
state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(),
+                                                     parent()->node_id());
+    }
+
+    Status add_block(const PBlock& pblock) {
+        vectorized::Block block;
+        RETURN_IF_ERROR(block.deserialize(pblock));
+        _blocks.emplace_back(std::move(block));
+        return Status::OK();
+    }
+
+    void set_ready() { _scan_dependency->set_ready(); }
+
+    std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
+
+private:
+    friend class RecCTEScanOperatorX;
+    std::vector<vectorized::Block> _blocks;
+    DependencySPtr _scan_dependency = nullptr;
+};
+
+class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> {
+public:
+    RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                        const DescriptorTbl& descs)
+            : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) 
{}
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (local_state._blocks.empty()) {
+            *eos = true;
+            return Status::OK();
+        }
+        *block = std::move(local_state._blocks.back());
+        RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+        local_state._blocks.pop_back();
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
new file mode 100644
index 00000000000..1508b478bca
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTESinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_sink_operator.h
new file mode 100644
index 00000000000..2321ba16ea3
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESinkOperatorX;
+class RecCTESinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESinkLocalState);
+    RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+private:
+    friend class RecCTESinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTESinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTESinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<RecCTESinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTESinkLocalState>;
+
+    friend class RecCTESinkLocalState;
+    RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                        const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTESinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool need_rerun(RuntimeState* state) const override {
+        return get_local_state(state)._shared_state->ready_to_return == false;
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->source_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp 
b/be/src/pipeline/exec/rec_cte_source_operator.cpp
new file mode 100644
index 00000000000..9a02bb947b8
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_source_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, 
OperatorXBase* parent)
+        : Base(state, parent) {}
+
+Status RecCTESourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    return Status::OK();
+}
+
+Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
+    RETURN_IF_ERROR(Base::init(state, info));
+    _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets;
+    _shared_state->max_recursion_depth =
+            _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth;
+    _shared_state->source_dep = _dependency;
+    _anchor_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                   _parent->get_name() + 
"_ANCHOR_DEPENDENCY");
+    _shared_state->anchor_dep = _anchor_dependency.get();
+
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) {
+        _shared_state->agg_data = std::make_unique<DistinctDataVariants>();
+        
RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(),
+                                                               
get_data_types(_child_expr), false));
+    }
+
+    _shared_state->hash_table_compute_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
+    _shared_state->hash_table_emplace_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
+    _shared_state->hash_table_input_counter =
+            ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", 
TUnit::UNIT);
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+
+    _max_recursion_depth = state->cte_max_recursion_depth();
+
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h 
b/be/src/pipeline/exec/rec_cte_source_operator.h
new file mode 100644
index 00000000000..e36358161d2
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.h
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <gen_cpp/internal_service.pb.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "util/brpc_client_cache.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESourceOperatorX;
+class RecCTESourceLocalState final : public 
PipelineXLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESourceLocalState);
+    using Base = PipelineXLocalState<RecCTESharedState>;
+    using Parent = RecCTESourceOperatorX;
+    RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent);
+
+    Status open(RuntimeState* state) override;
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    bool is_blockable() const override { return true; }
+
+    std::vector<Dependency*> dependencies() const override {
+        return std::vector<Dependency*> {_dependency, 
_anchor_dependency.get()};
+    }
+
+private:
+    friend class RecCTESourceOperatorX;
+    friend class OperatorX<RecCTESourceLocalState>;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    std::shared_ptr<Dependency> _anchor_dependency = nullptr;
+};
+
+class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
+public:
+    using Base = OperatorX<RecCTESourceLocalState>;
+    RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                          const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs),
+              _is_union_all(tnode.rec_cte_node.is_union_all),
+              _targets(tnode.rec_cte_node.targets),
+              _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset),
+              _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids),
+              
_is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) {
+        DCHECK(tnode.__isset.rec_cte_node);
+    }
+    ~RecCTESourceOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    Status terminate(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::terminate(state);
+    }
+
+    Status close(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::close(state);
+    }
+
+    Status set_child(OperatorPtr child) override {
+        Base::_child = child;
+        return Status::OK();
+    }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+        auto& ctx = local_state._shared_state;
+        ctx->update_ready_to_return();
+
+        if (!ctx->ready_to_return) {
+            if (ctx->current_round + 1 > _max_recursion_depth) {
+                return Status::Aborted("reach cte_max_recursion_depth {}", 
_max_recursion_depth);
+            }
+
+            ctx->source_dep->block();
+            // ctx->blocks.size() may be changed after _recursive_process
+            int current_blocks_size = int(ctx->blocks.size());
+            RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset));
+            ctx->current_round++;
+            ctx->last_round_offset = current_blocks_size;
+        } else {
+            if (ctx->blocks.empty()) {
+                *eos = true;
+            } else {
+                block->swap(ctx->blocks.back());
+                RETURN_IF_ERROR(
+                        local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+                ctx->blocks.pop_back();
+            }
+        }
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+
+private:
+    Status _send_close(RuntimeState* state) {
+        if (!_aready_send_close && !_is_used_by_other_rec_cte) {
+            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::close));
+            _aready_send_close = true;
+
+            auto* round_counter = 
ADD_COUNTER(get_local_state(state).Base::custom_profile(),
+                                              "RecursiveRound", TUnit::UNIT);
+            
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
+        }
+        return Status::OK();
+    }
+
+    Status _recursive_process(RuntimeState* state, size_t last_round_offset) 
const {
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::wait));
+        RETURN_IF_ERROR(_send_reset_global_rf(state));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::release));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::rebuild));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::submit));
+        
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
+                state, last_round_offset));
+        return Status::OK();
+    }
+
+    Status _send_reset_global_rf(RuntimeState* state) const {
+        TNetworkAddress addr;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+        auto stub =
+                
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr);
+        PResetGlobalRfParams request;
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        for (auto filter_id : _global_rf_ids) {
+            request.add_filter_ids(filter_id);
+        }
+
+        PResetGlobalRfResult result;
+        brpc::Controller controller;
+        controller.set_timeout_ms(
+                
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+        stub->reset_global_rf(&controller, &request, &result, 
brpc::DoNothing());
+        brpc::Join(controller.call_id());
+        return Status::create(result.status());
+    }
+
+    Status _send_rerun_fragments(RuntimeState* state, 
PRerunFragmentParams_Opcode stage) const {
+        for (auto fragment : _fragments_to_reset) {
+            if (state->fragment_id() == fragment.fragment_id) {
+                return Status::InternalError("Fragment {} contains a recursive 
CTE node",
+                                             fragment.fragment_id);
+            }
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            fragment.addr);
+
+            PRerunFragmentParams request;
+            
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+            request.set_fragment_id(fragment.fragment_id);
+            request.set_stage(stage);
+
+            PRerunFragmentResult result;
+            brpc::Controller controller;
+            controller.set_timeout_ms(
+                    
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+            stub->rerun_fragment(&controller, &request, &result, 
brpc::DoNothing());
+            brpc::Join(controller.call_id());
+            if (controller.Failed()) {
+                return Status::InternalError(controller.ErrorText());
+            }
+
+            RETURN_IF_ERROR(Status::create(result.status()));
+        }
+        return Status::OK();
+    }
+
+    friend class RecCTESourceLocalState;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    const bool _is_union_all = false;
+    std::vector<TRecCTETarget> _targets;
+    std::vector<TRecCTEResetInfo> _fragments_to_reset;
+    std::vector<int> _global_rf_ids;
+
+    int _max_recursion_depth = 0;
+
+    bool _aready_send_close = false;
+
+    bool _is_used_by_other_rec_cte = false;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 06b95cdf92d..764f8099427 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -29,6 +29,24 @@ namespace doris {
 #include "common/compile_check_begin.h"
 class RuntimeState;
 
+inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs,
+                                vectorized::Block* src_block, 
vectorized::Block* res_block,
+                                bool need_clone) {
+    vectorized::ColumnsWithTypeAndName columns;
+    auto rows = src_block->rows();
+    for (const auto& expr : exprs) {
+        int result_column_id = -1;
+        RETURN_IF_ERROR(expr->execute(src_block, &result_column_id));
+        const auto& src_col_with_type = 
src_block->get_by_position(result_column_id);
+        vectorized::ColumnPtr cloned_col = need_clone
+                                                   ? 
src_col_with_type.column->clone_resized(rows)
+                                                   : src_col_with_type.column;
+        columns.emplace_back(cloned_col, src_col_with_type.type, 
src_col_with_type.name);
+    }
+    *res_block = {columns};
+    return Status::OK();
+}
+
 namespace pipeline {
 class DataQueue;
 
@@ -153,27 +171,17 @@ private:
                     
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
                                                                                
row_descriptor());
             vectorized::Block res;
-            RETURN_IF_ERROR(materialize_block(state, input_block, child_id, 
&res));
+            auto& local_state = get_local_state(state);
+            {
+                SCOPED_TIMER(local_state._expr_timer);
+                RETURN_IF_ERROR(
+                        materialize_block(local_state._child_expr, 
input_block, &res, false));
+            }
+            local_state._child_row_idx += res.rows();
             RETURN_IF_ERROR(mblock.merge(res));
         }
         return Status::OK();
     }
-
-    Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
-                             vectorized::Block* res_block) {
-        auto& local_state = get_local_state(state);
-        SCOPED_TIMER(local_state._expr_timer);
-        const auto& child_exprs = local_state._child_expr;
-        vectorized::ColumnsWithTypeAndName colunms;
-        for (size_t i = 0; i < child_exprs.size(); ++i) {
-            int result_column_id = -1;
-            RETURN_IF_ERROR(child_exprs[i]->execute(src_block, 
&result_column_id));
-            colunms.emplace_back(src_block->get_by_position(result_column_id));
-        }
-        local_state._child_row_idx += src_block->rows();
-        *res_block = {colunms};
-        return Status::OK();
-    }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 1d0ddcf178e..2a20a5cd73d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -133,7 +133,7 @@ public:
             fmt::format_to(debug_string_buffer, "\n{}", 
_operators[i]->debug_string(i));
         }
         fmt::format_to(debug_string_buffer, "\n{}",
-                       _sink->debug_string(cast_set<int>(_operators.size())));
+                       _sink ? 
_sink->debug_string(cast_set<int>(_operators.size())) : "null");
         return fmt::to_string(debug_string_buffer);
     }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b5c2f7084dc..ba7937e9522 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,6 +82,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext(
           _is_report_on_cancel(true),
           _report_status_cb(std::move(report_status_cb)),
           _params(request),
-          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0) {
+          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0),
+          _need_notify_close(request.__isset.need_notify_close ? 
request.need_notify_close
+                                                               : false) {
     _fragment_watcher.start();
 }
 
@@ -142,23 +148,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id);
-    // The memory released by the query end is recorded in the query mem 
tracker.
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
-    auto st = _query_ctx->exec_status();
-    for (size_t i = 0; i < _tasks.size(); i++) {
-        if (!_tasks[i].empty()) {
-            _call_back(_tasks[i].front().first->runtime_state(), &st);
-        }
-    }
-    _tasks.clear();
-    _dag.clear();
-    _pip_id_to_pipeline.clear();
-    _pipelines.clear();
-    _sink.reset();
-    _root_op.reset();
+    _release_resource();
     _runtime_state.reset();
-    _runtime_filter_mgr_map.clear();
-    _op_id_to_shared_state.clear();
     _query_ctx.reset();
 }
 
@@ -253,6 +244,54 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     return pipeline;
 }
 
+Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* 
thread_pool) {
+    {
+        SCOPED_TIMER(_build_pipelines_timer);
+        // 2. Build pipelines with operators in this fragment.
+        auto root_pipeline = add_pipeline();
+        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
+                                         &_root_op, root_pipeline));
+
+        // 3. Create sink operator
+        if (!_params.fragment.__isset.output_sink) {
+            return Status::InternalError("No output sink in this fragment!");
+        }
+        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
+                                          _params.fragment.output_exprs, 
_params,
+                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
+                                          *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
+        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+        for (PipelinePtr& pipeline : _pipelines) {
+            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
+            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
+        }
+    }
+    // 4. Build local exchanger
+    if (_runtime_state->enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_exchanger_timer);
+        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
+                                             
_params.bucket_seq_to_instance_idx,
+                                             
_params.shuffle_idx_to_instance_idx));
+    }
+
+    // 5. Initialize global states in pipelines.
+    for (PipelinePtr& pipeline : _pipelines) {
+        SCOPED_TIMER(_prepare_all_pipelines_timer);
+        pipeline->children().clear();
+        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
+    }
+
+    {
+        SCOPED_TIMER(_build_tasks_timer);
+        // 6. Build pipeline tasks and initialize local state.
+        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
+    }
+
+    return Status::OK();
+}
+
 Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
     if (_prepared) {
         return Status::InternalError("Already prepared");
@@ -277,7 +316,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
         auto* fragment_context = this;
 
-        if (_params.query_options.__isset.is_report_success) {
+        if (!_need_notify_close && 
_params.query_options.__isset.is_report_success) {
             
fragment_context->set_is_report_success(_params.query_options.is_report_success);
         }
 
@@ -322,49 +361,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
         }
     }
 
-    {
-        SCOPED_TIMER(_build_pipelines_timer);
-        // 2. Build pipelines with operators in this fragment.
-        auto root_pipeline = add_pipeline();
-        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
-                                         &_root_op, root_pipeline));
-
-        // 3. Create sink operator
-        if (!_params.fragment.__isset.output_sink) {
-            return Status::InternalError("No output sink in this fragment!");
-        }
-        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
-                                          _params.fragment.output_exprs, 
_params,
-                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
-                                          *_desc_tbl, root_pipeline->id()));
-        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
-        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
-
-        for (PipelinePtr& pipeline : _pipelines) {
-            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
-            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
-        }
-    }
-    // 4. Build local exchanger
-    if (_runtime_state->enable_local_shuffle()) {
-        SCOPED_TIMER(_plan_local_exchanger_timer);
-        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
-                                             
_params.bucket_seq_to_instance_idx,
-                                             
_params.shuffle_idx_to_instance_idx));
-    }
-
-    // 5. Initialize global states in pipelines.
-    for (PipelinePtr& pipeline : _pipelines) {
-        SCOPED_TIMER(_prepare_all_pipelines_timer);
-        pipeline->children().clear();
-        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
-    }
-
-    {
-        SCOPED_TIMER(_build_tasks_timer);
-        // 6. Build pipeline tasks and initialize local state.
-        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
-    }
+    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
 
     _init_next_report_time();
 
@@ -374,6 +371,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
 Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) 
{
     _total_tasks = 0;
+    _closed_tasks = 0;
     const auto target_size = _params.local_params.size();
     _tasks.resize(target_size);
     _runtime_filter_mgr_map.resize(target_size);
@@ -422,6 +420,10 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
 
                     
task_runtime_state->set_task_execution_context(shared_from_this());
                     
task_runtime_state->set_be_number(local_params.backend_num);
+                    if (_need_notify_close) {
+                        // rec cte require child rf to wait infinitely to make 
sure all rpc done
+                        task_runtime_state->set_force_make_rf_wait_infinite();
+                    }
 
                     if (_params.__isset.backend_id) {
                         task_runtime_state->set_backend_id(_params.backend_id);
@@ -1645,6 +1647,42 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::REC_CTE_NODE: {
+        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (!_dag.contains(downstream_pipeline_id)) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+
+        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
+
+        DataSinkOperatorPtr anchor_sink;
+        anchor_sink = 
std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
+                                                                  
op->operator_id(), tnode, descs);
+        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
+        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
+
+        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
+
+        DataSinkOperatorPtr rec_sink;
+        rec_sink = 
std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), 
op->operator_id(),
+                                                         tnode, descs);
+        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
+        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
+
+        break;
+    }
+    case TPlanNodeType::REC_CTE_SCAN_NODE: {
+        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(tnode.node_type));
@@ -1750,7 +1788,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
     if (_is_fragment_instance_closed) {
         return;
     }
-    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
+    Defer defer_op {[&]() {
+        _is_fragment_instance_closed = true;
+        _close_cv.notify_all();
+    }};
     
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // Print profile content in info log is a tempoeray solution for stream 
load and external_connector.
@@ -1785,8 +1826,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
                                          
collect_realtime_load_channel_profile());
     }
 
-    // all submitted tasks done
-    _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    if (!_need_notify_close) {
+        // all submitted tasks done
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
 }
 
 void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
@@ -1923,8 +1966,10 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
 }
 
 std::string PipelineFragmentContext::debug_string() {
+    std::lock_guard<std::mutex> l(_task_mutex);
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
+    fmt::format_to(debug_string_buffer, "PipelineFragmentContext 
Info:\nneed_notify_close: {}\n",
+                   _need_notify_close);
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
@@ -1998,5 +2043,66 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
                                                       
_runtime_state->profile_level());
     return load_channel_profile;
 }
+
+Status PipelineFragmentContext::wait_close(bool close) {
+    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
+        return Status::InternalError("stream load do not support reset");
+    }
+    if (!_need_notify_close) {
+        return Status::InternalError("_need_notify_close is false, do not 
support reset");
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_task_mutex);
+        _close_cv.wait(lock, [this] { return 
_is_fragment_instance_closed.load(); });
+    }
+
+    if (close) {
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::set_to_rerun() {
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+        for (size_t i = 0; i < _tasks.size(); i++) {
+            if (!_tasks[i].empty()) {
+                _tasks[i].front().first->runtime_state()->reset_to_rerun();
+            }
+        }
+    }
+    _release_resource();
+    _runtime_state->reset_to_rerun();
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
+    _submitted = false;
+    _is_fragment_instance_closed = false;
+    return _build_and_prepare_full_pipeline(thread_pool);
+}
+
+void PipelineFragmentContext::_release_resource() {
+    std::lock_guard<std::mutex> l(_task_mutex);
+    // The memory released by the query end is recorded in the query mem 
tracker.
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+    auto st = _query_ctx->exec_status();
+    for (size_t i = 0; i < _tasks.size(); i++) {
+        if (!_tasks[i].empty()) {
+            _call_back(_tasks[i].front().first->runtime_state(), &st);
+        }
+    }
+    _tasks.clear();
+    _dag.clear();
+    _pip_id_to_pipeline.clear();
+    _pipelines.clear();
+    _sink.reset();
+    _root_op.reset();
+    _runtime_filter_mgr_map.clear();
+    _op_id_to_shared_state.clear();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..40bb80f72ec 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -115,6 +115,9 @@ public:
     [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
 
     void clear_finished_tasks() {
+        if (_need_notify_close) {
+            return;
+        }
         for (size_t j = 0; j < _tasks.size(); j++) {
             for (size_t i = 0; i < _tasks[j].size(); i++) {
                 _tasks[j][i].first->stop_if_finished();
@@ -125,7 +128,17 @@ public:
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
+    Status wait_close(bool close);
+    Status rebuild(ThreadPool* thread_pool);
+    Status set_to_rerun();
+
+    bool need_notify_close() const { return _need_notify_close; }
+
 private:
+    void _release_resource();
+
+    Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
+
     Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, 
OperatorPtr* root,
                             PipelinePtr cur_pipe);
     Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& 
tnodes,
@@ -182,6 +195,7 @@ private:
     Pipelines _pipelines;
     PipelineId _next_pipeline_id = 0;
     std::mutex _task_mutex;
+    std::condition_variable _close_cv;
     int _closed_tasks = 0;
     // After prepared, `_total_tasks` is equal to the size of `_tasks`.
     // When submit fail, `_total_tasks` is equal to the number of tasks 
submitted.
@@ -203,7 +217,7 @@ private:
     RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 
     std::function<void(RuntimeState*, Status*)> _call_back;
-    bool _is_fragment_instance_closed = false;
+    std::atomic_bool _is_fragment_instance_closed = false;
 
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
@@ -323,6 +337,8 @@ private:
 
     TPipelineFragmentParams _params;
     int32_t _parallel_instances = 0;
+
+    bool _need_notify_close = false;
 };
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 930e885fc3b..edc56332058 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -30,6 +30,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/pipeline.h"
@@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) {
                 }
             }
 
-            if (_eos) {
-                RETURN_IF_ERROR(close(Status::OK(), false));
-            }
-
             DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
                 auto required_pipeline_id =
                         
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) {
             RETURN_IF_ERROR(block->check_type_and_column());
             status = _sink->sink(_state, block, _eos);
 
+            if (_eos) {
+                if (_sink->need_rerun(_state)) {
+                    if (auto* source = 
dynamic_cast<ExchangeSourceOperatorX*>(_root);
+                        source != nullptr) {
+                        RETURN_IF_ERROR(source->reset(_state));
+                        _eos = false;
+                    } else {
+                        return Status::InternalError(
+                                "Only ExchangeSourceOperatorX can be rerun, 
real is {}",
+                                _root->get_name());
+                    }
+                } else {
+                    RETURN_IF_ERROR(close(Status::OK(), false));
+                }
+            }
+
             if (status.is<ErrorCode::END_OF_FILE>()) {
                 set_wake_up_early();
                 return Status::OK();
@@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, 
std::unique_lock<std::mutex>& /* d
     _blocked_dep = nullptr;
     auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this());
     RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE));
-    
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    if (auto f = _fragment_context.lock(); f) {
+        
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/rec_cte_shared_state.h 
b/be/src/pipeline/rec_cte_shared_state.h
new file mode 100644
index 00000000000..17fb00c82f5
--- /dev/null
+++ b/be/src/pipeline/rec_cte_shared_state.h
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "dependency.h"
+#include "pipeline/common/distinct_agg_utils.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+struct RecCTESharedState : public BasicSharedState {
+    std::vector<TRecCTETarget> targets;
+    std::vector<vectorized::Block> blocks;
+    vectorized::IColumn::Selector distinct_row;
+    Dependency* source_dep = nullptr;
+    Dependency* anchor_dep = nullptr;
+    vectorized::Arena arena;
+    RuntimeProfile::Counter* hash_table_compute_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_emplace_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_input_counter = nullptr;
+
+    std::unique_ptr<DistinctDataVariants> agg_data = nullptr;
+
+    int current_round = 0;
+    int last_round_offset = 0;
+    int max_recursion_depth = 0;
+    bool ready_to_return = false;
+
+    void update_ready_to_return() {
+        if (last_round_offset == blocks.size()) {
+            ready_to_return = true;
+        }
+    }
+
+    Status emplace_block(RuntimeState* state, vectorized::Block&& block) {
+        if (agg_data) {
+            auto num_rows = uint32_t(block.rows());
+            vectorized::ColumnRawPtrs raw_columns;
+            std::vector<vectorized::ColumnPtr> columns = 
block.get_columns_and_convert();
+            for (auto& col : columns) {
+                raw_columns.push_back(col.get());
+            }
+
+            std::visit(vectorized::Overload {
+                               [&](std::monostate& arg) -> void {
+                                   throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                          "uninited hash 
table");
+                               },
+                               [&](auto& agg_method) -> void {
+                                   SCOPED_TIMER(hash_table_compute_timer);
+                                   using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                                   using AggState = typename 
HashMethodType::State;
+
+                                   AggState agg_state(raw_columns);
+                                   
agg_method.init_serialized_keys(raw_columns, num_rows);
+                                   distinct_row.clear();
+
+                                   size_t row = 0;
+                                   auto creator = [&](const auto& ctor, auto& 
key, auto& origin) {
+                                       HashMethodType::try_presis_key(key, 
origin, arena);
+                                       ctor(key);
+                                       distinct_row.push_back(row);
+                                   };
+                                   auto creator_for_null_key = [&]() {
+                                       distinct_row.push_back(row);
+                                   };
+
+                                   SCOPED_TIMER(hash_table_emplace_timer);
+                                   for (; row < num_rows; ++row) {
+                                       agg_method.lazy_emplace(agg_state, row, 
creator,
+                                                               
creator_for_null_key);
+                                   }
+                                   COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
+                               }},
+                       agg_data->method_variant);
+
+            if (distinct_row.size() == block.rows()) {
+                blocks.emplace_back(std::move(block));
+            } else if (!distinct_row.empty()) {
+                auto distinct_block = 
vectorized::MutableBlock(block.clone_empty());
+                
RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, 
distinct_row));
+                blocks.emplace_back(distinct_block.to_block());
+            }
+        } else {
+            blocks.emplace_back(std::move(block));
+        }
+        return Status::OK();
+    }
+
+    PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state,
+                                                 const TRecCTETarget& target) 
const {
+        PTransmitRecCTEBlockParams request;
+        request.set_node_id(target.node_id);
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        request.mutable_fragment_instance_id()->CopyFrom(
+                UniqueId(target.fragment_instance_id).to_proto());
+        return request;
+    }
+
+    Status send_data_to_targets(RuntimeState* state, size_t round_offset) 
const {
+        int send_multi_blocks_byte_size = 
state->query_options().exchange_multi_blocks_byte_size;
+        int block_number_per_target =
+                int(blocks.size() - round_offset + targets.size() - 1) / 
targets.size();
+        for (auto target : targets) {
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            target.addr);
+            if (!stub) {
+                return Status::InternalError(fmt::format("Get rpc stub failed, 
host={}, port={}",
+                                                         target.addr.hostname, 
target.addr.port));
+            }
+
+            // send blocks
+            int step = block_number_per_target;
+            while (round_offset < blocks.size() && step > 0) {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                auto current_bytes = 0;
+                while (round_offset < blocks.size() && step > 0 &&
+                       current_bytes < send_multi_blocks_byte_size) {
+                    auto* pblock = request.add_blocks();
+                    size_t uncompressed_bytes = 0;
+                    size_t compressed_bytes = 0;
+                    RETURN_IF_ERROR(blocks[round_offset].serialize(
+                            state->be_exec_version(), pblock, 
&uncompressed_bytes,
+                            &compressed_bytes, 
state->fragement_transmission_compression_type()));
+                    round_offset++;
+                    step--;
+                    current_bytes += compressed_bytes;
+                }
+                request.set_eos(false);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                controller.set_timeout_ms(
+                        
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+
+            // send eos
+            {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                request.set_eos(true);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+        }
+        return Status::OK();
+    }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 47fd9e264bf..f531e1ef2b9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1403,6 +1403,66 @@ Status FragmentMgr::get_query_statistics(const 
TUniqueId& query_id, TQueryStatis
             print_id(query_id), query_stats);
 }
 
+Status FragmentMgr::transmit_rec_cte_block(
+        const TUniqueId& query_id, const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, 
eos);
+    } else {
+        return Status::EndOfFile(
+                "Transmit rec cte block failed: Query context (query-id: {}) 
not found, maybe "
+                "finished",
+                print_id(query_id));
+    }
+}
+
+Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+                                   PRerunFragmentParams_Opcode stage) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment);
+        }
+
+        if (stage == PRerunFragmentParams::wait) {
+            return fragment_ctx->wait_close(false);
+        }
+        if (stage == PRerunFragmentParams::release) {
+            return fragment_ctx->set_to_rerun();
+        }
+        if (stage == PRerunFragmentParams::rebuild) {
+            return fragment_ctx->rebuild(_thread_pool.get());
+        }
+        if (stage == PRerunFragmentParams::submit) {
+            return fragment_ctx->submit();
+        }
+        if (stage == PRerunFragmentParams::close) {
+            return fragment_ctx->wait_close(true);
+        }
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
+Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
+                                    const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->reset_global_rf(filter_ids);
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index b0c1a3ad592..58725a49a63 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -184,6 +184,17 @@ public:
 
     std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
 
+    Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& 
instance_id,
+                                  int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<PBlock>& pblocks,
+                                  bool eos);
+
+    Status rerun_fragment(const TUniqueId& query_id, int fragment,
+                          PRerunFragmentParams_Opcode stage);
+
+    Status reset_global_rf(const TUniqueId& query_id,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     struct BrpcItem {
         TNetworkAddress network_address;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5332b886d58..b3097dde69f 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -34,6 +34,7 @@
 #include "common/status.h"
 #include "olap/olap_common.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() {
                 continue;
             }
 
+            if (fragment_ctx->need_notify_close()) {
+                continue;
+            }
+
             auto profile = fragment_ctx->collect_realtime_profile();
 
             if (profile.empty()) {
@@ -497,4 +502,46 @@ TReportExecStatusParams 
QueryContext::get_realtime_exec_status() {
     return exec_status;
 }
 
+Status QueryContext::send_block_to_cte_scan(
+        const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto it = _cte_scan.find(std::make_pair(instance_id, node_id));
+    if (it == _cte_scan.end()) {
+        return Status::InternalError("RecCTEScan not found for instance {}, 
node {}",
+                                     print_id(instance_id), node_id);
+    }
+    for (const auto& pblock : pblocks) {
+        RETURN_IF_ERROR(it->second->add_block(pblock));
+    }
+    if (eos) {
+        it->second->set_ready();
+    }
+    return Status::OK();
+}
+
+void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                                    pipeline::RecCTEScanLocalState* scan) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for 
instance "
+                                     << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.emplace(key, scan);
+}
+
+void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int 
node_id) {
+    std::lock_guard<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for 
instance "
+                                    << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.erase(key);
+}
+
+Status QueryContext::reset_global_rf(const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (_merge_controller_handler) {
+        return _merge_controller_handler->reset_global_rf(this, filter_ids);
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 32458cdb00e..874e889e368 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -32,6 +32,7 @@
 #include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/object_pool.h"
+#include "common/status.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/runtime_predicate.h"
@@ -48,6 +49,7 @@ namespace pipeline {
 class PipelineFragmentContext;
 class PipelineTask;
 class Dependency;
+class RecCTEScanLocalState;
 } // namespace pipeline
 
 struct ReportStatusRequest {
@@ -161,11 +163,6 @@ public:
         return _query_options.runtime_filter_wait_time_ms;
     }
 
-    bool runtime_filter_wait_infinitely() const {
-        return _query_options.__isset.runtime_filter_wait_infinitely &&
-               _query_options.runtime_filter_wait_infinitely;
-    }
-
     int be_exec_version() const {
         if (!_query_options.__isset.be_exec_version) {
             return 0;
@@ -299,6 +296,23 @@ public:
     void set_first_error_msg(std::string error_msg);
     std::string get_first_error_msg();
 
+    Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks,
+                                  bool eos);
+    void registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                          pipeline::RecCTEScanLocalState* scan);
+    void deregiste_cte_scan(const TUniqueId& instance_id, int node_id);
+
+    std::vector<int> get_fragment_ids() {
+        std::vector<int> fragment_ids;
+        for (const auto& it : _fragment_id_to_pipeline_ctx) {
+            fragment_ids.push_back(it.first);
+        }
+        return fragment_ids;
+    }
+
+    Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     friend class QueryTaskController;
 
@@ -377,6 +391,10 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    // instance id + node id -> cte scan
+    std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
+    std::mutex _cte_scan_lock;
+
 public:
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     void add_fragment_profile(
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index 51c79e1b426..b3e19d089fc 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -55,6 +55,7 @@ public:
 
     void set_detected_source() {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        _orderby_extrem = Field(PrimitiveType::TYPE_NULL);
         _detected_source = true;
     }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 33db0f60e7c..faeda888cfb 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -503,7 +503,7 @@ Status RuntimeState::register_consumer_runtime_filter(
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(_query_ctx, desc, node_id, 
consumer_filter);
+    return mgr->register_consumer_filter(this, desc, node_id, consumer_filter);
 }
 
 bool RuntimeState::is_nereids() const {
@@ -515,12 +515,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::pipeline_id_to_profil
     return _pipeline_id_to_profile;
 }
 
+void RuntimeState::reset_to_rerun() {
+    if (local_runtime_filter_mgr()) {
+        auto filter_ids = local_runtime_filter_mgr()->get_filter_ids();
+        filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids());
+        local_runtime_filter_mgr()->remove_filters(filter_ids);
+        global_runtime_filter_mgr()->remove_filters(filter_ids);
+    }
+    std::unique_lock lc(_pipeline_profile_lock);
+    _pipeline_id_to_profile.clear();
+}
+
 std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::build_pipeline_profile(
         std::size_t pipeline_size) {
     std::unique_lock lc(_pipeline_profile_lock);
     if (!_pipeline_id_to_profile.empty()) {
-        throw Exception(ErrorCode::INTERNAL_ERROR,
-                        "build_pipeline_profile can only be called once.");
+        return _pipeline_id_to_profile;
     }
     _pipeline_id_to_profile.resize(pipeline_size);
     {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3d89f4aa0d4..907f2c50a61 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -503,7 +503,7 @@ public:
         _runtime_filter_mgr = runtime_filter_mgr;
     }
 
-    QueryContext* get_query_ctx() { return _query_ctx; }
+    QueryContext* get_query_ctx() const { return _query_ctx; }
 
     [[nodiscard]] bool low_memory_mode() const;
 
@@ -520,6 +520,12 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    int cte_max_recursion_depth() const {
+        return _query_options.__isset.cte_max_recursion_depth
+                       ? _query_options.cte_max_recursion_depth
+                       : 0;
+    }
+
     int rpc_verbose_profile_max_instance_count() const {
         return _query_options.__isset.rpc_verbose_profile_max_instance_count
                        ? _query_options.rpc_verbose_profile_max_instance_count
@@ -702,6 +708,17 @@ public:
                                       _query_options.hnsw_bounded_queue);
     }
 
+    void reset_to_rerun();
+
+    void set_force_make_rf_wait_infinite() {
+        _query_options.__set_runtime_filter_wait_infinitely(true);
+    }
+
+    bool runtime_filter_wait_infinitely() const {
+        return _query_options.__isset.runtime_filter_wait_infinitely &&
+               _query_options.runtime_filter_wait_infinitely;
+    }
+
 private:
     Status create_error_log_file();
 
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index e0e42e509d4..e9ef68f6d28 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -41,11 +41,11 @@ public:
         APPLIED, // The consumer will switch to this state after the 
expression is acquired
     };
 
-    static Status create(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc, int node_id,
+    static Status create(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id,
                          std::shared_ptr<RuntimeFilterConsumer>* res) {
         *res = std::shared_ptr<RuntimeFilterConsumer>(
-                new RuntimeFilterConsumer(query_ctx, desc, node_id));
-        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&query_ctx->query_options()));
+                new RuntimeFilterConsumer(state, desc, node_id));
+        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&state->query_options()));
         return Status::OK();
     }
 
@@ -86,17 +86,16 @@ public:
     }
 
 private:
-    RuntimeFilterConsumer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                          int node_id)
+    RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id)
             : RuntimeFilter(desc),
               _probe_expr(desc->planId_to_target_expr.find(node_id)->second),
               _registration_time(MonotonicMillis()),
               _rf_state(State::NOT_READY) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
-        bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() ||
+        bool wait_infinitely = state->runtime_filter_wait_infinitely() ||
                                _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER;
-        _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 
1000
-                                           : 
query_ctx->runtime_filter_wait_time_ms();
+        _rf_wait_time_ms = wait_infinitely ? 
state->get_query_ctx()->execution_timeout() * 1000
+                                           : 
state->get_query_ctx()->runtime_filter_wait_time_ms();
         DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index d373bb8be05..42e26b3d9c3 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -78,6 +78,8 @@ public:
         _expected_producer_num = num;
     }
 
+    int get_expected_producer_num() const { return _expected_producer_num; }
+
     bool add_rf_size(uint64_t size) {
         _received_rf_size_num++;
         if (_expected_producer_num < _received_rf_size_num) {
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index ba9e9f5bc9d..dcddc29f06c 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consum
 }
 
 Status RuntimeFilterMgr::register_consumer_filter(
-        const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int 
node_id,
+        const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id,
         std::shared_ptr<RuntimeFilterConsumer>* consumer) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
     std::lock_guard<std::mutex> l(_lock);
-    RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, 
consumer));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, 
consumer));
     _consumer_map[key].push_back(*consumer);
     return Status::OK();
 }
@@ -446,6 +446,24 @@ void 
RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu
     _filter_map.clear();
 }
 
+Status RuntimeFilterMergeControllerEntity::reset_global_rf(
+        QueryContext* query_ctx, const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    for (const auto& filter_id : filter_ids) {
+        GlobalMergeContext* cnt_val;
+        {
+            std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+            cnt_val = &_filter_map[filter_id]; // may inplace construct 
default object
+        }
+        int producer_size = cnt_val->merger->get_expected_producer_num();
+        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, 
&cnt_val->runtime_filter_desc,
+                                                    &cnt_val->merger));
+        cnt_val->merger->set_expected_producer_num(producer_size);
+        cnt_val->arrive_id.clear();
+        cnt_val->source_addrs.clear();
+    }
+    return Status::OK();
+}
+
 std::string RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
     std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 6941e5e5631..a079f3b0ad9 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -83,7 +83,7 @@ public:
 
     // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
-    Status register_consumer_filter(const QueryContext* query_ctx, const 
TRuntimeFilterDesc& desc,
+    Status register_consumer_filter(const RuntimeState* state, const 
TRuntimeFilterDesc& desc,
                                     int node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter);
 
@@ -104,6 +104,27 @@ public:
 
     std::string debug_string();
 
+    std::set<int32_t> get_filter_ids() {
+        std::set<int32_t> ids;
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : _producer_id_set) {
+            ids.insert(id);
+        }
+        for (const auto& kv : _consumer_map) {
+            ids.insert(kv.first);
+        }
+        return ids;
+    }
+
+    void remove_filters(const std::set<int32_t>& filter_ids) {
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : filter_ids) {
+            _consumer_map.erase(id);
+            _local_merge_map.erase(id);
+            _producer_id_set.erase(id);
+        }
+    }
+
 private:
     /**
      * `_is_global = true` means this runtime filter manager menages 
query-level runtime filters.
@@ -152,6 +173,9 @@ public:
 
     void release_undone_filters(QueryContext* query_ctx);
 
+    Status reset_global_rf(QueryContext* query_ctx,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
                            const TRuntimeFilterDesc* runtime_filter_desc,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index daf45179f79..f9f7d2e4460 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1597,6 +1597,57 @@ void 
PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
     }
 }
 
+void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* 
controller,
+                                              const 
PTransmitRecCTEBlockParams* request,
+                                              PTransmitRecCTEBlockResult* 
response,
+                                              google::protobuf::Closure* done) 
{
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block(
+                UniqueId(request->query_id()).to_thrift(),
+                UniqueId(request->fragment_instance_id()).to_thrift(), 
request->node_id(),
+                request->blocks(), request->eos());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::rerun_fragment(google::protobuf::RpcController* 
controller,
+                                      const PRerunFragmentParams* request,
+                                      PRerunFragmentResult* response,
+                                      google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st =
+                
_exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(),
+                                                          
request->fragment_id(), request->stage());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::reset_global_rf(google::protobuf::RpcController* 
controller,
+                                       const PResetGlobalRfParams* request,
+                                       PResetGlobalRfResult* response,
+                                       google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->reset_global_rf(
+                UniqueId(request->query_id()).to_thrift(), 
request->filter_ids());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
 void PInternalService::transmit_block(google::protobuf::RpcController* 
controller,
                                       const PTransmitDataParams* request,
                                       PTransmitDataResult* response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index d73501bfc80..781a7def178 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -140,6 +140,16 @@ public:
                         const ::doris::PPublishFilterRequestV2* request,
                         ::doris::PPublishFilterResponse* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_rec_cte_block(google::protobuf::RpcController* controller,
+                                const PTransmitRecCTEBlockParams* request,
+                                PTransmitRecCTEBlockResult* response,
+                                google::protobuf::Closure* done) override;
+    void rerun_fragment(google::protobuf::RpcController* controller,
+                        const PRerunFragmentParams* request, 
PRerunFragmentResult* response,
+                        google::protobuf::Closure* done) override;
+    void reset_global_rf(google::protobuf::RpcController* controller,
+                         const PResetGlobalRfParams* request, 
PResetGlobalRfResult* response,
+                         google::protobuf::Closure* done) override;
     void transmit_block(::google::protobuf::RpcController* controller,
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 77ee21368c0..e0e352645c3 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -31,7 +31,7 @@ public:
     void test_signal_aquire(TRuntimeFilterDesc desc) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
     const_cast<TQueryOptions&>(_query_ctx->_query_options)
             .__set_runtime_filter_wait_infinitely(true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     desc.__set_src_expr(
@@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
                     .build());
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     {
         desc.__set_has_local_targets(false);
         desc.__set_has_remote_targets(true);
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
         desc.__set_has_local_targets(true);
         desc.__set_has_remote_targets(false);
@@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
 
     
desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr());
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
@@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, 
aquire_signal_at_same_time) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index d8222e201d9..02b1349c1a6 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
         std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
         EXPECT_TRUE(global_runtime_filter_mgr
-                            ->register_consumer_filter(ctx.get(), desc, 0, 
&consumer_filter)
+                            ->register_consumer_filter(&state, desc, 0, 
&consumer_filter)
                             .ok());
         
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7582a7b6b6d..868263be5dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -786,6 +786,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String READ_HIVE_JSON_IN_ONE_COLUMN = 
"read_hive_json_in_one_column";
 
+    public static final String CTE_MAX_RECURSION_DEPTH = 
"cte_max_recursion_depth";
+
     /**
      * Inserting overwrite for auto partition table allows creating partition 
for
      * datas which cannot find partition to overwrite.
@@ -989,6 +991,11 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public int minScanSchedulerConcurrency = 0;
 
+    @VariableMgr.VarAttr(name = CTE_MAX_RECURSION_DEPTH, needForward = true, 
description = {
+            "CTE递归的最大深度,默认值100",
+            "The maximum depth of CTE recursion. Default is 100" })
+    public int cteMaxRecursionDepth = 100;
+
     // By default, the number of Limit items after OrderBy is changed from 
65535 items
     // before v1.2.0 (not included), to return all items by default
     @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT, affectQueryResult = 
true)
@@ -4783,7 +4790,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold);
 
         tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
-
+        tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth);
         tResult.setEnableParallelScan(enableParallelScan);
         tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1ddfbcf2502..5b9c3579590 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -60,6 +60,45 @@ message PTransmitDataResult {
     optional int64 receive_time = 2;
 };
 
+message PTransmitRecCTEBlockParams {
+    repeated PBlock blocks = 1;
+    optional bool eos = 2;
+    optional PUniqueId query_id = 3;
+    optional PUniqueId fragment_instance_id = 4;
+    optional int32 node_id = 5;
+};
+
+message PTransmitRecCTEBlockResult {
+    optional PStatus status = 1;
+};
+
+
+message PRerunFragmentParams {
+    enum Opcode {
+    wait = 1;    // wait fragment execute done
+    release = 2; // release current round's resource
+    rebuild = 3; // rebuild next round pipeline tasks
+    submit = 4;  // submit tasks to execute
+    close = 5;   // close fragment
+    }
+    optional PUniqueId query_id = 1;
+    optional int32 fragment_id = 2;
+    optional Opcode stage = 3;
+};
+
+message PRerunFragmentResult {
+    optional PStatus status = 1;
+};
+
+message PResetGlobalRfParams {
+    optional PUniqueId query_id = 1;
+    repeated int32 filter_ids = 2;
+};
+
+message PResetGlobalRfResult {
+    optional PStatus status = 1;
+};
+
 message PTabletWithPartition {
     required int64 partition_id = 1;
     required int64 tablet_id = 2;
@@ -1107,6 +1146,9 @@ service PBackendService {
     rpc sync_filter_size(PSyncFilterSizeRequest) returns 
(PSyncFilterSizeResponse);
     rpc apply_filterv2(PPublishFilterRequestV2) returns 
(PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
+    rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult);
+    rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult);
+    rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns 
(PTransmitRecCTEBlockResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns 
(PCheckRPCChannelResponse);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index cc018384f24..c4169768584 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -410,6 +410,7 @@ struct TQueryOptions {
 
   175: optional bool enable_fuzzy_blockable_task = false;
   176: optional list<i32> shuffled_agg_ids;
+  177: optional i32 cte_max_recursion_depth;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
diff --git a/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out 
b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out
new file mode 100644
index 00000000000..ba843a71ee5
--- /dev/null
+++ b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out
@@ -0,0 +1,953 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0.5403023058681398
+0.6542897904977791
+0.7013687736227565
+0.7221024250267077
+0.7314040424225098
+0.7356047404363474
+0.7375068905132428
+0.7383692041223232
+0.7387603198742113
+0.7389377567153445
+0.7390182624274122
+0.7390547907469174
+0.7390713652989449
+0.7390788859949921
+0.7390822985224024
+0.7390838469650002
+0.7390845495752126
+0.7390848683867142
+0.7390850130484203
+0.739085078689123
+0.7390851084737987
+0.7390851219886894
+0.7390851281211138
+0.7390851309037207
+0.7390851321663374
+0.7390851327392538
+0.7390851329992164
+0.7390851331171753
+0.7390851331706995
+0.7390851331949863
+0.7390851332060064
+0.7390851332110069
+0.7390851332132758
+0.7390851332143055
+0.7390851332147726
+0.7390851332149846
+0.7390851332150807
+0.7390851332151244
+0.7390851332151441
+0.7390851332151531
+0.7390851332151572
+0.7390851332151591
+0.7390851332151599
+0.7390851332151603
+0.7390851332151605
+0.7390851332151606
+0.7390851332151607
+0.7390851332151608
+0.7390851332151609
+0.7390851332151611
+0.7390851332151617
+0.7390851332151629
+0.7390851332151657
+0.7390851332151718
+0.7390851332151851
+0.7390851332152145
+0.7390851332152792
+0.739085133215422
+0.7390851332157367
+0.7390851332164302
+0.7390851332179587
+0.7390851332213271
+0.7390851332287504
+0.7390851332451103
+0.7390851332811648
+0.7390851333606233
+0.7390851335357372
+0.7390851339216605
+0.7390851347721744
+0.7390851366465718
+0.7390851407774467
+0.7390851498812394
+0.7390851699445544
+0.7390852141609171
+0.7390853116067619
+0.7390855263619245
+0.7390859996481299
+0.7390870426953322
+0.7390893414033927
+0.7390944073790913
+0.7391055719265363
+0.7391301765296711
+0.7391843997714936
+0.7393038923969059
+0.7395672022122561
+0.7401473355678757
+0.7414250866101092
+0.7442373549005569
+0.7504177617637605
+0.7639596829006542
+0.7934803587425656
+0.8575532158463934
+1
+
+-- !sql --
+55
+
+-- !sql --
+1      3
+1      5
+1      8
+2      4
+2      5
+2      10
+2      19
+3      1
+3      5
+3      8
+3      10
+3      24
+5      3
+5      4
+5      8
+5      15
+6      3
+6      4
+6      7
+7      4
+8      1
+9      4
+
+-- !sql --
+1      3
+1      5
+2      4
+2      5
+2      10
+3      1
+3      5
+3      8
+3      10
+5      3
+5      4
+5      8
+5      10
+6      3
+6      4
+6      5
+7      4
+8      1
+8      8
+9      4
+11     1
+12     3
+29     4
+
+-- !sql --
+1      3
+1      5
+2      4
+2      5
+2      10
+3      1
+3      5
+3      8
+3      10
+5      3
+5      4
+5      8
+6      3
+6      4
+7      4
+7      10
+8      1
+8      10
+9      4
+9      5
+9      10
+10     5
+10     8
+10     10
+11     5
+11     8
+11     10
+12     5
+12     8
+12     10
+13     1
+13     5
+13     8
+13     10
+14     1
+14     5
+14     8
+14     10
+15     1
+15     3
+15     5
+15     8
+15     10
+16     1
+16     3
+16     5
+16     8
+16     10
+17     1
+17     3
+17     5
+17     8
+17     10
+18     1
+18     3
+18     5
+18     8
+18     10
+19     1
+19     3
+19     5
+19     8
+19     10
+20     1
+20     3
+20     5
+20     8
+20     10
+21     1
+21     3
+21     5
+21     8
+21     10
+22     1
+22     3
+22     5
+22     8
+22     10
+23     1
+23     3
+23     5
+23     8
+23     10
+24     1
+24     3
+24     5
+24     8
+24     10
+25     1
+25     3
+25     5
+25     8
+25     10
+26     1
+26     3
+26     5
+26     8
+26     10
+27     1
+27     3
+27     5
+27     8
+27     10
+28     1
+28     3
+28     5
+28     8
+28     10
+29     1
+29     3
+29     5
+29     8
+29     10
+30     1
+30     3
+30     5
+30     8
+30     10
+31     1
+31     3
+31     5
+31     8
+31     10
+32     1
+32     3
+32     5
+32     8
+32     10
+33     1
+33     3
+33     5
+33     8
+33     10
+34     1
+34     3
+34     4
+34     5
+34     8
+34     10
+35     1
+35     3
+35     4
+35     5
+35     8
+35     10
+36     1
+36     3
+36     4
+36     5
+36     8
+36     10
+37     1
+37     3
+37     4
+37     5
+37     8
+37     10
+38     1
+38     3
+38     4
+38     5
+38     8
+38     10
+39     1
+39     3
+39     4
+39     5
+39     8
+39     10
+40     1
+40     3
+40     4
+40     5
+40     8
+40     10
+41     1
+41     3
+41     4
+41     5
+41     8
+41     10
+42     1
+42     3
+42     4
+42     5
+42     8
+42     10
+43     1
+43     3
+43     4
+43     5
+43     8
+43     10
+44     1
+44     3
+44     4
+44     5
+44     8
+44     10
+45     1
+45     3
+45     4
+45     5
+45     8
+45     10
+46     1
+46     3
+46     4
+46     5
+46     8
+46     10
+47     1
+47     3
+47     4
+47     5
+47     8
+47     10
+48     1
+48     3
+48     4
+48     5
+48     8
+48     10
+49     1
+49     3
+49     4
+49     5
+49     8
+49     10
+50     1
+50     3
+50     4
+50     5
+50     8
+50     10
+51     1
+51     3
+51     4
+51     5
+51     8
+51     10
+52     1
+52     3
+52     4
+52     5
+52     8
+52     10
+53     1
+53     3
+53     4
+53     5
+53     8
+53     10
+54     1
+54     3
+54     4
+54     5
+54     8
+54     10
+55     1
+55     3
+55     4
+55     5
+55     8
+55     10
+56     1
+56     3
+56     4
+56     5
+56     8
+56     10
+57     1
+57     3
+57     4
+57     5
+57     8
+57     10
+58     1
+58     3
+58     4
+58     5
+58     8
+58     10
+59     1
+59     3
+59     4
+59     5
+59     8
+59     10
+60     1
+60     3
+60     4
+60     5
+60     8
+60     10
+61     1
+61     3
+61     4
+61     5
+61     8
+61     10
+62     1
+62     3
+62     4
+62     5
+62     8
+62     10
+63     1
+63     3
+63     4
+63     5
+63     8
+63     10
+64     1
+64     3
+64     4
+64     5
+64     8
+64     10
+65     1
+65     3
+65     4
+65     5
+65     8
+65     10
+66     1
+66     3
+66     4
+66     5
+66     8
+66     10
+67     1
+67     3
+67     4
+67     5
+67     8
+67     10
+68     1
+68     3
+68     4
+68     5
+68     8
+68     10
+69     1
+69     3
+69     4
+69     5
+69     8
+69     10
+70     1
+70     3
+70     4
+70     5
+70     8
+70     10
+71     1
+71     3
+71     4
+71     5
+71     8
+71     10
+72     1
+72     3
+72     4
+72     5
+72     8
+72     10
+73     1
+73     3
+73     4
+73     5
+73     8
+73     10
+74     1
+74     3
+74     4
+74     5
+74     8
+74     10
+75     1
+75     3
+75     4
+75     5
+75     8
+75     10
+76     1
+76     3
+76     4
+76     5
+76     8
+76     10
+77     1
+77     3
+77     4
+77     5
+77     8
+77     10
+78     1
+78     3
+78     4
+78     5
+78     8
+78     10
+79     1
+79     3
+79     4
+79     5
+79     8
+79     10
+80     1
+80     3
+80     4
+80     5
+80     8
+80     10
+81     1
+81     3
+81     4
+81     5
+81     8
+81     10
+82     1
+82     3
+82     4
+82     5
+82     8
+82     10
+83     1
+83     3
+83     4
+83     5
+83     8
+83     10
+84     1
+84     3
+84     4
+84     5
+84     8
+84     10
+85     1
+85     3
+85     4
+85     5
+85     8
+85     10
+86     1
+86     3
+86     4
+86     5
+86     8
+86     10
+87     1
+87     3
+87     4
+87     5
+87     8
+87     10
+88     1
+88     3
+88     4
+88     5
+88     8
+88     10
+89     1
+89     3
+89     4
+89     5
+89     8
+89     10
+90     1
+90     3
+90     4
+90     5
+90     8
+90     10
+91     1
+91     3
+91     4
+91     5
+91     8
+91     10
+92     1
+92     3
+92     4
+92     5
+92     8
+92     10
+93     1
+93     3
+93     4
+93     5
+93     8
+93     10
+94     1
+94     3
+94     4
+94     5
+94     8
+94     10
+95     1
+95     3
+95     4
+95     5
+95     8
+95     10
+96     1
+96     3
+96     4
+96     5
+96     8
+96     10
+97     1
+97     3
+97     4
+97     5
+97     8
+97     10
+98     1
+98     3
+98     4
+98     5
+98     8
+98     10
+99     1
+99     3
+99     4
+99     5
+99     8
+99     10
+100    1
+100    3
+100    4
+100    5
+100    8
+100    10
+
+-- !sql --
+1      3
+1      5
+1      8
+2      \N
+2      4
+2      5
+2      9
+2      10
+2      15
+2      19
+2      28
+2      34
+2      43
+2      71
+2      77
+2      105
+2      176
+2      182
+2      253
+2      429
+2      435
+2      611
+2      1040
+2      1046
+2      1475
+2      2515
+2      2521
+2      3561
+2      6076
+2      6082
+2      8597
+2      14673
+2      14679
+2      20755
+2      35428
+2      35434
+2      50107
+2      85535
+2      85541
+2      120969
+2      206504
+2      206510
+2      292045
+2      498549
+2      498555
+2      705059
+2      1203608
+2      1203614
+2      1702163
+2      2905771
+2      2905777
+2      4109385
+2      7015156
+2      7015162
+2      9920933
+2      16936089
+2      16936095
+2      23951251
+2      40887340
+2      40887346
+2      57823435
+2      98710775
+2      98710781
+2      139598121
+2      238308896
+2      238308902
+2      337019677
+2      575328573
+2      575328579
+2      813637475
+2      1388966048
+2      1388966054
+2      1964294627
+3      \N
+3      1
+3      5
+3      6
+3      8
+3      10
+3      14
+3      18
+3      20
+3      23
+3      41
+3      43
+3      55
+3      63
+3      96
+3      118
+3      139
+3      181
+3      235
+3      320
+3      353
+3      501
+3      588
+3      854
+3      908
+3      1355
+3      1496
+3      2263
+3      2350
+3      3618
+3      3846
+3      5968
+3      6109
+3      9586
+3      9955
+3      15695
+3      15923
+3      25281
+3      25878
+3      41204
+3      41573
+3      66485
+3      67451
+3      108058
+3      108655
+3      174543
+3      176106
+3      283198
+3      284164
+3      457741
+3      460270
+3      741905
+3      743468
+3      1199646
+3      1203738
+3      1943114
+3      1945643
+3      3142760
+3      3149381
+3      5088403
+3      5092495
+3      8231163
+3      8241876
+3      13323658
+3      13330279
+3      21554821
+3      21572155
+3      34885100
+3      34895813
+3      56439921
+3      56467968
+3      91335734
+3      91353068
+3      147775655
+3      147821036
+3      239128723
+3      239156770
+3      386904378
+3      386977806
+3      626061148
+3      626106529
+3      1012965526
+3      1013084335
+3      1639072055
+3      1639145483
+5      \N
+5      3
+5      4
+5      7
+5      8
+5      12
+5      15
+5      22
+5      27
+5      34
+5      56
+5      61
+5      83
+5      139
+5      144
+5      200
+5      339
+5      344
+5      483
+5      822
+5      827
+5      1166
+5      1988
+5      1993
+5      2815
+5      4803
+5      4808
+5      6796
+5      11599
+5      11604
+5      16407
+5      28006
+5      28011
+5      39610
+5      67616
+5      67621
+5      95627
+5      163243
+5      163248
+5      230864
+5      394107
+5      394112
+5      557355
+5      951462
+5      951467
+5      1345574
+5      2297036
+5      2297041
+5      3248503
+5      5545539
+5      5545544
+5      7842580
+5      13388119
+5      13388124
+5      18933663
+5      32321782
+5      32321787
+5      45709906
+5      78031688
+5      78031693
+5      110353475
+5      188385163
+5      188385168
+5      266416856
+5      454802019
+5      454802024
+5      643187187
+5      1097989206
+5      1097989211
+5      1552791230
+6      3
+6      4
+6      7
+7      4
+8      1
+9      4
+
+-- !sql --
+1      2
+
+-- !sql --
+1      2
+3      4
+
+-- !sql --
+1      2
+3      4
+11     22
+33     44
+
+-- !sql --
+1      2
+3      4
+11     22
+
+-- !sql --
+1      22
+3      22
+11     22
+
+-- !sql --
+1      2
+2      3
+
+-- !sql --
+1      2
+3      4
+11     22
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out 
b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out
new file mode 100644
index 00000000000..3a055b01acc
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+5050
+
+-- !q2 --
+0      \N      ROOT
+1      0       Child_1
+2      0       Child_2
+3      1       Child_1_1
+
+-- !q3 --
+0      \N      ROOT    [0]
+1      0       Child_1 [0, 1]
+3      1       Child_1_1       [0, 1, 3]
+2      0       Child_2 [0, 2]
+
+-- !q4 --
+0      \N      ROOT    [0]     0
+1      0       Child_1 [0, 1]  1
+2      0       Child_2 [0, 2]  1
+3      1       Child_1_1       [0, 1, 3]       2
+
+-- !q5 --
+1      2       1 -> 2
+1      3       1 -> 3
+1      4       1 -> 4
+2      3       2 -> 3
+4      5       4 -> 5
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
 
b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
new file mode 100644
index 00000000000..958b6848838
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
@@ -0,0 +1,30 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+0      0       1
+1      1       1
+2      1       2
+3      2       3
+4      3       5
+5      5       8
+6      8       13
+7      13      21
+8      21      34
+9      34      55
+
+-- !q2 --
+["Oasis", "Rock", "Music", "Art"]
+
+-- !q3 --
+1      3       [1, 3]
+1      5       [1, 5]
+1      5       [1, 3, 5]
+1      8       [1, 3, 8]
+1      10      [1, 3, 10]
+1      3       [1, 5, 3]
+1      4       [1, 5, 4]
+1      8       [1, 5, 8]
+1      4       [1, 3, 5, 4]
+1      8       [1, 3, 5, 8]
+1      8       [1, 5, 3, 8]
+1      10      [1, 5, 3, 10]
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
 
b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
new file mode 100644
index 00000000000..3bc4cf27785
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+1      abc
+2      abcabc
+3      abcabcabcabc
+
+-- !q2 --
+1      1       -1
+2      -2      2
+3      4       -4
+4      -8      8
+5      16      -16
+
+-- !q4 --
+2017-01-03
+2017-01-04
+2017-01-05
+2017-01-06
+2017-01-07
+2017-01-08
+2017-01-09
+2017-01-10
+
+-- !q5 --
+2017-01-03     300
+2017-01-04     0
+2017-01-05     0
+2017-01-06     50
+2017-01-07     0
+2017-01-08     180
+2017-01-09     0
+2017-01-10     5
+
+-- !q6 --
+333    Yasmina 333
+198    John    333,198
+29     Pedro   333,198,29
+4610   Sarah   333,198,29,4610
+72     Pierre  333,198,29,72
+692    Tarek   333,692
+123    Adil    333,692,123
+
diff --git a/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy 
b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy
new file mode 100644
index 00000000000..d456f6ba998
--- /dev/null
+++ b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite ("rec_cte") {
+    qt_sql """
+    WITH RECURSIVE test_table AS (
+    SELECT
+        cast(1.0 as double) AS number
+    UNION
+    SELECT
+        cos(number)
+    FROM
+        test_table
+    )
+    SELECT
+        number
+    FROM
+        test_table order by number;
+    """
+
+    qt_sql """
+    WITH RECURSIVE test_table AS (
+        SELECT cast(10 as int) AS number
+    UNION ALL
+        SELECT cast(number - 1 as int) FROM test_table WHERE number > 0
+    )
+    SELECT sum(number) FROM test_table;
+    """
+
+
+    sql "DROP TABLE IF EXISTS edge;"
+    sql """
+    CREATE TABLE edge
+    (
+        node1id int,
+        node2id int
+    ) DUPLICATE KEY (node1id)
+    DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """
+    INSERT INTO edge VALUES
+    (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1),
+    (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8),
+    (6, 3), (6, 4), (7, 4), (8, 1), (9, 4);
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            k1,
+            cast(sum(k2) as int)
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            cast(sum(k1) as int),
+            k2
+        FROM t1 GROUP BY k2
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE t1(k1, k2) AS (
+                SELECT
+                    node1id AS k1,
+                    node2id AS k2
+                FROM edge
+                UNION
+                SELECT
+                    cast(sum(k1 + 1) as int),
+                    k2
+                FROM t1 GROUP BY k2
+            )
+            SELECT * FROM t1 ORDER BY 1,2;
+        """
+        exception "ABORTED"
+    }  
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            cast(sum(k1 + 1) as int),
+            k2
+        FROM t1 WHERE k1 < 100 GROUP BY k2
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            k1,
+            cast(sum(k2) OVER (PARTITION BY k1 ORDER BY k1 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) as int)
+        FROM t1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE t1(k1, k2) AS (
+                SELECT
+                    1,2
+                UNION ALL
+                SELECT
+                    1,2
+                FROM t1 GROUP BY k1
+            )
+            SELECT * FROM t1 ORDER BY 1,2;
+        """
+        exception "ABORTED"
+    }  
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            1,2
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT
+            33,44
+        FROM t2 GROUP BY k1
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t2.k1, t2.k2 FROM t1,t2
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t1.k1, t2.k2 FROM t1,t2
+    )
+    select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            2,3
+        UNION
+        SELECT least(t1.k1,t2.k1), least(t1.k2,t2.k2) FROM t1,t2
+    )
+    select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t1.k1, t1.k2 FROM t1
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
new file mode 100644
index 00000000000..a01ab03db34
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// https://clickhouse.com/docs/sql-reference/statements/select/with
+suite ("rec_cte_from_ck_doc") {
+    qt_q1 """
+    WITH RECURSIVE test_table AS (
+        SELECT cast(1 as int) AS number
+    UNION ALL
+        SELECT cast(number + 1 as int) FROM test_table WHERE number < 100
+    )
+    SELECT sum(number) FROM test_table;
+    """
+
+    sql "DROP TABLE IF EXISTS tree;"
+    sql """
+        CREATE TABLE tree
+        (
+            id int,
+            parent_id int,
+            data varchar(100)
+        ) DUPLICATE KEY (id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO tree VALUES (0, NULL, 'ROOT'), (1, 0, 'Child_1'), (2, 
0, 'Child_2'), (3, 1, 'Child_1_1');"""
+
+    qt_q2 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree order BY id;
+    """
+
+    qt_q3 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data, array(t.id) AS path
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id))
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree ORDER BY path;
+    """
+
+    qt_q4 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data, array(t.id) AS path, cast(0 as int) AS 
depth
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)), 
cast(depth + 1 as int)
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree ORDER BY depth, id;
+    """
+
+    sql "DROP TABLE IF EXISTS graph;"
+    sql """
+        CREATE TABLE graph
+        (
+            c_from int,
+            c_to int,
+            label varchar(100)
+        ) DUPLICATE KEY (c_from) DISTRIBUTED BY HASH(c_from) BUCKETS 1 
PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO graph VALUES (1, 2, '1 -> 2'), (1, 3, '1 -> 3'), (2, 3, 
'2 -> 3'), (1, 4, '1 -> 4'), (4, 5, '4 -> 5');"""
+
+    qt_q5 """
+    WITH RECURSIVE search_graph AS (
+        SELECT c_from, c_to, label FROM graph g
+        UNION ALL
+        SELECT g.c_from, g.c_to, g.label
+        FROM graph g, search_graph sg
+        WHERE g.c_from = sg.c_to
+    )
+    SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+    """
+
+    sql "INSERT INTO graph VALUES (5, 1, '5 -> 1');"
+    test {
+        sql """
+            WITH RECURSIVE search_graph AS (
+                SELECT c_from, c_to, label FROM graph g
+                UNION ALL
+                SELECT g.c_from, g.c_to, g.label
+                FROM graph g, search_graph sg
+                WHERE g.c_from = sg.c_to
+            )
+            SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+        """
+        exception "ABORTED"
+    }
+
+    // test global rf
+    sql "set enable_runtime_filter_prune = false;"
+    test {
+        sql """
+        WITH RECURSIVE search_graph AS (
+            SELECT c_from, c_to, label FROM graph g
+            UNION ALL
+            SELECT g.c_from, g.c_to, g.label
+            FROM graph g join [shuffle] search_graph sg
+            on g.c_from = sg.c_to
+        )
+        SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+        """
+        exception "ABORTED"
+    }
+
+    // do not support use limit to stop recursion now
+    //qt_q6 """
+    //WITH RECURSIVE test_table AS (
+    //    SELECT cast(1 as int) AS number
+    //UNION ALL
+    //    SELECT cast(number + 1 as int) FROM test_table
+    //)
+    //SELECT sum(number) FROM test_table LIMIT 100;
+    //"""
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
new file mode 100644
index 00000000000..62d70558163
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// https://duckdb.org/docs/stable/sql/query_syntax/with#recursive-ctes
+suite ("rec_cte_from_duckdb_doc") {
+    qt_q1 """
+    WITH RECURSIVE FibonacciNumbers (
+        RecursionDepth,
+        FibonacciNumber,
+        NextNumber
+    ) AS (
+        -- Base case
+        SELECT
+            cast(0 as int) AS RecursionDepth,
+            cast(0 as int) AS FibonacciNumber,
+            cast(1 as int) AS NextNumber
+        UNION
+        ALL -- Recursive step
+        SELECT
+            cast((fib.RecursionDepth + 1) as int) AS RecursionDepth,
+            fib.NextNumber AS FibonacciNumber,
+            cast((fib.FibonacciNumber + fib.NextNumber) as int) AS NextNumber
+        FROM
+            FibonacciNumbers fib
+        WHERE
+            cast((fib.RecursionDepth + 1) as int) < 10
+    )
+    SELECT
+        *
+    FROM
+        FibonacciNumbers fn ORDER BY fn.RecursionDepth;
+    """
+
+    sql "DROP TABLE IF EXISTS tag;"
+    sql """
+        CREATE TABLE tag
+        (
+            id int,
+            name varchar(100),
+            subclassof int
+        ) DUPLICATE KEY (id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO tag VALUES
+    (1, 'U2',     5),
+    (2, 'Blur',   5),
+    (3, 'Oasis',  5),
+    (4, '2Pac',   6),
+    (5, 'Rock',   7),
+    (6, 'Rap',    7),
+    (7, 'Music',  9),
+    (8, 'Movies', 9),
+    (9, 'Art', NULL);"""
+
+    qt_q2 """
+    WITH RECURSIVE tag_hierarchy(id, source, path) AS (
+            SELECT id, name, array(name) AS path
+            FROM tag
+            WHERE subclassof IS NULL
+        UNION ALL
+            SELECT tag.id, tag.name, array_concat(array(tag.name), 
tag_hierarchy.path)
+            FROM tag, tag_hierarchy
+            WHERE tag.subclassof = tag_hierarchy.id
+        )
+    SELECT path
+    FROM tag_hierarchy
+    WHERE source = 'Oasis';
+    """
+
+    sql "DROP TABLE IF EXISTS edge;"
+    sql """
+    CREATE TABLE edge
+    (
+        node1id int,
+        node2id int
+    ) DUPLICATE KEY (node1id)
+    DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """
+    INSERT INTO edge VALUES
+    (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1),
+    (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8),
+    (6, 3), (6, 4), (7, 4), (8, 1), (9, 4);
+    """
+
+    qt_q3 """
+    WITH RECURSIVE paths(startNode, endNode, path) AS (
+            SELECT -- Define the path as the first edge of the traversal
+                node1id AS startNode,
+                node2id AS endNode,
+                array_concat(array(node1id), array(node2id)) AS path
+            FROM edge
+            WHERE node1id = 1
+            UNION ALL
+            SELECT -- Concatenate new edge to the path
+                paths.startNode AS startNode,
+                node2id AS endNode,
+                array_concat(path, array(node2id)) AS path
+            FROM paths
+            JOIN edge ON paths.endNode = node1id
+            -- Prevent adding a repeated node to the path.
+            -- This ensures that no cycles occur.
+            WHERE array_contains(paths.path, node2id) = false
+        )
+    SELECT startNode, endNode, path
+    FROM paths
+    ORDER BY array_size(path), path;
+    """
+
+    // do not support subquery containing recursive cte
+    //qt_q4 """
+    //WITH RECURSIVE paths(startNode, endNode, path) AS (
+    //        SELECT -- Define the path as the first edge of the traversal
+    //            node1id AS startNode,
+    //            node2id AS endNode,
+    //            array_concat(array(node1id), array(node2id)) AS path
+    //        FROM edge
+    //        WHERE startNode = 1
+    //        UNION ALL
+    //        SELECT -- Concatenate new edge to the path
+    //            paths.startNode AS startNode,
+    //            node2id AS endNode,
+    //            array_concat(path, array(node2id)) AS path
+    //        FROM paths
+    //        JOIN edge ON paths.endNode = node1id
+    //        -- Prevent adding a node that was visited previously by any path.
+    //        -- This ensures that (1) no cycles occur and (2) only nodes that
+    //        -- were not visited by previous (shorter) paths are added to a 
path.
+    //        WHERE NOT EXISTS (
+    //            SELECT 1 FROM paths previous_paths
+    //                WHERE array_contains(previous_paths.path, node2id)
+    //            )
+    //    )
+    //SELECT startNode, endNode, path
+    //FROM paths
+    //ORDER BY array_size(path), path;
+    //"""
+
+    //qt_q5 """
+    //WITH RECURSIVE paths(startNode, endNode, path, endReached) AS (
+    //SELECT -- Define the path as the first edge of the traversal
+    //        node1id AS startNode,
+    //        node2id AS endNode,
+    //        array_concat(array(node1id), array(node2id)) AS path,
+    //        (node2id = 8) AS endReached
+    //    FROM edge
+    //    WHERE startNode = 1
+    //UNION ALL
+    //SELECT -- Concatenate new edge to the path
+    //        paths.startNode AS startNode,
+    //        node2id AS endNode,
+    //        array_concat(path, array(node2id)) AS path,
+    //        max(CASE WHEN node2id = 8 THEN 1 ELSE 0 END)
+    //            OVER (ROWS BETWEEN UNBOUNDED PRECEDING
+    //                        AND UNBOUNDED FOLLOWING) AS endReached
+    //    FROM paths
+    //    JOIN edge ON paths.endNode = node1id
+    //    WHERE NOT EXISTS (
+    //            FROM paths previous_paths
+    //            WHERE array_contains(previous_paths.path, node2id)
+    //        )
+    //    AND paths.endReached = 0
+    //)
+    //SELECT startNode, endNode, path
+    //FROM paths
+    //WHERE endNode = 8
+    //ORDER BY array_size(path), path;
+    //"""
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
new file mode 100644
index 00000000000..cee07dc6612
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// 
https://dev.mysql.com/doc/refman/8.4/en/with.html#common-table-expressions-recursive
+suite ("rec_cte_from_mysql_doc") {
+    qt_q1 """
+    WITH RECURSIVE cte AS
+    (
+    SELECT cast(1 as int) AS n, cast('abc' as varchar(65533)) AS str
+    UNION ALL
+    SELECT cast(n + 1 as int), cast(CONCAT(str, str) as varchar(65533)) FROM 
cte WHERE n < 3
+    )
+    SELECT * FROM cte order by n;
+    """
+
+    qt_q2 """
+    WITH RECURSIVE cte AS
+    (
+    SELECT cast(1 as int) AS n, cast(1 as int) AS p, cast(-1 as int) AS q
+    UNION ALL
+    SELECT cast(n + 1 as int), cast(q * 2 as int), cast(p * 2 as int) FROM cte 
WHERE n < 5
+    )
+    SELECT * FROM cte order by n;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE cte (n) AS
+            (
+            SELECT cast(1 as int)
+            UNION ALL
+            SELECT cast(n + 1 as int) FROM cte
+            )
+            SELECT n FROM cte order by n;
+        """
+        exception "ABORTED"
+    }
+    
+    // do not support use limit to stop recursion now
+    //qt_q3 """
+    //WITH RECURSIVE cte (n) AS
+    //(
+    //SELECT cast(1 as int)
+    //UNION ALL
+    //SELECT cast(n + 1 as int) FROM cte LIMIT 10000
+    //)
+    //SELECT n FROM cte order by n;
+    //"""
+
+    sql "DROP TABLE IF EXISTS sales;"
+    sql """
+        CREATE TABLE sales
+        (
+            c_date date,
+            c_price double
+        ) DUPLICATE KEY (c_date)
+        DISTRIBUTED BY HASH(c_date) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """insert into sales values
+        ('2017-01-03', 100.0),
+        ('2017-01-03', 200.0),
+        ('2017-01-06', 50.0),
+        ('2017-01-08', 10.0),
+        ('2017-01-08', 20.0),
+        ('2017-01-08', 150.0),
+        ('2017-01-10', 5.0);"""
+
+    qt_q4 """
+        WITH RECURSIVE dates (c_date) AS
+        (
+        SELECT MIN(c_date) FROM sales
+        UNION ALL
+        SELECT c_date + INTERVAL 1 DAY FROM dates
+        WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales)
+        )
+        SELECT * FROM dates order by 1;
+    """
+
+    qt_q5 """
+        WITH RECURSIVE dates (c_date) AS
+        (
+        SELECT MIN(c_date) FROM sales
+        UNION ALL
+        SELECT c_date + INTERVAL 1 DAY FROM dates
+        WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales)
+        )
+        SELECT dates.c_date, COALESCE(SUM(c_price), 0) AS sum_price
+        FROM dates LEFT JOIN sales ON dates.c_date = sales.c_date
+        GROUP BY dates.c_date
+        ORDER BY dates.c_date;
+    """
+
+    sql "DROP TABLE IF EXISTS employees;"
+    sql """
+        CREATE TABLE employees (
+        id         INT NOT NULL,
+        name       VARCHAR(100) NOT NULL,
+        manager_id INT NULL
+        ) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """INSERT INTO employees VALUES
+    (333, "Yasmina", NULL),
+    (198, "John", 333),
+    (692, "Tarek", 333),
+    (29, "Pedro", 198),
+    (4610, "Sarah", 29),
+    (72, "Pierre", 29),
+    (123, "Adil", 692);
+    """
+
+    qt_q6 """
+        WITH RECURSIVE employee_paths (id, name, path) AS
+        (
+        SELECT id, name, CAST(id AS varchar(65533))
+            FROM employees
+            WHERE manager_id IS NULL
+        UNION ALL
+        SELECT e.id, e.name, cast(CONCAT(ep.path, ',', e.id) as varchar(65533))
+            FROM employee_paths AS ep JOIN employees AS e
+            ON ep.id = e.manager_id
+        )
+        SELECT * FROM employee_paths ORDER BY path;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to