This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_rec4 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 21beaee3c1c46450cb69eeb56acbefc243ff80fd Author: BiteTheDDDDt <[email protected]> AuthorDate: Wed Dec 10 20:19:14 2025 +0800 support rec cte (be part / proto part) --- be/src/pipeline/exec/exchange_source_operator.cpp | 5 + be/src/pipeline/exec/exchange_source_operator.h | 19 ++ be/src/pipeline/exec/operator.cpp | 10 + be/src/pipeline/exec/operator.h | 2 + .../pipeline/exec/rec_cte_anchor_sink_operator.cpp | 56 +++++ .../pipeline/exec/rec_cte_anchor_sink_operator.h | 114 ++++++++++ be/src/pipeline/exec/rec_cte_scan_operator.h | 89 ++++++++ be/src/pipeline/exec/rec_cte_sink_operator.cpp | 55 +++++ be/src/pipeline/exec/rec_cte_sink_operator.h | 101 +++++++++ be/src/pipeline/exec/rec_cte_source_operator.cpp | 86 ++++++++ be/src/pipeline/exec/rec_cte_source_operator.h | 228 +++++++++++++++++++ be/src/pipeline/exec/union_sink_operator.h | 42 ++-- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 242 +++++++++++++++------ be/src/pipeline/pipeline_fragment_context.h | 18 +- be/src/pipeline/pipeline_task.cpp | 25 ++- be/src/pipeline/rec_cte_shared_state.h | 177 +++++++++++++++ be/src/runtime/fragment_mgr.cpp | 60 +++++ be/src/runtime/fragment_mgr.h | 11 + be/src/runtime/query_context.cpp | 47 ++++ be/src/runtime/query_context.h | 28 ++- be/src/runtime/runtime_predicate.h | 1 + be/src/runtime/runtime_state.cpp | 16 +- be/src/runtime/runtime_state.h | 19 +- be/src/runtime_filter/runtime_filter_consumer.h | 15 +- be/src/runtime_filter/runtime_filter_merger.h | 2 + be/src/runtime_filter/runtime_filter_mgr.cpp | 23 +- be/src/runtime_filter/runtime_filter_mgr.h | 26 ++- be/src/service/internal_service.cpp | 51 +++++ be/src/service/internal_service.h | 10 + .../runtime_filter_consumer_test.cpp | 20 +- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 2 +- gensrc/proto/internal_service.proto | 42 ++++ gensrc/thrift/PaloInternalService.thrift | 4 +- gensrc/thrift/PlanNodes.thrift | 30 ++- 35 files changed, 1554 insertions(+), 124 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 76c2c4ab093..b6ccc910a64 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) { _is_closed = true; return OperatorX<ExchangeLocalState>::close(state); } + +Status ExchangeSourceOperatorX::reset(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.reset(state); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 3008217e130..03f2a288cdf 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -48,6 +48,23 @@ public: Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; + Status reset(RuntimeState* state) { + if (stream_recvr) { + stream_recvr->close(); + } + create_stream_recvr(state); + + is_ready = false; + num_rows_skipped = 0; + + const auto& queues = stream_recvr->sender_queues(); + for (size_t i = 0; i < queues.size(); i++) { + deps[i]->block(); + queues[i]->set_dependency(deps[i]); + } + return Status::OK(); + } + std::vector<Dependency*> dependencies() const override { std::vector<Dependency*> dep_vec; std::for_each(deps.begin(), deps.end(), @@ -83,6 +100,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; + Status reset(RuntimeState* state); + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 37ad694199c..4879a06915a 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -62,6 +62,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) DECLARE_OPERATOR(CacheSinkLocalState) DECLARE_OPERATOR(DictSinkLocalState) +DECLARE_OPERATOR(RecCTESinkLocalState) +DECLARE_OPERATOR(RecCTEAnchorSinkLocalState) #undef DECLARE_OPERATOR @@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) DECLARE_OPERATOR(CacheSourceLocalState) +DECLARE_OPERATOR(RecCTESourceLocalState) +DECLARE_OPERATOR(RecCTEScanLocalState) #ifdef BE_TEST DECLARE_OPERATOR(MockLocalState) @@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; template class PipelineXSinkLocalState<DataQueueSharedState>; +template class PipelineXSinkLocalState<RecCTESharedState>; template class PipelineXLocalState<HashJoinSharedState>; template class PipelineXLocalState<PartitionedHashJoinSharedState>; @@ -888,6 +897,7 @@ template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; template class PipelineXLocalState<LocalExchangeSharedState>; template class PipelineXLocalState<BasicSharedState>; +template class PipelineXLocalState<RecCTESharedState>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index b6328876008..543c303ce80 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -613,6 +613,8 @@ public: // For agg/sort/join sink. virtual Status init(const TPlanNode& tnode, RuntimeState* state); + virtual bool need_rerun(RuntimeState* state) const { return false; } + Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle, diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp new file mode 100644 index 00000000000..f1552ed8e5e --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[0]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + _name = "REC_CTE_ANCHOR_SINK_OPERATOR"; + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h new file mode 100644 index 00000000000..340949a2f79 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTEAnchorSinkOperatorX; +class RecCTEAnchorSinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState); + RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + + bool is_blockable() const override { return true; } + +private: + friend class RecCTEAnchorSinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTEAnchorSinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final) + : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>; + + friend class RecCTEAnchorSinkLocalState; + RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTEAnchorSinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + if (_need_notify_rec_side_ready) { + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0)); + _need_notify_rec_side_ready = false; + } + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->anchor_dep->set_ready(); + } + return Status::OK(); + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { + std::shared_ptr<BasicSharedState> ss = std::make_shared<RecCTESharedState>(); + ss->id = operator_id(); + for (const auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; + + bool _need_notify_rec_side_ready = true; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h b/be/src/pipeline/exec/rec_cte_scan_operator.h new file mode 100644 index 00000000000..5b03766c163 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_scan_operator.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "pipeline/exec/operator.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; +} // namespace doris + +namespace doris::pipeline { + +class RecCTEScanOperatorX; +class RecCTEScanLocalState final : public PipelineXLocalState<> { +public: + ENABLE_FACTORY_CREATOR(RecCTEScanLocalState); + + RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState<>(state, parent) { + _scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_DEPENDENCY"); + state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), parent->node_id(), + this); + } + ~RecCTEScanLocalState() override { + state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(), + parent()->node_id()); + } + + Status add_block(const PBlock& pblock) { + vectorized::Block block; + size_t uncompressed_bytes; + int64_t decompress_time; + RETURN_IF_ERROR(block.deserialize(pblock, &uncompressed_bytes, &decompress_time)); + _blocks.emplace_back(std::move(block)); + return Status::OK(); + } + + void set_ready() { _scan_dependency->set_ready(); } + + std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; } + +private: + friend class RecCTEScanOperatorX; + std::vector<vectorized::Block> _blocks; + DependencySPtr _scan_dependency = nullptr; +}; + +class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> { +public: + RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) {} + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + + if (local_state._blocks.empty()) { + *eos = true; + return Status::OK(); + } + *block = std::move(local_state._blocks.back()); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns())); + local_state._blocks.pop_back(); + return Status::OK(); + } + + bool is_source() const override { return true; } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_sink_operator.cpp new file mode 100644 index 00000000000..1508b478bca --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTESinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h b/be/src/pipeline/exec/rec_cte_sink_operator.h new file mode 100644 index 00000000000..2321ba16ea3 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTESinkOperatorX; +class RecCTESinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESinkLocalState); + RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + +private: + friend class RecCTESinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTESinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTESinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<RecCTESinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTESinkLocalState>; + + friend class RecCTESinkLocalState; + RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTESinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool need_rerun(RuntimeState* state) const override { + return get_local_state(state)._shared_state->ready_to_return == false; + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->source_dep->set_ready(); + } + return Status::OK(); + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp b/be/src/pipeline/exec/rec_cte_source_operator.cpp new file mode 100644 index 00000000000..9a02bb947b8 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_source_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + +Status RecCTESourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + return Status::OK(); +} + +Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets; + _shared_state->max_recursion_depth = + _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth; + _shared_state->source_dep = _dependency; + _anchor_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_ANCHOR_DEPENDENCY"); + _shared_state->anchor_dep = _anchor_dependency.get(); + + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) { + _shared_state->agg_data = std::make_unique<DistinctDataVariants>(); + RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(), + get_data_types(_child_expr), false)); + } + + _shared_state->hash_table_compute_timer = + ADD_TIMER(Base::custom_profile(), "HashTableComputeTime"); + _shared_state->hash_table_emplace_timer = + ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime"); + _shared_state->hash_table_input_counter = + ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT); + return Status::OK(); +} + +Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + + _max_recursion_depth = state->cte_max_recursion_depth(); + + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h b/be/src/pipeline/exec/rec_cte_source_operator.h new file mode 100644 index 00000000000..e36358161d2 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.h @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <gen_cpp/internal_service.pb.h> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "util/brpc_client_cache.h" +#include "util/uid_util.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class DataQueue; + +class RecCTESourceOperatorX; +class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESourceLocalState); + using Base = PipelineXLocalState<RecCTESharedState>; + using Parent = RecCTESourceOperatorX; + RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent); + + Status open(RuntimeState* state) override; + Status init(RuntimeState* state, LocalStateInfo& info) override; + + bool is_blockable() const override { return true; } + + std::vector<Dependency*> dependencies() const override { + return std::vector<Dependency*> {_dependency, _anchor_dependency.get()}; + } + +private: + friend class RecCTESourceOperatorX; + friend class OperatorX<RecCTESourceLocalState>; + + vectorized::VExprContextSPtrs _child_expr; + + std::shared_ptr<Dependency> _anchor_dependency = nullptr; +}; + +class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> { +public: + using Base = OperatorX<RecCTESourceLocalState>; + RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _is_union_all(tnode.rec_cte_node.is_union_all), + _targets(tnode.rec_cte_node.targets), + _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset), + _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids), + _is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) { + DCHECK(tnode.__isset.rec_cte_node); + } + ~RecCTESourceOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + Status terminate(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::terminate(state); + } + + Status close(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::close(state); + } + + Status set_child(OperatorPtr child) override { + Base::_child = child; + return Status::OK(); + } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + auto& ctx = local_state._shared_state; + ctx->update_ready_to_return(); + + if (!ctx->ready_to_return) { + if (ctx->current_round + 1 > _max_recursion_depth) { + return Status::Aborted("reach cte_max_recursion_depth {}", _max_recursion_depth); + } + + ctx->source_dep->block(); + // ctx->blocks.size() may be changed after _recursive_process + int current_blocks_size = int(ctx->blocks.size()); + RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset)); + ctx->current_round++; + ctx->last_round_offset = current_blocks_size; + } else { + if (ctx->blocks.empty()) { + *eos = true; + } else { + block->swap(ctx->blocks.back()); + RETURN_IF_ERROR( + local_state.filter_block(local_state.conjuncts(), block, block->columns())); + ctx->blocks.pop_back(); + } + } + return Status::OK(); + } + + bool is_source() const override { return true; } + +private: + Status _send_close(RuntimeState* state) { + if (!_aready_send_close && !_is_used_by_other_rec_cte) { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close)); + _aready_send_close = true; + + auto* round_counter = ADD_COUNTER(get_local_state(state).Base::custom_profile(), + "RecursiveRound", TUnit::UNIT); + round_counter->set(int64_t(get_local_state(state)._shared_state->current_round)); + } + return Status::OK(); + } + + Status _recursive_process(RuntimeState* state, size_t last_round_offset) const { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::wait)); + RETURN_IF_ERROR(_send_reset_global_rf(state)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::release)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::rebuild)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::submit)); + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets( + state, last_round_offset)); + return Status::OK(); + } + + Status _send_reset_global_rf(RuntimeState* state) const { + TNetworkAddress addr; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr)); + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr); + PResetGlobalRfParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + for (auto filter_id : _global_rf_ids) { + request.add_filter_ids(filter_id); + } + + PResetGlobalRfResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->reset_global_rf(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + return Status::create(result.status()); + } + + Status _send_rerun_fragments(RuntimeState* state, PRerunFragmentParams_Opcode stage) const { + for (auto fragment : _fragments_to_reset) { + if (state->fragment_id() == fragment.fragment_id) { + return Status::InternalError("Fragment {} contains a recursive CTE node", + fragment.fragment_id); + } + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + fragment.addr); + + PRerunFragmentParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.set_fragment_id(fragment.fragment_id); + request.set_stage(stage); + + PRerunFragmentResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->rerun_fragment(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + if (controller.Failed()) { + return Status::InternalError(controller.ErrorText()); + } + + RETURN_IF_ERROR(Status::create(result.status())); + } + return Status::OK(); + } + + friend class RecCTESourceLocalState; + + vectorized::VExprContextSPtrs _child_expr; + + const bool _is_union_all = false; + std::vector<TRecCTETarget> _targets; + std::vector<TRecCTEResetInfo> _fragments_to_reset; + std::vector<int> _global_rf_ids; + + int _max_recursion_depth = 0; + + bool _aready_send_close = false; + + bool _is_used_by_other_rec_cte = false; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 06b95cdf92d..764f8099427 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -29,6 +29,24 @@ namespace doris { #include "common/compile_check_begin.h" class RuntimeState; +inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs, + vectorized::Block* src_block, vectorized::Block* res_block, + bool need_clone) { + vectorized::ColumnsWithTypeAndName columns; + auto rows = src_block->rows(); + for (const auto& expr : exprs) { + int result_column_id = -1; + RETURN_IF_ERROR(expr->execute(src_block, &result_column_id)); + const auto& src_col_with_type = src_block->get_by_position(result_column_id); + vectorized::ColumnPtr cloned_col = need_clone + ? src_col_with_type.column->clone_resized(rows) + : src_col_with_type.column; + columns.emplace_back(cloned_col, src_col_with_type.type, src_col_with_type.name); + } + *res_block = {columns}; + return Status::OK(); +} + namespace pipeline { class DataQueue; @@ -153,27 +171,17 @@ private: vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, row_descriptor()); vectorized::Block res; - RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); + auto& local_state = get_local_state(state); + { + SCOPED_TIMER(local_state._expr_timer); + RETURN_IF_ERROR( + materialize_block(local_state._child_expr, input_block, &res, false)); + } + local_state._child_row_idx += res.rows(); RETURN_IF_ERROR(mblock.merge(res)); } return Status::OK(); } - - Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, - vectorized::Block* res_block) { - auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._expr_timer); - const auto& child_exprs = local_state._child_expr; - vectorized::ColumnsWithTypeAndName colunms; - for (size_t i = 0; i < child_exprs.size(); ++i) { - int result_column_id = -1; - RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); - colunms.emplace_back(src_block->get_by_position(result_column_id)); - } - local_state._child_row_idx += src_block->rows(); - *res_block = {colunms}; - return Status::OK(); - } }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 1d0ddcf178e..2a20a5cd73d 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -133,7 +133,7 @@ public: fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i)); } fmt::format_to(debug_string_buffer, "\n{}", - _sink->debug_string(cast_set<int>(_operators.size()))); + _sink ? _sink->debug_string(cast_set<int>(_operators.size())) : "null"); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b5c2f7084dc..312bfce2be4 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -82,6 +82,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext( _is_report_on_cancel(true), _report_status_cb(std::move(report_status_cb)), _params(request), - _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0) { + _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0), + _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close + : false) { _fragment_watcher.start(); } @@ -142,24 +148,13 @@ PipelineFragmentContext::~PipelineFragmentContext() { LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id); - // The memory released by the query end is recorded in the query mem tracker. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); - auto st = _query_ctx->exec_status(); - for (size_t i = 0; i < _tasks.size(); i++) { - if (!_tasks[i].empty()) { - _call_back(_tasks[i].front().first->runtime_state(), &st); - } + _release_resource(); + { + // The memory released by the query end is recorded in the query mem tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + _runtime_state.reset(); + _query_ctx.reset(); } - _tasks.clear(); - _dag.clear(); - _pip_id_to_pipeline.clear(); - _pipelines.clear(); - _sink.reset(); - _root_op.reset(); - _runtime_state.reset(); - _runtime_filter_mgr_map.clear(); - _op_id_to_shared_state.clear(); - _query_ctx.reset(); } bool PipelineFragmentContext::is_timeout(timespec now) const { @@ -253,6 +248,54 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } +Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) { + { + SCOPED_TIMER(_build_pipelines_timer); + // 2. Build pipelines with operators in this fragment. + auto root_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, + &_root_op, root_pipeline)); + + // 3. Create sink operator + if (!_params.fragment.__isset.output_sink) { + return Status::InternalError("No output sink in this fragment!"); + } + RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, + _params.fragment.output_exprs, _params, + root_pipeline->output_row_desc(), _runtime_state.get(), + *_desc_tbl, root_pipeline->id())); + RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); + RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); + + for (PipelinePtr& pipeline : _pipelines) { + DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); + RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); + } + } + // 4. Build local exchanger + if (_runtime_state->enable_local_shuffle()) { + SCOPED_TIMER(_plan_local_exchanger_timer); + RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, + _params.bucket_seq_to_instance_idx, + _params.shuffle_idx_to_instance_idx)); + } + + // 5. Initialize global states in pipelines. + for (PipelinePtr& pipeline : _pipelines) { + SCOPED_TIMER(_prepare_all_pipelines_timer); + pipeline->children().clear(); + RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); + } + + { + SCOPED_TIMER(_build_tasks_timer); + // 6. Build pipeline tasks and initialize local state. + RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); + } + + return Status::OK(); +} + Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { if (_prepared) { return Status::InternalError("Already prepared"); @@ -277,7 +320,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { auto* fragment_context = this; - if (_params.query_options.__isset.is_report_success) { + if (!_need_notify_close && _params.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(_params.query_options.is_report_success); } @@ -322,49 +365,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { } } - { - SCOPED_TIMER(_build_pipelines_timer); - // 2. Build pipelines with operators in this fragment. - auto root_pipeline = add_pipeline(); - RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, - &_root_op, root_pipeline)); - - // 3. Create sink operator - if (!_params.fragment.__isset.output_sink) { - return Status::InternalError("No output sink in this fragment!"); - } - RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, - _params.fragment.output_exprs, _params, - root_pipeline->output_row_desc(), _runtime_state.get(), - *_desc_tbl, root_pipeline->id())); - RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); - RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); - - for (PipelinePtr& pipeline : _pipelines) { - DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); - RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); - } - } - // 4. Build local exchanger - if (_runtime_state->enable_local_shuffle()) { - SCOPED_TIMER(_plan_local_exchanger_timer); - RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, - _params.bucket_seq_to_instance_idx, - _params.shuffle_idx_to_instance_idx)); - } - - // 5. Initialize global states in pipelines. - for (PipelinePtr& pipeline : _pipelines) { - SCOPED_TIMER(_prepare_all_pipelines_timer); - pipeline->children().clear(); - RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); - } - - { - SCOPED_TIMER(_build_tasks_timer); - // 6. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); - } + RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool)); _init_next_report_time(); @@ -374,6 +375,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { _total_tasks = 0; + _closed_tasks = 0; const auto target_size = _params.local_params.size(); _tasks.resize(target_size); _runtime_filter_mgr_map.resize(target_size); @@ -422,6 +424,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { task_runtime_state->set_task_execution_context(shared_from_this()); task_runtime_state->set_be_number(local_params.backend_num); + if (_need_notify_close) { + // rec cte require child rf to wait infinitely to make sure all rpc done + task_runtime_state->set_force_make_rf_wait_infinite(); + } if (_params.__isset.backend_id) { task_runtime_state->set_backend_id(_params.backend_id); @@ -1645,6 +1651,42 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); break; } + case TPlanNodeType::REC_CTE_NODE: { + op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + + PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id()); + + DataSinkOperatorPtr anchor_sink; + anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(), + op->operator_id(), tnode, descs); + RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink)); + RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), anchor_side_pipe); + + PipelinePtr rec_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(rec_side_pipe->id()); + + DataSinkOperatorPtr rec_sink; + rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(), + tnode, descs); + RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink)); + RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), rec_side_pipe); + + break; + } + case TPlanNodeType::REC_CTE_SCAN_NODE: { + op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); @@ -1750,7 +1792,10 @@ void PipelineFragmentContext::_close_fragment_instance() { if (_is_fragment_instance_closed) { return; } - Defer defer_op {[&]() { _is_fragment_instance_closed = true; }}; + Defer defer_op {[&]() { + _is_fragment_instance_closed = true; + _close_cv.notify_all(); + }}; _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast<void>(send_report(true)); // Print profile content in info log is a tempoeray solution for stream load and external_connector. @@ -1785,8 +1830,10 @@ void PipelineFragmentContext::_close_fragment_instance() { collect_realtime_load_channel_profile()); } - // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + if (!_need_notify_close) { + // all submitted tasks done + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } } void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { @@ -1923,8 +1970,10 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const } std::string PipelineFragmentContext::debug_string() { + std::lock_guard<std::mutex> l(_task_mutex); fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n"); + fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\nneed_notify_close: {}\n", + _need_notify_close); for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { @@ -1998,5 +2047,66 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const { _runtime_state->profile_level()); return load_channel_profile; } + +Status PipelineFragmentContext::wait_close(bool close) { + if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) { + return Status::InternalError("stream load do not support reset"); + } + if (!_need_notify_close) { + return Status::InternalError("_need_notify_close is false, do not support reset"); + } + + { + std::unique_lock<std::mutex> lock(_task_mutex); + _close_cv.wait(lock, [this] { return _is_fragment_instance_closed.load(); }); + } + + if (close) { + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } + return Status::OK(); +} + +Status PipelineFragmentContext::set_to_rerun() { + { + std::lock_guard<std::mutex> l(_task_mutex); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + for (size_t i = 0; i < _tasks.size(); i++) { + if (!_tasks[i].empty()) { + _tasks[i].front().first->runtime_state()->reset_to_rerun(); + } + } + } + _release_resource(); + _runtime_state->reset_to_rerun(); + return Status::OK(); +} + +Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) { + _submitted = false; + _is_fragment_instance_closed = false; + return _build_and_prepare_full_pipeline(thread_pool); +} + +void PipelineFragmentContext::_release_resource() { + std::lock_guard<std::mutex> l(_task_mutex); + // The memory released by the query end is recorded in the query mem tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + auto st = _query_ctx->exec_status(); + for (auto& _task : _tasks) { + if (!_task.empty()) { + _call_back(_task.front().first->runtime_state(), &st); + } + } + _tasks.clear(); + _dag.clear(); + _pip_id_to_pipeline.clear(); + _pipelines.clear(); + _sink.reset(); + _root_op.reset(); + _runtime_filter_mgr_map.clear(); + _op_id_to_shared_state.clear(); +} + #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 81b3f57b01f..40bb80f72ec 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -115,6 +115,9 @@ public: [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; void clear_finished_tasks() { + if (_need_notify_close) { + return; + } for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { _tasks[j][i].first->stop_if_finished(); @@ -125,7 +128,17 @@ public: std::string get_load_error_url(); std::string get_first_error_msg(); + Status wait_close(bool close); + Status rebuild(ThreadPool* thread_pool); + Status set_to_rerun(); + + bool need_notify_close() const { return _need_notify_close; } + private: + void _release_resource(); + + Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool); + Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes, @@ -182,6 +195,7 @@ private: Pipelines _pipelines; PipelineId _next_pipeline_id = 0; std::mutex _task_mutex; + std::condition_variable _close_cv; int _closed_tasks = 0; // After prepared, `_total_tasks` is equal to the size of `_tasks`. // When submit fail, `_total_tasks` is equal to the number of tasks submitted. @@ -203,7 +217,7 @@ private: RuntimeProfile::Counter* _build_tasks_timer = nullptr; std::function<void(RuntimeState*, Status*)> _call_back; - bool _is_fragment_instance_closed = false; + std::atomic_bool _is_fragment_instance_closed = false; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. @@ -323,6 +337,8 @@ private: TPipelineFragmentParams _params; int32_t _parallel_instances = 0; + + bool _need_notify_close = false; }; } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 930e885fc3b..edc56332058 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -30,6 +30,7 @@ #include "common/logging.h" #include "common/status.h" #include "pipeline/dependency.h" +#include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/pipeline.h" @@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) { } } - if (_eos) { - RETURN_IF_ERROR(close(Status::OK(), false)); - } - DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { auto required_pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( @@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) { RETURN_IF_ERROR(block->check_type_and_column()); status = _sink->sink(_state, block, _eos); + if (_eos) { + if (_sink->need_rerun(_state)) { + if (auto* source = dynamic_cast<ExchangeSourceOperatorX*>(_root); + source != nullptr) { + RETURN_IF_ERROR(source->reset(_state)); + _eos = false; + } else { + return Status::InternalError( + "Only ExchangeSourceOperatorX can be rerun, real is {}", + _root->get_name()); + } + } else { + RETURN_IF_ERROR(close(Status::OK(), false)); + } + } + if (status.is<ErrorCode::END_OF_FILE>()) { set_wake_up_early(); return Status::OK(); @@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* d _blocked_dep = nullptr; auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this()); RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE)); - RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + if (auto f = _fragment_context.lock(); f) { + RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + } return Status::OK(); } diff --git a/be/src/pipeline/rec_cte_shared_state.h b/be/src/pipeline/rec_cte_shared_state.h new file mode 100644 index 00000000000..effd50395a5 --- /dev/null +++ b/be/src/pipeline/rec_cte_shared_state.h @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "dependency.h" +#include "pipeline/common/distinct_agg_utils.h" +#include "util/brpc_client_cache.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +struct RecCTESharedState : public BasicSharedState { + std::vector<TRecCTETarget> targets; + std::vector<vectorized::Block> blocks; + vectorized::IColumn::Selector distinct_row; + Dependency* source_dep = nullptr; + Dependency* anchor_dep = nullptr; + vectorized::Arena arena; + RuntimeProfile::Counter* hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* hash_table_input_counter = nullptr; + + std::unique_ptr<DistinctDataVariants> agg_data = nullptr; + + int current_round = 0; + int last_round_offset = 0; + int max_recursion_depth = 0; + bool ready_to_return = false; + + void update_ready_to_return() { + if (last_round_offset == blocks.size()) { + ready_to_return = true; + } + } + + Status emplace_block(RuntimeState* state, vectorized::Block&& block) { + if (agg_data) { + auto num_rows = uint32_t(block.rows()); + vectorized::ColumnRawPtrs raw_columns; + std::vector<vectorized::ColumnPtr> columns = block.get_columns_and_convert(); + for (auto& col : columns) { + raw_columns.push_back(col.get()); + } + + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "uninited hash table"); + }, + [&](auto& agg_method) -> void { + SCOPED_TIMER(hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + + AggState agg_state(raw_columns); + agg_method.init_serialized_keys(raw_columns, num_rows); + distinct_row.clear(); + + size_t row = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, arena); + ctor(key); + distinct_row.push_back(row); + }; + auto creator_for_null_key = [&]() { + distinct_row.push_back(row); + }; + + SCOPED_TIMER(hash_table_emplace_timer); + for (; row < num_rows; ++row) { + agg_method.lazy_emplace(agg_state, row, creator, + creator_for_null_key); + } + COUNTER_UPDATE(hash_table_input_counter, num_rows); + }}, + agg_data->method_variant); + + if (distinct_row.size() == block.rows()) { + blocks.emplace_back(std::move(block)); + } else if (!distinct_row.empty()) { + auto distinct_block = vectorized::MutableBlock(block.clone_empty()); + RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, distinct_row)); + blocks.emplace_back(distinct_block.to_block()); + } + } else { + blocks.emplace_back(std::move(block)); + } + return Status::OK(); + } + + PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state, + const TRecCTETarget& target) const { + PTransmitRecCTEBlockParams request; + request.set_node_id(target.node_id); + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.mutable_fragment_instance_id()->CopyFrom( + UniqueId(target.fragment_instance_id).to_proto()); + return request; + } + + Status send_data_to_targets(RuntimeState* state, size_t round_offset) const { + int send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; + int block_number_per_target = + int(blocks.size() - round_offset + targets.size() - 1) / targets.size(); + for (auto target : targets) { + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + target.addr); + if (!stub) { + return Status::InternalError(fmt::format("Get rpc stub failed, host={}, port={}", + target.addr.hostname, target.addr.port)); + } + + // send blocks + int step = block_number_per_target; + while (round_offset < blocks.size() && step > 0) { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + auto current_bytes = 0; + while (round_offset < blocks.size() && step > 0 && + current_bytes < send_multi_blocks_byte_size) { + auto* pblock = request.add_blocks(); + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compress_time; + RETURN_IF_ERROR(blocks[round_offset].serialize( + state->be_exec_version(), pblock, &uncompressed_bytes, + &compressed_bytes, &compress_time, + state->fragement_transmission_compression_type())); + round_offset++; + step--; + current_bytes += compressed_bytes; + } + request.set_eos(false); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + + // send eos + { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + request.set_eos(true); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + } + return Status::OK(); + } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index fed6c3931fe..ad858d8deb9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1411,6 +1411,66 @@ Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatis print_id(query_id), query_stats); } +Status FragmentMgr::transmit_rec_cte_block( + const TUniqueId& query_id, const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, eos); + } else { + return Status::EndOfFile( + "Transmit rec cte block failed: Query context (query-id: {}) not found, maybe " + "finished", + print_id(query_id)); + } +} + +Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + auto fragment_ctx = _pipeline_map.find({query_id, fragment}); + if (!fragment_ctx) { + return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found", + print_id(query_id), fragment); + } + + if (stage == PRerunFragmentParams::wait) { + return fragment_ctx->wait_close(false); + } + if (stage == PRerunFragmentParams::release) { + return fragment_ctx->set_to_rerun(); + } + if (stage == PRerunFragmentParams::rebuild) { + return fragment_ctx->rebuild(_thread_pool.get()); + } + if (stage == PRerunFragmentParams::submit) { + return fragment_ctx->submit(); + } + if (stage == PRerunFragmentParams::close) { + return fragment_ctx->wait_close(true); + } + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + +Status FragmentMgr::reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->reset_global_rf(filter_ids); + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 826bca81901..080e081fca1 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -184,6 +184,17 @@ public: std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id); + Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& instance_id, + int node_id, + const google::protobuf::RepeatedPtrField<PBlock>& pblocks, + bool eos); + + Status rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage); + + Status reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: struct BrpcItem { TNetworkAddress network_address; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2b142f9a9f0..fb1222d019f 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "olap/olap_common.h" #include "pipeline/dependency.h" +#include "pipeline/exec/rec_cte_scan_operator.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() { continue; } + if (fragment_ctx->need_notify_close()) { + continue; + } + auto profile = fragment_ctx->collect_realtime_profile(); if (profile.empty()) { @@ -497,4 +502,46 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status() { return exec_status; } +Status QueryContext::send_block_to_cte_scan( + const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto it = _cte_scan.find(std::make_pair(instance_id, node_id)); + if (it == _cte_scan.end()) { + return Status::InternalError("RecCTEScan not found for instance {}, node {}", + print_id(instance_id), node_id); + } + for (const auto& pblock : pblocks) { + RETURN_IF_ERROR(it->second->add_block(pblock)); + } + if (eos) { + it->second->set_ready(); + } + return Status::OK(); +} + +void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.emplace(key, scan); +} + +void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int node_id) { + std::lock_guard<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.erase(key); +} + +Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (_merge_controller_handler) { + return _merge_controller_handler->reset_global_rf(this, filter_ids); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 7fd9fc29e35..c504dcc66aa 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -32,6 +32,7 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" +#include "common/status.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_predicate.h" @@ -48,6 +49,7 @@ namespace pipeline { class PipelineFragmentContext; class PipelineTask; class Dependency; +class RecCTEScanLocalState; } // namespace pipeline struct ReportStatusRequest { @@ -161,11 +163,6 @@ public: return _query_options.runtime_filter_wait_time_ms; } - bool runtime_filter_wait_infinitely() const { - return _query_options.__isset.runtime_filter_wait_infinitely && - _query_options.runtime_filter_wait_infinitely; - } - int be_exec_version() const { if (!_query_options.__isset.be_exec_version) { return 0; @@ -296,6 +293,23 @@ public: void set_first_error_msg(std::string error_msg); std::string get_first_error_msg(); + Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, + bool eos); + void registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan); + void deregiste_cte_scan(const TUniqueId& instance_id, int node_id); + + std::vector<int> get_fragment_ids() { + std::vector<int> fragment_ids; + for (const auto& it : _fragment_id_to_pipeline_ctx) { + fragment_ids.push_back(it.first); + } + return fragment_ids; + } + + Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: friend class QueryTaskController; @@ -374,6 +388,10 @@ private: std::string _load_error_url; std::string _first_error_msg; + // instance id + node id -> cte scan + std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> _cte_scan; + std::mutex _cte_scan_lock; + public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile void add_fragment_profile( diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index adf90e9095a..19ba6ab702f 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -55,6 +55,7 @@ public: void set_detected_source() { std::unique_lock<std::shared_mutex> wlock(_rwlock); + _orderby_extrem = Field(PrimitiveType::TYPE_NULL); _detected_source = true; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 7f82f9e7796..6dffaa67eef 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -482,7 +482,7 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { bool need_merge = desc.has_remote_targets || need_local_merge; RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); - return mgr->register_consumer_filter(_query_ctx, desc, node_id, consumer_filter); + return mgr->register_consumer_filter(this, desc, node_id, consumer_filter); } bool RuntimeState::is_nereids() const { @@ -494,12 +494,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::pipeline_id_to_profil return _pipeline_id_to_profile; } +void RuntimeState::reset_to_rerun() { + if (local_runtime_filter_mgr()) { + auto filter_ids = local_runtime_filter_mgr()->get_filter_ids(); + filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids()); + local_runtime_filter_mgr()->remove_filters(filter_ids); + global_runtime_filter_mgr()->remove_filters(filter_ids); + } + std::unique_lock lc(_pipeline_profile_lock); + _pipeline_id_to_profile.clear(); +} + std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::build_pipeline_profile( std::size_t pipeline_size) { std::unique_lock lc(_pipeline_profile_lock); if (!_pipeline_id_to_profile.empty()) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "build_pipeline_profile can only be called once."); + return _pipeline_id_to_profile; } _pipeline_id_to_profile.resize(pipeline_size); { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e7c66436981..3edf5317383 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -511,7 +511,7 @@ public: _runtime_filter_mgr = runtime_filter_mgr; } - QueryContext* get_query_ctx() { return _query_ctx; } + QueryContext* get_query_ctx() const { return _query_ctx; } [[nodiscard]] bool low_memory_mode() const; @@ -528,6 +528,12 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + int cte_max_recursion_depth() const { + return _query_options.__isset.cte_max_recursion_depth + ? _query_options.cte_max_recursion_depth + : 0; + } + int rpc_verbose_profile_max_instance_count() const { return _query_options.__isset.rpc_verbose_profile_max_instance_count ? _query_options.rpc_verbose_profile_max_instance_count @@ -710,6 +716,17 @@ public: _query_options.hnsw_bounded_queue, _query_options.ivf_nprobe); } + void reset_to_rerun(); + + void set_force_make_rf_wait_infinite() { + _query_options.__set_runtime_filter_wait_infinitely(true); + } + + bool runtime_filter_wait_infinitely() const { + return _query_options.__isset.runtime_filter_wait_infinitely && + _query_options.runtime_filter_wait_infinitely; + } + private: Status create_error_log_file(); diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index e0e42e509d4..e9ef68f6d28 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -41,11 +41,11 @@ public: APPLIED, // The consumer will switch to this state after the expression is acquired }; - static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, int node_id, + static Status create(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* res) { *res = std::shared_ptr<RuntimeFilterConsumer>( - new RuntimeFilterConsumer(query_ctx, desc, node_id)); - RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_ctx->query_options())); + new RuntimeFilterConsumer(state, desc, node_id)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->query_options())); return Status::OK(); } @@ -86,17 +86,16 @@ public: } private: - RuntimeFilterConsumer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - int node_id) + RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id) : RuntimeFilter(desc), _probe_expr(desc->planId_to_target_expr.find(node_id)->second), _registration_time(MonotonicMillis()), _rf_state(State::NOT_READY) { // If bitmap filter is not applied, it will cause the query result to be incorrect - bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() || + bool wait_infinitely = state->runtime_filter_wait_infinitely() || _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; - _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 1000 - : query_ctx->runtime_filter_wait_time_ms(); + _rf_wait_time_ms = wait_infinitely ? state->get_query_ctx()->execution_timeout() * 1000 + : state->get_query_ctx()->runtime_filter_wait_time_ms(); DorisMetrics::instance()->runtime_filter_consumer_num->increment(1); } diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index 00bac845473..f1080fae0a4 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -78,6 +78,8 @@ public: _expected_producer_num = num; } + int get_expected_producer_num() const { return _expected_producer_num; } + bool add_rf_size(uint64_t size) { _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 48645ffdb8e..7923e39fd5c 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consum } Status RuntimeFilterMgr::register_consumer_filter( - const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int node_id, + const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); - RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, consumer)); + RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, consumer)); _consumer_map[key].push_back(*consumer); return Status::OK(); } @@ -457,6 +457,25 @@ void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu _filter_map.clear(); } +Status RuntimeFilterMergeControllerEntity::reset_global_rf( + QueryContext* query_ctx, const google::protobuf::RepeatedField<int32_t>& filter_ids) { + for (const auto& filter_id : filter_ids) { + GlobalMergeContext* cnt_val; + { + std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); + cnt_val = &_filter_map[filter_id]; // may inplace construct default object + } + int producer_size = cnt_val->merger->get_expected_producer_num(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &cnt_val->runtime_filter_desc, + &cnt_val->merger)); + cnt_val->merger->set_expected_producer_num(producer_size); + cnt_val->arrive_id.clear(); + cnt_val->source_addrs.clear(); + cnt_val->done = false; + } + return Status::OK(); +} + std::string RuntimeFilterMergeControllerEntity::debug_string() { std::string result = "RuntimeFilterMergeControllerEntity Info:\n"; std::shared_lock<std::shared_mutex> guard(_filter_map_mutex); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 160babf278d..2b5792877a2 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -83,7 +83,7 @@ public: // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); - Status register_consumer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, + Status register_consumer_filter(const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter); @@ -104,6 +104,27 @@ public: std::string debug_string(); + std::set<int32_t> get_filter_ids() { + std::set<int32_t> ids; + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : _producer_id_set) { + ids.insert(id); + } + for (const auto& kv : _consumer_map) { + ids.insert(kv.first); + } + return ids; + } + + void remove_filters(const std::set<int32_t>& filter_ids) { + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : filter_ids) { + _consumer_map.erase(id); + _local_merge_map.erase(id); + _producer_id_set.erase(id); + } + } + private: /** * `_is_global = true` means this runtime filter manager menages query-level runtime filters. @@ -152,6 +173,9 @@ public: void release_undone_filters(QueryContext* query_ctx); + Status reset_global_rf(QueryContext* query_ctx, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx, const TRuntimeFilterDesc* runtime_filter_desc, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3b413ba92a3..b84c4f32ce5 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1616,6 +1616,57 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr } } +void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block( + UniqueId(request->query_id()).to_thrift(), + UniqueId(request->fragment_instance_id()).to_thrift(), request->node_id(), + request->blocks(), request->eos()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, + PRerunFragmentResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = + _exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(), + request->fragment_id(), request->stage()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, + PResetGlobalRfResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->reset_global_rf( + UniqueId(request->query_id()).to_thrift(), request->filter_ids()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + void PInternalService::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index d73501bfc80..781a7def178 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -140,6 +140,16 @@ public: const ::doris::PPublishFilterRequestV2* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) override; + void transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) override; + void rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, PRerunFragmentResult* response, + google::protobuf::Closure* done) override; + void reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, PResetGlobalRfResult* response, + google::protobuf::Closure* done) override; void transmit_block(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 77ee21368c0..e0e352645c3 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -31,7 +31,7 @@ public: void test_signal_aquire(TRuntimeFilterDesc desc) { std::shared_ptr<RuntimeFilterConsumer> consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) { const_cast<TQueryOptions&>(_query_ctx->_query_options) .__set_runtime_filter_wait_infinitely(true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { std::shared_ptr<RuntimeFilterConsumer> consumer; { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } desc.__set_src_expr( @@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { .build()); { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } { desc.__set_has_local_targets(false); desc.__set_has_remote_targets(true); - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); desc.__set_has_local_targets(true); desc.__set_has_remote_targets(false); @@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr()); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); } TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { @@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index d8222e201d9..02b1349c1a6 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); std::shared_ptr<RuntimeFilterConsumer> consumer_filter; EXPECT_TRUE(global_runtime_filter_mgr - ->register_consumer_filter(ctx.get(), desc, 0, &consumer_filter) + ->register_consumer_filter(&state, desc, 0, &consumer_filter) .ok()); EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index afd62513b83..76f821f95a2 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -60,6 +60,45 @@ message PTransmitDataResult { optional int64 receive_time = 2; }; +message PTransmitRecCTEBlockParams { + repeated PBlock blocks = 1; + optional bool eos = 2; + optional PUniqueId query_id = 3; + optional PUniqueId fragment_instance_id = 4; + optional int32 node_id = 5; +}; + +message PTransmitRecCTEBlockResult { + optional PStatus status = 1; +}; + + +message PRerunFragmentParams { + enum Opcode { + wait = 1; // wait fragment execute done + release = 2; // release current round's resource + rebuild = 3; // rebuild next round pipeline tasks + submit = 4; // submit tasks to execute + close = 5; // close fragment + } + optional PUniqueId query_id = 1; + optional int32 fragment_id = 2; + optional Opcode stage = 3; +}; + +message PRerunFragmentResult { + optional PStatus status = 1; +}; + +message PResetGlobalRfParams { + optional PUniqueId query_id = 1; + repeated int32 filter_ids = 2; +}; + +message PResetGlobalRfResult { + optional PStatus status = 1; +}; + message PTabletWithPartition { required int64 partition_id = 1; required int64 tablet_id = 2; @@ -1144,6 +1183,9 @@ service PBackendService { rpc sync_filter_size(PSyncFilterSizeRequest) returns (PSyncFilterSizeResponse); rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); + rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult); + rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult); + rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns (PTransmitRecCTEBlockResult); rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult); rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 8aa5d2bd1a5..8204376d961 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -418,9 +418,8 @@ struct TQueryOptions { 179: optional bool enable_parquet_filter_by_bloom_filter = true; 180: optional i32 max_file_scanners_concurrency = 0; 181: optional i32 min_file_scanners_concurrency = 0; - - 182: optional i32 ivf_nprobe = 1; + 183: optional i32 cte_max_recursion_depth; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. @@ -668,6 +667,7 @@ struct TPipelineFragmentParams { // Used by 2.1 44: optional list<i32> topn_filter_source_node_ids 45: optional map<string, TAIResource> ai_resources + 46: optional bool need_notify_close // For cloud 1000: optional bool is_mow_table; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 36564210b70..5bf6aa1489a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -60,7 +60,9 @@ enum TPlanNodeType { TEST_EXTERNAL_SCAN_NODE = 31, PARTITION_SORT_NODE = 32, GROUP_COMMIT_SCAN_NODE = 33, - MATERIALIZATION_NODE = 34 + MATERIALIZATION_NODE = 34, + REC_CTE_NODE = 35, + REC_CTE_SCAN_NODE = 36 } struct TKeyRange { @@ -708,6 +710,29 @@ struct TFileScanNode { 2: optional string table_name } +struct TRecCTETarget { + 1: optional Types.TNetworkAddress addr + 2: optional Types.TUniqueId fragment_instance_id + 3: optional i32 node_id +} + +struct TRecCTEResetInfo { + 1: optional Types.TNetworkAddress addr + 2: optional i32 fragment_id +} + +struct TRecCTENode { + 1: optional bool is_union_all + 2: optional list<TRecCTETarget> targets + 3: optional list<TRecCTEResetInfo> fragments_to_reset + 4: optional list<list<Exprs.TExpr>> result_expr_lists + 5: optional list<i32> rec_side_runtime_filter_ids + 6: optional bool is_used_by_other_rec_cte +} + +struct TRecCTEScanNode { +} + struct TEsScanNode { 1: required Types.TTupleId tuple_id 2: optional map<string,string> properties @@ -1443,6 +1468,7 @@ struct TPlanNode { 36: optional list<TRuntimeFilterDesc> runtime_filters 37: optional TGroupCommitScanNode group_commit_scan_node 38: optional TMaterializationNode materialization_node + 39: optional TRecCTENode rec_cte_node // Use in vec exec engine 40: optional Exprs.TExpr vconjunct @@ -1465,6 +1491,8 @@ struct TPlanNode { 50: optional list<list<Exprs.TExpr>> distribute_expr_lists 51: optional bool is_serial_operator + 52: optional TRecCTEScanNode rec_cte_scan_node + // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list<Exprs.TExpr> projections 102: optional Types.TTupleId output_tuple_id --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
