HappenLee commented on code in PR #57358: URL: https://github.com/apache/doris/pull/57358#discussion_r2609923154
########## be/src/pipeline/exec/rec_cte_source_operator.h: ########## @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <gen_cpp/internal_service.pb.h> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "util/brpc_client_cache.h" +#include "util/uid_util.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class DataQueue; + +class RecCTESourceOperatorX; +class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESourceLocalState); + using Base = PipelineXLocalState<RecCTESharedState>; + using Parent = RecCTESourceOperatorX; + RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent); + + Status open(RuntimeState* state) override; + Status init(RuntimeState* state, LocalStateInfo& info) override; + + bool is_blockable() const override { return true; } + + std::vector<Dependency*> dependencies() const override { + return std::vector<Dependency*> {_dependency, _anchor_dependency.get()}; + } + +private: + friend class RecCTESourceOperatorX; + friend class OperatorX<RecCTESourceLocalState>; + + vectorized::VExprContextSPtrs _child_expr; + + std::shared_ptr<Dependency> _anchor_dependency = nullptr; +}; + +class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> { +public: + using Base = OperatorX<RecCTESourceLocalState>; + RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _is_union_all(tnode.rec_cte_node.is_union_all), + _targets(tnode.rec_cte_node.targets), + _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset), + _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids), + _is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) { + DCHECK(tnode.__isset.rec_cte_node); + } + ~RecCTESourceOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + Status terminate(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::terminate(state); + } + + Status close(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::close(state); + } + + Status set_child(OperatorPtr child) override { + Base::_child = child; + return Status::OK(); + } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + auto& ctx = local_state._shared_state; + ctx->update_ready_to_return(); + + if (!ctx->ready_to_return) { + if (ctx->current_round + 1 > _max_recursion_depth) { + return Status::Aborted("reach cte_max_recursion_depth {}", _max_recursion_depth); + } + + ctx->source_dep->block(); + // ctx->blocks.size() may be changed after _recursive_process + int current_blocks_size = int(ctx->blocks.size()); + RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset)); + ctx->current_round++; + ctx->last_round_offset = current_blocks_size; + } else { + if (ctx->blocks.empty()) { + *eos = true; + } else { + block->swap(ctx->blocks.back()); + RETURN_IF_ERROR( + local_state.filter_block(local_state.conjuncts(), block, block->columns())); + ctx->blocks.pop_back(); + } + } + return Status::OK(); + } + + bool is_source() const override { return true; } + +private: + Status _send_close(RuntimeState* state) { + if (!_aready_send_close && !_is_used_by_other_rec_cte) { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close)); + _aready_send_close = true; Review Comment: the word should be `already ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
