This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_rec4
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 21beaee3c1c46450cb69eeb56acbefc243ff80fd
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Wed Dec 10 20:19:14 2025 +0800

    support rec cte (be part / proto part)
---
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +
 be/src/pipeline/exec/exchange_source_operator.h    |  19 ++
 be/src/pipeline/exec/operator.cpp                  |  10 +
 be/src/pipeline/exec/operator.h                    |   2 +
 .../pipeline/exec/rec_cte_anchor_sink_operator.cpp |  56 +++++
 .../pipeline/exec/rec_cte_anchor_sink_operator.h   | 114 ++++++++++
 be/src/pipeline/exec/rec_cte_scan_operator.h       |  89 ++++++++
 be/src/pipeline/exec/rec_cte_sink_operator.cpp     |  55 +++++
 be/src/pipeline/exec/rec_cte_sink_operator.h       | 101 +++++++++
 be/src/pipeline/exec/rec_cte_source_operator.cpp   |  86 ++++++++
 be/src/pipeline/exec/rec_cte_source_operator.h     | 228 +++++++++++++++++++
 be/src/pipeline/exec/union_sink_operator.h         |  42 ++--
 be/src/pipeline/pipeline.h                         |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 242 +++++++++++++++------
 be/src/pipeline/pipeline_fragment_context.h        |  18 +-
 be/src/pipeline/pipeline_task.cpp                  |  25 ++-
 be/src/pipeline/rec_cte_shared_state.h             | 177 +++++++++++++++
 be/src/runtime/fragment_mgr.cpp                    |  60 +++++
 be/src/runtime/fragment_mgr.h                      |  11 +
 be/src/runtime/query_context.cpp                   |  47 ++++
 be/src/runtime/query_context.h                     |  28 ++-
 be/src/runtime/runtime_predicate.h                 |   1 +
 be/src/runtime/runtime_state.cpp                   |  16 +-
 be/src/runtime/runtime_state.h                     |  19 +-
 be/src/runtime_filter/runtime_filter_consumer.h    |  15 +-
 be/src/runtime_filter/runtime_filter_merger.h      |   2 +
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  23 +-
 be/src/runtime_filter/runtime_filter_mgr.h         |  26 ++-
 be/src/service/internal_service.cpp                |  51 +++++
 be/src/service/internal_service.h                  |  10 +
 .../runtime_filter_consumer_test.cpp               |  20 +-
 be/test/runtime_filter/runtime_filter_mgr_test.cpp |   2 +-
 gensrc/proto/internal_service.proto                |  42 ++++
 gensrc/thrift/PaloInternalService.thrift           |   4 +-
 gensrc/thrift/PlanNodes.thrift                     |  30 ++-
 35 files changed, 1554 insertions(+), 124 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 76c2c4ab093..b6ccc910a64 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) {
     _is_closed = true;
     return OperatorX<ExchangeLocalState>::close(state);
 }
+
+Status ExchangeSourceOperatorX::reset(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    return local_state.reset(state);
+}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 3008217e130..03f2a288cdf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -48,6 +48,23 @@ public:
     Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
+    Status reset(RuntimeState* state) {
+        if (stream_recvr) {
+            stream_recvr->close();
+        }
+        create_stream_recvr(state);
+
+        is_ready = false;
+        num_rows_skipped = 0;
+
+        const auto& queues = stream_recvr->sender_queues();
+        for (size_t i = 0; i < queues.size(); i++) {
+            deps[i]->block();
+            queues[i]->set_dependency(deps[i]);
+        }
+        return Status::OK();
+    }
+
     std::vector<Dependency*> dependencies() const override {
         std::vector<Dependency*> dep_vec;
         std::for_each(deps.begin(), deps.end(),
@@ -83,6 +100,8 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status prepare(RuntimeState* state) override;
 
+    Status reset(RuntimeState* state);
+
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     std::string debug_string(int indentation_level = 0) const override;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 37ad694199c..4879a06915a 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,6 +62,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
 DECLARE_OPERATOR(DictSinkLocalState)
+DECLARE_OPERATOR(RecCTESinkLocalState)
+DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(RecCTESourceLocalState)
+DECLARE_OPERATOR(RecCTEScanLocalState)
 
 #ifdef BE_TEST
 DECLARE_OPERATOR(MockLocalState)
@@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
 template class PipelineXSinkLocalState<DataQueueSharedState>;
+template class PipelineXSinkLocalState<RecCTESharedState>;
 
 template class PipelineXLocalState<HashJoinSharedState>;
 template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -888,6 +897,7 @@ template class 
PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
 template class PipelineXLocalState<LocalExchangeSharedState>;
 template class PipelineXLocalState<BasicSharedState>;
+template class PipelineXLocalState<RecCTESharedState>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b6328876008..543c303ce80 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -613,6 +613,8 @@ public:
     // For agg/sort/join sink.
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
+    virtual bool need_rerun(RuntimeState* state) const { return false; }
+
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
                                       const bool use_global_hash_shuffle,
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
new file mode 100644
index 00000000000..f1552ed8e5e
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[0];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    _name = "REC_CTE_ANCHOR_SINK_OPERATOR";
+    return Status::OK();
+}
+
+Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
new file mode 100644
index 00000000000..340949a2f79
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTEAnchorSinkOperatorX;
+class RecCTEAnchorSinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState);
+    RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+    bool is_blockable() const override { return true; }
+
+private:
+    friend class RecCTEAnchorSinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTEAnchorSinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final)
+        : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>;
+
+    friend class RecCTEAnchorSinkLocalState;
+    RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                              const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTEAnchorSinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (_need_notify_rec_side_ready) {
+            
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state,
 0));
