HappenLee commented on code in PR #57358:
URL: https://github.com/apache/doris/pull/57358#discussion_r2609934065


##########
be/src/pipeline/exec/rec_cte_source_operator.h:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <gen_cpp/internal_service.pb.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/union_sink_operator.h"
+#include "pipeline/rec_cte_shared_state.h"
+#include "util/brpc_client_cache.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class RecCTESourceOperatorX;
+class RecCTESourceLocalState final : public 
PipelineXLocalState<RecCTESharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(RecCTESourceLocalState);
+    using Base = PipelineXLocalState<RecCTESharedState>;
+    using Parent = RecCTESourceOperatorX;
+    RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent);
+
+    Status open(RuntimeState* state) override;
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    bool is_blockable() const override { return true; }
+
+    std::vector<Dependency*> dependencies() const override {
+        return std::vector<Dependency*> {_dependency, 
_anchor_dependency.get()};
+    }
+
+private:
+    friend class RecCTESourceOperatorX;
+    friend class OperatorX<RecCTESourceLocalState>;
+
+    vectorized::VExprContextSPtrs _child_expr;
+
+    std::shared_ptr<Dependency> _anchor_dependency = nullptr;
+};
+
+class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
+public:
+    using Base = OperatorX<RecCTESourceLocalState>;
+    RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                          const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs),
+              _is_union_all(tnode.rec_cte_node.is_union_all),
+              _targets(tnode.rec_cte_node.targets),
+              _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset),
+              _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids),
+              
_is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) {
+        DCHECK(tnode.__isset.rec_cte_node);
+    }
+    ~RecCTESourceOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+    Status terminate(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::terminate(state);
+    }
+
+    Status close(RuntimeState* state) override {
+        RETURN_IF_ERROR(_send_close(state));
+        return Base::close(state);
+    }
+
+    Status set_child(OperatorPtr child) override {
+        Base::_child = child;
+        return Status::OK();
+    }
+
+    bool is_serial_operator() const override { return true; }
+
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return {ExchangeType::NOOP};
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        auto& local_state = get_local_state(state);
+        auto& ctx = local_state._shared_state;
+        ctx->update_ready_to_return();
+
+        if (!ctx->ready_to_return) {
+            if (ctx->current_round + 1 > _max_recursion_depth) {
+                return Status::Aborted("reach cte_max_recursion_depth {}", 
_max_recursion_depth);
+            }
+
+            ctx->source_dep->block();
+            // ctx->blocks.size() may be changed after _recursive_process
+            int current_blocks_size = int(ctx->blocks.size());
+            RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset));
+            ctx->current_round++;
+            ctx->last_round_offset = current_blocks_size;
+        } else {
+            if (ctx->blocks.empty()) {
+                *eos = true;
+            } else {
+                block->swap(ctx->blocks.back());
+                RETURN_IF_ERROR(
+                        local_state.filter_block(local_state.conjuncts(), 
block, block->columns()));
+                ctx->blocks.pop_back();
+            }
+        }
+        return Status::OK();
+    }
+
+    bool is_source() const override { return true; }
+
+private:
+    Status _send_close(RuntimeState* state) {
+        if (!_aready_send_close && !_is_used_by_other_rec_cte) {
+            RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::close));
+            _aready_send_close = true;
+
+            auto* round_counter = 
ADD_COUNTER(get_local_state(state).Base::custom_profile(),
+                                              "RecursiveRound", TUnit::UNIT);
+            
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
+        }
+        return Status::OK();
+    }
+
+    Status _recursive_process(RuntimeState* state, size_t last_round_offset) 
const {
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::wait));
+        RETURN_IF_ERROR(_send_reset_global_rf(state));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::release));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::rebuild));
+        RETURN_IF_ERROR(_send_rerun_fragments(state, 
PRerunFragmentParams::submit));
+        
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
+                state, last_round_offset));
+        return Status::OK();
+    }
+
+    Status _send_reset_global_rf(RuntimeState* state) const {
+        TNetworkAddress addr;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+        auto stub =
+                
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr);
+        PResetGlobalRfParams request;
+        
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
+        for (auto filter_id : _global_rf_ids) {
+            request.add_filter_ids(filter_id);
+        }
+
+        PResetGlobalRfResult result;
+        brpc::Controller controller;
+        controller.set_timeout_ms(
+                
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
+        stub->reset_global_rf(&controller, &request, &result, 
brpc::DoNothing());
+        brpc::Join(controller.call_id());
+        return Status::create(result.status());
+    }
+
+    Status _send_rerun_fragments(RuntimeState* state, 
PRerunFragmentParams_Opcode stage) const {
+        for (auto fragment : _fragments_to_reset) {
+            if (state->fragment_id() == fragment.fragment_id) {

Review Comment:
   the check seems only need do once time in init or open



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to