This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_rec2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a0931099bfaa88139aee7a12d5072241128e3f51
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Sep 18 21:02:33 2025 +0800

    add rec cte thrift struct
    
    update
    
    update thrift
    
    add base
    
    update
    
    update thrift
    
    update _send_data_to_targets
    
    add fragment reset interface
    
    update
    
    reset format
    
    update send block
    
    clean
    
    fix
    
    add result_expr_lists to TRecCTENode
    
    update
    
    format
    
    add TRecCTEScanNode
    
    update
    
    format
    
    update
    
    update uniq
    
    update
    
    Revert "update"
    
    This reverts commit f464ed4fac5089db27bc479d73646f542c26bddc.
    
    update
    
    update
    
    update
    
    fix
    
    update
    
    update
    
    update
    
    update
    
    update
    
    update
    
    update
    
    update
    
    update
    
    format
    
    update
    
    tmp delete case
    
    Revert "tmp delete case"
    
    This reverts commit 20329c58a04ebf9cdce366026b6403f563b1092d.
    
    update
    
    fix
    
    update
    
    update
    
    update
    
    fix
    
    fix
    
    update
    
    update
    
    clean
    
    fix
    
    update
---
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +
 be/src/pipeline/exec/exchange_source_operator.h    |  19 +
 be/src/pipeline/exec/operator.cpp                  |  10 +
 be/src/pipeline/exec/operator.h                    |   2 +
 .../pipeline/exec/rec_cte_anchor_sink_operator.cpp |  56 ++
 .../pipeline/exec/rec_cte_anchor_sink_operator.h   | 114 +++
 be/src/pipeline/exec/rec_cte_scan_operator.h       |  87 ++
 be/src/pipeline/exec/rec_cte_sink_operator.cpp     |  55 ++
 be/src/pipeline/exec/rec_cte_sink_operator.h       | 101 +++
 be/src/pipeline/exec/rec_cte_source_operator.cpp   |  86 ++
 be/src/pipeline/exec/rec_cte_source_operator.h     | 228 +++++
 be/src/pipeline/exec/union_sink_operator.h         |  42 +-
 be/src/pipeline/pipeline.h                         |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 236 +++--
 be/src/pipeline/pipeline_fragment_context.h        |  18 +-
 be/src/pipeline/pipeline_task.cpp                  |  25 +-
 be/src/pipeline/rec_cte_shared_state.h             | 175 ++++
 be/src/runtime/fragment_mgr.cpp                    |  60 ++
 be/src/runtime/fragment_mgr.h                      |  11 +
 be/src/runtime/query_context.cpp                   |  47 +
 be/src/runtime/query_context.h                     |  28 +-
 be/src/runtime/runtime_predicate.h                 |   1 +
 be/src/runtime/runtime_state.cpp                   |  16 +-
 be/src/runtime/runtime_state.h                     |  19 +-
 be/src/runtime_filter/runtime_filter_consumer.h    |  15 +-
 be/src/runtime_filter/runtime_filter_merger.h      |   2 +
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  22 +-
 be/src/runtime_filter/runtime_filter_mgr.h         |  26 +-
 be/src/service/internal_service.cpp                |  51 ++
 be/src/service/internal_service.h                  |  10 +
 .../runtime_filter_consumer_test.cpp               |  20 +-
 be/test/runtime_filter/runtime_filter_mgr_test.cpp |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 +-
 gensrc/proto/internal_service.proto                |  42 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 .../data/rec_cte_p0/rec_cte/rec_cte.out            | 953 +++++++++++++++++++++
 .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.out    |  29 +
 .../rec_cte_from_duckdb_doc.out                    |  30 +
 .../rec_cte_from_mysql_doc.out                     |  42 +
 .../suites/rec_cte_p0/rec_cte/rec_cte.groovy       | 271 ++++++
 .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy | 144 ++++
 .../rec_cte_from_duckdb_doc.groovy                 | 185 ++++
 .../rec_cte_from_mysql_doc.groovy                  | 140 +++
 43 files changed, 3316 insertions(+), 121 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index b31b193aff2..8762b40fa40 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) {
     _is_closed = true;
     return OperatorX<ExchangeLocalState>::close(state);
 }
+
+Status ExchangeSourceOperatorX::reset(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state.reset(state);
+}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 3008217e130..03f2a288cdf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -48,6 +48,23 @@ public:
     Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
+    Status reset(RuntimeState* state) {
+        if (stream_recvr) {
+            stream_recvr->close();
+        }
+        create_stream_recvr(state);
+
+        is_ready = false;
+        num_rows_skipped = 0;
+
+        const auto& queues = stream_recvr->sender_queues();
+        for (size_t i = 0; i < queues.size(); i++) {
+            deps[i]->block();
+            queues[i]->set_dependency(deps[i]);
+        }
+        return Status::OK();
+    }
+
     std::vector<Dependency*> dependencies() const override {
         std::vector<Dependency*> dep_vec;
         std::for_each(deps.begin(), deps.end(),
@@ -83,6 +100,8 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
 
+    Status reset(RuntimeState* state);
+
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     std::string debug_string(int indentation_level = 0) const override;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 0eea8c7e157..ceaa9148b23 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,6 +62,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
 DECLARE_OPERATOR(DictSinkLocalState)
+DECLARE_OPERATOR(RecCTESinkLocalState)
+DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(RecCTESourceLocalState)
+DECLARE_OPERATOR(RecCTEScanLocalState)
 
 #ifdef BE_TEST
 DECLARE_OPERATOR(MockLocalState)
@@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
 template class PipelineXSinkLocalState<DataQueueSharedState>;
+template class PipelineXSinkLocalState<RecCTESharedState>;
 
 template class PipelineXLocalState<HashJoinSharedState>;
 template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -888,6 +897,7 @@ template class 
PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
 template class PipelineXLocalState<LocalExchangeSharedState>;
 template class PipelineXLocalState<BasicSharedState>;
+template class PipelineXLocalState<RecCTESharedState>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d5630fd4aa6..7492ee75ae9 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -610,6 +610,8 @@ public:
     // For agg/sort/join sink.
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
+    virtual bool need_rerun(RuntimeState* state) const { return false; }
+
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
                                       const bool use_global_hash_shuffle,
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
new file mode 100644
index 00000000000..f1552ed8e5e
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[0];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    _name = "REC_CTE_ANCHOR_SINK_OPERATOR";
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
new file mode 100644
index 00000000000..340949a2f79
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTEAnchorSinkOperatorX;
+class RecCTEAnchorSinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState);
+    RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+    bool is_blockable() const override { return true; }
+
+private:
+    friend class RecCTEAnchorSinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTEAnchorSinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
+        : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>;
+
+    friend class RecCTEAnchorSinkLocalState;
+    RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                              const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTEAnchorSinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (_need_notify_rec_side_ready) {
+            
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state,
 0));