+            _need_notify_rec_side_ready = false;
+        }
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->anchor_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<RecCTESharedState>();
+        ss->id = operator_id();
+        for (const auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+
+    bool _need_notify_rec_side_ready = true;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h 
b/be/src/pipeline/exec/rec_cte_scan_operator.h
new file mode 100644
index 00000000000..5b03766c163
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_scan_operator.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/exec/operator.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+class RecCTEScanOperatorX;
+class RecCTEScanLocalState final : public PipelineXLocalState<> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTEScanLocalState);
+
+    RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PipelineXLocalState<>(state, parent) {
+        _scan_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                     _parent->get_name() + 
"_DEPENDENCY");
+        
state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), 
parent->node_id(),
+                                                 this);
+    }
+    ~RecCTEScanLocalState() override {
+        
state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(),
+                                                     parent()->node_id());
+    }
+
+    Status add_block(const PBlock& pblock) {
+        vectorized::Block block;
+        size_t uncompressed_bytes;
+        int64_t decompress_time;
+        RETURN_IF_ERROR(block.deserialize(pblock, &uncompressed_bytes, 
&decompress_time));
+        _blocks.emplace_back(std::move(block));
+        return Status::OK();
+    }
+
+    void set_ready() { _scan_dependency->set_ready(); }
+
+    std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
+
+private:
+    friend class RecCTEScanOperatorX;
+    std::vector<vectorized::Block> _blocks;
+    DependencySPtr _scan_dependency = nullptr;
+};
+
+class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> {
+public:
+    RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                        const DescriptorTbl& descs)
+            : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) 
{}
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        if (local_state._blocks.empty()) {
+            *eos = true;
+            return Status::OK();
+        }
+        *block = std::move(local_state._blocks.back());
+        RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+        local_state._blocks.pop_back();
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp 
b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
new file mode 100644
index 00000000000..1508b478bca
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_sink_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+Status RecCTESinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h 
b/be/src/pipeline/exec/rec_cte_sink_operator.h
new file mode 100644
index 00000000000..2321ba16ea3
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_sink_operator.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESinkOperatorX;
+class RecCTESinkLocalState final : public 
PipelineXSinkLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESinkLocalState);
+    RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {}
+    Status open(RuntimeState* state) override;
+
+private:
+    friend class RecCTESinkOperatorX;
+    using Base = PipelineXSinkLocalState<RecCTESharedState>;
+    using Parent = RecCTESinkOperatorX;
+
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+class RecCTESinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<RecCTESinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<RecCTESinkLocalState>;
+
+    friend class RecCTESinkLocalState;
+    RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode,
+                        const DescriptorTbl& descs)
+            : Base(sink_id, tnode.node_id, dest_id),
+              _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) 
{}
+
+    ~RecCTESinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    bool need_rerun(RuntimeState* state) const override {
+        return get_local_state(state)._shared_state->ready_to_return == false;
+    }
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
+        if (input_block->rows() != 0) {
+            vectorized::Block block;
+            RETURN_IF_ERROR(materialize_block(local_state._child_expr, 
input_block, &block, true));
+            RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, 
std::move(block)));
+        }
+
+        if (eos) {
+            local_state._shared_state->source_dep->set_ready();
+        }
+        return Status::OK();
+    }
+
+private:
+    const RowDescriptor _row_descriptor;
+    vectorized::VExprContextSPtrs _child_expr;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp 
b/be/src/pipeline/exec/rec_cte_source_operator.cpp
new file mode 100644
index 00000000000..9a02bb947b8
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/rec_cte_source_operator.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, 
OperatorXBase* parent)
+        : Base(state, parent) {}
+
+Status RecCTESourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    return Status::OK();
+}
+
+Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) 
{
+    RETURN_IF_ERROR(Base::init(state, info));
+    _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets;
+    _shared_state->max_recursion_depth =
+            _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth;
+    _shared_state->source_dep = _dependency;
+    _anchor_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                   _parent->get_name() + 
"_ANCHOR_DEPENDENCY");
+    _shared_state->anchor_dep = _anchor_dependency.get();
+
+    auto& p = _parent->cast<Parent>();
+    _child_expr.resize(p._child_expr.size());
+    for (size_t i = 0; i < p._child_expr.size(); i++) {
+        RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
+    }
+    if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) {
+        _shared_state->agg_data = std::make_unique<DistinctDataVariants>();
+        
RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(),
+                                                               
get_data_types(_child_expr), false));
+    }
+
+    _shared_state->hash_table_compute_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
+    _shared_state->hash_table_emplace_timer =
+            ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
+    _shared_state->hash_table_input_counter =
+            ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", 
TUnit::UNIT);
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+    DCHECK(tnode.__isset.rec_cte_node);
+
+    _max_recursion_depth = state->cte_max_recursion_depth();
+
+    {
+        const auto& texprs = tnode.rec_cte_node.result_expr_lists[1];
+        vectorized::VExprContextSPtrs ctxs;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs));
+        _child_expr = ctxs;
+    }
+    return Status::OK();
+}
+
+Status RecCTESourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h 
b/be/src/pipeline/exec/rec_cte_source_operator.h
new file mode 100644
index 00000000000..e36358161d2
--- /dev/null
+++ b/be/src/pipeline/exec/rec_cte_source_operator.h
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <gen_cpp/internal_service.pb.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "util/brpc_client_cache.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESourceOperatorX;
+class RecCTESourceLocalState final : public 
PipelineXLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESourceLocalState);
+    using Base = PipelineXLocalState<RecCTESharedState>;
+    using Parent = RecCTESourceOperatorX;
+    RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent);
+
+    Status open(RuntimeState* state) override;
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    bool is_blockable() const override { return true; }
+
+    std::vector<Dependency*> dependencies() const override {
+        return std::vector<Dependency*> {_dependency, 
_anchor_dependency.get()};
+    }
+
+private:
+    friend class RecCTESourceOperatorX;
+    friend class OperatorX<RecCTESourceLocalState>;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    std::shared_ptr<Dependency> _anchor_dependency = nullptr;
+};
+
+class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
+public:
+    using Base = OperatorX<RecCTESourceLocalState>;
+    RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                          const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs),
+              _is_union_all(tnode.rec_cte_node.is_union_all),
+              _targets(tnode.rec_cte_node.targets),
+              _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset),
+              _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids),
+              
_is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) {
+        DCHECK(tnode.__isset.rec_cte_node);
+    }
+    ~RecCTESourceOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    Status terminate(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::terminate(state);
+    }
+
+    Status close(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::close(state);
+    }
+
+    Status set_child(OperatorPtr child) override {
+        Base::_child = child;
+        return Status::OK();
+    }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+        auto& ctx = local_state._shared_state;
+        ctx->update_ready_to_return();
+
+        if (!ctx->ready_to_return) {
+            if (ctx->current_round + 1 > _max_recursion_depth) {
+                return Status::Aborted("reach cte_max_recursion_depth {}", 
_max_recursion_depth);
+            }
+
+            ctx->source_dep->block();
+            // ctx->blocks.size() may be changed after _recursive_process
+            int current_blocks_size = int(ctx->blocks.size());
+            RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset));
+            ctx->current_round++;
+            ctx->last_round_offset = current_blocks_size;
+        } else {
+            if (ctx->blocks.empty()) {
+                *eos = true;
+            } else {
+                block->swap(ctx->blocks.back());
+                RETURN_IF_ERROR(
+                        local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+                ctx->blocks.pop_back();
+            }
+        }
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+
+private:
+    Status _send_close(RuntimeState* state) {
+        if (!_aready_send_close && !_is_used_by_other_rec_cte) {
+            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::close));
+            _aready_send_close = true;
+
+            auto* round_counter = 
ADD_COUNTER(get_local_state(state).Base::custom_profile(),
+                                              "RecursiveRound", TUnit::UNIT);
+            
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
+        }
+        return Status::OK();
+    }
+
+    Status _recursive_process(RuntimeState* state, size_t last_round_offset) 
const {
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::wait));
+        RETURN_IF_ERROR(_send_reset_global_rf(state));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::release));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::rebuild));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::submit));
+        
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
+                state, last_round_offset));
+        return Status::OK();
+    }
+
+    Status _send_reset_global_rf(RuntimeState* state) const {
+        TNetworkAddress addr;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+        auto stub =
+                
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr);
+        PResetGlobalRfParams request;
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        for (auto filter_id : _global_rf_ids) {
+            request.add_filter_ids(filter_id);
+        }
+
+        PResetGlobalRfResult result;
+        brpc::Controller controller;
+        controller.set_timeout_ms(
+                
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+        stub->reset_global_rf(&controller, &request, &result, 
brpc::DoNothing());
+        brpc::Join(controller.call_id());
+        return Status::create(result.status());
+    }
+
+    Status _send_rerun_fragments(RuntimeState* state, 
PRerunFragmentParams_Opcode stage) const {
+        for (auto fragment : _fragments_to_reset) {
+            if (state->fragment_id() == fragment.fragment_id) {
+                return Status::InternalError("Fragment {} contains a recursive 
CTE node",
+                                             fragment.fragment_id);
+            }
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            fragment.addr);
+
+            PRerunFragmentParams request;
+            
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+            request.set_fragment_id(fragment.fragment_id);
+            request.set_stage(stage);
+
+            PRerunFragmentResult result;
+            brpc::Controller controller;
+            controller.set_timeout_ms(
+                    
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+            stub->rerun_fragment(&controller, &request, &result, 
brpc::DoNothing());
+            brpc::Join(controller.call_id());
+            if (controller.Failed()) {
+                return Status::InternalError(controller.ErrorText());
+            }
+
+            RETURN_IF_ERROR(Status::create(result.status()));
+        }
+        return Status::OK();
+    }
+
+    friend class RecCTESourceLocalState;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    const bool _is_union_all = false;
+    std::vector<TRecCTETarget> _targets;
+    std::vector<TRecCTEResetInfo> _fragments_to_reset;
+    std::vector<int> _global_rf_ids;
+
+    int _max_recursion_depth = 0;
+
+    bool _aready_send_close = false;
+
+    bool _is_used_by_other_rec_cte = false;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 06b95cdf92d..764f8099427 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -29,6 +29,24 @@ namespace doris {
 #include "common/compile_check_begin.h"
 class RuntimeState;
 
+inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs,
+                                vectorized::Block* src_block, 
vectorized::Block* res_block,
+                                bool need_clone) {
+    vectorized::ColumnsWithTypeAndName columns;
+    auto rows = src_block->rows();
+    for (const auto& expr : exprs) {
+        int result_column_id = -1;
+        RETURN_IF_ERROR(expr->execute(src_block, &result_column_id));
+        const auto& src_col_with_type = 
src_block->get_by_position(result_column_id);
+        vectorized::ColumnPtr cloned_col = need_clone
+                                                   ? 
src_col_with_type.column->clone_resized(rows)
+                                                   : src_col_with_type.column;
+        columns.emplace_back(cloned_col, src_col_with_type.type, 
src_col_with_type.name);
+    }
+    *res_block = {columns};
+    return Status::OK();
+}
+
 namespace pipeline {
 class DataQueue;
 
@@ -153,27 +171,17 @@ private:
                     
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
                                                                                
row_descriptor());
             vectorized::Block res;
