This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_rec2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit a0931099bfaa88139aee7a12d5072241128e3f51 Author: BiteTheDDDDt <[email protected]> AuthorDate: Thu Sep 18 21:02:33 2025 +0800 add rec cte thrift struct update update thrift add base update update thrift update _send_data_to_targets add fragment reset interface update reset format update send block clean fix add result_expr_lists to TRecCTENode update format add TRecCTEScanNode update format update update uniq update Revert "update" This reverts commit f464ed4fac5089db27bc479d73646f542c26bddc. update update update fix update update update update update update update update update format update tmp delete case Revert "tmp delete case" This reverts commit 20329c58a04ebf9cdce366026b6403f563b1092d. update fix update update update fix fix update update clean fix update --- be/src/pipeline/exec/exchange_source_operator.cpp | 5 + be/src/pipeline/exec/exchange_source_operator.h | 19 + be/src/pipeline/exec/operator.cpp | 10 + be/src/pipeline/exec/operator.h | 2 + .../pipeline/exec/rec_cte_anchor_sink_operator.cpp | 56 ++ .../pipeline/exec/rec_cte_anchor_sink_operator.h | 114 +++ be/src/pipeline/exec/rec_cte_scan_operator.h | 87 ++ be/src/pipeline/exec/rec_cte_sink_operator.cpp | 55 ++ be/src/pipeline/exec/rec_cte_sink_operator.h | 101 +++ be/src/pipeline/exec/rec_cte_source_operator.cpp | 86 ++ be/src/pipeline/exec/rec_cte_source_operator.h | 228 +++++ be/src/pipeline/exec/union_sink_operator.h | 42 +- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 236 +++-- be/src/pipeline/pipeline_fragment_context.h | 18 +- be/src/pipeline/pipeline_task.cpp | 25 +- be/src/pipeline/rec_cte_shared_state.h | 175 ++++ be/src/runtime/fragment_mgr.cpp | 60 ++ be/src/runtime/fragment_mgr.h | 11 + be/src/runtime/query_context.cpp | 47 + be/src/runtime/query_context.h | 28 +- be/src/runtime/runtime_predicate.h | 1 + be/src/runtime/runtime_state.cpp | 16 +- be/src/runtime/runtime_state.h | 19 +- be/src/runtime_filter/runtime_filter_consumer.h | 15 +- be/src/runtime_filter/runtime_filter_merger.h | 2 + be/src/runtime_filter/runtime_filter_mgr.cpp | 22 +- be/src/runtime_filter/runtime_filter_mgr.h | 26 +- be/src/service/internal_service.cpp | 51 ++ be/src/service/internal_service.h | 10 + .../runtime_filter_consumer_test.cpp | 20 +- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 9 +- gensrc/proto/internal_service.proto | 42 + gensrc/thrift/PaloInternalService.thrift | 1 + .../data/rec_cte_p0/rec_cte/rec_cte.out | 953 +++++++++++++++++++++ .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.out | 29 + .../rec_cte_from_duckdb_doc.out | 30 + .../rec_cte_from_mysql_doc.out | 42 + .../suites/rec_cte_p0/rec_cte/rec_cte.groovy | 271 ++++++ .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy | 144 ++++ .../rec_cte_from_duckdb_doc.groovy | 185 ++++ .../rec_cte_from_mysql_doc.groovy | 140 +++ 43 files changed, 3316 insertions(+), 121 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index b31b193aff2..8762b40fa40 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) { _is_closed = true; return OperatorX<ExchangeLocalState>::close(state); } + +Status ExchangeSourceOperatorX::reset(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.reset(state); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 3008217e130..03f2a288cdf 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -48,6 +48,23 @@ public: Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; + Status reset(RuntimeState* state) { + if (stream_recvr) { + stream_recvr->close(); + } + create_stream_recvr(state); + + is_ready = false; + num_rows_skipped = 0; + + const auto& queues = stream_recvr->sender_queues(); + for (size_t i = 0; i < queues.size(); i++) { + deps[i]->block(); + queues[i]->set_dependency(deps[i]); + } + return Status::OK(); + } + std::vector<Dependency*> dependencies() const override { std::vector<Dependency*> dep_vec; std::for_each(deps.begin(), deps.end(), @@ -83,6 +100,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; + Status reset(RuntimeState* state); + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 0eea8c7e157..ceaa9148b23 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -62,6 +62,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) DECLARE_OPERATOR(CacheSinkLocalState) DECLARE_OPERATOR(DictSinkLocalState) +DECLARE_OPERATOR(RecCTESinkLocalState) +DECLARE_OPERATOR(RecCTEAnchorSinkLocalState) #undef DECLARE_OPERATOR @@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) DECLARE_OPERATOR(CacheSourceLocalState) +DECLARE_OPERATOR(RecCTESourceLocalState) +DECLARE_OPERATOR(RecCTEScanLocalState) #ifdef BE_TEST DECLARE_OPERATOR(MockLocalState) @@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; template class PipelineXSinkLocalState<DataQueueSharedState>; +template class PipelineXSinkLocalState<RecCTESharedState>; template class PipelineXLocalState<HashJoinSharedState>; template class PipelineXLocalState<PartitionedHashJoinSharedState>; @@ -888,6 +897,7 @@ template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; template class PipelineXLocalState<LocalExchangeSharedState>; template class PipelineXLocalState<BasicSharedState>; +template class PipelineXLocalState<RecCTESharedState>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d5630fd4aa6..7492ee75ae9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -610,6 +610,8 @@ public: // For agg/sort/join sink. virtual Status init(const TPlanNode& tnode, RuntimeState* state); + virtual bool need_rerun(RuntimeState* state) const { return false; } + Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle, diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp new file mode 100644 index 00000000000..f1552ed8e5e --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[0]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + _name = "REC_CTE_ANCHOR_SINK_OPERATOR"; + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h new file mode 100644 index 00000000000..340949a2f79 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTEAnchorSinkOperatorX; +class RecCTEAnchorSinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState); + RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + + bool is_blockable() const override { return true; } + +private: + friend class RecCTEAnchorSinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTEAnchorSinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final) + : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>; + + friend class RecCTEAnchorSinkLocalState; + RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTEAnchorSinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + if (_need_notify_rec_side_ready) { + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0)); + _need_notify_rec_side_ready = false; + } + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->anchor_dep->set_ready(); + } + return Status::OK(); + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { + std::shared_ptr<BasicSharedState> ss = std::make_shared<RecCTESharedState>(); + ss->id = operator_id(); + for (const auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; + + bool _need_notify_rec_side_ready = true; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h b/be/src/pipeline/exec/rec_cte_scan_operator.h new file mode 100644 index 00000000000..661d97927d1 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_scan_operator.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "pipeline/exec/operator.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; +} // namespace doris + +namespace doris::pipeline { + +class RecCTEScanOperatorX; +class RecCTEScanLocalState final : public PipelineXLocalState<> { +public: + ENABLE_FACTORY_CREATOR(RecCTEScanLocalState); + + RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState<>(state, parent) { + _scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_DEPENDENCY"); + state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), parent->node_id(), + this); + } + ~RecCTEScanLocalState() override { + state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(), + parent()->node_id()); + } + + Status add_block(const PBlock& pblock) { + vectorized::Block block; + RETURN_IF_ERROR(block.deserialize(pblock)); + _blocks.emplace_back(std::move(block)); + return Status::OK(); + } + + void set_ready() { _scan_dependency->set_ready(); } + + std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; } + +private: + friend class RecCTEScanOperatorX; + std::vector<vectorized::Block> _blocks; + DependencySPtr _scan_dependency = nullptr; +}; + +class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> { +public: + RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) {} + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + + if (local_state._blocks.empty()) { + *eos = true; + return Status::OK(); + } + *block = std::move(local_state._blocks.back()); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns())); + local_state._blocks.pop_back(); + return Status::OK(); + } + + bool is_source() const override { return true; } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_sink_operator.cpp new file mode 100644 index 00000000000..1508b478bca --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTESinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h b/be/src/pipeline/exec/rec_cte_sink_operator.h new file mode 100644 index 00000000000..2321ba16ea3 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTESinkOperatorX; +class RecCTESinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESinkLocalState); + RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + +private: + friend class RecCTESinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTESinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTESinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<RecCTESinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTESinkLocalState>; + + friend class RecCTESinkLocalState; + RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTESinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool need_rerun(RuntimeState* state) const override { + return get_local_state(state)._shared_state->ready_to_return == false; + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->source_dep->set_ready(); + } + return Status::OK(); + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp b/be/src/pipeline/exec/rec_cte_source_operator.cpp new file mode 100644 index 00000000000..9a02bb947b8 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_source_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + +Status RecCTESourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + return Status::OK(); +} + +Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets; + _shared_state->max_recursion_depth = + _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth; + _shared_state->source_dep = _dependency; + _anchor_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_ANCHOR_DEPENDENCY"); + _shared_state->anchor_dep = _anchor_dependency.get(); + + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) { + _shared_state->agg_data = std::make_unique<DistinctDataVariants>(); + RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(), + get_data_types(_child_expr), false)); + } + + _shared_state->hash_table_compute_timer = + ADD_TIMER(Base::custom_profile(), "HashTableComputeTime"); + _shared_state->hash_table_emplace_timer = + ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime"); + _shared_state->hash_table_input_counter = + ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT); + return Status::OK(); +} + +Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + + _max_recursion_depth = state->cte_max_recursion_depth(); + + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h b/be/src/pipeline/exec/rec_cte_source_operator.h new file mode 100644 index 00000000000..e36358161d2 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.h @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <gen_cpp/internal_service.pb.h> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "util/brpc_client_cache.h" +#include "util/uid_util.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class DataQueue; + +class RecCTESourceOperatorX; +class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESourceLocalState); + using Base = PipelineXLocalState<RecCTESharedState>; + using Parent = RecCTESourceOperatorX; + RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent); + + Status open(RuntimeState* state) override; + Status init(RuntimeState* state, LocalStateInfo& info) override; + + bool is_blockable() const override { return true; } + + std::vector<Dependency*> dependencies() const override { + return std::vector<Dependency*> {_dependency, _anchor_dependency.get()}; + } + +private: + friend class RecCTESourceOperatorX; + friend class OperatorX<RecCTESourceLocalState>; + + vectorized::VExprContextSPtrs _child_expr; + + std::shared_ptr<Dependency> _anchor_dependency = nullptr; +}; + +class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> { +public: + using Base = OperatorX<RecCTESourceLocalState>; + RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _is_union_all(tnode.rec_cte_node.is_union_all), + _targets(tnode.rec_cte_node.targets), + _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset), + _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids), + _is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) { + DCHECK(tnode.__isset.rec_cte_node); + } + ~RecCTESourceOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + Status terminate(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::terminate(state); + } + + Status close(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::close(state); + } + + Status set_child(OperatorPtr child) override { + Base::_child = child; + return Status::OK(); + } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + auto& ctx = local_state._shared_state; + ctx->update_ready_to_return(); + + if (!ctx->ready_to_return) { + if (ctx->current_round + 1 > _max_recursion_depth) { + return Status::Aborted("reach cte_max_recursion_depth {}", _max_recursion_depth); + } + + ctx->source_dep->block(); + // ctx->blocks.size() may be changed after _recursive_process + int current_blocks_size = int(ctx->blocks.size()); + RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset)); + ctx->current_round++; + ctx->last_round_offset = current_blocks_size; + } else { + if (ctx->blocks.empty()) { + *eos = true; + } else { + block->swap(ctx->blocks.back()); + RETURN_IF_ERROR( + local_state.filter_block(local_state.conjuncts(), block, block->columns())); + ctx->blocks.pop_back(); + } + } + return Status::OK(); + } + + bool is_source() const override { return true; } + +private: + Status _send_close(RuntimeState* state) { + if (!_aready_send_close && !_is_used_by_other_rec_cte) { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close)); + _aready_send_close = true; + + auto* round_counter = ADD_COUNTER(get_local_state(state).Base::custom_profile(), + "RecursiveRound", TUnit::UNIT); + round_counter->set(int64_t(get_local_state(state)._shared_state->current_round)); + } + return Status::OK(); + } + + Status _recursive_process(RuntimeState* state, size_t last_round_offset) const { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::wait)); + RETURN_IF_ERROR(_send_reset_global_rf(state)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::release)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::rebuild)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::submit)); + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets( + state, last_round_offset)); + return Status::OK(); + } + + Status _send_reset_global_rf(RuntimeState* state) const { + TNetworkAddress addr; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr)); + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr); + PResetGlobalRfParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + for (auto filter_id : _global_rf_ids) { + request.add_filter_ids(filter_id); + } + + PResetGlobalRfResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->reset_global_rf(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + return Status::create(result.status()); + } + + Status _send_rerun_fragments(RuntimeState* state, PRerunFragmentParams_Opcode stage) const { + for (auto fragment : _fragments_to_reset) { + if (state->fragment_id() == fragment.fragment_id) { + return Status::InternalError("Fragment {} contains a recursive CTE node", + fragment.fragment_id); + } + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + fragment.addr); + + PRerunFragmentParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.set_fragment_id(fragment.fragment_id); + request.set_stage(stage); + + PRerunFragmentResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->rerun_fragment(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + if (controller.Failed()) { + return Status::InternalError(controller.ErrorText()); + } + + RETURN_IF_ERROR(Status::create(result.status())); + } + return Status::OK(); + } + + friend class RecCTESourceLocalState; + + vectorized::VExprContextSPtrs _child_expr; + + const bool _is_union_all = false; + std::vector<TRecCTETarget> _targets; + std::vector<TRecCTEResetInfo> _fragments_to_reset; + std::vector<int> _global_rf_ids; + + int _max_recursion_depth = 0; + + bool _aready_send_close = false; + + bool _is_used_by_other_rec_cte = false; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 06b95cdf92d..764f8099427 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -29,6 +29,24 @@ namespace doris { #include "common/compile_check_begin.h" class RuntimeState; +inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs, + vectorized::Block* src_block, vectorized::Block* res_block, + bool need_clone) { + vectorized::ColumnsWithTypeAndName columns; + auto rows = src_block->rows(); + for (const auto& expr : exprs) { + int result_column_id = -1; + RETURN_IF_ERROR(expr->execute(src_block, &result_column_id)); + const auto& src_col_with_type = src_block->get_by_position(result_column_id); + vectorized::ColumnPtr cloned_col = need_clone + ? src_col_with_type.column->clone_resized(rows) + : src_col_with_type.column; + columns.emplace_back(cloned_col, src_col_with_type.type, src_col_with_type.name); + } + *res_block = {columns}; + return Status::OK(); +} + namespace pipeline { class DataQueue; @@ -153,27 +171,17 @@ private: vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, row_descriptor()); vectorized::Block res; - RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); + auto& local_state = get_local_state(state); + { + SCOPED_TIMER(local_state._expr_timer); + RETURN_IF_ERROR( + materialize_block(local_state._child_expr, input_block, &res, false)); + } + local_state._child_row_idx += res.rows(); RETURN_IF_ERROR(mblock.merge(res)); } return Status::OK(); } - - Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, - vectorized::Block* res_block) { - auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._expr_timer); - const auto& child_exprs = local_state._child_expr; - vectorized::ColumnsWithTypeAndName colunms; - for (size_t i = 0; i < child_exprs.size(); ++i) { - int result_column_id = -1; - RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); - colunms.emplace_back(src_block->get_by_position(result_column_id)); - } - local_state._child_row_idx += src_block->rows(); - *res_block = {colunms}; - return Status::OK(); - } }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 1d0ddcf178e..2a20a5cd73d 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -133,7 +133,7 @@ public: fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i)); } fmt::format_to(debug_string_buffer, "\n{}", - _sink->debug_string(cast_set<int>(_operators.size()))); + _sink ? _sink->debug_string(cast_set<int>(_operators.size())) : "null"); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b5c2f7084dc..ba7937e9522 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -82,6 +82,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext( _is_report_on_cancel(true), _report_status_cb(std::move(report_status_cb)), _params(request), - _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0) { + _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0), + _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close + : false) { _fragment_watcher.start(); } @@ -142,23 +148,8 @@ PipelineFragmentContext::~PipelineFragmentContext() { LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id); - // The memory released by the query end is recorded in the query mem tracker. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); - auto st = _query_ctx->exec_status(); - for (size_t i = 0; i < _tasks.size(); i++) { - if (!_tasks[i].empty()) { - _call_back(_tasks[i].front().first->runtime_state(), &st); - } - } - _tasks.clear(); - _dag.clear(); - _pip_id_to_pipeline.clear(); - _pipelines.clear(); - _sink.reset(); - _root_op.reset(); + _release_resource(); _runtime_state.reset(); - _runtime_filter_mgr_map.clear(); - _op_id_to_shared_state.clear(); _query_ctx.reset(); } @@ -253,6 +244,54 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } +Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) { + { + SCOPED_TIMER(_build_pipelines_timer); + // 2. Build pipelines with operators in this fragment. + auto root_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, + &_root_op, root_pipeline)); + + // 3. Create sink operator + if (!_params.fragment.__isset.output_sink) { + return Status::InternalError("No output sink in this fragment!"); + } + RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, + _params.fragment.output_exprs, _params, + root_pipeline->output_row_desc(), _runtime_state.get(), + *_desc_tbl, root_pipeline->id())); + RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); + RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); + + for (PipelinePtr& pipeline : _pipelines) { + DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); + RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); + } + } + // 4. Build local exchanger + if (_runtime_state->enable_local_shuffle()) { + SCOPED_TIMER(_plan_local_exchanger_timer); + RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, + _params.bucket_seq_to_instance_idx, + _params.shuffle_idx_to_instance_idx)); + } + + // 5. Initialize global states in pipelines. + for (PipelinePtr& pipeline : _pipelines) { + SCOPED_TIMER(_prepare_all_pipelines_timer); + pipeline->children().clear(); + RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); + } + + { + SCOPED_TIMER(_build_tasks_timer); + // 6. Build pipeline tasks and initialize local state. + RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); + } + + return Status::OK(); +} + Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { if (_prepared) { return Status::InternalError("Already prepared"); @@ -277,7 +316,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { auto* fragment_context = this; - if (_params.query_options.__isset.is_report_success) { + if (!_need_notify_close && _params.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(_params.query_options.is_report_success); } @@ -322,49 +361,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { } } - { - SCOPED_TIMER(_build_pipelines_timer); - // 2. Build pipelines with operators in this fragment. - auto root_pipeline = add_pipeline(); - RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, - &_root_op, root_pipeline)); - - // 3. Create sink operator - if (!_params.fragment.__isset.output_sink) { - return Status::InternalError("No output sink in this fragment!"); - } - RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, - _params.fragment.output_exprs, _params, - root_pipeline->output_row_desc(), _runtime_state.get(), - *_desc_tbl, root_pipeline->id())); - RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); - RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); - - for (PipelinePtr& pipeline : _pipelines) { - DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); - RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); - } - } - // 4. Build local exchanger - if (_runtime_state->enable_local_shuffle()) { - SCOPED_TIMER(_plan_local_exchanger_timer); - RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, - _params.bucket_seq_to_instance_idx, - _params.shuffle_idx_to_instance_idx)); - } - - // 5. Initialize global states in pipelines. - for (PipelinePtr& pipeline : _pipelines) { - SCOPED_TIMER(_prepare_all_pipelines_timer); - pipeline->children().clear(); - RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); - } - - { - SCOPED_TIMER(_build_tasks_timer); - // 6. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); - } + RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool)); _init_next_report_time(); @@ -374,6 +371,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { _total_tasks = 0; + _closed_tasks = 0; const auto target_size = _params.local_params.size(); _tasks.resize(target_size); _runtime_filter_mgr_map.resize(target_size); @@ -422,6 +420,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { task_runtime_state->set_task_execution_context(shared_from_this()); task_runtime_state->set_be_number(local_params.backend_num); + if (_need_notify_close) { + // rec cte require child rf to wait infinitely to make sure all rpc done + task_runtime_state->set_force_make_rf_wait_infinite(); + } if (_params.__isset.backend_id) { task_runtime_state->set_backend_id(_params.backend_id); @@ -1645,6 +1647,42 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); break; } + case TPlanNodeType::REC_CTE_NODE: { + op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + + PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id()); + + DataSinkOperatorPtr anchor_sink; + anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(), + op->operator_id(), tnode, descs); + RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink)); + RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), anchor_side_pipe); + + PipelinePtr rec_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(rec_side_pipe->id()); + + DataSinkOperatorPtr rec_sink; + rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(), + tnode, descs); + RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink)); + RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), rec_side_pipe); + + break; + } + case TPlanNodeType::REC_CTE_SCAN_NODE: { + op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); @@ -1750,7 +1788,10 @@ void PipelineFragmentContext::_close_fragment_instance() { if (_is_fragment_instance_closed) { return; } - Defer defer_op {[&]() { _is_fragment_instance_closed = true; }}; + Defer defer_op {[&]() { + _is_fragment_instance_closed = true; + _close_cv.notify_all(); + }}; _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast<void>(send_report(true)); // Print profile content in info log is a tempoeray solution for stream load and external_connector. @@ -1785,8 +1826,10 @@ void PipelineFragmentContext::_close_fragment_instance() { collect_realtime_load_channel_profile()); } - // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + if (!_need_notify_close) { + // all submitted tasks done + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } } void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { @@ -1923,8 +1966,10 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const } std::string PipelineFragmentContext::debug_string() { + std::lock_guard<std::mutex> l(_task_mutex); fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n"); + fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\nneed_notify_close: {}\n", + _need_notify_close); for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { @@ -1998,5 +2043,66 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const { _runtime_state->profile_level()); return load_channel_profile; } + +Status PipelineFragmentContext::wait_close(bool close) { + if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) { + return Status::InternalError("stream load do not support reset"); + } + if (!_need_notify_close) { + return Status::InternalError("_need_notify_close is false, do not support reset"); + } + + { + std::unique_lock<std::mutex> lock(_task_mutex); + _close_cv.wait(lock, [this] { return _is_fragment_instance_closed.load(); }); + } + + if (close) { + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } + return Status::OK(); +} + +Status PipelineFragmentContext::set_to_rerun() { + { + std::lock_guard<std::mutex> l(_task_mutex); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + for (size_t i = 0; i < _tasks.size(); i++) { + if (!_tasks[i].empty()) { + _tasks[i].front().first->runtime_state()->reset_to_rerun(); + } + } + } + _release_resource(); + _runtime_state->reset_to_rerun(); + return Status::OK(); +} + +Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) { + _submitted = false; + _is_fragment_instance_closed = false; + return _build_and_prepare_full_pipeline(thread_pool); +} + +void PipelineFragmentContext::_release_resource() { + std::lock_guard<std::mutex> l(_task_mutex); + // The memory released by the query end is recorded in the query mem tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + auto st = _query_ctx->exec_status(); + for (size_t i = 0; i < _tasks.size(); i++) { + if (!_tasks[i].empty()) { + _call_back(_tasks[i].front().first->runtime_state(), &st); + } + } + _tasks.clear(); + _dag.clear(); + _pip_id_to_pipeline.clear(); + _pipelines.clear(); + _sink.reset(); + _root_op.reset(); + _runtime_filter_mgr_map.clear(); + _op_id_to_shared_state.clear(); +} + #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 81b3f57b01f..40bb80f72ec 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -115,6 +115,9 @@ public: [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; void clear_finished_tasks() { + if (_need_notify_close) { + return; + } for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { _tasks[j][i].first->stop_if_finished(); @@ -125,7 +128,17 @@ public: std::string get_load_error_url(); std::string get_first_error_msg(); + Status wait_close(bool close); + Status rebuild(ThreadPool* thread_pool); + Status set_to_rerun(); + + bool need_notify_close() const { return _need_notify_close; } + private: + void _release_resource(); + + Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool); + Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes, @@ -182,6 +195,7 @@ private: Pipelines _pipelines; PipelineId _next_pipeline_id = 0; std::mutex _task_mutex; + std::condition_variable _close_cv; int _closed_tasks = 0; // After prepared, `_total_tasks` is equal to the size of `_tasks`. // When submit fail, `_total_tasks` is equal to the number of tasks submitted. @@ -203,7 +217,7 @@ private: RuntimeProfile::Counter* _build_tasks_timer = nullptr; std::function<void(RuntimeState*, Status*)> _call_back; - bool _is_fragment_instance_closed = false; + std::atomic_bool _is_fragment_instance_closed = false; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. @@ -323,6 +337,8 @@ private: TPipelineFragmentParams _params; int32_t _parallel_instances = 0; + + bool _need_notify_close = false; }; } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 930e885fc3b..edc56332058 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -30,6 +30,7 @@ #include "common/logging.h" #include "common/status.h" #include "pipeline/dependency.h" +#include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/pipeline.h" @@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) { } } - if (_eos) { - RETURN_IF_ERROR(close(Status::OK(), false)); - } - DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { auto required_pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( @@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) { RETURN_IF_ERROR(block->check_type_and_column()); status = _sink->sink(_state, block, _eos); + if (_eos) { + if (_sink->need_rerun(_state)) { + if (auto* source = dynamic_cast<ExchangeSourceOperatorX*>(_root); + source != nullptr) { + RETURN_IF_ERROR(source->reset(_state)); + _eos = false; + } else { + return Status::InternalError( + "Only ExchangeSourceOperatorX can be rerun, real is {}", + _root->get_name()); + } + } else { + RETURN_IF_ERROR(close(Status::OK(), false)); + } + } + if (status.is<ErrorCode::END_OF_FILE>()) { set_wake_up_early(); return Status::OK(); @@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* d _blocked_dep = nullptr; auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this()); RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE)); - RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + if (auto f = _fragment_context.lock(); f) { + RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + } return Status::OK(); } diff --git a/be/src/pipeline/rec_cte_shared_state.h b/be/src/pipeline/rec_cte_shared_state.h new file mode 100644 index 00000000000..17fb00c82f5 --- /dev/null +++ b/be/src/pipeline/rec_cte_shared_state.h @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "dependency.h" +#include "pipeline/common/distinct_agg_utils.h" +#include "util/brpc_client_cache.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +struct RecCTESharedState : public BasicSharedState { + std::vector<TRecCTETarget> targets; + std::vector<vectorized::Block> blocks; + vectorized::IColumn::Selector distinct_row; + Dependency* source_dep = nullptr; + Dependency* anchor_dep = nullptr; + vectorized::Arena arena; + RuntimeProfile::Counter* hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* hash_table_input_counter = nullptr; + + std::unique_ptr<DistinctDataVariants> agg_data = nullptr; + + int current_round = 0; + int last_round_offset = 0; + int max_recursion_depth = 0; + bool ready_to_return = false; + + void update_ready_to_return() { + if (last_round_offset == blocks.size()) { + ready_to_return = true; + } + } + + Status emplace_block(RuntimeState* state, vectorized::Block&& block) { + if (agg_data) { + auto num_rows = uint32_t(block.rows()); + vectorized::ColumnRawPtrs raw_columns; + std::vector<vectorized::ColumnPtr> columns = block.get_columns_and_convert(); + for (auto& col : columns) { + raw_columns.push_back(col.get()); + } + + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "uninited hash table"); + }, + [&](auto& agg_method) -> void { + SCOPED_TIMER(hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + + AggState agg_state(raw_columns); + agg_method.init_serialized_keys(raw_columns, num_rows); + distinct_row.clear(); + + size_t row = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, arena); + ctor(key); + distinct_row.push_back(row); + }; + auto creator_for_null_key = [&]() { + distinct_row.push_back(row); + }; + + SCOPED_TIMER(hash_table_emplace_timer); + for (; row < num_rows; ++row) { + agg_method.lazy_emplace(agg_state, row, creator, + creator_for_null_key); + } + COUNTER_UPDATE(hash_table_input_counter, num_rows); + }}, + agg_data->method_variant); + + if (distinct_row.size() == block.rows()) { + blocks.emplace_back(std::move(block)); + } else if (!distinct_row.empty()) { + auto distinct_block = vectorized::MutableBlock(block.clone_empty()); + RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, distinct_row)); + blocks.emplace_back(distinct_block.to_block()); + } + } else { + blocks.emplace_back(std::move(block)); + } + return Status::OK(); + } + + PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state, + const TRecCTETarget& target) const { + PTransmitRecCTEBlockParams request; + request.set_node_id(target.node_id); + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.mutable_fragment_instance_id()->CopyFrom( + UniqueId(target.fragment_instance_id).to_proto()); + return request; + } + + Status send_data_to_targets(RuntimeState* state, size_t round_offset) const { + int send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; + int block_number_per_target = + int(blocks.size() - round_offset + targets.size() - 1) / targets.size(); + for (auto target : targets) { + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + target.addr); + if (!stub) { + return Status::InternalError(fmt::format("Get rpc stub failed, host={}, port={}", + target.addr.hostname, target.addr.port)); + } + + // send blocks + int step = block_number_per_target; + while (round_offset < blocks.size() && step > 0) { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + auto current_bytes = 0; + while (round_offset < blocks.size() && step > 0 && + current_bytes < send_multi_blocks_byte_size) { + auto* pblock = request.add_blocks(); + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + RETURN_IF_ERROR(blocks[round_offset].serialize( + state->be_exec_version(), pblock, &uncompressed_bytes, + &compressed_bytes, state->fragement_transmission_compression_type())); + round_offset++; + step--; + current_bytes += compressed_bytes; + } + request.set_eos(false); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + + // send eos + { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + request.set_eos(true); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + } + return Status::OK(); + } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 47fd9e264bf..f531e1ef2b9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1403,6 +1403,66 @@ Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatis print_id(query_id), query_stats); } +Status FragmentMgr::transmit_rec_cte_block( + const TUniqueId& query_id, const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, eos); + } else { + return Status::EndOfFile( + "Transmit rec cte block failed: Query context (query-id: {}) not found, maybe " + "finished", + print_id(query_id)); + } +} + +Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + auto fragment_ctx = _pipeline_map.find({query_id, fragment}); + if (!fragment_ctx) { + return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found", + print_id(query_id), fragment); + } + + if (stage == PRerunFragmentParams::wait) { + return fragment_ctx->wait_close(false); + } + if (stage == PRerunFragmentParams::release) { + return fragment_ctx->set_to_rerun(); + } + if (stage == PRerunFragmentParams::rebuild) { + return fragment_ctx->rebuild(_thread_pool.get()); + } + if (stage == PRerunFragmentParams::submit) { + return fragment_ctx->submit(); + } + if (stage == PRerunFragmentParams::close) { + return fragment_ctx->wait_close(true); + } + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + +Status FragmentMgr::reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->reset_global_rf(filter_ids); + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index b0c1a3ad592..58725a49a63 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -184,6 +184,17 @@ public: std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id); + Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& instance_id, + int node_id, + const google::protobuf::RepeatedPtrField<PBlock>& pblocks, + bool eos); + + Status rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage); + + Status reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: struct BrpcItem { TNetworkAddress network_address; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 5332b886d58..b3097dde69f 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "olap/olap_common.h" #include "pipeline/dependency.h" +#include "pipeline/exec/rec_cte_scan_operator.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() { continue; } + if (fragment_ctx->need_notify_close()) { + continue; + } + auto profile = fragment_ctx->collect_realtime_profile(); if (profile.empty()) { @@ -497,4 +502,46 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status() { return exec_status; } +Status QueryContext::send_block_to_cte_scan( + const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto it = _cte_scan.find(std::make_pair(instance_id, node_id)); + if (it == _cte_scan.end()) { + return Status::InternalError("RecCTEScan not found for instance {}, node {}", + print_id(instance_id), node_id); + } + for (const auto& pblock : pblocks) { + RETURN_IF_ERROR(it->second->add_block(pblock)); + } + if (eos) { + it->second->set_ready(); + } + return Status::OK(); +} + +void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.emplace(key, scan); +} + +void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int node_id) { + std::lock_guard<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.erase(key); +} + +Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (_merge_controller_handler) { + return _merge_controller_handler->reset_global_rf(this, filter_ids); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 32458cdb00e..874e889e368 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -32,6 +32,7 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" +#include "common/status.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_predicate.h" @@ -48,6 +49,7 @@ namespace pipeline { class PipelineFragmentContext; class PipelineTask; class Dependency; +class RecCTEScanLocalState; } // namespace pipeline struct ReportStatusRequest { @@ -161,11 +163,6 @@ public: return _query_options.runtime_filter_wait_time_ms; } - bool runtime_filter_wait_infinitely() const { - return _query_options.__isset.runtime_filter_wait_infinitely && - _query_options.runtime_filter_wait_infinitely; - } - int be_exec_version() const { if (!_query_options.__isset.be_exec_version) { return 0; @@ -299,6 +296,23 @@ public: void set_first_error_msg(std::string error_msg); std::string get_first_error_msg(); + Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, + bool eos); + void registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan); + void deregiste_cte_scan(const TUniqueId& instance_id, int node_id); + + std::vector<int> get_fragment_ids() { + std::vector<int> fragment_ids; + for (const auto& it : _fragment_id_to_pipeline_ctx) { + fragment_ids.push_back(it.first); + } + return fragment_ids; + } + + Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: friend class QueryTaskController; @@ -377,6 +391,10 @@ private: std::string _load_error_url; std::string _first_error_msg; + // instance id + node id -> cte scan + std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> _cte_scan; + std::mutex _cte_scan_lock; + public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile void add_fragment_profile( diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 51c79e1b426..b3e19d089fc 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -55,6 +55,7 @@ public: void set_detected_source() { std::unique_lock<std::shared_mutex> wlock(_rwlock); + _orderby_extrem = Field(PrimitiveType::TYPE_NULL); _detected_source = true; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 33db0f60e7c..faeda888cfb 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -503,7 +503,7 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { bool need_merge = desc.has_remote_targets || need_local_merge; RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); - return mgr->register_consumer_filter(_query_ctx, desc, node_id, consumer_filter); + return mgr->register_consumer_filter(this, desc, node_id, consumer_filter); } bool RuntimeState::is_nereids() const { @@ -515,12 +515,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::pipeline_id_to_profil return _pipeline_id_to_profile; } +void RuntimeState::reset_to_rerun() { + if (local_runtime_filter_mgr()) { + auto filter_ids = local_runtime_filter_mgr()->get_filter_ids(); + filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids()); + local_runtime_filter_mgr()->remove_filters(filter_ids); + global_runtime_filter_mgr()->remove_filters(filter_ids); + } + std::unique_lock lc(_pipeline_profile_lock); + _pipeline_id_to_profile.clear(); +} + std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::build_pipeline_profile( std::size_t pipeline_size) { std::unique_lock lc(_pipeline_profile_lock); if (!_pipeline_id_to_profile.empty()) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "build_pipeline_profile can only be called once."); + return _pipeline_id_to_profile; } _pipeline_id_to_profile.resize(pipeline_size); { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 3d89f4aa0d4..907f2c50a61 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -503,7 +503,7 @@ public: _runtime_filter_mgr = runtime_filter_mgr; } - QueryContext* get_query_ctx() { return _query_ctx; } + QueryContext* get_query_ctx() const { return _query_ctx; } [[nodiscard]] bool low_memory_mode() const; @@ -520,6 +520,12 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + int cte_max_recursion_depth() const { + return _query_options.__isset.cte_max_recursion_depth + ? _query_options.cte_max_recursion_depth + : 0; + } + int rpc_verbose_profile_max_instance_count() const { return _query_options.__isset.rpc_verbose_profile_max_instance_count ? _query_options.rpc_verbose_profile_max_instance_count @@ -702,6 +708,17 @@ public: _query_options.hnsw_bounded_queue); } + void reset_to_rerun(); + + void set_force_make_rf_wait_infinite() { + _query_options.__set_runtime_filter_wait_infinitely(true); + } + + bool runtime_filter_wait_infinitely() const { + return _query_options.__isset.runtime_filter_wait_infinitely && + _query_options.runtime_filter_wait_infinitely; + } + private: Status create_error_log_file(); diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index e0e42e509d4..e9ef68f6d28 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -41,11 +41,11 @@ public: APPLIED, // The consumer will switch to this state after the expression is acquired }; - static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, int node_id, + static Status create(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* res) { *res = std::shared_ptr<RuntimeFilterConsumer>( - new RuntimeFilterConsumer(query_ctx, desc, node_id)); - RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_ctx->query_options())); + new RuntimeFilterConsumer(state, desc, node_id)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->query_options())); return Status::OK(); } @@ -86,17 +86,16 @@ public: } private: - RuntimeFilterConsumer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - int node_id) + RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id) : RuntimeFilter(desc), _probe_expr(desc->planId_to_target_expr.find(node_id)->second), _registration_time(MonotonicMillis()), _rf_state(State::NOT_READY) { // If bitmap filter is not applied, it will cause the query result to be incorrect - bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() || + bool wait_infinitely = state->runtime_filter_wait_infinitely() || _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; - _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 1000 - : query_ctx->runtime_filter_wait_time_ms(); + _rf_wait_time_ms = wait_infinitely ? state->get_query_ctx()->execution_timeout() * 1000 + : state->get_query_ctx()->runtime_filter_wait_time_ms(); DorisMetrics::instance()->runtime_filter_consumer_num->increment(1); } diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index d373bb8be05..42e26b3d9c3 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -78,6 +78,8 @@ public: _expected_producer_num = num; } + int get_expected_producer_num() const { return _expected_producer_num; } + bool add_rf_size(uint64_t size) { _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index ba9e9f5bc9d..dcddc29f06c 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consum } Status RuntimeFilterMgr::register_consumer_filter( - const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int node_id, + const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); - RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, consumer)); + RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, consumer)); _consumer_map[key].push_back(*consumer); return Status::OK(); } @@ -446,6 +446,24 @@ void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu _filter_map.clear(); } +Status RuntimeFilterMergeControllerEntity::reset_global_rf( + QueryContext* query_ctx, const google::protobuf::RepeatedField<int32_t>& filter_ids) { + for (const auto& filter_id : filter_ids) { + GlobalMergeContext* cnt_val; + { + std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); + cnt_val = &_filter_map[filter_id]; // may inplace construct default object + } + int producer_size = cnt_val->merger->get_expected_producer_num(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &cnt_val->runtime_filter_desc, + &cnt_val->merger)); + cnt_val->merger->set_expected_producer_num(producer_size); + cnt_val->arrive_id.clear(); + cnt_val->source_addrs.clear(); + } + return Status::OK(); +} + std::string RuntimeFilterMergeControllerEntity::debug_string() { std::string result = "RuntimeFilterMergeControllerEntity Info:\n"; std::shared_lock<std::shared_mutex> guard(_filter_map_mutex); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 6941e5e5631..a079f3b0ad9 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -83,7 +83,7 @@ public: // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); - Status register_consumer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, + Status register_consumer_filter(const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter); @@ -104,6 +104,27 @@ public: std::string debug_string(); + std::set<int32_t> get_filter_ids() { + std::set<int32_t> ids; + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : _producer_id_set) { + ids.insert(id); + } + for (const auto& kv : _consumer_map) { + ids.insert(kv.first); + } + return ids; + } + + void remove_filters(const std::set<int32_t>& filter_ids) { + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : filter_ids) { + _consumer_map.erase(id); + _local_merge_map.erase(id); + _producer_id_set.erase(id); + } + } + private: /** * `_is_global = true` means this runtime filter manager menages query-level runtime filters. @@ -152,6 +173,9 @@ public: void release_undone_filters(QueryContext* query_ctx); + Status reset_global_rf(QueryContext* query_ctx, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx, const TRuntimeFilterDesc* runtime_filter_desc, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index daf45179f79..f9f7d2e4460 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1597,6 +1597,57 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr } } +void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block( + UniqueId(request->query_id()).to_thrift(), + UniqueId(request->fragment_instance_id()).to_thrift(), request->node_id(), + request->blocks(), request->eos()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, + PRerunFragmentResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = + _exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(), + request->fragment_id(), request->stage()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, + PResetGlobalRfResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->reset_global_rf( + UniqueId(request->query_id()).to_thrift(), request->filter_ids()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + void PInternalService::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index d73501bfc80..781a7def178 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -140,6 +140,16 @@ public: const ::doris::PPublishFilterRequestV2* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) override; + void transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) override; + void rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, PRerunFragmentResult* response, + google::protobuf::Closure* done) override; + void reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, PResetGlobalRfResult* response, + google::protobuf::Closure* done) override; void transmit_block(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 77ee21368c0..e0e352645c3 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -31,7 +31,7 @@ public: void test_signal_aquire(TRuntimeFilterDesc desc) { std::shared_ptr<RuntimeFilterConsumer> consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) { const_cast<TQueryOptions&>(_query_ctx->_query_options) .__set_runtime_filter_wait_infinitely(true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { std::shared_ptr<RuntimeFilterConsumer> consumer; { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } desc.__set_src_expr( @@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { .build()); { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } { desc.__set_has_local_targets(false); desc.__set_has_remote_targets(true); - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); desc.__set_has_local_targets(true); desc.__set_has_remote_targets(false); @@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr()); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); } TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { @@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index d8222e201d9..02b1349c1a6 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); std::shared_ptr<RuntimeFilterConsumer> consumer_filter; EXPECT_TRUE(global_runtime_filter_mgr - ->register_consumer_filter(ctx.get(), desc, 0, &consumer_filter) + ->register_consumer_filter(&state, desc, 0, &consumer_filter) .ok()); EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7582a7b6b6d..868263be5dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -786,6 +786,8 @@ public class SessionVariable implements Serializable, Writable { public static final String READ_HIVE_JSON_IN_ONE_COLUMN = "read_hive_json_in_one_column"; + public static final String CTE_MAX_RECURSION_DEPTH = "cte_max_recursion_depth"; + /** * Inserting overwrite for auto partition table allows creating partition for * datas which cannot find partition to overwrite. @@ -989,6 +991,11 @@ public class SessionVariable implements Serializable, Writable { }) public int minScanSchedulerConcurrency = 0; + @VariableMgr.VarAttr(name = CTE_MAX_RECURSION_DEPTH, needForward = true, description = { + "CTE递归的最大深度,默认值100", + "The maximum depth of CTE recursion. Default is 100" }) + public int cteMaxRecursionDepth = 100; + // By default, the number of Limit items after OrderBy is changed from 65535 items // before v1.2.0 (not included), to return all items by default @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT, affectQueryResult = true) @@ -4783,7 +4790,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold); tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead); - + tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth); tResult.setEnableParallelScan(enableParallelScan); tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount); tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1ddfbcf2502..5b9c3579590 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -60,6 +60,45 @@ message PTransmitDataResult { optional int64 receive_time = 2; }; +message PTransmitRecCTEBlockParams { + repeated PBlock blocks = 1; + optional bool eos = 2; + optional PUniqueId query_id = 3; + optional PUniqueId fragment_instance_id = 4; + optional int32 node_id = 5; +}; + +message PTransmitRecCTEBlockResult { + optional PStatus status = 1; +}; + + +message PRerunFragmentParams { + enum Opcode { + wait = 1; // wait fragment execute done + release = 2; // release current round's resource + rebuild = 3; // rebuild next round pipeline tasks + submit = 4; // submit tasks to execute + close = 5; // close fragment + } + optional PUniqueId query_id = 1; + optional int32 fragment_id = 2; + optional Opcode stage = 3; +}; + +message PRerunFragmentResult { + optional PStatus status = 1; +}; + +message PResetGlobalRfParams { + optional PUniqueId query_id = 1; + repeated int32 filter_ids = 2; +}; + +message PResetGlobalRfResult { + optional PStatus status = 1; +}; + message PTabletWithPartition { required int64 partition_id = 1; required int64 tablet_id = 2; @@ -1107,6 +1146,9 @@ service PBackendService { rpc sync_filter_size(PSyncFilterSizeRequest) returns (PSyncFilterSizeResponse); rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); + rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult); + rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult); + rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns (PTransmitRecCTEBlockResult); rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult); rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cc018384f24..c4169768584 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -410,6 +410,7 @@ struct TQueryOptions { 175: optional bool enable_fuzzy_blockable_task = false; 176: optional list<i32> shuffled_agg_ids; + 177: optional i32 cte_max_recursion_depth; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. diff --git a/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out new file mode 100644 index 00000000000..ba843a71ee5 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out @@ -0,0 +1,953 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0.5403023058681398 +0.6542897904977791 +0.7013687736227565 +0.7221024250267077 +0.7314040424225098 +0.7356047404363474 +0.7375068905132428 +0.7383692041223232 +0.7387603198742113 +0.7389377567153445 +0.7390182624274122 +0.7390547907469174 +0.7390713652989449 +0.7390788859949921 +0.7390822985224024 +0.7390838469650002 +0.7390845495752126 +0.7390848683867142 +0.7390850130484203 +0.739085078689123 +0.7390851084737987 +0.7390851219886894 +0.7390851281211138 +0.7390851309037207 +0.7390851321663374 +0.7390851327392538 +0.7390851329992164 +0.7390851331171753 +0.7390851331706995 +0.7390851331949863 +0.7390851332060064 +0.7390851332110069 +0.7390851332132758 +0.7390851332143055 +0.7390851332147726 +0.7390851332149846 +0.7390851332150807 +0.7390851332151244 +0.7390851332151441 +0.7390851332151531 +0.7390851332151572 +0.7390851332151591 +0.7390851332151599 +0.7390851332151603 +0.7390851332151605 +0.7390851332151606 +0.7390851332151607 +0.7390851332151608 +0.7390851332151609 +0.7390851332151611 +0.7390851332151617 +0.7390851332151629 +0.7390851332151657 +0.7390851332151718 +0.7390851332151851 +0.7390851332152145 +0.7390851332152792 +0.739085133215422 +0.7390851332157367 +0.7390851332164302 +0.7390851332179587 +0.7390851332213271 +0.7390851332287504 +0.7390851332451103 +0.7390851332811648 +0.7390851333606233 +0.7390851335357372 +0.7390851339216605 +0.7390851347721744 +0.7390851366465718 +0.7390851407774467 +0.7390851498812394 +0.7390851699445544 +0.7390852141609171 +0.7390853116067619 +0.7390855263619245 +0.7390859996481299 +0.7390870426953322 +0.7390893414033927 +0.7390944073790913 +0.7391055719265363 +0.7391301765296711 +0.7391843997714936 +0.7393038923969059 +0.7395672022122561 +0.7401473355678757 +0.7414250866101092 +0.7442373549005569 +0.7504177617637605 +0.7639596829006542 +0.7934803587425656 +0.8575532158463934 +1 + +-- !sql -- +55 + +-- !sql -- +1 3 +1 5 +1 8 +2 4 +2 5 +2 10 +2 19 +3 1 +3 5 +3 8 +3 10 +3 24 +5 3 +5 4 +5 8 +5 15 +6 3 +6 4 +6 7 +7 4 +8 1 +9 4 + +-- !sql -- +1 3 +1 5 +2 4 +2 5 +2 10 +3 1 +3 5 +3 8 +3 10 +5 3 +5 4 +5 8 +5 10 +6 3 +6 4 +6 5 +7 4 +8 1 +8 8 +9 4 +11 1 +12 3 +29 4 + +-- !sql -- +1 3 +1 5 +2 4 +2 5 +2 10 +3 1 +3 5 +3 8 +3 10 +5 3 +5 4 +5 8 +6 3 +6 4 +7 4 +7 10 +8 1 +8 10 +9 4 +9 5 +9 10 +10 5 +10 8 +10 10 +11 5 +11 8 +11 10 +12 5 +12 8 +12 10 +13 1 +13 5 +13 8 +13 10 +14 1 +14 5 +14 8 +14 10 +15 1 +15 3 +15 5 +15 8 +15 10 +16 1 +16 3 +16 5 +16 8 +16 10 +17 1 +17 3 +17 5 +17 8 +17 10 +18 1 +18 3 +18 5 +18 8 +18 10 +19 1 +19 3 +19 5 +19 8 +19 10 +20 1 +20 3 +20 5 +20 8 +20 10 +21 1 +21 3 +21 5 +21 8 +21 10 +22 1 +22 3 +22 5 +22 8 +22 10 +23 1 +23 3 +23 5 +23 8 +23 10 +24 1 +24 3 +24 5 +24 8 +24 10 +25 1 +25 3 +25 5 +25 8 +25 10 +26 1 +26 3 +26 5 +26 8 +26 10 +27 1 +27 3 +27 5 +27 8 +27 10 +28 1 +28 3 +28 5 +28 8 +28 10 +29 1 +29 3 +29 5 +29 8 +29 10 +30 1 +30 3 +30 5 +30 8 +30 10 +31 1 +31 3 +31 5 +31 8 +31 10 +32 1 +32 3 +32 5 +32 8 +32 10 +33 1 +33 3 +33 5 +33 8 +33 10 +34 1 +34 3 +34 4 +34 5 +34 8 +34 10 +35 1 +35 3 +35 4 +35 5 +35 8 +35 10 +36 1 +36 3 +36 4 +36 5 +36 8 +36 10 +37 1 +37 3 +37 4 +37 5 +37 8 +37 10 +38 1 +38 3 +38 4 +38 5 +38 8 +38 10 +39 1 +39 3 +39 4 +39 5 +39 8 +39 10 +40 1 +40 3 +40 4 +40 5 +40 8 +40 10 +41 1 +41 3 +41 4 +41 5 +41 8 +41 10 +42 1 +42 3 +42 4 +42 5 +42 8 +42 10 +43 1 +43 3 +43 4 +43 5 +43 8 +43 10 +44 1 +44 3 +44 4 +44 5 +44 8 +44 10 +45 1 +45 3 +45 4 +45 5 +45 8 +45 10 +46 1 +46 3 +46 4 +46 5 +46 8 +46 10 +47 1 +47 3 +47 4 +47 5 +47 8 +47 10 +48 1 +48 3 +48 4 +48 5 +48 8 +48 10 +49 1 +49 3 +49 4 +49 5 +49 8 +49 10 +50 1 +50 3 +50 4 +50 5 +50 8 +50 10 +51 1 +51 3 +51 4 +51 5 +51 8 +51 10 +52 1 +52 3 +52 4 +52 5 +52 8 +52 10 +53 1 +53 3 +53 4 +53 5 +53 8 +53 10 +54 1 +54 3 +54 4 +54 5 +54 8 +54 10 +55 1 +55 3 +55 4 +55 5 +55 8 +55 10 +56 1 +56 3 +56 4 +56 5 +56 8 +56 10 +57 1 +57 3 +57 4 +57 5 +57 8 +57 10 +58 1 +58 3 +58 4 +58 5 +58 8 +58 10 +59 1 +59 3 +59 4 +59 5 +59 8 +59 10 +60 1 +60 3 +60 4 +60 5 +60 8 +60 10 +61 1 +61 3 +61 4 +61 5 +61 8 +61 10 +62 1 +62 3 +62 4 +62 5 +62 8 +62 10 +63 1 +63 3 +63 4 +63 5 +63 8 +63 10 +64 1 +64 3 +64 4 +64 5 +64 8 +64 10 +65 1 +65 3 +65 4 +65 5 +65 8 +65 10 +66 1 +66 3 +66 4 +66 5 +66 8 +66 10 +67 1 +67 3 +67 4 +67 5 +67 8 +67 10 +68 1 +68 3 +68 4 +68 5 +68 8 +68 10 +69 1 +69 3 +69 4 +69 5 +69 8 +69 10 +70 1 +70 3 +70 4 +70 5 +70 8 +70 10 +71 1 +71 3 +71 4 +71 5 +71 8 +71 10 +72 1 +72 3 +72 4 +72 5 +72 8 +72 10 +73 1 +73 3 +73 4 +73 5 +73 8 +73 10 +74 1 +74 3 +74 4 +74 5 +74 8 +74 10 +75 1 +75 3 +75 4 +75 5 +75 8 +75 10 +76 1 +76 3 +76 4 +76 5 +76 8 +76 10 +77 1 +77 3 +77 4 +77 5 +77 8 +77 10 +78 1 +78 3 +78 4 +78 5 +78 8 +78 10 +79 1 +79 3 +79 4 +79 5 +79 8 +79 10 +80 1 +80 3 +80 4 +80 5 +80 8 +80 10 +81 1 +81 3 +81 4 +81 5 +81 8 +81 10 +82 1 +82 3 +82 4 +82 5 +82 8 +82 10 +83 1 +83 3 +83 4 +83 5 +83 8 +83 10 +84 1 +84 3 +84 4 +84 5 +84 8 +84 10 +85 1 +85 3 +85 4 +85 5 +85 8 +85 10 +86 1 +86 3 +86 4 +86 5 +86 8 +86 10 +87 1 +87 3 +87 4 +87 5 +87 8 +87 10 +88 1 +88 3 +88 4 +88 5 +88 8 +88 10 +89 1 +89 3 +89 4 +89 5 +89 8 +89 10 +90 1 +90 3 +90 4 +90 5 +90 8 +90 10 +91 1 +91 3 +91 4 +91 5 +91 8 +91 10 +92 1 +92 3 +92 4 +92 5 +92 8 +92 10 +93 1 +93 3 +93 4 +93 5 +93 8 +93 10 +94 1 +94 3 +94 4 +94 5 +94 8 +94 10 +95 1 +95 3 +95 4 +95 5 +95 8 +95 10 +96 1 +96 3 +96 4 +96 5 +96 8 +96 10 +97 1 +97 3 +97 4 +97 5 +97 8 +97 10 +98 1 +98 3 +98 4 +98 5 +98 8 +98 10 +99 1 +99 3 +99 4 +99 5 +99 8 +99 10 +100 1 +100 3 +100 4 +100 5 +100 8 +100 10 + +-- !sql -- +1 3 +1 5 +1 8 +2 \N +2 4 +2 5 +2 9 +2 10 +2 15 +2 19 +2 28 +2 34 +2 43 +2 71 +2 77 +2 105 +2 176 +2 182 +2 253 +2 429 +2 435 +2 611 +2 1040 +2 1046 +2 1475 +2 2515 +2 2521 +2 3561 +2 6076 +2 6082 +2 8597 +2 14673 +2 14679 +2 20755 +2 35428 +2 35434 +2 50107 +2 85535 +2 85541 +2 120969 +2 206504 +2 206510 +2 292045 +2 498549 +2 498555 +2 705059 +2 1203608 +2 1203614 +2 1702163 +2 2905771 +2 2905777 +2 4109385 +2 7015156 +2 7015162 +2 9920933 +2 16936089 +2 16936095 +2 23951251 +2 40887340 +2 40887346 +2 57823435 +2 98710775 +2 98710781 +2 139598121 +2 238308896 +2 238308902 +2 337019677 +2 575328573 +2 575328579 +2 813637475 +2 1388966048 +2 1388966054 +2 1964294627 +3 \N +3 1 +3 5 +3 6 +3 8 +3 10 +3 14 +3 18 +3 20 +3 23 +3 41 +3 43 +3 55 +3 63 +3 96 +3 118 +3 139 +3 181 +3 235 +3 320 +3 353 +3 501 +3 588 +3 854 +3 908 +3 1355 +3 1496 +3 2263 +3 2350 +3 3618 +3 3846 +3 5968 +3 6109 +3 9586 +3 9955 +3 15695 +3 15923 +3 25281 +3 25878 +3 41204 +3 41573 +3 66485 +3 67451 +3 108058 +3 108655 +3 174543 +3 176106 +3 283198 +3 284164 +3 457741 +3 460270 +3 741905 +3 743468 +3 1199646 +3 1203738 +3 1943114 +3 1945643 +3 3142760 +3 3149381 +3 5088403 +3 5092495 +3 8231163 +3 8241876 +3 13323658 +3 13330279 +3 21554821 +3 21572155 +3 34885100 +3 34895813 +3 56439921 +3 56467968 +3 91335734 +3 91353068 +3 147775655 +3 147821036 +3 239128723 +3 239156770 +3 386904378 +3 386977806 +3 626061148 +3 626106529 +3 1012965526 +3 1013084335 +3 1639072055 +3 1639145483 +5 \N +5 3 +5 4 +5 7 +5 8 +5 12 +5 15 +5 22 +5 27 +5 34 +5 56 +5 61 +5 83 +5 139 +5 144 +5 200 +5 339 +5 344 +5 483 +5 822 +5 827 +5 1166 +5 1988 +5 1993 +5 2815 +5 4803 +5 4808 +5 6796 +5 11599 +5 11604 +5 16407 +5 28006 +5 28011 +5 39610 +5 67616 +5 67621 +5 95627 +5 163243 +5 163248 +5 230864 +5 394107 +5 394112 +5 557355 +5 951462 +5 951467 +5 1345574 +5 2297036 +5 2297041 +5 3248503 +5 5545539 +5 5545544 +5 7842580 +5 13388119 +5 13388124 +5 18933663 +5 32321782 +5 32321787 +5 45709906 +5 78031688 +5 78031693 +5 110353475 +5 188385163 +5 188385168 +5 266416856 +5 454802019 +5 454802024 +5 643187187 +5 1097989206 +5 1097989211 +5 1552791230 +6 3 +6 4 +6 7 +7 4 +8 1 +9 4 + +-- !sql -- +1 2 + +-- !sql -- +1 2 +3 4 + +-- !sql -- +1 2 +3 4 +11 22 +33 44 + +-- !sql -- +1 2 +3 4 +11 22 + +-- !sql -- +1 22 +3 22 +11 22 + +-- !sql -- +1 2 +2 3 + +-- !sql -- +1 2 +3 4 +11 22 + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out new file mode 100644 index 00000000000..3a055b01acc --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +5050 + +-- !q2 -- +0 \N ROOT +1 0 Child_1 +2 0 Child_2 +3 1 Child_1_1 + +-- !q3 -- +0 \N ROOT [0] +1 0 Child_1 [0, 1] +3 1 Child_1_1 [0, 1, 3] +2 0 Child_2 [0, 2] + +-- !q4 -- +0 \N ROOT [0] 0 +1 0 Child_1 [0, 1] 1 +2 0 Child_2 [0, 2] 1 +3 1 Child_1_1 [0, 1, 3] 2 + +-- !q5 -- +1 2 1 -> 2 +1 3 1 -> 3 +1 4 1 -> 4 +2 3 2 -> 3 +4 5 4 -> 5 + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out new file mode 100644 index 00000000000..958b6848838 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +0 0 1 +1 1 1 +2 1 2 +3 2 3 +4 3 5 +5 5 8 +6 8 13 +7 13 21 +8 21 34 +9 34 55 + +-- !q2 -- +["Oasis", "Rock", "Music", "Art"] + +-- !q3 -- +1 3 [1, 3] +1 5 [1, 5] +1 5 [1, 3, 5] +1 8 [1, 3, 8] +1 10 [1, 3, 10] +1 3 [1, 5, 3] +1 4 [1, 5, 4] +1 8 [1, 5, 8] +1 4 [1, 3, 5, 4] +1 8 [1, 3, 5, 8] +1 8 [1, 5, 3, 8] +1 10 [1, 5, 3, 10] + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out new file mode 100644 index 00000000000..3bc4cf27785 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +1 abc +2 abcabc +3 abcabcabcabc + +-- !q2 -- +1 1 -1 +2 -2 2 +3 4 -4 +4 -8 8 +5 16 -16 + +-- !q4 -- +2017-01-03 +2017-01-04 +2017-01-05 +2017-01-06 +2017-01-07 +2017-01-08 +2017-01-09 +2017-01-10 + +-- !q5 -- +2017-01-03 300 +2017-01-04 0 +2017-01-05 0 +2017-01-06 50 +2017-01-07 0 +2017-01-08 180 +2017-01-09 0 +2017-01-10 5 + +-- !q6 -- +333 Yasmina 333 +198 John 333,198 +29 Pedro 333,198,29 +4610 Sarah 333,198,29,4610 +72 Pierre 333,198,29,72 +692 Tarek 333,692 +123 Adil 333,692,123 + diff --git a/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy new file mode 100644 index 00000000000..d456f6ba998 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite ("rec_cte") { + qt_sql """ + WITH RECURSIVE test_table AS ( + SELECT + cast(1.0 as double) AS number + UNION + SELECT + cos(number) + FROM + test_table + ) + SELECT + number + FROM + test_table order by number; + """ + + qt_sql """ + WITH RECURSIVE test_table AS ( + SELECT cast(10 as int) AS number + UNION ALL + SELECT cast(number - 1 as int) FROM test_table WHERE number > 0 + ) + SELECT sum(number) FROM test_table; + """ + + + sql "DROP TABLE IF EXISTS edge;" + sql """ + CREATE TABLE edge + ( + node1id int, + node2id int + ) DUPLICATE KEY (node1id) + DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """ + INSERT INTO edge VALUES + (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1), + (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8), + (6, 3), (6, 4), (7, 4), (8, 1), (9, 4); + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + k1, + cast(sum(k2) as int) + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1) as int), + k2 + FROM t1 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + test { + sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1 + 1) as int), + k2 + FROM t1 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + exception "ABORTED" + } + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1 + 1) as int), + k2 + FROM t1 WHERE k1 < 100 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + k1, + cast(sum(k2) OVER (PARTITION BY k1 ORDER BY k1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as int) + FROM t1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + test { + sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION ALL + SELECT + 1,2 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + exception "ABORTED" + } + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 1,2 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT + 33,44 + FROM t2 GROUP BY k1 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t2.k1, t2.k2 FROM t1,t2 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t1.k1, t2.k2 FROM t1,t2 + ) + select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 2,3 + UNION + SELECT least(t1.k1,t2.k1), least(t1.k2,t2.k2) FROM t1,t2 + ) + select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t1.k1, t1.k2 FROM t1 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy new file mode 100644 index 00000000000..a01ab03db34 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://clickhouse.com/docs/sql-reference/statements/select/with +suite ("rec_cte_from_ck_doc") { + qt_q1 """ + WITH RECURSIVE test_table AS ( + SELECT cast(1 as int) AS number + UNION ALL + SELECT cast(number + 1 as int) FROM test_table WHERE number < 100 + ) + SELECT sum(number) FROM test_table; + """ + + sql "DROP TABLE IF EXISTS tree;" + sql """ + CREATE TABLE tree + ( + id int, + parent_id int, + data varchar(100) + ) DUPLICATE KEY (id) + DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO tree VALUES (0, NULL, 'ROOT'), (1, 0, 'Child_1'), (2, 0, 'Child_2'), (3, 1, 'Child_1_1');""" + + qt_q2 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree order BY id; + """ + + qt_q3 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data, array(t.id) AS path + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)) + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree ORDER BY path; + """ + + qt_q4 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data, array(t.id) AS path, cast(0 as int) AS depth + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)), cast(depth + 1 as int) + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree ORDER BY depth, id; + """ + + sql "DROP TABLE IF EXISTS graph;" + sql """ + CREATE TABLE graph + ( + c_from int, + c_to int, + label varchar(100) + ) DUPLICATE KEY (c_from) DISTRIBUTED BY HASH(c_from) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO graph VALUES (1, 2, '1 -> 2'), (1, 3, '1 -> 3'), (2, 3, '2 -> 3'), (1, 4, '1 -> 4'), (4, 5, '4 -> 5');""" + + qt_q5 """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g, search_graph sg + WHERE g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + + sql "INSERT INTO graph VALUES (5, 1, '5 -> 1');" + test { + sql """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g, search_graph sg + WHERE g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + exception "ABORTED" + } + + // test global rf + sql "set enable_runtime_filter_prune = false;" + test { + sql """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g join [shuffle] search_graph sg + on g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + exception "ABORTED" + } + + // do not support use limit to stop recursion now + //qt_q6 """ + //WITH RECURSIVE test_table AS ( + // SELECT cast(1 as int) AS number + //UNION ALL + // SELECT cast(number + 1 as int) FROM test_table + //) + //SELECT sum(number) FROM test_table LIMIT 100; + //""" +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy new file mode 100644 index 00000000000..62d70558163 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://duckdb.org/docs/stable/sql/query_syntax/with#recursive-ctes +suite ("rec_cte_from_duckdb_doc") { + qt_q1 """ + WITH RECURSIVE FibonacciNumbers ( + RecursionDepth, + FibonacciNumber, + NextNumber + ) AS ( + -- Base case + SELECT + cast(0 as int) AS RecursionDepth, + cast(0 as int) AS FibonacciNumber, + cast(1 as int) AS NextNumber + UNION + ALL -- Recursive step + SELECT + cast((fib.RecursionDepth + 1) as int) AS RecursionDepth, + fib.NextNumber AS FibonacciNumber, + cast((fib.FibonacciNumber + fib.NextNumber) as int) AS NextNumber + FROM + FibonacciNumbers fib + WHERE + cast((fib.RecursionDepth + 1) as int) < 10 + ) + SELECT + * + FROM + FibonacciNumbers fn ORDER BY fn.RecursionDepth; + """ + + sql "DROP TABLE IF EXISTS tag;" + sql """ + CREATE TABLE tag + ( + id int, + name varchar(100), + subclassof int + ) DUPLICATE KEY (id) + DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO tag VALUES + (1, 'U2', 5), + (2, 'Blur', 5), + (3, 'Oasis', 5), + (4, '2Pac', 6), + (5, 'Rock', 7), + (6, 'Rap', 7), + (7, 'Music', 9), + (8, 'Movies', 9), + (9, 'Art', NULL);""" + + qt_q2 """ + WITH RECURSIVE tag_hierarchy(id, source, path) AS ( + SELECT id, name, array(name) AS path + FROM tag + WHERE subclassof IS NULL + UNION ALL + SELECT tag.id, tag.name, array_concat(array(tag.name), tag_hierarchy.path) + FROM tag, tag_hierarchy + WHERE tag.subclassof = tag_hierarchy.id + ) + SELECT path + FROM tag_hierarchy + WHERE source = 'Oasis'; + """ + + sql "DROP TABLE IF EXISTS edge;" + sql """ + CREATE TABLE edge + ( + node1id int, + node2id int + ) DUPLICATE KEY (node1id) + DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """ + INSERT INTO edge VALUES + (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1), + (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8), + (6, 3), (6, 4), (7, 4), (8, 1), (9, 4); + """ + + qt_q3 """ + WITH RECURSIVE paths(startNode, endNode, path) AS ( + SELECT -- Define the path as the first edge of the traversal + node1id AS startNode, + node2id AS endNode, + array_concat(array(node1id), array(node2id)) AS path + FROM edge + WHERE node1id = 1 + UNION ALL + SELECT -- Concatenate new edge to the path + paths.startNode AS startNode, + node2id AS endNode, + array_concat(path, array(node2id)) AS path + FROM paths + JOIN edge ON paths.endNode = node1id + -- Prevent adding a repeated node to the path. + -- This ensures that no cycles occur. + WHERE array_contains(paths.path, node2id) = false + ) + SELECT startNode, endNode, path + FROM paths + ORDER BY array_size(path), path; + """ + + // do not support subquery containing recursive cte + //qt_q4 """ + //WITH RECURSIVE paths(startNode, endNode, path) AS ( + // SELECT -- Define the path as the first edge of the traversal + // node1id AS startNode, + // node2id AS endNode, + // array_concat(array(node1id), array(node2id)) AS path + // FROM edge + // WHERE startNode = 1 + // UNION ALL + // SELECT -- Concatenate new edge to the path + // paths.startNode AS startNode, + // node2id AS endNode, + // array_concat(path, array(node2id)) AS path + // FROM paths + // JOIN edge ON paths.endNode = node1id + // -- Prevent adding a node that was visited previously by any path. + // -- This ensures that (1) no cycles occur and (2) only nodes that + // -- were not visited by previous (shorter) paths are added to a path. + // WHERE NOT EXISTS ( + // SELECT 1 FROM paths previous_paths + // WHERE array_contains(previous_paths.path, node2id) + // ) + // ) + //SELECT startNode, endNode, path + //FROM paths + //ORDER BY array_size(path), path; + //""" + + //qt_q5 """ + //WITH RECURSIVE paths(startNode, endNode, path, endReached) AS ( + //SELECT -- Define the path as the first edge of the traversal + // node1id AS startNode, + // node2id AS endNode, + // array_concat(array(node1id), array(node2id)) AS path, + // (node2id = 8) AS endReached + // FROM edge + // WHERE startNode = 1 + //UNION ALL + //SELECT -- Concatenate new edge to the path + // paths.startNode AS startNode, + // node2id AS endNode, + // array_concat(path, array(node2id)) AS path, + // max(CASE WHEN node2id = 8 THEN 1 ELSE 0 END) + // OVER (ROWS BETWEEN UNBOUNDED PRECEDING + // AND UNBOUNDED FOLLOWING) AS endReached + // FROM paths + // JOIN edge ON paths.endNode = node1id + // WHERE NOT EXISTS ( + // FROM paths previous_paths + // WHERE array_contains(previous_paths.path, node2id) + // ) + // AND paths.endReached = 0 + //) + //SELECT startNode, endNode, path + //FROM paths + //WHERE endNode = 8 + //ORDER BY array_size(path), path; + //""" +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy new file mode 100644 index 00000000000..cee07dc6612 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://dev.mysql.com/doc/refman/8.4/en/with.html#common-table-expressions-recursive +suite ("rec_cte_from_mysql_doc") { + qt_q1 """ + WITH RECURSIVE cte AS + ( + SELECT cast(1 as int) AS n, cast('abc' as varchar(65533)) AS str + UNION ALL + SELECT cast(n + 1 as int), cast(CONCAT(str, str) as varchar(65533)) FROM cte WHERE n < 3 + ) + SELECT * FROM cte order by n; + """ + + qt_q2 """ + WITH RECURSIVE cte AS + ( + SELECT cast(1 as int) AS n, cast(1 as int) AS p, cast(-1 as int) AS q + UNION ALL + SELECT cast(n + 1 as int), cast(q * 2 as int), cast(p * 2 as int) FROM cte WHERE n < 5 + ) + SELECT * FROM cte order by n; + """ + + test { + sql """ + WITH RECURSIVE cte (n) AS + ( + SELECT cast(1 as int) + UNION ALL + SELECT cast(n + 1 as int) FROM cte + ) + SELECT n FROM cte order by n; + """ + exception "ABORTED" + } + + // do not support use limit to stop recursion now + //qt_q3 """ + //WITH RECURSIVE cte (n) AS + //( + //SELECT cast(1 as int) + //UNION ALL + //SELECT cast(n + 1 as int) FROM cte LIMIT 10000 + //) + //SELECT n FROM cte order by n; + //""" + + sql "DROP TABLE IF EXISTS sales;" + sql """ + CREATE TABLE sales + ( + c_date date, + c_price double + ) DUPLICATE KEY (c_date) + DISTRIBUTED BY HASH(c_date) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """insert into sales values + ('2017-01-03', 100.0), + ('2017-01-03', 200.0), + ('2017-01-06', 50.0), + ('2017-01-08', 10.0), + ('2017-01-08', 20.0), + ('2017-01-08', 150.0), + ('2017-01-10', 5.0);""" + + qt_q4 """ + WITH RECURSIVE dates (c_date) AS + ( + SELECT MIN(c_date) FROM sales + UNION ALL + SELECT c_date + INTERVAL 1 DAY FROM dates + WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales) + ) + SELECT * FROM dates order by 1; + """ + + qt_q5 """ + WITH RECURSIVE dates (c_date) AS + ( + SELECT MIN(c_date) FROM sales + UNION ALL + SELECT c_date + INTERVAL 1 DAY FROM dates + WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales) + ) + SELECT dates.c_date, COALESCE(SUM(c_price), 0) AS sum_price + FROM dates LEFT JOIN sales ON dates.c_date = sales.c_date + GROUP BY dates.c_date + ORDER BY dates.c_date; + """ + + sql "DROP TABLE IF EXISTS employees;" + sql """ + CREATE TABLE employees ( + id INT NOT NULL, + name VARCHAR(100) NOT NULL, + manager_id INT NULL + ) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO employees VALUES + (333, "Yasmina", NULL), + (198, "John", 333), + (692, "Tarek", 333), + (29, "Pedro", 198), + (4610, "Sarah", 29), + (72, "Pierre", 29), + (123, "Adil", 692); + """ + + qt_q6 """ + WITH RECURSIVE employee_paths (id, name, path) AS + ( + SELECT id, name, CAST(id AS varchar(65533)) + FROM employees + WHERE manager_id IS NULL + UNION ALL + SELECT e.id, e.name, cast(CONCAT(ep.path, ',', e.id) as varchar(65533)) + FROM employee_paths AS ep JOIN employees AS e + ON ep.id = e.manager_id + ) + SELECT * FROM employee_paths ORDER BY path; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