+            _need_notify_rec_side_ready = false;
+        }
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->anchor_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<RecCTESharedState>();
+        ss->id = operator_id();
+        for (const auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+
+    bool _need_notify_rec_side_ready = true;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h 
b/be/src/pipeline/exec/rec_cte_scan_operator.h
new file mode 100644
index 00000000000..661d97927d1
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_scan_operator.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/exec/operator.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+class RecCTEScanOperatorX;
+class RecCTEScanLocalState final : public PipelineXLocalState<> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEScanLocalState);
+
+    RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PipelineXLocalState<>(state, parent) {
+        _scan_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                     _parent->get_name() + 
"_DEPENDENCY");
+        
state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), 
parent->node_id(),
+                                                 this);
+    }
+    ~RecCTEScanLocalState() override {
+        
state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(),
+                                                     parent()->node_id());
+    }
+
+    Status add_block(const PBlock& pblock) {
+        vectorized::Block block;
+        RETURN_IF_ERROR(block.deserialize(pblock));
+        _blocks.emplace_back(std::move(block));
+        return Status::OK();
+    }
+
+    void set_ready() { _scan_dependency->set_ready(); }
+
+    std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
+
+private:
+    friend class RecCTEScanOperatorX;
+    std::vector<vectorized::Block> _blocks;
+    DependencySPtr _scan_dependency = nullptr;
+};
+
+class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> {
+public:
+    RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                        const DescriptorTbl& descs)
+            : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) 
{}
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (local_state._blocks.empty()) {
+            *eos = true;
+            return Status::OK();
+        }
+        *block = std::move(local_state._blocks.back());
+        RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+        local_state._blocks.pop_back();
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
new file mode 100644
index 00000000000..1508b478bca
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTESinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_sink_operator.h
new file mode 100644
index 00000000000..2321ba16ea3
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESinkOperatorX;
+class RecCTESinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESinkLocalState);
+    RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+private:
+    friend class RecCTESinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTESinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTESinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<RecCTESinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTESinkLocalState>;
+
+    friend class RecCTESinkLocalState;
+    RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                        const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTESinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool need_rerun(RuntimeState* state) const override {
+        return get_local_state(state)._shared_state->ready_to_return == false;
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->source_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp 
b/be/src/pipeline/exec/rec_cte_source_operator.cpp
new file mode 100644
index 00000000000..9a02bb947b8
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_source_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, 
OperatorXBase* parent)
+        : Base(state, parent) {}
+
+Status RecCTESourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    return Status::OK();
+}
+
+Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
+    RETURN_IF_ERROR(Base::init(state, info));
+    _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets;
+    _shared_state->max_recursion_depth =
+            _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth;
+    _shared_state->source_dep = _dependency;
+    _anchor_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                   _parent->get_name() + 
"_ANCHOR_DEPENDENCY");
+    _shared_state->anchor_dep = _anchor_dependency.get();
+
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) {
+        _shared_state->agg_data = std::make_unique<DistinctDataVariants>();
+        
RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(),
+                                                               
get_data_types(_child_expr), false));
+    }
+
+    _shared_state->hash_table_compute_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
+    _shared_state->hash_table_emplace_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
+    _shared_state->hash_table_input_counter =
+            ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", 
TUnit::UNIT);
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+
+    _max_recursion_depth = state->cte_max_recursion_depth();
+
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h 
b/be/src/pipeline/exec/rec_cte_source_operator.h
new file mode 100644
index 00000000000..e36358161d2
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.h
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <gen_cpp/internal_service.pb.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "util/brpc_client_cache.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESourceOperatorX;
+class RecCTESourceLocalState final : public 
PipelineXLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESourceLocalState);
+    using Base = PipelineXLocalState<RecCTESharedState>;
+    using Parent = RecCTESourceOperatorX;
+    RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent);
+
+    Status open(RuntimeState* state) override;
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    bool is_blockable() const override { return true; }
+
+    std::vector<Dependency*> dependencies() const override {
+        return std::vector<Dependency*> {_dependency, 
_anchor_dependency.get()};
+    }
+
+private:
+    friend class RecCTESourceOperatorX;
+    friend class OperatorX<RecCTESourceLocalState>;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    std::shared_ptr<Dependency> _anchor_dependency = nullptr;
+};
+
+class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
+public:
+    using Base = OperatorX<RecCTESourceLocalState>;
+    RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                          const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs),
+              _is_union_all(tnode.rec_cte_node.is_union_all),
+              _targets(tnode.rec_cte_node.targets),
+              _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset),
+              _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids),
+              
_is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) {
+        DCHECK(tnode.__isset.rec_cte_node);
+    }
+    ~RecCTESourceOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    Status terminate(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::terminate(state);
+    }
+
+    Status close(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::close(state);
+    }
+
+    Status set_child(OperatorPtr child) override {
+        Base::_child = child;
+        return Status::OK();
+    }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+        auto& ctx = local_state._shared_state;
+        ctx->update_ready_to_return();
+
+        if (!ctx->ready_to_return) {
+            if (ctx->current_round + 1 > _max_recursion_depth) {
+                return Status::Aborted("reach cte_max_recursion_depth {}", 
_max_recursion_depth);
+            }
+
+            ctx->source_dep->block();
+            // ctx->blocks.size() may be changed after _recursive_process
+            int current_blocks_size = int(ctx->blocks.size());
+            RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset));
+            ctx->current_round++;
+            ctx->last_round_offset = current_blocks_size;
+        } else {
+            if (ctx->blocks.empty()) {
+                *eos = true;
+            } else {
+                block->swap(ctx->blocks.back());
+                RETURN_IF_ERROR(
+                        local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+                ctx->blocks.pop_back();
+            }
+        }
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+
+private:
+    Status _send_close(RuntimeState* state) {
+        if (!_aready_send_close && !_is_used_by_other_rec_cte) {
+            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::close));
+            _aready_send_close = true;
+
+            auto* round_counter = 
ADD_COUNTER(get_local_state(state).Base::custom_profile(),
+                                              "RecursiveRound", TUnit::UNIT);
+            
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
+        }
+        return Status::OK();
+    }
+
+    Status _recursive_process(RuntimeState* state, size_t last_round_offset) 
const {
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::wait));
+        RETURN_IF_ERROR(_send_reset_global_rf(state));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::release));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::rebuild));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::submit));
+        
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
+                state, last_round_offset));
+        return Status::OK();
+    }
+
+    Status _send_reset_global_rf(RuntimeState* state) const {
+        TNetworkAddress addr;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+        auto stub =
+                
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr);
+        PResetGlobalRfParams request;
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        for (auto filter_id : _global_rf_ids) {
+            request.add_filter_ids(filter_id);
+        }
+
+        PResetGlobalRfResult result;
+        brpc::Controller controller;
+        controller.set_timeout_ms(
+                
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+        stub->reset_global_rf(&controller, &request, &result, 
brpc::DoNothing());
+        brpc::Join(controller.call_id());
+        return Status::create(result.status());
+    }
+
+    Status _send_rerun_fragments(RuntimeState* state, 
PRerunFragmentParams_Opcode stage) const {
+        for (auto fragment : _fragments_to_reset) {
+            if (state->fragment_id() == fragment.fragment_id) {
+                return Status::InternalError("Fragment {} contains a recursive 
CTE node",
+                                             fragment.fragment_id);
+            }
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            fragment.addr);
+
+            PRerunFragmentParams request;
+            
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+            request.set_fragment_id(fragment.fragment_id);
+            request.set_stage(stage);
+
+            PRerunFragmentResult result;
+            brpc::Controller controller;
+            controller.set_timeout_ms(
+                    
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+            stub->rerun_fragment(&controller, &request, &result, 
brpc::DoNothing());
+            brpc::Join(controller.call_id());
+            if (controller.Failed()) {
+                return Status::InternalError(controller.ErrorText());
+            }
+
+            RETURN_IF_ERROR(Status::create(result.status()));
+        }
+        return Status::OK();
+    }
+
+    friend class RecCTESourceLocalState;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    const bool _is_union_all = false;
+    std::vector<TRecCTETarget> _targets;
+    std::vector<TRecCTEResetInfo> _fragments_to_reset;
+    std::vector<int> _global_rf_ids;
+
+    int _max_recursion_depth = 0;
+
+    bool _aready_send_close = false;
+
+    bool _is_used_by_other_rec_cte = false;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 06b95cdf92d..764f8099427 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -29,6 +29,24 @@ namespace doris {
 #include "common/compile_check_begin.h"
 class RuntimeState;
 
+inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs,
+                                vectorized::Block* src_block, 
vectorized::Block* res_block,
+                                bool need_clone) {
+    vectorized::ColumnsWithTypeAndName columns;
+    auto rows = src_block->rows();
+    for (const auto& expr : exprs) {
+        int result_column_id = -1;
+        RETURN_IF_ERROR(expr->execute(src_block, &result_column_id));
+        const auto& src_col_with_type = 
src_block->get_by_position(result_column_id);
+        vectorized::ColumnPtr cloned_col = need_clone
+                                                   ? 
src_col_with_type.column->clone_resized(rows)
+                                                   : src_col_with_type.column;
+        columns.emplace_back(cloned_col, src_col_with_type.type, 
src_col_with_type.name);
+    }
+    *res_block = {columns};
+    return Status::OK();
+}
+
 namespace pipeline {
 class DataQueue;
 
@@ -153,27 +171,17 @@ private:
                     
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
                                                                                
row_descriptor());
             vectorized::Block res;
-            RETURN_IF_ERROR(materialize_block(state, input_block, child_id, 
&res));
+            auto& local_state = get_local_state(state);
+            {
+                SCOPED_TIMER(local_state._expr_timer);
+                RETURN_IF_ERROR(
+                        materialize_block(local_state._child_expr, 
input_block, &res, false));
+            }
+            local_state._child_row_idx += res.rows();
             RETURN_IF_ERROR(mblock.merge(res));
         }
         return Status::OK();
     }
-
-    Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
-                             vectorized::Block* res_block) {
-        auto& local_state = get_local_state(state);
-        SCOPED_TIMER(local_state._expr_timer);
-        const auto& child_exprs = local_state._child_expr;
-        vectorized::ColumnsWithTypeAndName colunms;
-        for (size_t i = 0; i < child_exprs.size(); ++i) {
-            int result_column_id = -1;
-            RETURN_IF_ERROR(child_exprs[i]->execute(src_block, 
&result_column_id));
-            colunms.emplace_back(src_block->get_by_position(result_column_id));
-        }
-        local_state._child_row_idx += src_block->rows();
-        *res_block = {colunms};
-        return Status::OK();
-    }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 1d0ddcf178e..2a20a5cd73d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -133,7 +133,7 @@ public:
             fmt::format_to(debug_string_buffer, "\n{}", 
_operators[i]->debug_string(i));
         }
         fmt::format_to(debug_string_buffer, "\n{}",
-                       _sink->debug_string(cast_set<int>(_operators.size())));
+                       _sink ? 
_sink->debug_string(cast_set<int>(_operators.size())) : "null");
         return fmt::to_string(debug_string_buffer);
     }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b5c2f7084dc..ba7937e9522 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,6 +82,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext(
           _is_report_on_cancel(true),
           _report_status_cb(std::move(report_status_cb)),
           _params(request),
-          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0) {
+          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0),
+          _need_notify_close(request.__isset.need_notify_close ? 
request.need_notify_close
+                                                               : false) {
     _fragment_watcher.start();
 }
 
@@ -142,23 +148,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id);
-    // The memory released by the query end is recorded in the query mem 
tracker.
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
-    auto st = _query_ctx->exec_status();
-    for (size_t i = 0; i < _tasks.size(); i++) {
-        if (!_tasks[i].empty()) {
-            _call_back(_tasks[i].front().first->runtime_state(), &st);
-        }
-    }
-    _tasks.clear();
-    _dag.clear();
-    _pip_id_to_pipeline.clear();
-    _pipelines.clear();
-    _sink.reset();
-    _root_op.reset();
+    _release_resource();
     _runtime_state.reset();
-    _runtime_filter_mgr_map.clear();
-    _op_id_to_shared_state.clear();
     _query_ctx.reset();
 }
 
@@ -253,6 +244,54 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     return pipeline;
 }
 
+Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* 
thread_pool) {
+    {
+        SCOPED_TIMER(_build_pipelines_timer);
+        // 2. Build pipelines with operators in this fragment.
+        auto root_pipeline = add_pipeline();
+        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
+                                         &_root_op, root_pipeline));
+
+        // 3. Create sink operator
+        if (!_params.fragment.__isset.output_sink) {
+            return Status::InternalError("No output sink in this fragment!");
+        }
+        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
+                                          _params.fragment.output_exprs, 
_params,
+                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
+                                          *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
+        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+        for (PipelinePtr& pipeline : _pipelines) {
+            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
+            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
+        }
+    }
+    // 4. Build local exchanger
+    if (_runtime_state->enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_exchanger_timer);
+        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
+                                             
_params.bucket_seq_to_instance_idx,
+                                             
_params.shuffle_idx_to_instance_idx));
+    }
+
+    // 5. Initialize global states in pipelines.
+    for (PipelinePtr& pipeline : _pipelines) {
+        SCOPED_TIMER(_prepare_all_pipelines_timer);
+        pipeline->children().clear();
+        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
+    }
+
+    {
+        SCOPED_TIMER(_build_tasks_timer);
+        // 6. Build pipeline tasks and initialize local state.
+        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
+    }
+
+    return Status::OK();
+}
+
 Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
     if (_prepared) {
         return Status::InternalError("Already prepared");
@@ -277,7 +316,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
         auto* fragment_context = this;
 
-        if (_params.query_options.__isset.is_report_success) {
+        if (!_need_notify_close && 
_params.query_options.__isset.is_report_success) {
             
fragment_context->set_is_report_success(_params.query_options.is_report_success);
         }
 
@@ -322,49 +361,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
         }
     }
 