-            RETURN_IF_ERROR(materialize_block(state, input_block, child_id, 
&res));
+            auto& local_state = get_local_state(state);
+            {
+                SCOPED_TIMER(local_state._expr_timer);
+                RETURN_IF_ERROR(
+                        materialize_block(local_state._child_expr, 
input_block, &res, false));
+            }
+            local_state._child_row_idx += res.rows();
             RETURN_IF_ERROR(mblock.merge(res));
         }
         return Status::OK();
     }
-
-    Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
-                             vectorized::Block* res_block) {
-        auto& local_state = get_local_state(state);
-        SCOPED_TIMER(local_state._expr_timer);
-        const auto& child_exprs = local_state._child_expr;
-        vectorized::ColumnsWithTypeAndName colunms;
-        for (size_t i = 0; i < child_exprs.size(); ++i) {
-            int result_column_id = -1;
-            RETURN_IF_ERROR(child_exprs[i]->execute(src_block, 
&result_column_id));
-            colunms.emplace_back(src_block->get_by_position(result_column_id));
-        }
-        local_state._child_row_idx += src_block->rows();
-        *res_block = {colunms};
-        return Status::OK();
-    }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 1d0ddcf178e..2a20a5cd73d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -133,7 +133,7 @@ public:
             fmt::format_to(debug_string_buffer, "\n{}", 
_operators[i]->debug_string(i));
         }
         fmt::format_to(debug_string_buffer, "\n{}",
-                       _sink->debug_string(cast_set<int>(_operators.size())));
+                       _sink ? 
_sink->debug_string(cast_set<int>(_operators.size())) : "null");
         return fmt::to_string(debug_string_buffer);
     }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b5c2f7084dc..312bfce2be4 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,6 +82,10 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
+#include "pipeline/exec/rec_cte_sink_operator.h"
+#include "pipeline/exec/rec_cte_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
@@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext(
           _is_report_on_cancel(true),
           _report_status_cb(std::move(report_status_cb)),
           _params(request),
-          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0) {
+          _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0),
+          _need_notify_close(request.__isset.need_notify_close ? 
request.need_notify_close
+                                                               : false) {
     _fragment_watcher.start();
 }
 
@@ -142,24 +148,13 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id);
-    // The memory released by the query end is recorded in the query mem 
tracker.
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
-    auto st = _query_ctx->exec_status();
-    for (size_t i = 0; i < _tasks.size(); i++) {
-        if (!_tasks[i].empty()) {
-            _call_back(_tasks[i].front().first->runtime_state(), &st);
-        }
+    _release_resource();
+    {
+        // The memory released by the query end is recorded in the query mem 
tracker.
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+        _runtime_state.reset();
+        _query_ctx.reset();
     }
-    _tasks.clear();
-    _dag.clear();
-    _pip_id_to_pipeline.clear();
-    _pipelines.clear();
-    _sink.reset();
-    _root_op.reset();
-    _runtime_state.reset();
-    _runtime_filter_mgr_map.clear();
-    _op_id_to_shared_state.clear();
-    _query_ctx.reset();
 }
 
 bool PipelineFragmentContext::is_timeout(timespec now) const {
@@ -253,6 +248,54 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     return pipeline;
 }
 
+Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* 
thread_pool) {
+    {
+        SCOPED_TIMER(_build_pipelines_timer);
+        // 2. Build pipelines with operators in this fragment.
+        auto root_pipeline = add_pipeline();
+        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
+                                         &_root_op, root_pipeline));
+
+        // 3. Create sink operator
+        if (!_params.fragment.__isset.output_sink) {
+            return Status::InternalError("No output sink in this fragment!");
+        }
+        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
+                                          _params.fragment.output_exprs, 
_params,
+                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
+                                          *_desc_tbl, root_pipeline->id()));
+        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
+        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
+
+        for (PipelinePtr& pipeline : _pipelines) {
+            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
+            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
+        }
+    }
+    // 4. Build local exchanger
+    if (_runtime_state->enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_exchanger_timer);
+        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
+                                             
_params.bucket_seq_to_instance_idx,
+                                             
_params.shuffle_idx_to_instance_idx));
+    }
+
+    // 5. Initialize global states in pipelines.
+    for (PipelinePtr& pipeline : _pipelines) {
+        SCOPED_TIMER(_prepare_all_pipelines_timer);
+        pipeline->children().clear();
+        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
+    }
+
+    {
+        SCOPED_TIMER(_build_tasks_timer);
+        // 6. Build pipeline tasks and initialize local state.
+        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
+    }
+
+    return Status::OK();
+}
+
 Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
     if (_prepared) {
         return Status::InternalError("Already prepared");
@@ -277,7 +320,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
         auto* fragment_context = this;
 
-        if (_params.query_options.__isset.is_report_success) {
+        if (!_need_notify_close && 
_params.query_options.__isset.is_report_success) {
             
fragment_context->set_is_report_success(_params.query_options.is_report_success);
         }
 
@@ -322,49 +365,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
         }
     }
 
-    {
-        SCOPED_TIMER(_build_pipelines_timer);
-        // 2. Build pipelines with operators in this fragment.
-        auto root_pipeline = add_pipeline();
-        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
-                                         &_root_op, root_pipeline));
-
-        // 3. Create sink operator
-        if (!_params.fragment.__isset.output_sink) {
-            return Status::InternalError("No output sink in this fragment!");
-        }
-        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), 
_params.fragment.output_sink,
-                                          _params.fragment.output_exprs, 
_params,
-                                          root_pipeline->output_row_desc(), 
_runtime_state.get(),
-                                          *_desc_tbl, root_pipeline->id()));
-        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
-        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
-
-        for (PipelinePtr& pipeline : _pipelines) {
-            DCHECK(pipeline->sink() != nullptr) << 
pipeline->operators().size();
-            
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
-        }
-    }
-    // 4. Build local exchanger
-    if (_runtime_state->enable_local_shuffle()) {
-        SCOPED_TIMER(_plan_local_exchanger_timer);
-        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
-                                             
_params.bucket_seq_to_instance_idx,
-                                             
_params.shuffle_idx_to_instance_idx));
-    }
-
-    // 5. Initialize global states in pipelines.
-    for (PipelinePtr& pipeline : _pipelines) {
-        SCOPED_TIMER(_prepare_all_pipelines_timer);
-        pipeline->children().clear();
-        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
-    }
-
-    {
-        SCOPED_TIMER(_build_tasks_timer);
-        // 6. Build pipeline tasks and initialize local state.
-        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
-    }
+    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
 
     _init_next_report_time();
 
@@ -374,6 +375,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* 
thread_pool) {
 
 Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) 
{
     _total_tasks = 0;
+    _closed_tasks = 0;
     const auto target_size = _params.local_params.size();
     _tasks.resize(target_size);
     _runtime_filter_mgr_map.resize(target_size);
@@ -422,6 +424,10 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
 
                     
task_runtime_state->set_task_execution_context(shared_from_this());
                     
task_runtime_state->set_be_number(local_params.backend_num);
+                    if (_need_notify_close) {
+                        // rec cte require child rf to wait infinitely to make 
sure all rpc done
+                        task_runtime_state->set_force_make_rf_wait_infinite();
+                    }
 
                     if (_params.__isset.backend_id) {
                         task_runtime_state->set_backend_id(_params.backend_id);
@@ -1645,6 +1651,42 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::REC_CTE_NODE: {
+        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (!_dag.contains(downstream_pipeline_id)) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+
+        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
+
+        DataSinkOperatorPtr anchor_sink;
+        anchor_sink = 
std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
+                                                                  
op->operator_id(), tnode, descs);
+        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
+        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
+
+        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
+
+        DataSinkOperatorPtr rec_sink;
+        rec_sink = 
std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), 
op->operator_id(),
+                                                         tnode, descs);
+        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
+        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
+        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
+
+        break;
+    }
+    case TPlanNodeType::REC_CTE_SCAN_NODE: {
+        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(tnode.node_type));
@@ -1750,7 +1792,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
     if (_is_fragment_instance_closed) {
         return;
     }
-    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
+    Defer defer_op {[&]() {
+        _is_fragment_instance_closed = true;
+        _close_cv.notify_all();
+    }};
     
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // Print profile content in info log is a tempoeray solution for stream 
load and external_connector.
@@ -1785,8 +1830,10 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
                                          
collect_realtime_load_channel_profile());
     }
 
-    // all submitted tasks done
-    _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    if (!_need_notify_close) {
+        // all submitted tasks done
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
 }
 
 void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
@@ -1923,8 +1970,10 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
 }
 
 std::string PipelineFragmentContext::debug_string() {
+    std::lock_guard<std::mutex> l(_task_mutex);
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
+    fmt::format_to(debug_string_buffer, "PipelineFragmentContext 
Info:\nneed_notify_close: {}\n",
+                   _need_notify_close);
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
@@ -1998,5 +2047,66 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
                                                       
_runtime_state->profile_level());
     return load_channel_profile;
 }