-    {
-        SCOPED_TIMER(_build_pipelines_timer);
-        // 2. Build pipelines with operators in this fragment.
-        auto root_pipeline = add_pipeline();
-        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
-                                         &_root_op, root_pipeline));
-
-        // 3. Create sink operator
-        if (!_params.fragment.__isset.output_sink) {
-            return Status::InternalError("No output sink in this fragment!");
-        }
-        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
-                                          _params.fragment.output_exprs, 
_params,
-                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
-                                          *_desc_tbl, root_pipeline->id()));
-        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
-        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
-
-        for (PipelinePtr& pipeline : _pipelines) {
-            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
-            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
-        }
-    }
-    // 4. Build local exchanger
-    if (_runtime_state->enable_local_shuffle()) {
-        SCOPED_TIMER(_plan_local_exchanger_timer);
-        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
-                                             
_params.bucket_seq_to_instance_idx,
-                                             
_params.shuffle_idx_to_instance_idx));
-    }
-
-    // 5. Initialize global states in pipelines.
-    for (PipelinePtr& pipeline : _pipelines) {
-        SCOPED_TIMER(_prepare_all_pipelines_timer);
-        pipeline->children().clear();
-        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
-    }
-
-    {
-        SCOPED_TIMER(_build_tasks_timer);
-        // 6. Build pipeline tasks and initialize local state.
-        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
-    }
+    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
 
     _init_next_report_time();
 
@@ -374,6 +371,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
 Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) 
{
     _total_tasks = 0;
+    _closed_tasks = 0;
     const auto target_size = _params.local_params.size();
     _tasks.resize(target_size);
     _runtime_filter_mgr_map.resize(target_size);
@@ -422,6 +420,10 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
 
                     
task_runtime_state->set_task_execution_context(shared_from_this());
                     
task_runtime_state->set_be_number(local_params.backend_num);
+                    if (_need_notify_close) {
+                        // rec cte require child rf to wait infinitely to make 
sure all rpc done
+                        task_runtime_state->set_force_make_rf_wait_infinite();
+                    }
 
                     if (_params.__isset.backend_id) {
                         task_runtime_state->set_backend_id(_params.backend_id);
@@ -1645,6 +1647,42 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::REC_CTE_NODE: {
+        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (!_dag.contains(downstream_pipeline_id)) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+
+        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
+
+        DataSinkOperatorPtr anchor_sink;
+        anchor_sink = 
std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
+                                                                  
op->operator_id(), tnode, descs);
+        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
+        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
+
+        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
+
+        DataSinkOperatorPtr rec_sink;
+        rec_sink = 
std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), 
op->operator_id(),
+                                                         tnode, descs);
+        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
+        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
+
+        break;
+    }
+    case TPlanNodeType::REC_CTE_SCAN_NODE: {
+        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(tnode.node_type));
@@ -1750,7 +1788,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
     if (_is_fragment_instance_closed) {
         return;
     }
-    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
+    Defer defer_op {[&]() {
+        _is_fragment_instance_closed = true;
+        _close_cv.notify_all();
+    }};
     
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // Print profile content in info log is a tempoeray solution for stream 
load and external_connector.
@@ -1785,8 +1826,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
                                          
collect_realtime_load_channel_profile());
     }
 
-    // all submitted tasks done
-    _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    if (!_need_notify_close) {
+        // all submitted tasks done
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
 }
 
 void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
@@ -1923,8 +1966,10 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
 }
 
 std::string PipelineFragmentContext::debug_string() {
+    std::lock_guard<std::mutex> l(_task_mutex);
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
+    fmt::format_to(debug_string_buffer, "PipelineFragmentContext 
Info:\nneed_notify_close: {}\n",
+                   _need_notify_close);
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
@@ -1998,5 +2043,66 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
                                                       
_runtime_state->profile_level());
     return load_channel_profile;
 }