+
+Status PipelineFragmentContext::wait_close(bool close) {
+    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
+        return Status::InternalError("stream load do not support reset");
+    }
+    if (!_need_notify_close) {
+        return Status::InternalError("_need_notify_close is false, do not 
support reset");
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_task_mutex);
+        _close_cv.wait(lock, [this] { return 
_is_fragment_instance_closed.load(); });
+    }
+
+    if (close) {
+        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+    }
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::set_to_rerun() {
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+        for (size_t i = 0; i < _tasks.size(); i++) {
+            if (!_tasks[i].empty()) {
+                _tasks[i].front().first->runtime_state()->reset_to_rerun();
+            }
+        }
+    }
+    _release_resource();
+    _runtime_state->reset_to_rerun();
+    return Status::OK();
+}
+
+Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
+    _submitted = false;
+    _is_fragment_instance_closed = false;
+    return _build_and_prepare_full_pipeline(thread_pool);
+}
+
+void PipelineFragmentContext::_release_resource() {
+    std::lock_guard<std::mutex> l(_task_mutex);
+    // The memory released by the query end is recorded in the query mem 
tracker.
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
+    auto st = _query_ctx->exec_status();
+    for (auto& _task : _tasks) {
+        if (!_task.empty()) {
+            _call_back(_task.front().first->runtime_state(), &st);
+        }
+    }
+    _tasks.clear();
+    _dag.clear();
+    _pip_id_to_pipeline.clear();
+    _pipelines.clear();
+    _sink.reset();
+    _root_op.reset();
+    _runtime_filter_mgr_map.clear();
+    _op_id_to_shared_state.clear();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..40bb80f72ec 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -115,6 +115,9 @@ public:
     [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
 
     void clear_finished_tasks() {
+        if (_need_notify_close) {
+            return;
+        }
         for (size_t j = 0; j < _tasks.size(); j++) {
             for (size_t i = 0; i < _tasks[j].size(); i++) {
                 _tasks[j][i].first->stop_if_finished();
@@ -125,7 +128,17 @@ public:
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
+    Status wait_close(bool close);
+    Status rebuild(ThreadPool* thread_pool);
+    Status set_to_rerun();
+
+    bool need_notify_close() const { return _need_notify_close; }
+
 private:
+    void _release_resource();
+
+    Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
+
     Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, 
OperatorPtr* root,
                             PipelinePtr cur_pipe);
     Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& 
tnodes,
@@ -182,6 +195,7 @@ private:
     Pipelines _pipelines;
     PipelineId _next_pipeline_id = 0;
     std::mutex _task_mutex;
+    std::condition_variable _close_cv;
     int _closed_tasks = 0;
     // After prepared, `_total_tasks` is equal to the size of `_tasks`.
     // When submit fail, `_total_tasks` is equal to the number of tasks 
submitted.
@@ -203,7 +217,7 @@ private:
     RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 
     std::function<void(RuntimeState*, Status*)> _call_back;
-    bool _is_fragment_instance_closed = false;
+    std::atomic_bool _is_fragment_instance_closed = false;
 
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
@@ -323,6 +337,8 @@ private:
 
     TPipelineFragmentParams _params;
     int32_t _parallel_instances = 0;
+
+    bool _need_notify_close = false;
 };
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 930e885fc3b..edc56332058 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -30,6 +30,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/pipeline.h"
@@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) {
                 }
             }
 
-            if (_eos) {
-                RETURN_IF_ERROR(close(Status::OK(), false));
-            }
-
             DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
                 auto required_pipeline_id =
                         
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
@@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) {
             RETURN_IF_ERROR(block->check_type_and_column());
             status = _sink->sink(_state, block, _eos);
 
+            if (_eos) {
+                if (_sink->need_rerun(_state)) {
+                    if (auto* source = 
dynamic_cast<ExchangeSourceOperatorX*>(_root);
+                        source != nullptr) {
+                        RETURN_IF_ERROR(source->reset(_state));
+                        _eos = false;
+                    } else {
+                        return Status::InternalError(
+                                "Only ExchangeSourceOperatorX can be rerun, 
real is {}",
+                                _root->get_name());
+                    }
+                } else {
+                    RETURN_IF_ERROR(close(Status::OK(), false));
+                }
+            }
+
             if (status.is<ErrorCode::END_OF_FILE>()) {
                 set_wake_up_early();
                 return Status::OK();
@@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, 
std::unique_lock<std::mutex>& /* d
     _blocked_dep = nullptr;
     auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this());
     RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE));
-    
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    if (auto f = _fragment_context.lock(); f) {
+        
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/rec_cte_shared_state.h 
b/be/src/pipeline/rec_cte_shared_state.h
new file mode 100644
index 00000000000..effd50395a5
--- /dev/null
+++ b/be/src/pipeline/rec_cte_shared_state.h
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "dependency.h"
+#include "pipeline/common/distinct_agg_utils.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+struct RecCTESharedState : public BasicSharedState {
+    std::vector<TRecCTETarget> targets;
+    std::vector<vectorized::Block> blocks;
+    vectorized::IColumn::Selector distinct_row;
+    Dependency* source_dep = nullptr;
+    Dependency* anchor_dep = nullptr;
+    vectorized::Arena arena;
+    RuntimeProfile::Counter* hash_table_compute_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_emplace_timer = nullptr;
+    RuntimeProfile::Counter* hash_table_input_counter = nullptr;
+
+    std::unique_ptr<DistinctDataVariants> agg_data = nullptr;
+
+    int current_round = 0;
+    int last_round_offset = 0;
+    int max_recursion_depth = 0;
+    bool ready_to_return = false;
+
+    void update_ready_to_return() {
+        if (last_round_offset == blocks.size()) {
+            ready_to_return = true;
+        }
+    }
+
+    Status emplace_block(RuntimeState* state, vectorized::Block&& block) {
+        if (agg_data) {
+            auto num_rows = uint32_t(block.rows());
+            vectorized::ColumnRawPtrs raw_columns;
+            std::vector<vectorized::ColumnPtr> columns = 
block.get_columns_and_convert();
+            for (auto& col : columns) {
+                raw_columns.push_back(col.get());
+            }
+
+            std::visit(vectorized::Overload {
+                               [&](std::monostate& arg) -> void {
+                                   throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                                          "uninited hash 
table");
+                               },
+                               [&](auto& agg_method) -> void {
+                                   SCOPED_TIMER(hash_table_compute_timer);
+                                   using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                                   using AggState = typename 
HashMethodType::State;
+
+                                   AggState agg_state(raw_columns);
+                                   
agg_method.init_serialized_keys(raw_columns, num_rows);
+                                   distinct_row.clear();
+
+                                   size_t row = 0;
+                                   auto creator = [&](const auto& ctor, auto& 
key, auto& origin) {
+                                       HashMethodType::try_presis_key(key, 
origin, arena);
+                                       ctor(key);
+                                       distinct_row.push_back(row);
+                                   };
+                                   auto creator_for_null_key = [&]() {
+                                       distinct_row.push_back(row);
+                                   };
+
+                                   SCOPED_TIMER(hash_table_emplace_timer);
+                                   for (; row < num_rows; ++row) {
+                                       agg_method.lazy_emplace(agg_state, row, 
creator,
+                                                               
creator_for_null_key);
+                                   }
+                                   COUNTER_UPDATE(hash_table_input_counter, 
num_rows);
+                               }},
+                       agg_data->method_variant);
+
+            if (distinct_row.size() == block.rows()) {
+                blocks.emplace_back(std::move(block));
+            } else if (!distinct_row.empty()) {
+                auto distinct_block = 
vectorized::MutableBlock(block.clone_empty());
+                
RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, 
distinct_row));
+                blocks.emplace_back(distinct_block.to_block());
+            }
+        } else {
+            blocks.emplace_back(std::move(block));
+        }
+        return Status::OK();
+    }
+
+    PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state,
+                                                 const TRecCTETarget& target) 
const {
+        PTransmitRecCTEBlockParams request;
+        request.set_node_id(target.node_id);
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        request.mutable_fragment_instance_id()->CopyFrom(
+                UniqueId(target.fragment_instance_id).to_proto());
+        return request;
+    }
+
+    Status send_data_to_targets(RuntimeState* state, size_t round_offset) 
const {
+        int send_multi_blocks_byte_size = 
state->query_options().exchange_multi_blocks_byte_size;
+        int block_number_per_target =
+                int(blocks.size() - round_offset + targets.size() - 1) / 
targets.size();
+        for (auto target : targets) {
+            auto stub =
+                    
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
+                            target.addr);
+            if (!stub) {
+                return Status::InternalError(fmt::format("Get rpc stub failed, 
host={}, port={}",
+                                                         target.addr.hostname, 
target.addr.port));
+            }
+
+            // send blocks
+            int step = block_number_per_target;
+            while (round_offset < blocks.size() && step > 0) {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                auto current_bytes = 0;
+                while (round_offset < blocks.size() && step > 0 &&
+                       current_bytes < send_multi_blocks_byte_size) {
+                    auto* pblock = request.add_blocks();
+                    size_t uncompressed_bytes = 0;
+                    size_t compressed_bytes = 0;
+                    int64_t compress_time;
+                    RETURN_IF_ERROR(blocks[round_offset].serialize(
+                            state->be_exec_version(), pblock, 
&uncompressed_bytes,
+                            &compressed_bytes, &compress_time,
+                            state->fragement_transmission_compression_type()));
+                    round_offset++;
+                    step--;
+                    current_bytes += compressed_bytes;
+                }
+                request.set_eos(false);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                controller.set_timeout_ms(
+                        
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+
+            // send eos
+            {
+                PTransmitRecCTEBlockParams request = build_basic_param(state, 
target);
+                request.set_eos(true);
+
+                PTransmitRecCTEBlockResult result;
+                brpc::Controller controller;
+                stub->transmit_rec_cte_block(&controller, &request, &result, 
brpc::DoNothing());
+                brpc::Join(controller.call_id());
+                RETURN_IF_ERROR(Status::create(result.status()));
+            }
+        }
+        return Status::OK();
+    }
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fed6c3931fe..ad858d8deb9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1411,6 +1411,66 @@ Status FragmentMgr::get_query_statistics(const 
TUniqueId& query_id, TQueryStatis
             print_id(query_id), query_stats);
 }
 
+Status FragmentMgr::transmit_rec_cte_block(
+        const TUniqueId& query_id, const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, 
eos);
+    } else {
+        return Status::EndOfFile(
+                "Transmit rec cte block failed: Query context (query-id: {}) 
not found, maybe "
+                "finished",
+                print_id(query_id));
+    }
+}
+
+Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+                                   PRerunFragmentParams_Opcode stage) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment);
+        }
+
+        if (stage == PRerunFragmentParams::wait) {
+            return fragment_ctx->wait_close(false);
+        }
+        if (stage == PRerunFragmentParams::release) {
+            return fragment_ctx->set_to_rerun();
+        }
+        if (stage == PRerunFragmentParams::rebuild) {
+            return fragment_ctx->rebuild(_thread_pool.get());
+        }
+        if (stage == PRerunFragmentParams::submit) {
+            return fragment_ctx->submit();
+        }
+        if (stage == PRerunFragmentParams::close) {
+            return fragment_ctx->wait_close(true);
+        }
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
+Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
+                                    const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        return q_ctx->reset_global_rf(filter_ids);
+    } else {
+        return Status::NotFound(
+                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
+                print_id(query_id));
+    }
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 826bca81901..080e081fca1 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -184,6 +184,17 @@ public:
 
     std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
 
+    Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& 
instance_id,
+                                  int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<PBlock>& pblocks,
+                                  bool eos);
+
+    Status rerun_fragment(const TUniqueId& query_id, int fragment,
+                          PRerunFragmentParams_Opcode stage);
+
+    Status reset_global_rf(const TUniqueId& query_id,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     struct BrpcItem {
         TNetworkAddress network_address;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2b142f9a9f0..fb1222d019f 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -34,6 +34,7 @@
 #include "common/status.h"
 #include "olap/olap_common.h"
 #include "pipeline/dependency.h"
+#include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() {
                 continue;
             }
 
+            if (fragment_ctx->need_notify_close()) {
+                continue;
+            }
+
             auto profile = fragment_ctx->collect_realtime_profile();
 
             if (profile.empty()) {
@@ -497,4 +502,46 @@ TReportExecStatusParams 
QueryContext::get_realtime_exec_status() {
     return exec_status;
 }
 
+Status QueryContext::send_block_to_cte_scan(
+        const TUniqueId& instance_id, int node_id,
+        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool 
eos) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto it = _cte_scan.find(std::make_pair(instance_id, node_id));
+    if (it == _cte_scan.end()) {
+        return Status::InternalError("RecCTEScan not found for instance {}, 
node {}",
+                                     print_id(instance_id), node_id);
+    }
+    for (const auto& pblock : pblocks) {
+        RETURN_IF_ERROR(it->second->add_block(pblock));
+    }
+    if (eos) {
+        it->second->set_ready();
+    }
+    return Status::OK();
+}
+
+void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                                    pipeline::RecCTEScanLocalState* scan) {
+    std::unique_lock<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for 
instance "
+                                     << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.emplace(key, scan);
+}
+
+void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int 
node_id) {
+    std::lock_guard<std::mutex> l(_cte_scan_lock);
+    auto key = std::make_pair(instance_id, node_id);
+    DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for 
instance "
+                                    << print_id(instance_id) << ", node " << 
node_id;
+    _cte_scan.erase(key);
+}
+
+Status QueryContext::reset_global_rf(const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    if (_merge_controller_handler) {
+        return _merge_controller_handler->reset_global_rf(this, filter_ids);
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7fd9fc29e35..c504dcc66aa 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -32,6 +32,7 @@
 #include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/object_pool.h"
+#include "common/status.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/runtime_predicate.h"
@@ -48,6 +49,7 @@ namespace pipeline {
 class PipelineFragmentContext;
 class PipelineTask;
 class Dependency;
+class RecCTEScanLocalState;
 } // namespace pipeline
 
 struct ReportStatusRequest {
@@ -161,11 +163,6 @@ public:
         return _query_options.runtime_filter_wait_time_ms;
     }
 
-    bool runtime_filter_wait_infinitely() const {
-        return _query_options.__isset.runtime_filter_wait_infinitely &&
-               _query_options.runtime_filter_wait_infinitely;
-    }
-
     int be_exec_version() const {
         if (!_query_options.__isset.be_exec_version) {
             return 0;
@@ -296,6 +293,23 @@ public:
     void set_first_error_msg(std::string error_msg);
     std::string get_first_error_msg();
 
+    Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id,
+                                  const 
google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks,
+                                  bool eos);
+    void registe_cte_scan(const TUniqueId& instance_id, int node_id,
+                          pipeline::RecCTEScanLocalState* scan);
+    void deregiste_cte_scan(const TUniqueId& instance_id, int node_id);
+
+    std::vector<int> get_fragment_ids() {
+        std::vector<int> fragment_ids;
+        for (const auto& it : _fragment_id_to_pipeline_ctx) {
+            fragment_ids.push_back(it.first);
+        }
+        return fragment_ids;
+    }
+
+    Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     friend class QueryTaskController;
 
@@ -374,6 +388,10 @@ private:
     std::string _load_error_url;
     std::string _first_error_msg;
 
+    // instance id + node id -> cte scan
+    std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
+    std::mutex _cte_scan_lock;
+
 public:
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     void add_fragment_profile(
diff --git a/be/src/runtime/runtime_predicate.h 
b/be/src/runtime/runtime_predicate.h
index adf90e9095a..19ba6ab702f 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -55,6 +55,7 @@ public:
 
     void set_detected_source() {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        _orderby_extrem = Field(PrimitiveType::TYPE_NULL);
         _detected_source = true;
     }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 7f82f9e7796..6dffaa67eef 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -482,7 +482,7 @@ Status RuntimeState::register_consumer_runtime_filter(
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(_query_ctx, desc, node_id, 
consumer_filter);
+    return mgr->register_consumer_filter(this, desc, node_id, consumer_filter);
 }
 
 bool RuntimeState::is_nereids() const {
@@ -494,12 +494,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::pipeline_id_to_profil
     return _pipeline_id_to_profile;
 }
 
+void RuntimeState::reset_to_rerun() {
+    if (local_runtime_filter_mgr()) {
+        auto filter_ids = local_runtime_filter_mgr()->get_filter_ids();
+        filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids());
+        local_runtime_filter_mgr()->remove_filters(filter_ids);
+        global_runtime_filter_mgr()->remove_filters(filter_ids);
+    }
+    std::unique_lock lc(_pipeline_profile_lock);
+    _pipeline_id_to_profile.clear();
+}
+
 std::vector<std::shared_ptr<RuntimeProfile>> 
RuntimeState::build_pipeline_profile(
         std::size_t pipeline_size) {
     std::unique_lock lc(_pipeline_profile_lock);
     if (!_pipeline_id_to_profile.empty()) {
-        throw Exception(ErrorCode::INTERNAL_ERROR,
-                        "build_pipeline_profile can only be called once.");
+        return _pipeline_id_to_profile;
     }
     _pipeline_id_to_profile.resize(pipeline_size);
     {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e7c66436981..3edf5317383 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -511,7 +511,7 @@ public:
         _runtime_filter_mgr = runtime_filter_mgr;
     }
 
-    QueryContext* get_query_ctx() { return _query_ctx; }
+    QueryContext* get_query_ctx() const { return _query_ctx; }
 
     [[nodiscard]] bool low_memory_mode() const;
 
@@ -528,6 +528,12 @@ public:
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
 
+    int cte_max_recursion_depth() const {
+        return _query_options.__isset.cte_max_recursion_depth
+                       ? _query_options.cte_max_recursion_depth
+                       : 0;
+    }
+
     int rpc_verbose_profile_max_instance_count() const {
         return _query_options.__isset.rpc_verbose_profile_max_instance_count
                        ? _query_options.rpc_verbose_profile_max_instance_count
@@ -710,6 +716,17 @@ public:
                                       _query_options.hnsw_bounded_queue, 
_query_options.ivf_nprobe);
     }
 
+    void reset_to_rerun();
+
+    void set_force_make_rf_wait_infinite() {
+        _query_options.__set_runtime_filter_wait_infinitely(true);
+    }
+
+    bool runtime_filter_wait_infinitely() const {
+        return _query_options.__isset.runtime_filter_wait_infinitely &&
+               _query_options.runtime_filter_wait_infinitely;
+    }
+
 private:
     Status create_error_log_file();
 
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index e0e42e509d4..e9ef68f6d28 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -41,11 +41,11 @@ public:
         APPLIED, // The consumer will switch to this state after the 
expression is acquired
     };
 
-    static Status create(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc, int node_id,
+    static Status create(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id,
                          std::shared_ptr<RuntimeFilterConsumer>* res) {
         *res = std::shared_ptr<RuntimeFilterConsumer>(
-                new RuntimeFilterConsumer(query_ctx, desc, node_id));
-        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&query_ctx->query_options()));
+                new RuntimeFilterConsumer(state, desc, node_id));
+        RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&state->query_options()));
         return Status::OK();
     }
 
@@ -86,17 +86,16 @@ public:
     }
 
 private:
-    RuntimeFilterConsumer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                          int node_id)
+    RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* 
desc, int node_id)
             : RuntimeFilter(desc),
               _probe_expr(desc->planId_to_target_expr.find(node_id)->second),
               _registration_time(MonotonicMillis()),
               _rf_state(State::NOT_READY) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
-        bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() ||
+        bool wait_infinitely = state->runtime_filter_wait_infinitely() ||
                                _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER;
-        _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 
1000
-                                           : 
query_ctx->runtime_filter_wait_time_ms();
+        _rf_wait_time_ms = wait_infinitely ? 
state->get_query_ctx()->execution_timeout() * 1000
+                                           : 
state->get_query_ctx()->runtime_filter_wait_time_ms();
         DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index 00bac845473..f1080fae0a4 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -78,6 +78,8 @@ public:
         _expected_producer_num = num;
     }
 
+    int get_expected_producer_num() const { return _expected_producer_num; }
+
     bool add_rf_size(uint64_t size) {
         _received_rf_size_num++;
         if (_expected_producer_num < _received_rf_size_num) {
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 48645ffdb8e..7923e39fd5c 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consum
 }
 
 Status RuntimeFilterMgr::register_consumer_filter(
-        const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int 
node_id,
+        const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id,
         std::shared_ptr<RuntimeFilterConsumer>* consumer) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
     std::lock_guard<std::mutex> l(_lock);
-    RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, 
consumer));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, 
consumer));
     _consumer_map[key].push_back(*consumer);
     return Status::OK();
 }
@@ -457,6 +457,25 @@ void 
RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu
     _filter_map.clear();
 }
 
+Status RuntimeFilterMergeControllerEntity::reset_global_rf(
+        QueryContext* query_ctx, const 
google::protobuf::RepeatedField<int32_t>& filter_ids) {
+    for (const auto& filter_id : filter_ids) {
+        GlobalMergeContext* cnt_val;
+        {
+            std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+            cnt_val = &_filter_map[filter_id]; // may inplace construct 
default object
+        }
+        int producer_size = cnt_val->merger->get_expected_producer_num();
+        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, 
&cnt_val->runtime_filter_desc,
+                                                    &cnt_val->merger));
+        cnt_val->merger->set_expected_producer_num(producer_size);
+        cnt_val->arrive_id.clear();
+        cnt_val->source_addrs.clear();
+        cnt_val->done = false;
+    }
+    return Status::OK();
+}
+
 std::string RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
     std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 160babf278d..2b5792877a2 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -83,7 +83,7 @@ public:
 
     // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
-    Status register_consumer_filter(const QueryContext* query_ctx, const 
TRuntimeFilterDesc& desc,
+    Status register_consumer_filter(const RuntimeState* state, const 
TRuntimeFilterDesc& desc,
                                     int node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter);
 
@@ -104,6 +104,27 @@ public:
 
     std::string debug_string();
 
+    std::set<int32_t> get_filter_ids() {
+        std::set<int32_t> ids;
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : _producer_id_set) {
+            ids.insert(id);
+        }
+        for (const auto& kv : _consumer_map) {
+            ids.insert(kv.first);
+        }
+        return ids;
+    }
+
+    void remove_filters(const std::set<int32_t>& filter_ids) {
+        std::lock_guard<std::mutex> l(_lock);
+        for (const auto& id : filter_ids) {
+            _consumer_map.erase(id);
+            _local_merge_map.erase(id);
+            _producer_id_set.erase(id);
+        }
+    }
+
 private:
     /**
      * `_is_global = true` means this runtime filter manager menages 
query-level runtime filters.
@@ -152,6 +173,9 @@ public:
 
     void release_undone_filters(QueryContext* query_ctx);
 
+    Status reset_global_rf(QueryContext* query_ctx,
+                           const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
+
 private:
     Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
                            const TRuntimeFilterDesc* runtime_filter_desc,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 3b413ba92a3..b84c4f32ce5 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1616,6 +1616,57 @@ void 
PInternalService::fold_constant_expr(google::protobuf::RpcController* contr
     }
 }
 
+void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* 
controller,
+                                              const 
PTransmitRecCTEBlockParams* request,
+                                              PTransmitRecCTEBlockResult* 
response,
+                                              google::protobuf::Closure* done) 
{
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block(
+                UniqueId(request->query_id()).to_thrift(),
+                UniqueId(request->fragment_instance_id()).to_thrift(), 
request->node_id(),
+                request->blocks(), request->eos());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::rerun_fragment(google::protobuf::RpcController* 
controller,
+                                      const PRerunFragmentParams* request,
+                                      PRerunFragmentResult* response,
+                                      google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st =
+                
_exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(),
+                                                          
request->fragment_id(), request->stage());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::reset_global_rf(google::protobuf::RpcController* 
controller,
+                                       const PResetGlobalRfParams* request,
+                                       PResetGlobalRfResult* response,
+                                       google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->reset_global_rf(
+                UniqueId(request->query_id()).to_thrift(), 
request->filter_ids());
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
 void PInternalService::transmit_block(google::protobuf::RpcController* 
controller,
                                       const PTransmitDataParams* request,
                                       PTransmitDataResult* response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index d73501bfc80..781a7def178 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -140,6 +140,16 @@ public:
                         const ::doris::PPublishFilterRequestV2* request,
                         ::doris::PPublishFilterResponse* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_rec_cte_block(google::protobuf::RpcController* controller,
+                                const PTransmitRecCTEBlockParams* request,
+                                PTransmitRecCTEBlockResult* response,
+                                google::protobuf::Closure* done) override;
+    void rerun_fragment(google::protobuf::RpcController* controller,
+                        const PRerunFragmentParams* request, 
PRerunFragmentResult* response,
+                        google::protobuf::Closure* done) override;
+    void reset_global_rf(google::protobuf::RpcController* controller,
+                         const PResetGlobalRfParams* request, 
PResetGlobalRfResult* response,
+                         google::protobuf::Closure* done) override;
     void transmit_block(::google::protobuf::RpcController* controller,
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 77ee21368c0..e0e352645c3 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -31,7 +31,7 @@ public:
     void test_signal_aquire(TRuntimeFilterDesc desc) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
     const_cast<TQueryOptions&>(_query_ctx->_query_options)
             .__set_runtime_filter_wait_infinitely(true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
@@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     desc.__set_src_expr(
@@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
                     .build());
 
     {
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     {
         desc.__set_has_local_targets(false);
         desc.__set_has_remote_targets(true);
-        auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer);
+        auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), 
&desc, 0, &consumer);
         ASSERT_FALSE(st.ok());
         desc.__set_has_local_targets(true);
         desc.__set_has_remote_targets(false);
@@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
 
     
desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr());
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+            RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, 
&consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
@@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, 
aquire_signal_at_same_time) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, 
&consumer));
+                RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index d8222e201d9..02b1349c1a6 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
         std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
         EXPECT_TRUE(global_runtime_filter_mgr
-                            ->register_consumer_filter(ctx.get(), desc, 0, 
&consumer_filter)
+                            ->register_consumer_filter(&state, desc, 0, 
&consumer_filter)
                             .ok());
         
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
     }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index afd62513b83..76f821f95a2 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -60,6 +60,45 @@ message PTransmitDataResult {
     optional int64 receive_time = 2;
 };
 
+message PTransmitRecCTEBlockParams {
+    repeated PBlock blocks = 1;
+    optional bool eos = 2;
+    optional PUniqueId query_id = 3;
+    optional PUniqueId fragment_instance_id = 4;
+    optional int32 node_id = 5;
+};
+
+message PTransmitRecCTEBlockResult {
+    optional PStatus status = 1;
+};
+
+
+message PRerunFragmentParams {
+    enum Opcode {
+    wait = 1;    // wait fragment execute done
+    release = 2; // release current round's resource
+    rebuild = 3; // rebuild next round pipeline tasks
+    submit = 4;  // submit tasks to execute
+    close = 5;   // close fragment
+    }
+    optional PUniqueId query_id = 1;
+    optional int32 fragment_id = 2;
+    optional Opcode stage = 3;
+};
+
+message PRerunFragmentResult {
+    optional PStatus status = 1;
+};
+
+message PResetGlobalRfParams {
+    optional PUniqueId query_id = 1;
+    repeated int32 filter_ids = 2;
+};
+
+message PResetGlobalRfResult {
+    optional PStatus status = 1;
+};
+
 message PTabletWithPartition {
     required int64 partition_id = 1;
     required int64 tablet_id = 2;
@@ -1144,6 +1183,9 @@ service PBackendService {
     rpc sync_filter_size(PSyncFilterSizeRequest) returns 
(PSyncFilterSizeResponse);
     rpc apply_filterv2(PPublishFilterRequestV2) returns 
(PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
+    rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult);
+    rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult);
+    rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns 
(PTransmitRecCTEBlockResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns 
(PCheckRPCChannelResponse);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 8aa5d2bd1a5..8204376d961 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -418,9 +418,8 @@ struct TQueryOptions {
   179: optional bool enable_parquet_filter_by_bloom_filter = true;
   180: optional i32 max_file_scanners_concurrency = 0;
   181: optional i32 min_file_scanners_concurrency = 0;
-
-
   182: optional i32 ivf_nprobe = 1;
+  183: optional i32 cte_max_recursion_depth;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
@@ -668,6 +667,7 @@ struct TPipelineFragmentParams {
   // Used by 2.1
   44: optional list<i32> topn_filter_source_node_ids
   45: optional map<string, TAIResource> ai_resources
+  46: optional bool need_notify_close
 
   // For cloud
   1000: optional bool is_mow_table;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 36564210b70..5bf6aa1489a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -60,7 +60,9 @@ enum TPlanNodeType {
   TEST_EXTERNAL_SCAN_NODE = 31,
   PARTITION_SORT_NODE = 32,
   GROUP_COMMIT_SCAN_NODE = 33,
-  MATERIALIZATION_NODE = 34
+  MATERIALIZATION_NODE = 34,
+  REC_CTE_NODE = 35,
+  REC_CTE_SCAN_NODE = 36
 }
 
 struct TKeyRange {
@@ -708,6 +710,29 @@ struct TFileScanNode {
     2: optional string table_name
 }
 
+struct TRecCTETarget {
+    1: optional Types.TNetworkAddress addr
+    2: optional Types.TUniqueId fragment_instance_id
+    3: optional i32 node_id
+}
+
+struct TRecCTEResetInfo {
+    1: optional Types.TNetworkAddress addr
+    2: optional i32 fragment_id
+}
+
+struct TRecCTENode {
+    1: optional bool is_union_all
+    2: optional list<TRecCTETarget> targets
+    3: optional list<TRecCTEResetInfo> fragments_to_reset
+    4: optional list<list<Exprs.TExpr>> result_expr_lists
+    5: optional list<i32> rec_side_runtime_filter_ids
+    6: optional bool is_used_by_other_rec_cte
+}
+
+struct TRecCTEScanNode {
+}
+
 struct TEsScanNode {
     1: required Types.TTupleId tuple_id
     2: optional map<string,string> properties
@@ -1443,6 +1468,7 @@ struct TPlanNode {
   36: optional list<TRuntimeFilterDesc> runtime_filters
   37: optional TGroupCommitScanNode group_commit_scan_node
   38: optional TMaterializationNode materialization_node
+  39: optional TRecCTENode rec_cte_node
 
   // Use in vec exec engine
   40: optional Exprs.TExpr vconjunct
@@ -1465,6 +1491,8 @@ struct TPlanNode {
 
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
   51: optional bool is_serial_operator
+  52: optional TRecCTEScanNode rec_cte_scan_node
+
   // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to