+
+Status PipelineFragmentContext::wait_close(bool close) {
+    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
+        return Status::InternalError("stream load do not support reset");
+    }
+    if (!_need_notify_close) {
+        return Status::InternalError("_need_notify_close is false, do not 
support reset");
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_task_mutex);
+        _close_cv.wait(lock, [this] { return 
_is_fragment_instance_closed.load(); });
+    }
+
+    if (close) {
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::set_to_rerun() {
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+        for (size_t i = 0; i < _tasks.size(); i++) {
+            if (!_tasks[i].empty()) {
+                _tasks[i].front().first->runtime_state()->reset_to_rerun();
+            }
+        }
+    }
+    _release_resource();
+    _runtime_state->reset_to_rerun();
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
+    _submitted = false;
+    _is_fragment_instance_closed = false;
+    return _build_and_prepare_full_pipeline(thread_pool);
+}
+
+void PipelineFragmentContext::_release_resource() {
+    std::lock_guard<std::mutex> l(_task_mutex);
+    // The memory released by the query end is recorded in the query mem 
tracker.
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+    auto st = _query_ctx->exec_status();
+    for (size_t i = 0; i < _tasks.size(); i++) {
+        if (!_tasks[i].empty()) {
+            _call_back(_tasks[i].front().first->runtime_state(), &st);
+        }
+    }
+    _tasks.clear();
+    _dag.clear();
+    _pip_id_to_pipeline.clear();
+    _pipelines.clear();
+    _sink.reset();
+    _root_op.reset();
+    _runtime_filter_mgr_map.clear();
+    _op_id_to_shared_state.clear();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..40bb80f72ec 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -115,6 +115,9 @@ public:
     [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
 
     void clear_finished_tasks() {
+        if (_need_notify_close) {
+            return;
+        }
         for (size_t j = 0; j < _tasks.size(); j++) {
             for (size_t i = 0; i < _tasks[j].size(); i++) {
                 _tasks[j][i].first->stop_if_finished();
@@ -125,7 +128,17 @@ public:
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
+    Status wait_close(bool close);
+    Status rebuild(ThreadPool* thread_pool);
+    Status set_to_rerun();
+
+    bool need_notify_close() const { return _need_notify_close; }
+
 private:
+    void _release_resource();
+
+    Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
+
     Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, 
OperatorPtr* root,
                             PipelinePtr cur_pipe);
     Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& 
tnodes,
@@ -182,6 +195,7 @@ private:
     Pipelines _pipelines;
     PipelineId _next_pipeline_id = 0;
     std::mutex _task_mutex;
+    std::condition_variable _close_cv;
     int _closed_tasks = 0;
     // After prepared, `_total_tasks` is equal to the size of `_tasks`.
     // When submit fail, `_total_tasks` is equal to the number of tasks 
submitted.
@@ -203,7 +217,7 @@ private:
     RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 
     std::function<void(RuntimeState*, Status*)> _call_back;
-    bool _is_fragment_instance_closed = false;
+    std::atomic_bool _is_fragment_instance_closed = false;
 
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
@@ -323,6 +337,8 @@ private:
 
     TPipelineFragmentParams _params;
     int32_t _parallel_instances = 0;
+
+    bool _need_notify_close = false;
 };
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 930e885fc3b..edc56332058 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -30,6 +30,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/pipeline.h"
@@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) {
                 }
             }
 
-            if (_eos) {
-                RETURN_IF_ERROR(close(Status::OK(), false));
-            }
-
             DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
                 auto required_pipeline_id =
                         
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) {
             RETURN_IF_ERROR(block->check_type_and_column());
             status = _sink->sink(_state, block, _eos);
 
+            if (_eos) {
+                if (_sink->need_rerun(_state)) {
+                    if (auto* source = 
dynamic_cast<ExchangeSourceOperatorX*>(_root);
+                        source != nullptr) {
+                        RETURN_IF_ERROR(source->reset(_state));
+                        _eos = false;
+                    } else {
+                        return Status::InternalError(
+                                "Only ExchangeSourceOperatorX can be rerun, 
real is {}",
+                                _root->get_name());
+                    }
+                } else {
+                    RETURN_IF_ERROR(close(Status::OK(), false));
+                }
+            }
+
             if (status.is<ErrorCode::END_OF_FILE>()) {
                 set_wake_up_early();
                 return Status::OK();
@@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, 
std::unique_lock<std::mutex>& /* d
     _blocked_dep = nullptr;
     auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this());
     RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE));
-    
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    if (auto f = _fragment_context.lock(); f) {
+        
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/rec_cte_shared_state.h 
b/be/src/pipeline/rec_cte_shared_state.h
new file mode 100644
index 00000000000..17fb00c82f5
--- /dev/null
+++ b/be/src/pipeline/rec_cte_shared_state.h
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "dependency.h"
+#include "pipeline/common/distinct_agg_utils.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+struct RecCTESharedState : public BasicSharedState {
+    std::vector<TRecCTETarget> targets;
+    std::vector<vectorized::Block> blocks;
+    vectorized::IColumn::Selector distinct_row;
+    Dependency* source_dep = nullptr;
+    Dependency* anchor_dep = nullptr;
+    vectorized::Arena arena;
+    RuntimeProfile::Counter* hash_table_compute_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_emplace_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_input_counter = nullptr;
+
+    std::unique_ptr<DistinctDataVariants> agg_data = nullptr;
+
+    int current_round = 0;
+    int last_round_offset = 0;
+    int max_recursion_depth = 0;
+    bool ready_to_return = false;
+
+    void update_ready_to_return() {
+        if (last_round_offset == blocks.size()) {
+            ready_to_return = true;
+        }
+    }
+
+    Status emplace_block(RuntimeState* state, vectorized::Block&& block) {
+        if (agg_data) {
+            auto num_rows = uint32_t(block.rows());
+            vectorized::ColumnRawPtrs raw_columns;
+            std::vector<vectorized::ColumnPtr> columns = 
block.get_columns_and_convert();
+            for (auto& col : columns) {
+                raw_columns.push_back(col.get());
+            }
+
+            std::visit(vectorized::Overload {
+                               [&](std::monostate& arg) -> void {
+                                   throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                          "uninited hash 
table");
+                               },
+                               [&](auto& agg_method) -> void {
+                                   SCOPED_TIMER(hash_table_compute_timer);
+                                   using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                                   using AggState = typename 
HashMethodType::State;
+
+                                   AggState agg_state(raw_columns);
+                                   
agg_method.init_serialized_keys(raw_columns, num_rows);
+                                   distinct_row.clear();
+
+                                   size_t row = 0;
+                                   auto creator = [&](const auto& ctor, auto& 
key, auto& origin) {
+                                       HashMethodType::try_presis_key(key, 
origin, arena);
+                                       ctor(key);
+                                       distinct_row.push_back(row);
+                                   };
+                                   auto creator_for_null_key = [&]() {
+                                       distinct_row.push_back(row);
+                                   };
+
+                                   SCOPED_TIMER(hash_table_emplace_timer);
+                                   for (; row < num_rows; ++row) {
+                                       agg_method.lazy_emplace(agg_state, row, 
creator,
+                                                               
creator_for_null_key);
+                                   }
+                                   COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
+                               }},
+                       agg_data->method_variant);
+
+            if (distinct_row.size() == block.rows()) {
+                blocks.emplace_back(std::move(block));
+            } else if (!distinct_row.empty()) {
+                auto distinct_block = 
vectorized::MutableBlock(block.clone_empty());
+                
RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, 
distinct_row));
+                blocks.emplace_back(distinct_block.to_block());
+            }
+        } else {
+            blocks.emplace_back(std::move(block));
+        }
+        return Status::OK();
+    }
+
+    PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state,
+                                                 const TRecCTETarget& target) 
const {
+        PTransmitRecCTEBlockParams request;
+        request.set_node_id(target.node_id);
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        request.mutable_fragment_instance_id()->CopyFrom(
+                UniqueId(target.fragment_instance_id).to_proto());
+        return request;
+    }
+
+    Status send_data_to_targets(RuntimeState* state, size_t round_offset) 
const {
+        int send_multi_blocks_byte_size = 
state->query_options().exchange_multi_blocks_byte_size;
+        int block_number_per_target =
+                int(blocks.size() - round_offset + targets.size() - 1) / 
targets.size();
+        for (auto target : targets) {
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            target.addr);
+            if (!stub) {
+                return Status::InternalError(fmt::format("Get rpc stub failed, 
host={}, port={}",
+                                                         target.addr.hostname, 
target.addr.port));
+            }
+
+            // send blocks
+            int step = block_number_per_target;
+            while (round_offset < blocks.size() && step > 0) {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                auto current_bytes = 0;
+                while (round_offset < blocks.size() && step > 0 &&
+                       current_bytes < send_multi_blocks_byte_size) {
+                    auto* pblock = request.add_blocks();
+                    size_t uncompressed_bytes = 0;
+                    size_t compressed_bytes = 0;
+                    RETURN_IF_ERROR(blocks[round_offset].serialize(
+                            state->be_exec_version(), pblock, 
&uncompressed_bytes,
+                            &compressed_bytes, 
state->fragement_transmission_compression_type()));
+                    round_offset++;
+                    step--;
+                    current_bytes += compressed_bytes;
+                }
+                request.set_eos(false);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                controller.set_timeout_ms(
+                        
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+
+            // send eos
+            {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                request.set_eos(true);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+        }
+        return Status::OK();
+    }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 47fd9e264bf..f531e1ef2b9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1403,6 +1403,66 @@ Status FragmentMgr::get_query_statistics(const 
TUniqueId& query_id, TQueryStatis
             print_id(query_id), query_stats);
 }
 
+Status FragmentMgr::transmit_rec_cte_block(
+        const TUniqueId& query_id, const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, 
eos);
+    } else {
+        return Status::EndOfFile(
+                "Transmit rec cte block failed: Query context (query-id: {}) 
not found, maybe "
+                "finished",
+                print_id(query_id));
+    }
+}
+
+Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+                                   PRerunFragmentParams_Opcode stage) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment);
+        }
+
+        if (stage == PRerunFragmentParams::wait) {
+            return fragment_ctx->wait_close(false);
+        }
+        if (stage == PRerunFragmentParams::release) {
+            return fragment_ctx->set_to_rerun();
+        }
+        if (stage == PRerunFragmentParams::rebuild) {
+            return fragment_ctx->rebuild(_thread_pool.get());
+        }
+        if (stage == PRerunFragmentParams::submit) {
+            return fragment_ctx->submit();
+        }
+        if (stage == PRerunFragmentParams::close) {
+            return fragment_ctx->wait_close(true);
+        }
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
+Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
+                                    const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->reset_global_rf(filter_ids);
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index b0c1a3ad592..58725a49a63 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -184,6 +184,17 @@ public:
 
     std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
 
+    Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& 
instance_id,
+                                  int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<PBlock>& pblocks,
+                                  bool eos);
+
+    Status rerun_fragment(const TUniqueId& query_id, int fragment,
+                          PRerunFragmentParams_Opcode stage);
+
+    Status reset_global_rf(const TUniqueId& query_id,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     struct BrpcItem {
         TNetworkAddress network_address;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5332b886d58..b3097dde69f 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -34,6 +34,7 @@
 #include "common/status.h"
 #include "olap/olap_common.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() {
                 continue;
             }
 
+            if (fragment_ctx->need_notify_close()) {
+                continue;
+            }
+
             auto profile = fragment_ctx->collect_realtime_profile();
 
             if (profile.empty()) {
@@ -497,4 +502,46 @@ TReportExecStatusParams 
QueryContext::get_realtime_exec_status() {
     return exec_status;
 }
 
+Status QueryContext::send_block_to_cte_scan(
+        const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto it = _cte_scan.find(std::make_pair(instance_id, node_id));
+    if (it == _cte_scan.end()) {
+        return Status::InternalError("RecCTEScan not found for instance {}, 
node {}",
+                                     print_id(instance_id), node_id);
+    }
+    for (const auto& pblock : pblocks) {
+        RETURN_IF_ERROR(it->second->add_block(pblock));
+    }
+    if (eos) {
+        it->second->set_ready();
+    }
+    return Status::OK();
+}
+
+void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                                    pipeline::RecCTEScanLocalState* scan) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for 
instance "
+                                     << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.emplace(key, scan);
+}
+
+void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int 
node_id) {
+    std::lock_guard<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for 
instance "
+                                    << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.erase(key);
+}
+
+Status QueryContext::reset_global_rf(const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (_merge_controller_handler) {
+        return _merge_controller_handler->reset_global_rf(this, filter_ids);
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 32458cdb00e..874e889e368 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -32,6 +32,7 @@
 #include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/object_pool.h"
+#include "common/status.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/runtime_predicate.h"
@@ -48,6 +49,7 @@ namespace pipeline {
 class PipelineFragmentContext;
 class PipelineTask;
 class Dependency;
+class RecCTEScanLocalState;
 } // namespace pipeline
 
 struct ReportStatusRequest {
@@ -161,11 +163,6 @@ public:
         return _query_options.runtime_filter_wait_time_ms;
     }
 
-    bool runtime_filter_wait_infinitely() const {
-        return _query_options.__isset.runtime_filter_wait_infinitely &&
-               _query_options.runtime_filter_wait_infinitely;
-    }
-
     int be_exec_version() const {
         if (!_query_options.__isset.be_exec_version) {
             return 0;
@@ -299,6 +296,23 @@ public:
     void set_first_error_msg(std::string error_msg);
     std::string get_first_error_msg();
 
+    Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks,
+                                  bool eos);
+    void registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                          pipeline::RecCTEScanLocalState* scan);
+    void deregiste_cte_scan(const TUniqueId& instance_id, int node_id);
+
+    std::vector<int> get_fragment_ids() {
+        std::vector<int> fragment_ids;
+        for (const auto& it : _fragment_id_to_pipeline_ctx) {
+            fragment_ids.push_back(it.first);
+        }
+        return fragment_ids;
+    }
+
+    Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     friend class QueryTaskController;
 
@@ -377,6 +391,10 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    // instance id + node id -> cte scan
+    std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
+    std::mutex _cte_scan_lock;
+
 public:
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     void add_fragment_profile(
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index 51c79e1b426..b3e19d089fc 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -55,6 +55,7 @@ public:
 
     void set_detected_source() {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        _orderby_extrem = Field(PrimitiveType::TYPE_NULL);
         _detected_source = true;
     }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 33db0f60e7c..faeda888cfb 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -503,7 +503,7 @@ Status RuntimeState::register_consumer_runtime_filter(
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(_query_ctx, desc, node_id, 
consumer_filter);
+    return mgr->register_consumer_filter(this, desc, node_id, consumer_filter);
 }
 
 bool RuntimeState::is_nereids() const {
@@ -515,12 +515,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::pipeline_id_to_profil
     return _pipeline_id_to_profile;
 }
 
+void RuntimeState::reset_to_rerun() {
+    if (local_runtime_filter_mgr()) {
+        auto filter_ids = local_runtime_filter_mgr()->get_filter_ids();
+        filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids());
+        local_runtime_filter_mgr()->remove_filters(filter_ids);
+        global_runtime_filter_mgr()->remove_filters(filter_ids);
+    }
+    std::unique_lock lc(_pipeline_profile_lock);
+    _pipeline_id_to_profile.clear();
+}
+
 std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::build_pipeline_profile(
         std::size_t pipeline_size) {
     std::unique_lock lc(_pipeline_profile_lock);
     if (!_pipeline_id_to_profile.empty()) {
-        throw Exception(ErrorCode::INTERNAL_ERROR,
-                        "build_pipeline_profile can only be called once.");
+        return _pipeline_id_to_profile;
     }
     _pipeline_id_to_profile.resize(pipeline_size);
     {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3d89f4aa0d4..907f2c50a61 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -503,7 +503,7 @@ public:
         _runtime_filter_mgr = runtime_filter_mgr;
     }
 
-    QueryContext* get_query_ctx() { return _query_ctx; }
+    QueryContext* get_query_ctx() const { return _query_ctx; }
 
     [[nodiscard]] bool low_memory_mode() const;
 
@@ -520,6 +520,12 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    int cte_max_recursion_depth() const {
+        return _query_options.__isset.cte_max_recursion_depth
+                       ? _query_options.cte_max_recursion_depth
+                       : 0;
+    }
+
     int rpc_verbose_profile_max_instance_count() const {
         return _query_options.__isset.rpc_verbose_profile_max_instance_count
                        ? _query_options.rpc_verbose_profile_max_instance_count
@@ -702,6 +708,17 @@ public:
                                       _query_options.hnsw_bounded_queue);
     }
 
+    void reset_to_rerun();
+
+    void set_force_make_rf_wait_infinite() {
+        _query_options.__set_runtime_filter_wait_infinitely(true);
+    }
+
+    bool runtime_filter_wait_infinitely() const {
+        return _query_options.__isset.runtime_filter_wait_infinitely &&
+               _query_options.runtime_filter_wait_infinitely;
+    }
+
 private:
     Status create_error_log_file();
 
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index e0e42e509d4..e9ef68f6d28 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -41,11 +41,11 @@ public:
         APPLIED, // The consumer will switch to this state after the 
expression is acquired
     };
 
-    static Status create(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc, int node_id,
+    static Status create(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id,
                          std::shared_ptr<RuntimeFilterConsumer>* res) {
         *res = std::shared_ptr<RuntimeFilterConsumer>(
-                new RuntimeFilterConsumer(query_ctx, desc, node_id));
-        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&query_ctx->query_options()));
+                new RuntimeFilterConsumer(state, desc, node_id));
+        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&state->query_options()));
         return Status::OK();
     }
 
@@ -86,17 +86,16 @@ public:
     }
 
 private:
-    RuntimeFilterConsumer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                          int node_id)
+    RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id)
             : RuntimeFilter(desc),
               _probe_expr(desc->planId_to_target_expr.find(node_id)->second),
               _registration_time(MonotonicMillis()),
               _rf_state(State::NOT_READY) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
-        bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() ||
+        bool wait_infinitely = state->runtime_filter_wait_infinitely() ||
                                _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER;
-        _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 
1000
-                                           : 
query_ctx->runtime_filter_wait_time_ms();
+        _rf_wait_time_ms = wait_infinitely ? 
state->get_query_ctx()->execution_timeout() * 1000
+                                           : 
state->get_query_ctx()->runtime_filter_wait_time_ms();
         DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index d373bb8be05..42e26b3d9c3 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -78,6 +78,8 @@ public:
         _expected_producer_num = num;
     }
 
+    int get_expected_producer_num() const { return _expected_producer_num; }
+
     bool add_rf_size(uint64_t size) {
         _received_rf_size_num++;
         if (_expected_producer_num < _received_rf_size_num) {
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index ba9e9f5bc9d..dcddc29f06c 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consum
 }
 
 Status RuntimeFilterMgr::register_consumer_filter(
-        const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int 
node_id,
+        const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id,
         std::shared_ptr<RuntimeFilterConsumer>* consumer) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
     std::lock_guard<std::mutex> l(_lock);
-    RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, 
consumer));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, 
consumer));
     _consumer_map[key].push_back(*consumer);
     return Status::OK();
 }
@@ -446,6 +446,24 @@ void 
RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu
     _filter_map.clear();
 }
 
+Status RuntimeFilterMergeControllerEntity::reset_global_rf(
+        QueryContext* query_ctx, const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    for (const auto& filter_id : filter_ids) {
+        GlobalMergeContext* cnt_val;
+        {
+            std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+            cnt_val = &_filter_map[filter_id]; // may inplace construct 
default object
+        }
+        int producer_size = cnt_val->merger->get_expected_producer_num();
+        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, 
&cnt_val->runtime_filter_desc,
+                                                    &cnt_val->merger));
+        cnt_val->merger->set_expected_producer_num(producer_size);
+        cnt_val->arrive_id.clear();
+        cnt_val->source_addrs.clear();
+    }
+    return Status::OK();
+}
+
 std::string RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
     std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 6941e5e5631..a079f3b0ad9 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -83,7 +83,7 @@ public:
 
     // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
-    Status register_consumer_filter(const QueryContext* query_ctx, const 
TRuntimeFilterDesc& desc,
+    Status register_consumer_filter(const RuntimeState* state, const 
TRuntimeFilterDesc& desc,
                                     int node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter);
 
@@ -104,6 +104,27 @@ public:
 
     std::string debug_string();
 
+    std::set<int32_t> get_filter_ids() {
+        std::set<int32_t> ids;
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : _producer_id_set) {
+            ids.insert(id);
+        }
+        for (const auto& kv : _consumer_map) {
+            ids.insert(kv.first);
+        }
+        return ids;
+    }
+
+    void remove_filters(const std::set<int32_t>& filter_ids) {
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : filter_ids) {
+            _consumer_map.erase(id);
+            _local_merge_map.erase(id);
+            _producer_id_set.erase(id);
+        }
+    }
+
 private:
     /**
      * `_is_global = true` means this runtime filter manager menages 
query-level runtime filters.
@@ -152,6 +173,9 @@ public:
 
     void release_undone_filters(QueryContext* query_ctx);
 
+    Status reset_global_rf(QueryContext* query_ctx,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
                            const TRuntimeFilterDesc* runtime_filter_desc,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index daf45179f79..f9f7d2e4460 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1597,6 +1597,57 @@ void 
PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
     }
 }
 
+void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* 
controller,
+                                              const 
PTransmitRecCTEBlockParams* request,
+                                              PTransmitRecCTEBlockResult* 
response,
+                                              google::protobuf::Closure* done) 
{
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block(
+                UniqueId(request->query_id()).to_thrift(),
+                UniqueId(request->fragment_instance_id()).to_thrift(), 
request->node_id(),
+                request->blocks(), request->eos());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::rerun_fragment(google::protobuf::RpcController* 
controller,
+                                      const PRerunFragmentParams* request,
+                                      PRerunFragmentResult* response,
+                                      google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st =
+                
_exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(),
+                                                          
request->fragment_id(), request->stage());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::reset_global_rf(google::protobuf::RpcController* 
controller,
+                                       const PResetGlobalRfParams* request,
+                                       PResetGlobalRfResult* response,
+                                       google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->reset_global_rf(
+                UniqueId(request->query_id()).to_thrift(), 
request->filter_ids());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
 void PInternalService::transmit_block(google::protobuf::RpcController* 
controller,
                                       const PTransmitDataParams* request,
                                       PTransmitDataResult* response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index d73501bfc80..781a7def178 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -140,6 +140,16 @@ public:
                         const ::doris::PPublishFilterRequestV2* request,
                         ::doris::PPublishFilterResponse* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_rec_cte_block(google::protobuf::RpcController* controller,
+                                const PTransmitRecCTEBlockParams* request,
+                                PTransmitRecCTEBlockResult* response,
+                                google::protobuf::Closure* done) override;
+    void rerun_fragment(google::protobuf::RpcController* controller,
+                        const PRerunFragmentParams* request, 
PRerunFragmentResult* response,
+                        google::protobuf::Closure* done) override;
+    void reset_global_rf(google::protobuf::RpcController* controller,
+                         const PResetGlobalRfParams* request, 
PResetGlobalRfResult* response,
+                         google::protobuf::Closure* done) override;
     void transmit_block(::google::protobuf::RpcController* controller,
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 77ee21368c0..e0e352645c3 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -31,7 +31,7 @@ public:
     void test_signal_aquire(TRuntimeFilterDesc desc) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
     const_cast<TQueryOptions&>(_query_ctx->_query_options)
             .__set_runtime_filter_wait_infinitely(true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     desc.__set_src_expr(
@@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
                     .build());
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     {
         desc.__set_has_local_targets(false);
         desc.__set_has_remote_targets(true);
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
         desc.__set_has_local_targets(true);
         desc.__set_has_remote_targets(false);
@@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
 
     
desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr());
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
@@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, 
aquire_signal_at_same_time) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index d8222e201d9..02b1349c1a6 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
         std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
         EXPECT_TRUE(global_runtime_filter_mgr
-                            ->register_consumer_filter(ctx.get(), desc, 0, 
&consumer_filter)
+                            ->register_consumer_filter(&state, desc, 0, 
&consumer_filter)
                             .ok());
         
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7582a7b6b6d..868263be5dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -786,6 +786,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String READ_HIVE_JSON_IN_ONE_COLUMN = 
"read_hive_json_in_one_column";
 
+    public static final String CTE_MAX_RECURSION_DEPTH = 
"cte_max_recursion_depth";
+
     /**
      * Inserting overwrite for auto partition table allows creating partition 
for
      * datas which cannot find partition to overwrite.
@@ -989,6 +991,11 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public int minScanSchedulerConcurrency = 0;
 
+    @VariableMgr.VarAttr(name = CTE_MAX_RECURSION_DEPTH, needForward = true, 
description = {
+            "CTE递归的最大深度,默认值100",
+            "The maximum depth of CTE recursion. Default is 100" })
+    public int cteMaxRecursionDepth = 100;
+
     // By default, the number of Limit items after OrderBy is changed from 
65535 items
     // before v1.2.0 (not included), to return all items by default
     @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT, affectQueryResult = 
true)
@@ -4783,7 +4790,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold);
 
         tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
-
+        tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth);
         tResult.setEnableParallelScan(enableParallelScan);
         tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1ddfbcf2502..5b9c3579590 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -60,6 +60,45 @@ message PTransmitDataResult {
     optional int64 receive_time = 2;
 };
 
+message PTransmitRecCTEBlockParams {
+    repeated PBlock blocks = 1;
+    optional bool eos = 2;
+    optional PUniqueId query_id = 3;
+    optional PUniqueId fragment_instance_id = 4;
+    optional int32 node_id = 5;
+};
+
+message PTransmitRecCTEBlockResult {
+    optional PStatus status = 1;
+};
+
+
+message PRerunFragmentParams {
+    enum Opcode {
+    wait = 1;    // wait fragment execute done
+    release = 2; // release current round's resource
+    rebuild = 3; // rebuild next round pipeline tasks
+    submit = 4;  // submit tasks to execute
+    close = 5;   // close fragment
+    }
+    optional PUniqueId query_id = 1;
+    optional int32 fragment_id = 2;
+    optional Opcode stage = 3;
+};
+
+message PRerunFragmentResult {
+    optional PStatus status = 1;
+};
+
+message PResetGlobalRfParams {
+    optional PUniqueId query_id = 1;
+    repeated int32 filter_ids = 2;
+};
+
+message PResetGlobalRfResult {
+    optional PStatus status = 1;
+};
+
 message PTabletWithPartition {
     required int64 partition_id = 1;
     required int64 tablet_id = 2;
@@ -1107,6 +1146,9 @@ service PBackendService {
     rpc sync_filter_size(PSyncFilterSizeRequest) returns 
(PSyncFilterSizeResponse);
     rpc apply_filterv2(PPublishFilterRequestV2) returns 
(PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
+    rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult);
+    rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult);
+    rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns 
(PTransmitRecCTEBlockResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns 
(PCheckRPCChannelResponse);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index cc018384f24..c4169768584 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -410,6 +410,7 @@ struct TQueryOptions {
 
   175: optional bool enable_fuzzy_blockable_task = false;
   176: optional list<i32> shuffled_agg_ids;
+  177: optional i32 cte_max_recursion_depth;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
diff --git a/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out 
b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out
new file mode 100644
index 00000000000..ba843a71ee5
--- /dev/null
+++ b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out
@@ -0,0 +1,953 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0.5403023058681398
+0.6542897904977791
+0.7013687736227565
+0.7221024250267077
+0.7314040424225098
+0.7356047404363474
+0.7375068905132428
+0.7383692041223232
+0.7387603198742113
+0.7389377567153445
+0.7390182624274122
+0.7390547907469174
+0.7390713652989449
+0.7390788859949921
+0.7390822985224024
+0.7390838469650002
+0.7390845495752126
+0.7390848683867142
+0.7390850130484203
+0.739085078689123
+0.7390851084737987
+0.7390851219886894
+0.7390851281211138
+0.7390851309037207
+0.7390851321663374
+0.7390851327392538
+0.7390851329992164
+0.7390851331171753
+0.7390851331706995
+0.7390851331949863
+0.7390851332060064
+0.7390851332110069
+0.7390851332132758
+0.7390851332143055
+0.7390851332147726
+0.7390851332149846
+0.7390851332150807
+0.7390851332151244
+0.7390851332151441
+0.7390851332151531
+0.7390851332151572
+0.7390851332151591
+0.7390851332151599
+0.7390851332151603
+0.7390851332151605
+0.7390851332151606
+0.7390851332151607
+0.7390851332151608
+0.7390851332151609
+0.7390851332151611
+0.7390851332151617
+0.7390851332151629
+0.7390851332151657
+0.7390851332151718
+0.7390851332151851
+0.7390851332152145
+0.7390851332152792
+0.739085133215422
+0.7390851332157367
+0.7390851332164302
+0.7390851332179587
+0.7390851332213271
+0.7390851332287504
+0.7390851332451103
+0.7390851332811648
+0.7390851333606233
+0.7390851335357372
+0.7390851339216605
+0.7390851347721744
+0.7390851366465718
+0.7390851407774467
+0.7390851498812394
+0.7390851699445544
+0.7390852141609171
+0.7390853116067619
+0.7390855263619245
+0.7390859996481299
+0.7390870426953322
+0.7390893414033927
+0.7390944073790913
+0.7391055719265363
+0.7391301765296711
+0.7391843997714936
+0.7393038923969059
+0.7395672022122561
+0.7401473355678757
+0.7414250866101092
+0.7442373549005569
+0.7504177617637605
+0.7639596829006542
+0.7934803587425656
+0.8575532158463934
+1
+
+-- !sql --
+55
+
+-- !sql --
+1      3
+1      5
+1      8
+2      4
+2      5
+2      10
+2      19
+3      1
+3      5
+3      8
+3      10
+3      24
+5      3
+5      4
+5      8
+5      15
+6      3
+6      4
+6      7
+7      4
+8      1
+9      4
+
+-- !sql --
+1      3
+1      5
+2      4
+2      5
+2      10
+3      1
+3      5
+3      8
+3      10
+5      3
+5      4
+5      8
+5      10
+6      3
+6      4
+6      5
+7      4
+8      1
+8      8
+9      4
+11     1
+12     3
+29     4
+
+-- !sql --
+1      3
+1      5
+2      4
+2      5
+2      10
+3      1
+3      5
+3      8
+3      10
+5      3
+5      4
+5      8
+6      3
+6      4
+7      4
+7      10
+8      1
+8      10
+9      4
+9      5
+9      10
+10     5
+10     8
+10     10
+11     5
+11     8
+11     10
+12     5
+12     8
+12     10
+13     1
+13     5
+13     8
+13     10
+14     1
+14     5
+14     8
+14     10
+15     1
+15     3
+15     5
+15     8
+15     10
+16     1
+16     3
+16     5
+16     8
+16     10
+17     1
+17     3
+17     5
+17     8
+17     10
+18     1
+18     3
+18     5
+18     8
+18     10
+19     1
+19     3
+19     5
+19     8
+19     10
+20     1
+20     3
+20     5
+20     8
+20     10
+21     1
+21     3
+21     5
+21     8
+21     10
+22     1
+22     3
+22     5
+22     8
+22     10
+23     1
+23     3
+23     5
+23     8
+23     10
+24     1
+24     3
+24     5
+24     8
+24     10
+25     1
+25     3
+25     5
+25     8
+25     10
+26     1
+26     3
+26     5
+26     8
+26     10
+27     1
+27     3
+27     5
+27     8
+27     10
+28     1
+28     3
+28     5
+28     8
+28     10
+29     1
+29     3
+29     5
+29     8
+29     10
+30     1
+30     3
+30     5
+30     8
+30     10
+31     1
+31     3
+31     5
+31     8
+31     10
+32     1
+32     3
+32     5
+32     8
+32     10
+33     1
+33     3
+33     5
+33     8
+33     10
+34     1
+34     3
+34     4
+34     5
+34     8
+34     10
+35     1
+35     3
+35     4
+35     5
+35     8
+35     10
+36     1
+36     3
+36     4
+36     5
+36     8
+36     10
+37     1
+37     3
+37     4
+37     5
+37     8
+37     10
+38     1
+38     3
+38     4
+38     5
+38     8
+38     10
+39     1
+39     3
+39     4
+39     5
+39     8
+39     10
+40     1
+40     3
+40     4
+40     5
+40     8
+40     10
+41     1
+41     3
+41     4
+41     5
+41     8
+41     10
+42     1
+42     3
+42     4
+42     5
+42     8
+42     10
+43     1
+43     3
+43     4
+43     5
+43     8
+43     10
+44     1
+44     3
+44     4
+44     5
+44     8
+44     10
+45     1
+45     3
+45     4
+45     5
+45     8
+45     10
+46     1
+46     3
+46     4
+46     5
+46     8
+46     10
+47     1
+47     3
+47     4
+47     5
+47     8
+47     10
+48     1
+48     3
+48     4
+48     5
+48     8
+48     10
+49     1
+49     3
+49     4
+49     5
+49     8
+49     10
+50     1
+50     3
+50     4
+50     5
+50     8
+50     10
+51     1
+51     3
+51     4
+51     5
+51     8
+51     10
+52     1
+52     3
+52     4
+52     5
+52     8
+52     10
+53     1
+53     3
+53     4
+53     5
+53     8
+53     10
+54     1
+54     3
+54     4
+54     5
+54     8
+54     10
+55     1
+55     3
+55     4
+55     5
+55     8
+55     10
+56     1
+56     3
+56     4
+56     5
+56     8
+56     10
+57     1
+57     3
+57     4
+57     5
+57     8
+57     10
+58     1
+58     3
+58     4
+58     5
+58     8
+58     10
+59     1
+59     3
+59     4
+59     5
+59     8
+59     10
+60     1
+60     3
+60     4
+60     5
+60     8
+60     10
+61     1
+61     3
+61     4
+61     5
+61     8
+61     10
+62     1
+62     3
+62     4
+62     5
+62     8
+62     10
+63     1
+63     3
+63     4
+63     5
+63     8
+63     10
+64     1
+64     3
+64     4
+64     5
+64     8
+64     10
+65     1
+65     3
+65     4
+65     5
+65     8
+65     10
+66     1
+66     3
+66     4
+66     5
+66     8
+66     10
+67     1
+67     3
+67     4
+67     5
+67     8
+67     10
+68     1
+68     3
+68     4
+68     5
+68     8
+68     10
+69     1
+69     3
+69     4
+69     5
+69     8
+69     10
+70     1
+70     3
+70     4
+70     5
+70     8
+70     10
+71     1
+71     3
+71     4
+71     5
+71     8
+71     10
+72     1
+72     3
+72     4
+72     5
+72     8
+72     10
+73     1
+73     3
+73     4
+73     5
+73     8
+73     10
+74     1
+74     3
+74     4
+74     5
+74     8
+74     10
+75     1
+75     3
+75     4
+75     5
+75     8
+75     10
+76     1
+76     3
+76     4
+76     5
+76     8
+76     10
+77     1
+77     3
+77     4
+77     5
+77     8
+77     10
+78     1
+78     3
+78     4
+78     5
+78     8
+78     10
+79     1
+79     3
+79     4
+79     5
+79     8
+79     10
+80     1
+80     3
+80     4
+80     5
+80     8
+80     10
+81     1
+81     3
+81     4
+81     5
+81     8
+81     10
+82     1
+82     3
+82     4
+82     5
+82     8
+82     10
+83     1
+83     3
+83     4
+83     5
+83     8
+83     10
+84     1
+84     3
+84     4
+84     5
+84     8
+84     10
+85     1
+85     3
+85     4
+85     5
+85     8
+85     10
+86     1
+86     3
+86     4
+86     5
+86     8
+86     10
+87     1
+87     3
+87     4
+87     5
+87     8
+87     10
+88     1
+88     3
+88     4
+88     5
+88     8
+88     10
+89     1
+89     3
+89     4
+89     5
+89     8
+89     10
+90     1
+90     3
+90     4
+90     5
+90     8
+90     10
+91     1
+91     3
+91     4
+91     5
+91     8
+91     10
+92     1
+92     3
+92     4
+92     5
+92     8
+92     10
+93     1
+93     3
+93     4
+93     5
+93     8
+93     10
+94     1
+94     3
+94     4
+94     5
+94     8
+94     10
+95     1
+95     3
+95     4
+95     5
+95     8
+95     10
+96     1
+96     3
+96     4
+96     5
+96     8
+96     10
+97     1
+97     3
+97     4
+97     5
+97     8
+97     10
+98     1
+98     3
+98     4
+98     5
+98     8
+98     10
+99     1
+99     3
+99     4
+99     5
+99     8
+99     10
+100    1
+100    3
+100    4
+100    5
+100    8
+100    10
+
+-- !sql --
+1      3
+1      5
+1      8
+2      \N
+2      4
+2      5
+2      9
+2      10
+2      15
+2      19
+2      28
+2      34
+2      43
+2      71
+2      77
+2      105
+2      176
+2      182
+2      253
+2      429
+2      435
+2      611
+2      1040
+2      1046
+2      1475
+2      2515
+2      2521
+2      3561
+2      6076
+2      6082
+2      8597
+2      14673
+2      14679
+2      20755
+2      35428
+2      35434
+2      50107
+2      85535
+2      85541
+2      120969
+2      206504
+2      206510
+2      292045
+2      498549
+2      498555
+2      705059
+2      1203608
+2      1203614
+2      1702163
+2      2905771
+2      2905777
+2      4109385
+2      7015156
+2      7015162
+2      9920933
+2      16936089
+2      16936095
+2      23951251
+2      40887340
+2      40887346
+2      57823435
+2      98710775
+2      98710781
+2      139598121
+2      238308896
+2      238308902
+2      337019677
+2      575328573
+2      575328579
+2      813637475
+2      1388966048
+2      1388966054
+2      1964294627
+3      \N
+3      1
+3      5
+3      6
+3      8
+3      10
+3      14
+3      18
+3      20
+3      23
+3      41
+3      43
+3      55
+3      63
+3      96
+3      118
+3      139
+3      181
+3      235
+3      320
+3      353
+3      501
+3      588
+3      854
+3      908
+3      1355
+3      1496
+3      2263
+3      2350
+3      3618
+3      3846
+3      5968
+3      6109
+3      9586
+3      9955
+3      15695
+3      15923
+3      25281
+3      25878
+3      41204
+3      41573
+3      66485
+3      67451
+3      108058
+3      108655
+3      174543
+3      176106
+3      283198
+3      284164
+3      457741
+3      460270
+3      741905
+3      743468
+3      1199646
+3      1203738
+3      1943114
+3      1945643
+3      3142760
+3      3149381
+3      5088403
+3      5092495
+3      8231163
+3      8241876
+3      13323658
+3      13330279
+3      21554821
+3      21572155
+3      34885100
+3      34895813
+3      56439921
+3      56467968
+3      91335734
+3      91353068
+3      147775655
+3      147821036
+3      239128723
+3      239156770
+3      386904378
+3      386977806
+3      626061148
+3      626106529
+3      1012965526
+3      1013084335
+3      1639072055
+3      1639145483
+5      \N
+5      3
+5      4
+5      7
+5      8
+5      12
+5      15
+5      22
+5      27
+5      34
+5      56
+5      61
+5      83
+5      139
+5      144
+5      200
+5      339
+5      344
+5      483
+5      822
+5      827
+5      1166
+5      1988
+5      1993
+5      2815
+5      4803
+5      4808
+5      6796
+5      11599
+5      11604
+5      16407
+5      28006
+5      28011
+5      39610
+5      67616
+5      67621
+5      95627
+5      163243
+5      163248
+5      230864
+5      394107
+5      394112
+5      557355
+5      951462
+5      951467
+5      1345574
+5      2297036
+5      2297041
+5      3248503
+5      5545539
+5      5545544
+5      7842580
+5      13388119
+5      13388124
+5      18933663
+5      32321782
+5      32321787
+5      45709906
+5      78031688
+5      78031693
+5      110353475
+5      188385163
+5      188385168
+5      266416856
+5      454802019
+5      454802024
+5      643187187
+5      1097989206
+5      1097989211
+5      1552791230
+6      3
+6      4
+6      7
+7      4
+8      1
+9      4
+
+-- !sql --
+1      2
+
+-- !sql --
+1      2
+3      4
+
+-- !sql --
+1      2
+3      4
+11     22
+33     44
+
+-- !sql --
+1      2
+3      4
+11     22
+
+-- !sql --
+1      22
+3      22
+11     22
+
+-- !sql --
+1      2
+2      3
+
+-- !sql --
+1      2
+3      4
+11     22
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out 
b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out
new file mode 100644
index 00000000000..3a055b01acc
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+5050
+
+-- !q2 --
+0      \N      ROOT
+1      0       Child_1
+2      0       Child_2
+3      1       Child_1_1
+
+-- !q3 --
+0      \N      ROOT    [0]
+1      0       Child_1 [0, 1]
+3      1       Child_1_1       [0, 1, 3]
+2      0       Child_2 [0, 2]
+
+-- !q4 --
+0      \N      ROOT    [0]     0
+1      0       Child_1 [0, 1]  1
+2      0       Child_2 [0, 2]  1
+3      1       Child_1_1       [0, 1, 3]       2
+
+-- !q5 --
+1      2       1 -> 2
+1      3       1 -> 3
+1      4       1 -> 4
+2      3       2 -> 3
+4      5       4 -> 5
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
 
b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
new file mode 100644
index 00000000000..958b6848838
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out
@@ -0,0 +1,30 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+0      0       1
+1      1       1
+2      1       2
+3      2       3
+4      3       5
+5      5       8
+6      8       13
+7      13      21
+8      21      34
+9      34      55
+
+-- !q2 --
+["Oasis", "Rock", "Music", "Art"]
+
+-- !q3 --
+1      3       [1, 3]
+1      5       [1, 5]
+1      5       [1, 3, 5]
+1      8       [1, 3, 8]
+1      10      [1, 3, 10]
+1      3       [1, 5, 3]
+1      4       [1, 5, 4]
+1      8       [1, 5, 8]
+1      4       [1, 3, 5, 4]
+1      8       [1, 3, 5, 8]
+1      8       [1, 5, 3, 8]
+1      10      [1, 5, 3, 10]
+
diff --git 
a/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
 
b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
new file mode 100644
index 00000000000..3bc4cf27785
--- /dev/null
+++ 
b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out
@@ -0,0 +1,42 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+1      abc
+2      abcabc
+3      abcabcabcabc
+
+-- !q2 --
+1      1       -1
+2      -2      2
+3      4       -4
+4      -8      8
+5      16      -16
+
+-- !q4 --
+2017-01-03
+2017-01-04
+2017-01-05
+2017-01-06
+2017-01-07
+2017-01-08
+2017-01-09
+2017-01-10
+
+-- !q5 --
+2017-01-03     300
+2017-01-04     0
+2017-01-05     0
+2017-01-06     50
+2017-01-07     0
+2017-01-08     180
+2017-01-09     0
+2017-01-10     5
+
+-- !q6 --
+333    Yasmina 333
+198    John    333,198
+29     Pedro   333,198,29
+4610   Sarah   333,198,29,4610
+72     Pierre  333,198,29,72
+692    Tarek   333,692
+123    Adil    333,692,123
+
diff --git a/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy 
b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy
new file mode 100644
index 00000000000..d456f6ba998
--- /dev/null
+++ b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite ("rec_cte") {
+    qt_sql """
+    WITH RECURSIVE test_table AS (
+    SELECT
+        cast(1.0 as double) AS number
+    UNION
+    SELECT
+        cos(number)
+    FROM
+        test_table
+    )
+    SELECT
+        number
+    FROM
+        test_table order by number;
+    """
+
+    qt_sql """
+    WITH RECURSIVE test_table AS (
+        SELECT cast(10 as int) AS number
+    UNION ALL
+        SELECT cast(number - 1 as int) FROM test_table WHERE number > 0
+    )
+    SELECT sum(number) FROM test_table;
+    """
+
+
+    sql "DROP TABLE IF EXISTS edge;"
+    sql """
+    CREATE TABLE edge
+    (
+        node1id int,
+        node2id int
+    ) DUPLICATE KEY (node1id)
+    DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """
+    INSERT INTO edge VALUES
+    (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1),
+    (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8),
+    (6, 3), (6, 4), (7, 4), (8, 1), (9, 4);
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            k1,
+            cast(sum(k2) as int)
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            cast(sum(k1) as int),
+            k2
+        FROM t1 GROUP BY k2
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE t1(k1, k2) AS (
+                SELECT
+                    node1id AS k1,
+                    node2id AS k2
+                FROM edge
+                UNION
+                SELECT
+                    cast(sum(k1 + 1) as int),
+                    k2
+                FROM t1 GROUP BY k2
+            )
+            SELECT * FROM t1 ORDER BY 1,2;
+        """
+        exception "ABORTED"
+    }  
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            cast(sum(k1 + 1) as int),
+            k2
+        FROM t1 WHERE k1 < 100 GROUP BY k2
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            node1id AS k1,
+            node2id AS k2
+        FROM edge
+        UNION
+        SELECT
+            k1,
+            cast(sum(k2) OVER (PARTITION BY k1 ORDER BY k1 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) as int)
+        FROM t1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE t1(k1, k2) AS (
+                SELECT
+                    1,2
+                UNION ALL
+                SELECT
+                    1,2
+                FROM t1 GROUP BY k1
+            )
+            SELECT * FROM t1 ORDER BY 1,2;
+        """
+        exception "ABORTED"
+    }  
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            1,2
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    )
+    SELECT * FROM t1 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT
+            33,44
+        FROM t2 GROUP BY k1
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t2.k1, t2.k2 FROM t1,t2
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t1.k1, t2.k2 FROM t1,t2
+    )
+    select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            2,3
+        UNION
+        SELECT least(t1.k1,t2.k1), least(t1.k2,t2.k2) FROM t1,t2
+    )
+    select * from t2 ORDER BY 1,2;
+    """
+
+    qt_sql """
+    WITH RECURSIVE t1(k1, k2) AS (
+        SELECT
+            1,2
+        UNION
+        SELECT
+            3,4
+        FROM t1 GROUP BY k1
+    ),
+    t2(k1, k2) AS (
+        SELECT
+            11,22
+        UNION
+        SELECT t1.k1, t1.k2 FROM t1
+    )
+    SELECT * FROM t1 UNION  select * from t2 ORDER BY 1,2;
+    """
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
new file mode 100644
index 00000000000..a01ab03db34
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// https://clickhouse.com/docs/sql-reference/statements/select/with
+suite ("rec_cte_from_ck_doc") {
+    qt_q1 """
+    WITH RECURSIVE test_table AS (
+        SELECT cast(1 as int) AS number
+    UNION ALL
+        SELECT cast(number + 1 as int) FROM test_table WHERE number < 100
+    )
+    SELECT sum(number) FROM test_table;
+    """
+
+    sql "DROP TABLE IF EXISTS tree;"
+    sql """
+        CREATE TABLE tree
+        (
+            id int,
+            parent_id int,
+            data varchar(100)
+        ) DUPLICATE KEY (id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO tree VALUES (0, NULL, 'ROOT'), (1, 0, 'Child_1'), (2, 
0, 'Child_2'), (3, 1, 'Child_1_1');"""
+
+    qt_q2 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree order BY id;
+    """
+
+    qt_q3 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data, array(t.id) AS path
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id))
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree ORDER BY path;
+    """
+
+    qt_q4 """
+    WITH RECURSIVE search_tree AS (
+        SELECT id, parent_id, data, array(t.id) AS path, cast(0 as int) AS 
depth
+        FROM tree t
+        WHERE t.id = 0
+    UNION ALL
+        SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)), 
cast(depth + 1 as int)
+        FROM tree t, search_tree st
+        WHERE t.parent_id = st.id
+    )
+    SELECT * FROM search_tree ORDER BY depth, id;
+    """
+
+    sql "DROP TABLE IF EXISTS graph;"
+    sql """
+        CREATE TABLE graph
+        (
+            c_from int,
+            c_to int,
+            label varchar(100)
+        ) DUPLICATE KEY (c_from) DISTRIBUTED BY HASH(c_from) BUCKETS 1 
PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO graph VALUES (1, 2, '1 -> 2'), (1, 3, '1 -> 3'), (2, 3, 
'2 -> 3'), (1, 4, '1 -> 4'), (4, 5, '4 -> 5');"""
+
+    qt_q5 """
+    WITH RECURSIVE search_graph AS (
+        SELECT c_from, c_to, label FROM graph g
+        UNION ALL
+        SELECT g.c_from, g.c_to, g.label
+        FROM graph g, search_graph sg
+        WHERE g.c_from = sg.c_to
+    )
+    SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+    """
+
+    sql "INSERT INTO graph VALUES (5, 1, '5 -> 1');"
+    test {
+        sql """
+            WITH RECURSIVE search_graph AS (
+                SELECT c_from, c_to, label FROM graph g
+                UNION ALL
+                SELECT g.c_from, g.c_to, g.label
+                FROM graph g, search_graph sg
+                WHERE g.c_from = sg.c_to
+            )
+            SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+        """
+        exception "ABORTED"
+    }
+
+    // test global rf
+    sql "set enable_runtime_filter_prune = false;"
+    test {
+        sql """
+        WITH RECURSIVE search_graph AS (
+            SELECT c_from, c_to, label FROM graph g
+            UNION ALL
+            SELECT g.c_from, g.c_to, g.label
+            FROM graph g join [shuffle] search_graph sg
+            on g.c_from = sg.c_to
+        )
+        SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to;
+        """
+        exception "ABORTED"
+    }
+
+    // do not support use limit to stop recursion now
+    //qt_q6 """
+    //WITH RECURSIVE test_table AS (
+    //    SELECT cast(1 as int) AS number
+    //UNION ALL
+    //    SELECT cast(number + 1 as int) FROM test_table
+    //)
+    //SELECT sum(number) FROM test_table LIMIT 100;
+    //"""
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
new file mode 100644
index 00000000000..62d70558163
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// https://duckdb.org/docs/stable/sql/query_syntax/with#recursive-ctes
+suite ("rec_cte_from_duckdb_doc") {
+    qt_q1 """
+    WITH RECURSIVE FibonacciNumbers (
+        RecursionDepth,
+        FibonacciNumber,
+        NextNumber
+    ) AS (
+        -- Base case
+        SELECT
+            cast(0 as int) AS RecursionDepth,
+            cast(0 as int) AS FibonacciNumber,
+            cast(1 as int) AS NextNumber
+        UNION
+        ALL -- Recursive step
+        SELECT
+            cast((fib.RecursionDepth + 1) as int) AS RecursionDepth,
+            fib.NextNumber AS FibonacciNumber,
+            cast((fib.FibonacciNumber + fib.NextNumber) as int) AS NextNumber
+        FROM
+            FibonacciNumbers fib
+        WHERE
+            cast((fib.RecursionDepth + 1) as int) < 10
+    )
+    SELECT
+        *
+    FROM
+        FibonacciNumbers fn ORDER BY fn.RecursionDepth;
+    """
+
+    sql "DROP TABLE IF EXISTS tag;"
+    sql """
+        CREATE TABLE tag
+        (
+            id int,
+            name varchar(100),
+            subclassof int
+        ) DUPLICATE KEY (id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
+    """
+    sql """INSERT INTO tag VALUES
+    (1, 'U2',     5),
+    (2, 'Blur',   5),
+    (3, 'Oasis',  5),
+    (4, '2Pac',   6),
+    (5, 'Rock',   7),
+    (6, 'Rap',    7),
+    (7, 'Music',  9),
+    (8, 'Movies', 9),
+    (9, 'Art', NULL);"""
+
+    qt_q2 """
+    WITH RECURSIVE tag_hierarchy(id, source, path) AS (
+            SELECT id, name, array(name) AS path
+            FROM tag
+            WHERE subclassof IS NULL
+        UNION ALL
+            SELECT tag.id, tag.name, array_concat(array(tag.name), 
tag_hierarchy.path)
+            FROM tag, tag_hierarchy
+            WHERE tag.subclassof = tag_hierarchy.id
+        )
+    SELECT path
+    FROM tag_hierarchy
+    WHERE source = 'Oasis';
+    """
+
+    sql "DROP TABLE IF EXISTS edge;"
+    sql """
+    CREATE TABLE edge
+    (
+        node1id int,
+        node2id int
+    ) DUPLICATE KEY (node1id)
+    DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """
+    INSERT INTO edge VALUES
+    (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1),
+    (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8),
+    (6, 3), (6, 4), (7, 4), (8, 1), (9, 4);
+    """
+
+    qt_q3 """
+    WITH RECURSIVE paths(startNode, endNode, path) AS (
+            SELECT -- Define the path as the first edge of the traversal
+                node1id AS startNode,
+                node2id AS endNode,
+                array_concat(array(node1id), array(node2id)) AS path
+            FROM edge
+            WHERE node1id = 1
+            UNION ALL
+            SELECT -- Concatenate new edge to the path
+                paths.startNode AS startNode,
+                node2id AS endNode,
+                array_concat(path, array(node2id)) AS path
+            FROM paths
+            JOIN edge ON paths.endNode = node1id
+            -- Prevent adding a repeated node to the path.
+            -- This ensures that no cycles occur.
+            WHERE array_contains(paths.path, node2id) = false
+        )
+    SELECT startNode, endNode, path
+    FROM paths
+    ORDER BY array_size(path), path;
+    """
+
+    // do not support subquery containing recursive cte
+    //qt_q4 """
+    //WITH RECURSIVE paths(startNode, endNode, path) AS (
+    //        SELECT -- Define the path as the first edge of the traversal
+    //            node1id AS startNode,
+    //            node2id AS endNode,
+    //            array_concat(array(node1id), array(node2id)) AS path
+    //        FROM edge
+    //        WHERE startNode = 1
+    //        UNION ALL
+    //        SELECT -- Concatenate new edge to the path
+    //            paths.startNode AS startNode,
+    //            node2id AS endNode,
+    //            array_concat(path, array(node2id)) AS path
+    //        FROM paths
+    //        JOIN edge ON paths.endNode = node1id
+    //        -- Prevent adding a node that was visited previously by any path.
+    //        -- This ensures that (1) no cycles occur and (2) only nodes that
+    //        -- were not visited by previous (shorter) paths are added to a 
path.
+    //        WHERE NOT EXISTS (
+    //            SELECT 1 FROM paths previous_paths
+    //                WHERE array_contains(previous_paths.path, node2id)
+    //            )
+    //    )
+    //SELECT startNode, endNode, path
+    //FROM paths
+    //ORDER BY array_size(path), path;
+    //"""
+
+    //qt_q5 """
+    //WITH RECURSIVE paths(startNode, endNode, path, endReached) AS (
+    //SELECT -- Define the path as the first edge of the traversal
+    //        node1id AS startNode,
+    //        node2id AS endNode,
+    //        array_concat(array(node1id), array(node2id)) AS path,
+    //        (node2id = 8) AS endReached
+    //    FROM edge
+    //    WHERE startNode = 1
+    //UNION ALL
+    //SELECT -- Concatenate new edge to the path
+    //        paths.startNode AS startNode,
+    //        node2id AS endNode,
+    //        array_concat(path, array(node2id)) AS path,
+    //        max(CASE WHEN node2id = 8 THEN 1 ELSE 0 END)
+    //            OVER (ROWS BETWEEN UNBOUNDED PRECEDING
+    //                        AND UNBOUNDED FOLLOWING) AS endReached
+    //    FROM paths
+    //    JOIN edge ON paths.endNode = node1id
+    //    WHERE NOT EXISTS (
+    //            FROM paths previous_paths
+    //            WHERE array_contains(previous_paths.path, node2id)
+    //        )
+    //    AND paths.endReached = 0
+    //)
+    //SELECT startNode, endNode, path
+    //FROM paths
+    //WHERE endNode = 8
+    //ORDER BY array_size(path), path;
+    //"""
+}
diff --git 
a/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
 
b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
new file mode 100644
index 00000000000..cee07dc6612
--- /dev/null
+++ 
b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// 
https://dev.mysql.com/doc/refman/8.4/en/with.html#common-table-expressions-recursive
+suite ("rec_cte_from_mysql_doc") {
+    qt_q1 """
+    WITH RECURSIVE cte AS
+    (
+    SELECT cast(1 as int) AS n, cast('abc' as varchar(65533)) AS str
+    UNION ALL
+    SELECT cast(n + 1 as int), cast(CONCAT(str, str) as varchar(65533)) FROM 
cte WHERE n < 3
+    )
+    SELECT * FROM cte order by n;
+    """
+
+    qt_q2 """
+    WITH RECURSIVE cte AS
+    (
+    SELECT cast(1 as int) AS n, cast(1 as int) AS p, cast(-1 as int) AS q
+    UNION ALL
+    SELECT cast(n + 1 as int), cast(q * 2 as int), cast(p * 2 as int) FROM cte 
WHERE n < 5
+    )
+    SELECT * FROM cte order by n;
+    """
+
+    test {
+        sql """
+            WITH RECURSIVE cte (n) AS
+            (
+            SELECT cast(1 as int)
+            UNION ALL
+            SELECT cast(n + 1 as int) FROM cte
+            )
+            SELECT n FROM cte order by n;
+        """
+        exception "ABORTED"
+    }
+    
+    // do not support use limit to stop recursion now
+    //qt_q3 """
+    //WITH RECURSIVE cte (n) AS
+    //(
+    //SELECT cast(1 as int)
+    //UNION ALL
+    //SELECT cast(n + 1 as int) FROM cte LIMIT 10000
+    //)
+    //SELECT n FROM cte order by n;
+    //"""
+
+    sql "DROP TABLE IF EXISTS sales;"
+    sql """
+        CREATE TABLE sales
+        (
+            c_date date,
+            c_price double
+        ) DUPLICATE KEY (c_date)
+        DISTRIBUTED BY HASH(c_date) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """insert into sales values
+        ('2017-01-03', 100.0),
+        ('2017-01-03', 200.0),
+        ('2017-01-06', 50.0),
+        ('2017-01-08', 10.0),
+        ('2017-01-08', 20.0),
+        ('2017-01-08', 150.0),
+        ('2017-01-10', 5.0);"""
+
+    qt_q4 """
+        WITH RECURSIVE dates (c_date) AS
+        (
+        SELECT MIN(c_date) FROM sales
+        UNION ALL
+        SELECT c_date + INTERVAL 1 DAY FROM dates
+        WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales)
+        )
+        SELECT * FROM dates order by 1;
+    """
+
+    qt_q5 """
+        WITH RECURSIVE dates (c_date) AS
+        (
+        SELECT MIN(c_date) FROM sales
+        UNION ALL
+        SELECT c_date + INTERVAL 1 DAY FROM dates
+        WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales)
+        )
+        SELECT dates.c_date, COALESCE(SUM(c_price), 0) AS sum_price
+        FROM dates LEFT JOIN sales ON dates.c_date = sales.c_date
+        GROUP BY dates.c_date
+        ORDER BY dates.c_date;
+    """
+
+    sql "DROP TABLE IF EXISTS employees;"
+    sql """
+        CREATE TABLE employees (
+        id         INT NOT NULL,
+        name       VARCHAR(100) NOT NULL,
+        manager_id INT NULL
+        ) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = 
'1');
+    """
+    sql """INSERT INTO employees VALUES
+    (333, "Yasmina", NULL),
+    (198, "John", 333),
+    (692, "Tarek", 333),
+    (29, "Pedro", 198),
+    (4610, "Sarah", 29),
+    (72, "Pierre", 29),
+    (123, "Adil", 692);
+    """
+
+    qt_q6 """
+        WITH RECURSIVE employee_paths (id, name, path) AS
+        (
+        SELECT id, name, CAST(id AS varchar(65533))
+            FROM employees
+            WHERE manager_id IS NULL
+        UNION ALL
+        SELECT e.id, e.name, cast(CONCAT(ep.path, ',', e.id) as varchar(65533))
+            FROM employee_paths AS ep JOIN employees AS e
+            ON ep.id = e.manager_id
+        )
+        SELECT * FROM employee_paths ORDER BY path;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to