This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_rec2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ef0ff57babbaa0d510e9f636fe596ed8d5494dcf Author: BiteTheDDDDt <[email protected]> AuthorDate: Thu Sep 18 21:02:33 2025 +0800 recursive cte be part --- be/src/pipeline/exec/exchange_source_operator.cpp | 5 + be/src/pipeline/exec/exchange_source_operator.h | 19 + be/src/pipeline/exec/operator.cpp | 10 + be/src/pipeline/exec/operator.h | 2 + .../pipeline/exec/rec_cte_anchor_sink_operator.cpp | 56 ++ .../pipeline/exec/rec_cte_anchor_sink_operator.h | 114 +++ be/src/pipeline/exec/rec_cte_scan_operator.h | 87 ++ be/src/pipeline/exec/rec_cte_sink_operator.cpp | 55 ++ be/src/pipeline/exec/rec_cte_sink_operator.h | 101 +++ be/src/pipeline/exec/rec_cte_source_operator.cpp | 86 ++ be/src/pipeline/exec/rec_cte_source_operator.h | 228 +++++ be/src/pipeline/exec/union_sink_operator.h | 42 +- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 236 +++-- be/src/pipeline/pipeline_fragment_context.h | 18 +- be/src/pipeline/pipeline_task.cpp | 25 +- be/src/pipeline/rec_cte_shared_state.h | 175 ++++ be/src/runtime/fragment_mgr.cpp | 60 ++ be/src/runtime/fragment_mgr.h | 11 + be/src/runtime/query_context.cpp | 47 + be/src/runtime/query_context.h | 28 +- be/src/runtime/runtime_predicate.h | 1 + be/src/runtime/runtime_state.cpp | 16 +- be/src/runtime/runtime_state.h | 19 +- be/src/runtime_filter/runtime_filter_consumer.h | 15 +- be/src/runtime_filter/runtime_filter_merger.h | 2 + be/src/runtime_filter/runtime_filter_mgr.cpp | 22 +- be/src/runtime_filter/runtime_filter_mgr.h | 26 +- be/src/service/internal_service.cpp | 51 ++ be/src/service/internal_service.h | 10 + .../runtime_filter_consumer_test.cpp | 20 +- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 9 +- gensrc/proto/internal_service.proto | 42 + gensrc/thrift/PaloInternalService.thrift | 1 + .../data/rec_cte_p0/rec_cte/rec_cte.out | 953 +++++++++++++++++++++ .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.out | 29 + .../rec_cte_from_duckdb_doc.out | 30 + .../rec_cte_from_mysql_doc.out | 42 + .../suites/rec_cte_p0/rec_cte/rec_cte.groovy | 271 ++++++ .../rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy | 144 ++++ .../rec_cte_from_duckdb_doc.groovy | 185 ++++ .../rec_cte_from_mysql_doc.groovy | 140 +++ 43 files changed, 3316 insertions(+), 121 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index b31b193aff2..8762b40fa40 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -224,4 +224,9 @@ Status ExchangeSourceOperatorX::close(RuntimeState* state) { _is_closed = true; return OperatorX<ExchangeLocalState>::close(state); } + +Status ExchangeSourceOperatorX::reset(RuntimeState* state) { + auto& local_state = get_local_state(state); + return local_state.reset(state); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 3008217e130..03f2a288cdf 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -48,6 +48,23 @@ public: Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; + Status reset(RuntimeState* state) { + if (stream_recvr) { + stream_recvr->close(); + } + create_stream_recvr(state); + + is_ready = false; + num_rows_skipped = 0; + + const auto& queues = stream_recvr->sender_queues(); + for (size_t i = 0; i < queues.size(); i++) { + deps[i]->block(); + queues[i]->set_dependency(deps[i]); + } + return Status::OK(); + } + std::vector<Dependency*> dependencies() const override { std::vector<Dependency*> dep_vec; std::for_each(deps.begin(), deps.end(), @@ -83,6 +100,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; + Status reset(RuntimeState* state); + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 0eea8c7e157..ceaa9148b23 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -62,6 +62,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -803,6 +807,8 @@ DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) DECLARE_OPERATOR(CacheSinkLocalState) DECLARE_OPERATOR(DictSinkLocalState) +DECLARE_OPERATOR(RecCTESinkLocalState) +DECLARE_OPERATOR(RecCTEAnchorSinkLocalState) #undef DECLARE_OPERATOR @@ -836,6 +842,8 @@ DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) DECLARE_OPERATOR(CacheSourceLocalState) +DECLARE_OPERATOR(RecCTESourceLocalState) +DECLARE_OPERATOR(RecCTEScanLocalState) #ifdef BE_TEST DECLARE_OPERATOR(MockLocalState) @@ -871,6 +879,7 @@ template class PipelineXSinkLocalState<SetSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; template class PipelineXSinkLocalState<DataQueueSharedState>; +template class PipelineXSinkLocalState<RecCTESharedState>; template class PipelineXLocalState<HashJoinSharedState>; template class PipelineXLocalState<PartitionedHashJoinSharedState>; @@ -888,6 +897,7 @@ template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; template class PipelineXLocalState<LocalExchangeSharedState>; template class PipelineXLocalState<BasicSharedState>; +template class PipelineXLocalState<RecCTESharedState>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d5630fd4aa6..7492ee75ae9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -610,6 +610,8 @@ public: // For agg/sort/join sink. virtual Status init(const TPlanNode& tnode, RuntimeState* state); + virtual bool need_rerun(RuntimeState* state) const { return false; } + Status init(const TDataSink& tsink) override; [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle, diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp new file mode 100644 index 00000000000..f1552ed8e5e --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.cpp @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTEAnchorSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[0]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + _name = "REC_CTE_ANCHOR_SINK_OPERATOR"; + return Status::OK(); +} + +Status RecCTEAnchorSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h new file mode 100644 index 00000000000..340949a2f79 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_anchor_sink_operator.h @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTEAnchorSinkOperatorX; +class RecCTEAnchorSinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTEAnchorSinkLocalState); + RecCTEAnchorSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + + bool is_blockable() const override { return true; } + +private: + friend class RecCTEAnchorSinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTEAnchorSinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTEAnchorSinkOperatorX MOCK_REMOVE(final) + : public DataSinkOperatorX<RecCTEAnchorSinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTEAnchorSinkLocalState>; + + friend class RecCTEAnchorSinkLocalState; + RecCTEAnchorSinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTEAnchorSinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + if (_need_notify_rec_side_ready) { + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(state, 0)); + _need_notify_rec_side_ready = false; + } + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->anchor_dep->set_ready(); + } + return Status::OK(); + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { + std::shared_ptr<BasicSharedState> ss = std::make_shared<RecCTESharedState>(); + ss->id = operator_id(); + for (const auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; + + bool _need_notify_rec_side_ready = true; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h b/be/src/pipeline/exec/rec_cte_scan_operator.h new file mode 100644 index 00000000000..661d97927d1 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_scan_operator.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "pipeline/exec/operator.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; +} // namespace doris + +namespace doris::pipeline { + +class RecCTEScanOperatorX; +class RecCTEScanLocalState final : public PipelineXLocalState<> { +public: + ENABLE_FACTORY_CREATOR(RecCTEScanLocalState); + + RecCTEScanLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState<>(state, parent) { + _scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_DEPENDENCY"); + state->get_query_ctx()->registe_cte_scan(state->fragment_instance_id(), parent->node_id(), + this); + } + ~RecCTEScanLocalState() override { + state()->get_query_ctx()->deregiste_cte_scan(state()->fragment_instance_id(), + parent()->node_id()); + } + + Status add_block(const PBlock& pblock) { + vectorized::Block block; + RETURN_IF_ERROR(block.deserialize(pblock)); + _blocks.emplace_back(std::move(block)); + return Status::OK(); + } + + void set_ready() { _scan_dependency->set_ready(); } + + std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; } + +private: + friend class RecCTEScanOperatorX; + std::vector<vectorized::Block> _blocks; + DependencySPtr _scan_dependency = nullptr; +}; + +class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> { +public: + RecCTEScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs) {} + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + + if (local_state._blocks.empty()) { + *eos = true; + return Status::OK(); + } + *block = std::move(local_state._blocks.back()); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns())); + local_state._blocks.pop_back(); + return Status::OK(); + } + + bool is_source() const override { return true; } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.cpp b/be/src/pipeline/exec/rec_cte_sink_operator.cpp new file mode 100644 index 00000000000..1508b478bca --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.cpp @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_sink_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status RecCTESinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_sink_operator.h b/be/src/pipeline/exec/rec_cte_sink_operator.h new file mode 100644 index 00000000000..2321ba16ea3 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_sink_operator.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class RecCTESinkOperatorX; +class RecCTESinkLocalState final : public PipelineXSinkLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESinkLocalState); + RecCTESinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status open(RuntimeState* state) override; + +private: + friend class RecCTESinkOperatorX; + using Base = PipelineXSinkLocalState<RecCTESharedState>; + using Parent = RecCTESinkOperatorX; + + vectorized::VExprContextSPtrs _child_expr; +}; + +class RecCTESinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<RecCTESinkLocalState> { +public: + using Base = DataSinkOperatorX<RecCTESinkLocalState>; + + friend class RecCTESinkLocalState; + RecCTESinkOperatorX(int sink_id, int dest_id, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(sink_id, tnode.node_id, dest_id), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + + ~RecCTESinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + bool need_rerun(RuntimeState* state) const override { + return get_local_state(state)._shared_state->ready_to_return == false; + } + + std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override { + auto& local_state = get_local_state(state); + + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + if (input_block->rows() != 0) { + vectorized::Block block; + RETURN_IF_ERROR(materialize_block(local_state._child_expr, input_block, &block, true)); + RETURN_IF_ERROR(local_state._shared_state->emplace_block(state, std::move(block))); + } + + if (eos) { + local_state._shared_state->source_dep->set_ready(); + } + return Status::OK(); + } + +private: + const RowDescriptor _row_descriptor; + vectorized::VExprContextSPtrs _child_expr; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/rec_cte_source_operator.cpp b/be/src/pipeline/exec/rec_cte_source_operator.cpp new file mode 100644 index 00000000000..9a02bb947b8 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/rec_cte_source_operator.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +RecCTESourceLocalState::RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + +Status RecCTESourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + return Status::OK(); +} + +Status RecCTESourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + _shared_state->targets = _parent->cast<RecCTESourceOperatorX>()._targets; + _shared_state->max_recursion_depth = + _parent->cast<RecCTESourceOperatorX>()._max_recursion_depth; + _shared_state->source_dep = _dependency; + _anchor_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_ANCHOR_DEPENDENCY"); + _shared_state->anchor_dep = _anchor_dependency.get(); + + auto& p = _parent->cast<Parent>(); + _child_expr.resize(p._child_expr.size()); + for (size_t i = 0; i < p._child_expr.size(); i++) { + RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); + } + if (!_parent->cast<RecCTESourceOperatorX>()._is_union_all) { + _shared_state->agg_data = std::make_unique<DistinctDataVariants>(); + RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(_shared_state->agg_data.get(), + get_data_types(_child_expr), false)); + } + + _shared_state->hash_table_compute_timer = + ADD_TIMER(Base::custom_profile(), "HashTableComputeTime"); + _shared_state->hash_table_emplace_timer = + ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime"); + _shared_state->hash_table_input_counter = + ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT); + return Status::OK(); +} + +Status RecCTESourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.rec_cte_node); + + _max_recursion_depth = state->cte_max_recursion_depth(); + + { + const auto& texprs = tnode.rec_cte_node.result_expr_lists[1]; + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _child_expr = ctxs; + } + return Status::OK(); +} + +Status RecCTESourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h b/be/src/pipeline/exec/rec_cte_source_operator.h new file mode 100644 index 00000000000..e36358161d2 --- /dev/null +++ b/be/src/pipeline/exec/rec_cte_source_operator.h @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <gen_cpp/internal_service.pb.h> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/union_sink_operator.h" +#include "pipeline/rec_cte_shared_state.h" +#include "util/brpc_client_cache.h" +#include "util/uid_util.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class DataQueue; + +class RecCTESourceOperatorX; +class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedState> { +public: + ENABLE_FACTORY_CREATOR(RecCTESourceLocalState); + using Base = PipelineXLocalState<RecCTESharedState>; + using Parent = RecCTESourceOperatorX; + RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent); + + Status open(RuntimeState* state) override; + Status init(RuntimeState* state, LocalStateInfo& info) override; + + bool is_blockable() const override { return true; } + + std::vector<Dependency*> dependencies() const override { + return std::vector<Dependency*> {_dependency, _anchor_dependency.get()}; + } + +private: + friend class RecCTESourceOperatorX; + friend class OperatorX<RecCTESourceLocalState>; + + vectorized::VExprContextSPtrs _child_expr; + + std::shared_ptr<Dependency> _anchor_dependency = nullptr; +}; + +class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> { +public: + using Base = OperatorX<RecCTESourceLocalState>; + RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), + _is_union_all(tnode.rec_cte_node.is_union_all), + _targets(tnode.rec_cte_node.targets), + _fragments_to_reset(tnode.rec_cte_node.fragments_to_reset), + _global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids), + _is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) { + DCHECK(tnode.__isset.rec_cte_node); + } + ~RecCTESourceOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + + Status terminate(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::terminate(state); + } + + Status close(RuntimeState* state) override { + RETURN_IF_ERROR(_send_close(state)); + return Base::close(state); + } + + Status set_child(OperatorPtr child) override { + Base::_child = child; + return Status::OK(); + } + + bool is_serial_operator() const override { return true; } + + DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { + return {ExchangeType::NOOP}; + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + auto& local_state = get_local_state(state); + auto& ctx = local_state._shared_state; + ctx->update_ready_to_return(); + + if (!ctx->ready_to_return) { + if (ctx->current_round + 1 > _max_recursion_depth) { + return Status::Aborted("reach cte_max_recursion_depth {}", _max_recursion_depth); + } + + ctx->source_dep->block(); + // ctx->blocks.size() may be changed after _recursive_process + int current_blocks_size = int(ctx->blocks.size()); + RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset)); + ctx->current_round++; + ctx->last_round_offset = current_blocks_size; + } else { + if (ctx->blocks.empty()) { + *eos = true; + } else { + block->swap(ctx->blocks.back()); + RETURN_IF_ERROR( + local_state.filter_block(local_state.conjuncts(), block, block->columns())); + ctx->blocks.pop_back(); + } + } + return Status::OK(); + } + + bool is_source() const override { return true; } + +private: + Status _send_close(RuntimeState* state) { + if (!_aready_send_close && !_is_used_by_other_rec_cte) { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close)); + _aready_send_close = true; + + auto* round_counter = ADD_COUNTER(get_local_state(state).Base::custom_profile(), + "RecursiveRound", TUnit::UNIT); + round_counter->set(int64_t(get_local_state(state)._shared_state->current_round)); + } + return Status::OK(); + } + + Status _recursive_process(RuntimeState* state, size_t last_round_offset) const { + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::wait)); + RETURN_IF_ERROR(_send_reset_global_rf(state)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::release)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::rebuild)); + RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::submit)); + RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets( + state, last_round_offset)); + return Status::OK(); + } + + Status _send_reset_global_rf(RuntimeState* state) const { + TNetworkAddress addr; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr)); + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr); + PResetGlobalRfParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + for (auto filter_id : _global_rf_ids) { + request.add_filter_ids(filter_id); + } + + PResetGlobalRfResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->reset_global_rf(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + return Status::create(result.status()); + } + + Status _send_rerun_fragments(RuntimeState* state, PRerunFragmentParams_Opcode stage) const { + for (auto fragment : _fragments_to_reset) { + if (state->fragment_id() == fragment.fragment_id) { + return Status::InternalError("Fragment {} contains a recursive CTE node", + fragment.fragment_id); + } + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + fragment.addr); + + PRerunFragmentParams request; + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.set_fragment_id(fragment.fragment_id); + request.set_stage(stage); + + PRerunFragmentResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + stub->rerun_fragment(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + if (controller.Failed()) { + return Status::InternalError(controller.ErrorText()); + } + + RETURN_IF_ERROR(Status::create(result.status())); + } + return Status::OK(); + } + + friend class RecCTESourceLocalState; + + vectorized::VExprContextSPtrs _child_expr; + + const bool _is_union_all = false; + std::vector<TRecCTETarget> _targets; + std::vector<TRecCTEResetInfo> _fragments_to_reset; + std::vector<int> _global_rf_ids; + + int _max_recursion_depth = 0; + + bool _aready_send_close = false; + + bool _is_used_by_other_rec_cte = false; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 06b95cdf92d..764f8099427 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -29,6 +29,24 @@ namespace doris { #include "common/compile_check_begin.h" class RuntimeState; +inline Status materialize_block(const vectorized::VExprContextSPtrs& exprs, + vectorized::Block* src_block, vectorized::Block* res_block, + bool need_clone) { + vectorized::ColumnsWithTypeAndName columns; + auto rows = src_block->rows(); + for (const auto& expr : exprs) { + int result_column_id = -1; + RETURN_IF_ERROR(expr->execute(src_block, &result_column_id)); + const auto& src_col_with_type = src_block->get_by_position(result_column_id); + vectorized::ColumnPtr cloned_col = need_clone + ? src_col_with_type.column->clone_resized(rows) + : src_col_with_type.column; + columns.emplace_back(cloned_col, src_col_with_type.type, src_col_with_type.name); + } + *res_block = {columns}; + return Status::OK(); +} + namespace pipeline { class DataQueue; @@ -153,27 +171,17 @@ private: vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, row_descriptor()); vectorized::Block res; - RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); + auto& local_state = get_local_state(state); + { + SCOPED_TIMER(local_state._expr_timer); + RETURN_IF_ERROR( + materialize_block(local_state._child_expr, input_block, &res, false)); + } + local_state._child_row_idx += res.rows(); RETURN_IF_ERROR(mblock.merge(res)); } return Status::OK(); } - - Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, - vectorized::Block* res_block) { - auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._expr_timer); - const auto& child_exprs = local_state._child_expr; - vectorized::ColumnsWithTypeAndName colunms; - for (size_t i = 0; i < child_exprs.size(); ++i) { - int result_column_id = -1; - RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); - colunms.emplace_back(src_block->get_by_position(result_column_id)); - } - local_state._child_row_idx += src_block->rows(); - *res_block = {colunms}; - return Status::OK(); - } }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 1d0ddcf178e..2a20a5cd73d 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -133,7 +133,7 @@ public: fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i)); } fmt::format_to(debug_string_buffer, "\n{}", - _sink->debug_string(cast_set<int>(_operators.size()))); + _sink ? _sink->debug_string(cast_set<int>(_operators.size())) : "null"); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b5c2f7084dc..ba7937e9522 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -82,6 +82,10 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/rec_cte_anchor_sink_operator.h" +#include "pipeline/exec/rec_cte_scan_operator.h" +#include "pipeline/exec/rec_cte_sink_operator.h" +#include "pipeline/exec/rec_cte_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -134,7 +138,9 @@ PipelineFragmentContext::PipelineFragmentContext( _is_report_on_cancel(true), _report_status_cb(std::move(report_status_cb)), _params(request), - _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0) { + _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0), + _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close + : false) { _fragment_watcher.start(); } @@ -142,23 +148,8 @@ PipelineFragmentContext::~PipelineFragmentContext() { LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id); - // The memory released by the query end is recorded in the query mem tracker. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); - auto st = _query_ctx->exec_status(); - for (size_t i = 0; i < _tasks.size(); i++) { - if (!_tasks[i].empty()) { - _call_back(_tasks[i].front().first->runtime_state(), &st); - } - } - _tasks.clear(); - _dag.clear(); - _pip_id_to_pipeline.clear(); - _pipelines.clear(); - _sink.reset(); - _root_op.reset(); + _release_resource(); _runtime_state.reset(); - _runtime_filter_mgr_map.clear(); - _op_id_to_shared_state.clear(); _query_ctx.reset(); } @@ -253,6 +244,54 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } +Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) { + { + SCOPED_TIMER(_build_pipelines_timer); + // 2. Build pipelines with operators in this fragment. + auto root_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, + &_root_op, root_pipeline)); + + // 3. Create sink operator + if (!_params.fragment.__isset.output_sink) { + return Status::InternalError("No output sink in this fragment!"); + } + RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, + _params.fragment.output_exprs, _params, + root_pipeline->output_row_desc(), _runtime_state.get(), + *_desc_tbl, root_pipeline->id())); + RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); + RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); + + for (PipelinePtr& pipeline : _pipelines) { + DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); + RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); + } + } + // 4. Build local exchanger + if (_runtime_state->enable_local_shuffle()) { + SCOPED_TIMER(_plan_local_exchanger_timer); + RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, + _params.bucket_seq_to_instance_idx, + _params.shuffle_idx_to_instance_idx)); + } + + // 5. Initialize global states in pipelines. + for (PipelinePtr& pipeline : _pipelines) { + SCOPED_TIMER(_prepare_all_pipelines_timer); + pipeline->children().clear(); + RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); + } + + { + SCOPED_TIMER(_build_tasks_timer); + // 6. Build pipeline tasks and initialize local state. + RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); + } + + return Status::OK(); +} + Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { if (_prepared) { return Status::InternalError("Already prepared"); @@ -277,7 +316,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { auto* fragment_context = this; - if (_params.query_options.__isset.is_report_success) { + if (!_need_notify_close && _params.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(_params.query_options.is_report_success); } @@ -322,49 +361,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { } } - { - SCOPED_TIMER(_build_pipelines_timer); - // 2. Build pipelines with operators in this fragment. - auto root_pipeline = add_pipeline(); - RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, - &_root_op, root_pipeline)); - - // 3. Create sink operator - if (!_params.fragment.__isset.output_sink) { - return Status::InternalError("No output sink in this fragment!"); - } - RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, - _params.fragment.output_exprs, _params, - root_pipeline->output_row_desc(), _runtime_state.get(), - *_desc_tbl, root_pipeline->id())); - RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); - RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); - - for (PipelinePtr& pipeline : _pipelines) { - DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); - RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); - } - } - // 4. Build local exchanger - if (_runtime_state->enable_local_shuffle()) { - SCOPED_TIMER(_plan_local_exchanger_timer); - RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, - _params.bucket_seq_to_instance_idx, - _params.shuffle_idx_to_instance_idx)); - } - - // 5. Initialize global states in pipelines. - for (PipelinePtr& pipeline : _pipelines) { - SCOPED_TIMER(_prepare_all_pipelines_timer); - pipeline->children().clear(); - RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); - } - - { - SCOPED_TIMER(_build_tasks_timer); - // 6. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); - } + RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool)); _init_next_report_time(); @@ -374,6 +371,7 @@ Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { _total_tasks = 0; + _closed_tasks = 0; const auto target_size = _params.local_params.size(); _tasks.resize(target_size); _runtime_filter_mgr_map.resize(target_size); @@ -422,6 +420,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { task_runtime_state->set_task_execution_context(shared_from_this()); task_runtime_state->set_be_number(local_params.backend_num); + if (_need_notify_close) { + // rec cte require child rf to wait infinitely to make sure all rpc done + task_runtime_state->set_force_make_rf_wait_infinite(); + } if (_params.__isset.backend_id) { task_runtime_state->set_backend_id(_params.backend_id); @@ -1645,6 +1647,42 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); break; } + case TPlanNodeType::REC_CTE_NODE: { + op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + + PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id()); + + DataSinkOperatorPtr anchor_sink; + anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(), + op->operator_id(), tnode, descs); + RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink)); + RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), anchor_side_pipe); + + PipelinePtr rec_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(rec_side_pipe->id()); + + DataSinkOperatorPtr rec_sink; + rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(), + tnode, descs); + RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink)); + RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get())); + _pipeline_parent_map.push(op->node_id(), rec_side_pipe); + + break; + } + case TPlanNodeType::REC_CTE_SCAN_NODE: { + op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + break; + } default: return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); @@ -1750,7 +1788,10 @@ void PipelineFragmentContext::_close_fragment_instance() { if (_is_fragment_instance_closed) { return; } - Defer defer_op {[&]() { _is_fragment_instance_closed = true; }}; + Defer defer_op {[&]() { + _is_fragment_instance_closed = true; + _close_cv.notify_all(); + }}; _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast<void>(send_report(true)); // Print profile content in info log is a tempoeray solution for stream load and external_connector. @@ -1785,8 +1826,10 @@ void PipelineFragmentContext::_close_fragment_instance() { collect_realtime_load_channel_profile()); } - // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + if (!_need_notify_close) { + // all submitted tasks done + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } } void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { @@ -1923,8 +1966,10 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const } std::string PipelineFragmentContext::debug_string() { + std::lock_guard<std::mutex> l(_task_mutex); fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n"); + fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\nneed_notify_close: {}\n", + _need_notify_close); for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { @@ -1998,5 +2043,66 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const { _runtime_state->profile_level()); return load_channel_profile; } + +Status PipelineFragmentContext::wait_close(bool close) { + if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) { + return Status::InternalError("stream load do not support reset"); + } + if (!_need_notify_close) { + return Status::InternalError("_need_notify_close is false, do not support reset"); + } + + { + std::unique_lock<std::mutex> lock(_task_mutex); + _close_cv.wait(lock, [this] { return _is_fragment_instance_closed.load(); }); + } + + if (close) { + _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); + } + return Status::OK(); +} + +Status PipelineFragmentContext::set_to_rerun() { + { + std::lock_guard<std::mutex> l(_task_mutex); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + for (size_t i = 0; i < _tasks.size(); i++) { + if (!_tasks[i].empty()) { + _tasks[i].front().first->runtime_state()->reset_to_rerun(); + } + } + } + _release_resource(); + _runtime_state->reset_to_rerun(); + return Status::OK(); +} + +Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) { + _submitted = false; + _is_fragment_instance_closed = false; + return _build_and_prepare_full_pipeline(thread_pool); +} + +void PipelineFragmentContext::_release_resource() { + std::lock_guard<std::mutex> l(_task_mutex); + // The memory released by the query end is recorded in the query mem tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); + auto st = _query_ctx->exec_status(); + for (size_t i = 0; i < _tasks.size(); i++) { + if (!_tasks[i].empty()) { + _call_back(_tasks[i].front().first->runtime_state(), &st); + } + } + _tasks.clear(); + _dag.clear(); + _pip_id_to_pipeline.clear(); + _pipelines.clear(); + _sink.reset(); + _root_op.reset(); + _runtime_filter_mgr_map.clear(); + _op_id_to_shared_state.clear(); +} + #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 81b3f57b01f..40bb80f72ec 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -115,6 +115,9 @@ public: [[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const; void clear_finished_tasks() { + if (_need_notify_close) { + return; + } for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { _tasks[j][i].first->stop_if_finished(); @@ -125,7 +128,17 @@ public: std::string get_load_error_url(); std::string get_first_error_msg(); + Status wait_close(bool close); + Status rebuild(ThreadPool* thread_pool); + Status set_to_rerun(); + + bool need_notify_close() const { return _need_notify_close; } + private: + void _release_resource(); + + Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool); + Status _build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes, @@ -182,6 +195,7 @@ private: Pipelines _pipelines; PipelineId _next_pipeline_id = 0; std::mutex _task_mutex; + std::condition_variable _close_cv; int _closed_tasks = 0; // After prepared, `_total_tasks` is equal to the size of `_tasks`. // When submit fail, `_total_tasks` is equal to the number of tasks submitted. @@ -203,7 +217,7 @@ private: RuntimeProfile::Counter* _build_tasks_timer = nullptr; std::function<void(RuntimeState*, Status*)> _call_back; - bool _is_fragment_instance_closed = false; + std::atomic_bool _is_fragment_instance_closed = false; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. @@ -323,6 +337,8 @@ private: TPipelineFragmentParams _params; int32_t _parallel_instances = 0; + + bool _need_notify_close = false; }; } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 930e885fc3b..edc56332058 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -30,6 +30,7 @@ #include "common/logging.h" #include "common/status.h" #include "pipeline/dependency.h" +#include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/pipeline.h" @@ -553,10 +554,6 @@ Status PipelineTask::execute(bool* done) { } } - if (_eos) { - RETURN_IF_ERROR(close(Status::OK(), false)); - } - DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { auto required_pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( @@ -595,6 +592,22 @@ Status PipelineTask::execute(bool* done) { RETURN_IF_ERROR(block->check_type_and_column()); status = _sink->sink(_state, block, _eos); + if (_eos) { + if (_sink->need_rerun(_state)) { + if (auto* source = dynamic_cast<ExchangeSourceOperatorX*>(_root); + source != nullptr) { + RETURN_IF_ERROR(source->reset(_state)); + _eos = false; + } else { + return Status::InternalError( + "Only ExchangeSourceOperatorX can be rerun, real is {}", + _root->get_name()); + } + } else { + RETURN_IF_ERROR(close(Status::OK(), false)); + } + } + if (status.is<ErrorCode::END_OF_FILE>()) { set_wake_up_early(); return Status::OK(); @@ -857,7 +870,9 @@ Status PipelineTask::wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* d _blocked_dep = nullptr; auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this()); RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE)); - RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + if (auto f = _fragment_context.lock(); f) { + RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder)); + } return Status::OK(); } diff --git a/be/src/pipeline/rec_cte_shared_state.h b/be/src/pipeline/rec_cte_shared_state.h new file mode 100644 index 00000000000..17fb00c82f5 --- /dev/null +++ b/be/src/pipeline/rec_cte_shared_state.h @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "dependency.h" +#include "pipeline/common/distinct_agg_utils.h" +#include "util/brpc_client_cache.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +struct RecCTESharedState : public BasicSharedState { + std::vector<TRecCTETarget> targets; + std::vector<vectorized::Block> blocks; + vectorized::IColumn::Selector distinct_row; + Dependency* source_dep = nullptr; + Dependency* anchor_dep = nullptr; + vectorized::Arena arena; + RuntimeProfile::Counter* hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* hash_table_input_counter = nullptr; + + std::unique_ptr<DistinctDataVariants> agg_data = nullptr; + + int current_round = 0; + int last_round_offset = 0; + int max_recursion_depth = 0; + bool ready_to_return = false; + + void update_ready_to_return() { + if (last_round_offset == blocks.size()) { + ready_to_return = true; + } + } + + Status emplace_block(RuntimeState* state, vectorized::Block&& block) { + if (agg_data) { + auto num_rows = uint32_t(block.rows()); + vectorized::ColumnRawPtrs raw_columns; + std::vector<vectorized::ColumnPtr> columns = block.get_columns_and_convert(); + for (auto& col : columns) { + raw_columns.push_back(col.get()); + } + + std::visit(vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "uninited hash table"); + }, + [&](auto& agg_method) -> void { + SCOPED_TIMER(hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + + AggState agg_state(raw_columns); + agg_method.init_serialized_keys(raw_columns, num_rows); + distinct_row.clear(); + + size_t row = 0; + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, arena); + ctor(key); + distinct_row.push_back(row); + }; + auto creator_for_null_key = [&]() { + distinct_row.push_back(row); + }; + + SCOPED_TIMER(hash_table_emplace_timer); + for (; row < num_rows; ++row) { + agg_method.lazy_emplace(agg_state, row, creator, + creator_for_null_key); + } + COUNTER_UPDATE(hash_table_input_counter, num_rows); + }}, + agg_data->method_variant); + + if (distinct_row.size() == block.rows()) { + blocks.emplace_back(std::move(block)); + } else if (!distinct_row.empty()) { + auto distinct_block = vectorized::MutableBlock(block.clone_empty()); + RETURN_IF_ERROR(block.append_to_block_by_selector(&distinct_block, distinct_row)); + blocks.emplace_back(distinct_block.to_block()); + } + } else { + blocks.emplace_back(std::move(block)); + } + return Status::OK(); + } + + PTransmitRecCTEBlockParams build_basic_param(RuntimeState* state, + const TRecCTETarget& target) const { + PTransmitRecCTEBlockParams request; + request.set_node_id(target.node_id); + request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto()); + request.mutable_fragment_instance_id()->CopyFrom( + UniqueId(target.fragment_instance_id).to_proto()); + return request; + } + + Status send_data_to_targets(RuntimeState* state, size_t round_offset) const { + int send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; + int block_number_per_target = + int(blocks.size() - round_offset + targets.size() - 1) / targets.size(); + for (auto target : targets) { + auto stub = + state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client( + target.addr); + if (!stub) { + return Status::InternalError(fmt::format("Get rpc stub failed, host={}, port={}", + target.addr.hostname, target.addr.port)); + } + + // send blocks + int step = block_number_per_target; + while (round_offset < blocks.size() && step > 0) { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + auto current_bytes = 0; + while (round_offset < blocks.size() && step > 0 && + current_bytes < send_multi_blocks_byte_size) { + auto* pblock = request.add_blocks(); + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + RETURN_IF_ERROR(blocks[round_offset].serialize( + state->be_exec_version(), pblock, &uncompressed_bytes, + &compressed_bytes, state->fragement_transmission_compression_type())); + round_offset++; + step--; + current_bytes += compressed_bytes; + } + request.set_eos(false); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + controller.set_timeout_ms( + get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout())); + + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + + // send eos + { + PTransmitRecCTEBlockParams request = build_basic_param(state, target); + request.set_eos(true); + + PTransmitRecCTEBlockResult result; + brpc::Controller controller; + stub->transmit_rec_cte_block(&controller, &request, &result, brpc::DoNothing()); + brpc::Join(controller.call_id()); + RETURN_IF_ERROR(Status::create(result.status())); + } + } + return Status::OK(); + } +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 47fd9e264bf..f531e1ef2b9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1403,6 +1403,66 @@ Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatis print_id(query_id), query_stats); } +Status FragmentMgr::transmit_rec_cte_block( + const TUniqueId& query_id, const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, eos); + } else { + return Status::EndOfFile( + "Transmit rec cte block failed: Query context (query-id: {}) not found, maybe " + "finished", + print_id(query_id)); + } +} + +Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + auto fragment_ctx = _pipeline_map.find({query_id, fragment}); + if (!fragment_ctx) { + return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found", + print_id(query_id), fragment); + } + + if (stage == PRerunFragmentParams::wait) { + return fragment_ctx->wait_close(false); + } + if (stage == PRerunFragmentParams::release) { + return fragment_ctx->set_to_rerun(); + } + if (stage == PRerunFragmentParams::rebuild) { + return fragment_ctx->rebuild(_thread_pool.get()); + } + if (stage == PRerunFragmentParams::submit) { + return fragment_ctx->submit(); + } + if (stage == PRerunFragmentParams::close) { + return fragment_ctx->wait_close(true); + } + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + +Status FragmentMgr::reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (auto q_ctx = get_query_ctx(query_id)) { + SCOPED_ATTACH_TASK(q_ctx.get()); + return q_ctx->reset_global_rf(filter_ids); + } else { + return Status::NotFound( + "reset_fragment: Query context (query-id: {}) not found, maybe finished", + print_id(query_id)); + } + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index b0c1a3ad592..58725a49a63 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -184,6 +184,17 @@ public: std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id); + Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& instance_id, + int node_id, + const google::protobuf::RepeatedPtrField<PBlock>& pblocks, + bool eos); + + Status rerun_fragment(const TUniqueId& query_id, int fragment, + PRerunFragmentParams_Opcode stage); + + Status reset_global_rf(const TUniqueId& query_id, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: struct BrpcItem { TNetworkAddress network_address; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 5332b886d58..b3097dde69f 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "olap/olap_common.h" #include "pipeline/dependency.h" +#include "pipeline/exec/rec_cte_scan_operator.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -460,6 +461,10 @@ QueryContext::_collect_realtime_query_profile() { continue; } + if (fragment_ctx->need_notify_close()) { + continue; + } + auto profile = fragment_ctx->collect_realtime_profile(); if (profile.empty()) { @@ -497,4 +502,46 @@ TReportExecStatusParams QueryContext::get_realtime_exec_status() { return exec_status; } +Status QueryContext::send_block_to_cte_scan( + const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto it = _cte_scan.find(std::make_pair(instance_id, node_id)); + if (it == _cte_scan.end()) { + return Status::InternalError("RecCTEScan not found for instance {}, node {}", + print_id(instance_id), node_id); + } + for (const auto& pblock : pblocks) { + RETURN_IF_ERROR(it->second->add_block(pblock)); + } + if (eos) { + it->second->set_ready(); + } + return Status::OK(); +} + +void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan) { + std::unique_lock<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.emplace(key, scan); +} + +void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int node_id) { + std::lock_guard<std::mutex> l(_cte_scan_lock); + auto key = std::make_pair(instance_id, node_id); + DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for instance " + << print_id(instance_id) << ", node " << node_id; + _cte_scan.erase(key); +} + +Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids) { + if (_merge_controller_handler) { + return _merge_controller_handler->reset_global_rf(this, filter_ids); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 32458cdb00e..874e889e368 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -32,6 +32,7 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" +#include "common/status.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_predicate.h" @@ -48,6 +49,7 @@ namespace pipeline { class PipelineFragmentContext; class PipelineTask; class Dependency; +class RecCTEScanLocalState; } // namespace pipeline struct ReportStatusRequest { @@ -161,11 +163,6 @@ public: return _query_options.runtime_filter_wait_time_ms; } - bool runtime_filter_wait_infinitely() const { - return _query_options.__isset.runtime_filter_wait_infinitely && - _query_options.runtime_filter_wait_infinitely; - } - int be_exec_version() const { if (!_query_options.__isset.be_exec_version) { return 0; @@ -299,6 +296,23 @@ public: void set_first_error_msg(std::string error_msg); std::string get_first_error_msg(); + Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id, + const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, + bool eos); + void registe_cte_scan(const TUniqueId& instance_id, int node_id, + pipeline::RecCTEScanLocalState* scan); + void deregiste_cte_scan(const TUniqueId& instance_id, int node_id); + + std::vector<int> get_fragment_ids() { + std::vector<int> fragment_ids; + for (const auto& it : _fragment_id_to_pipeline_ctx) { + fragment_ids.push_back(it.first); + } + return fragment_ids; + } + + Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: friend class QueryTaskController; @@ -377,6 +391,10 @@ private: std::string _load_error_url; std::string _first_error_msg; + // instance id + node id -> cte scan + std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> _cte_scan; + std::mutex _cte_scan_lock; + public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile void add_fragment_profile( diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 51c79e1b426..b3e19d089fc 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -55,6 +55,7 @@ public: void set_detected_source() { std::unique_lock<std::shared_mutex> wlock(_rwlock); + _orderby_extrem = Field(PrimitiveType::TYPE_NULL); _detected_source = true; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 33db0f60e7c..faeda888cfb 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -503,7 +503,7 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { bool need_merge = desc.has_remote_targets || need_local_merge; RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); - return mgr->register_consumer_filter(_query_ctx, desc, node_id, consumer_filter); + return mgr->register_consumer_filter(this, desc, node_id, consumer_filter); } bool RuntimeState::is_nereids() const { @@ -515,12 +515,22 @@ std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::pipeline_id_to_profil return _pipeline_id_to_profile; } +void RuntimeState::reset_to_rerun() { + if (local_runtime_filter_mgr()) { + auto filter_ids = local_runtime_filter_mgr()->get_filter_ids(); + filter_ids.merge(global_runtime_filter_mgr()->get_filter_ids()); + local_runtime_filter_mgr()->remove_filters(filter_ids); + global_runtime_filter_mgr()->remove_filters(filter_ids); + } + std::unique_lock lc(_pipeline_profile_lock); + _pipeline_id_to_profile.clear(); +} + std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::build_pipeline_profile( std::size_t pipeline_size) { std::unique_lock lc(_pipeline_profile_lock); if (!_pipeline_id_to_profile.empty()) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "build_pipeline_profile can only be called once."); + return _pipeline_id_to_profile; } _pipeline_id_to_profile.resize(pipeline_size); { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 3d89f4aa0d4..907f2c50a61 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -503,7 +503,7 @@ public: _runtime_filter_mgr = runtime_filter_mgr; } - QueryContext* get_query_ctx() { return _query_ctx; } + QueryContext* get_query_ctx() const { return _query_ctx; } [[nodiscard]] bool low_memory_mode() const; @@ -520,6 +520,12 @@ public: return _query_options.__isset.enable_profile && _query_options.enable_profile; } + int cte_max_recursion_depth() const { + return _query_options.__isset.cte_max_recursion_depth + ? _query_options.cte_max_recursion_depth + : 0; + } + int rpc_verbose_profile_max_instance_count() const { return _query_options.__isset.rpc_verbose_profile_max_instance_count ? _query_options.rpc_verbose_profile_max_instance_count @@ -702,6 +708,17 @@ public: _query_options.hnsw_bounded_queue); } + void reset_to_rerun(); + + void set_force_make_rf_wait_infinite() { + _query_options.__set_runtime_filter_wait_infinitely(true); + } + + bool runtime_filter_wait_infinitely() const { + return _query_options.__isset.runtime_filter_wait_infinitely && + _query_options.runtime_filter_wait_infinitely; + } + private: Status create_error_log_file(); diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index e0e42e509d4..e9ef68f6d28 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -41,11 +41,11 @@ public: APPLIED, // The consumer will switch to this state after the expression is acquired }; - static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, int node_id, + static Status create(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* res) { *res = std::shared_ptr<RuntimeFilterConsumer>( - new RuntimeFilterConsumer(query_ctx, desc, node_id)); - RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_ctx->query_options())); + new RuntimeFilterConsumer(state, desc, node_id)); + RETURN_IF_ERROR((*res)->_init_with_desc(desc, &state->query_options())); return Status::OK(); } @@ -86,17 +86,16 @@ public: } private: - RuntimeFilterConsumer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - int node_id) + RuntimeFilterConsumer(const RuntimeState* state, const TRuntimeFilterDesc* desc, int node_id) : RuntimeFilter(desc), _probe_expr(desc->planId_to_target_expr.find(node_id)->second), _registration_time(MonotonicMillis()), _rf_state(State::NOT_READY) { // If bitmap filter is not applied, it will cause the query result to be incorrect - bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() || + bool wait_infinitely = state->runtime_filter_wait_infinitely() || _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; - _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 1000 - : query_ctx->runtime_filter_wait_time_ms(); + _rf_wait_time_ms = wait_infinitely ? state->get_query_ctx()->execution_timeout() * 1000 + : state->get_query_ctx()->runtime_filter_wait_time_ms(); DorisMetrics::instance()->runtime_filter_consumer_num->increment(1); } diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index d373bb8be05..42e26b3d9c3 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -78,6 +78,8 @@ public: _expected_producer_num = num; } + int get_expected_producer_num() const { return _expected_producer_num; } + bool add_rf_size(uint64_t size) { _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index ba9e9f5bc9d..dcddc29f06c 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -62,13 +62,13 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consum } Status RuntimeFilterMgr::register_consumer_filter( - const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, int node_id, + const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); - RETURN_IF_ERROR(RuntimeFilterConsumer::create(query_ctx, &desc, node_id, consumer)); + RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, consumer)); _consumer_map[key].push_back(*consumer); return Status::OK(); } @@ -446,6 +446,24 @@ void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu _filter_map.clear(); } +Status RuntimeFilterMergeControllerEntity::reset_global_rf( + QueryContext* query_ctx, const google::protobuf::RepeatedField<int32_t>& filter_ids) { + for (const auto& filter_id : filter_ids) { + GlobalMergeContext* cnt_val; + { + std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); + cnt_val = &_filter_map[filter_id]; // may inplace construct default object + } + int producer_size = cnt_val->merger->get_expected_producer_num(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &cnt_val->runtime_filter_desc, + &cnt_val->merger)); + cnt_val->merger->set_expected_producer_num(producer_size); + cnt_val->arrive_id.clear(); + cnt_val->source_addrs.clear(); + } + return Status::OK(); +} + std::string RuntimeFilterMergeControllerEntity::debug_string() { std::string result = "RuntimeFilterMergeControllerEntity Info:\n"; std::shared_lock<std::shared_mutex> guard(_filter_map_mutex); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 6941e5e5631..a079f3b0ad9 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -83,7 +83,7 @@ public: // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); - Status register_consumer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, + Status register_consumer_filter(const RuntimeState* state, const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter); @@ -104,6 +104,27 @@ public: std::string debug_string(); + std::set<int32_t> get_filter_ids() { + std::set<int32_t> ids; + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : _producer_id_set) { + ids.insert(id); + } + for (const auto& kv : _consumer_map) { + ids.insert(kv.first); + } + return ids; + } + + void remove_filters(const std::set<int32_t>& filter_ids) { + std::lock_guard<std::mutex> l(_lock); + for (const auto& id : filter_ids) { + _consumer_map.erase(id); + _local_merge_map.erase(id); + _producer_id_set.erase(id); + } + } + private: /** * `_is_global = true` means this runtime filter manager menages query-level runtime filters. @@ -152,6 +173,9 @@ public: void release_undone_filters(QueryContext* query_ctx); + Status reset_global_rf(QueryContext* query_ctx, + const google::protobuf::RepeatedField<int32_t>& filter_ids); + private: Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx, const TRuntimeFilterDesc* runtime_filter_desc, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index daf45179f79..f9f7d2e4460 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1597,6 +1597,57 @@ void PInternalService::fold_constant_expr(google::protobuf::RpcController* contr } } +void PInternalService::transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->transmit_rec_cte_block( + UniqueId(request->query_id()).to_thrift(), + UniqueId(request->fragment_instance_id()).to_thrift(), request->node_id(), + request->blocks(), request->eos()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, + PRerunFragmentResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = + _exec_env->fragment_mgr()->rerun_fragment(UniqueId(request->query_id()).to_thrift(), + request->fragment_id(), request->stage()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + +void PInternalService::reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, + PResetGlobalRfResult* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->reset_global_rf( + UniqueId(request->query_id()).to_thrift(), request->filter_ids()); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +} + void PInternalService::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index d73501bfc80..781a7def178 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -140,6 +140,16 @@ public: const ::doris::PPublishFilterRequestV2* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) override; + void transmit_rec_cte_block(google::protobuf::RpcController* controller, + const PTransmitRecCTEBlockParams* request, + PTransmitRecCTEBlockResult* response, + google::protobuf::Closure* done) override; + void rerun_fragment(google::protobuf::RpcController* controller, + const PRerunFragmentParams* request, PRerunFragmentResult* response, + google::protobuf::Closure* done) override; + void reset_global_rf(google::protobuf::RpcController* controller, + const PResetGlobalRfParams* request, PResetGlobalRfResult* response, + google::protobuf::Closure* done) override; void transmit_block(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 77ee21368c0..e0e352645c3 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -31,7 +31,7 @@ public: void test_signal_aquire(TRuntimeFilterDesc desc) { std::shared_ptr<RuntimeFilterConsumer> consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[0].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -59,7 +59,7 @@ TEST_F(RuntimeFilterConsumerTest, basic) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -116,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -141,7 +141,7 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) { const_cast<TQueryOptions&>(_query_ctx->_query_options) .__set_runtime_filter_wait_infinitely(true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterConsumer> registed_consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( @@ -152,7 +152,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -174,7 +174,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { std::shared_ptr<RuntimeFilterConsumer> consumer; { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } desc.__set_src_expr( @@ -195,13 +195,13 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { .build()); { - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); } { desc.__set_has_local_targets(false); desc.__set_has_remote_targets(true); - auto st = RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer); + auto st = RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer); ASSERT_FALSE(st.ok()); desc.__set_has_local_targets(true); desc.__set_has_remote_targets(false); @@ -209,7 +209,7 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr()); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); } TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { @@ -217,7 +217,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { std::shared_ptr<RuntimeFilterConsumer> consumer; auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterConsumer::create(_query_ctx.get(), &desc, 0, &consumer)); + RuntimeFilterConsumer::create(_runtime_states[1].get(), &desc, 0, &consumer)); std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index d8222e201d9..02b1349c1a6 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -65,7 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); std::shared_ptr<RuntimeFilterConsumer> consumer_filter; EXPECT_TRUE(global_runtime_filter_mgr - ->register_consumer_filter(ctx.get(), desc, 0, &consumer_filter) + ->register_consumer_filter(&state, desc, 0, &consumer_filter) .ok()); EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7582a7b6b6d..868263be5dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -786,6 +786,8 @@ public class SessionVariable implements Serializable, Writable { public static final String READ_HIVE_JSON_IN_ONE_COLUMN = "read_hive_json_in_one_column"; + public static final String CTE_MAX_RECURSION_DEPTH = "cte_max_recursion_depth"; + /** * Inserting overwrite for auto partition table allows creating partition for * datas which cannot find partition to overwrite. @@ -989,6 +991,11 @@ public class SessionVariable implements Serializable, Writable { }) public int minScanSchedulerConcurrency = 0; + @VariableMgr.VarAttr(name = CTE_MAX_RECURSION_DEPTH, needForward = true, description = { + "CTE递归的最大深度,默认值100", + "The maximum depth of CTE recursion. Default is 100" }) + public int cteMaxRecursionDepth = 100; + // By default, the number of Limit items after OrderBy is changed from 65535 items // before v1.2.0 (not included), to return all items by default @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT, affectQueryResult = true) @@ -4783,7 +4790,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold); tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead); - + tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth); tResult.setEnableParallelScan(enableParallelScan); tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount); tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1ddfbcf2502..5b9c3579590 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -60,6 +60,45 @@ message PTransmitDataResult { optional int64 receive_time = 2; }; +message PTransmitRecCTEBlockParams { + repeated PBlock blocks = 1; + optional bool eos = 2; + optional PUniqueId query_id = 3; + optional PUniqueId fragment_instance_id = 4; + optional int32 node_id = 5; +}; + +message PTransmitRecCTEBlockResult { + optional PStatus status = 1; +}; + + +message PRerunFragmentParams { + enum Opcode { + wait = 1; // wait fragment execute done + release = 2; // release current round's resource + rebuild = 3; // rebuild next round pipeline tasks + submit = 4; // submit tasks to execute + close = 5; // close fragment + } + optional PUniqueId query_id = 1; + optional int32 fragment_id = 2; + optional Opcode stage = 3; +}; + +message PRerunFragmentResult { + optional PStatus status = 1; +}; + +message PResetGlobalRfParams { + optional PUniqueId query_id = 1; + repeated int32 filter_ids = 2; +}; + +message PResetGlobalRfResult { + optional PStatus status = 1; +}; + message PTabletWithPartition { required int64 partition_id = 1; required int64 tablet_id = 2; @@ -1107,6 +1146,9 @@ service PBackendService { rpc sync_filter_size(PSyncFilterSizeRequest) returns (PSyncFilterSizeResponse); rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); + rpc rerun_fragment(PRerunFragmentParams) returns (PRerunFragmentResult); + rpc reset_global_rf(PResetGlobalRfParams) returns (PResetGlobalRfResult); + rpc transmit_rec_cte_block(PTransmitRecCTEBlockParams) returns (PTransmitRecCTEBlockResult); rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult); rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cc018384f24..c4169768584 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -410,6 +410,7 @@ struct TQueryOptions { 175: optional bool enable_fuzzy_blockable_task = false; 176: optional list<i32> shuffled_agg_ids; + 177: optional i32 cte_max_recursion_depth; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. diff --git a/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out new file mode 100644 index 00000000000..ba843a71ee5 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte/rec_cte.out @@ -0,0 +1,953 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0.5403023058681398 +0.6542897904977791 +0.7013687736227565 +0.7221024250267077 +0.7314040424225098 +0.7356047404363474 +0.7375068905132428 +0.7383692041223232 +0.7387603198742113 +0.7389377567153445 +0.7390182624274122 +0.7390547907469174 +0.7390713652989449 +0.7390788859949921 +0.7390822985224024 +0.7390838469650002 +0.7390845495752126 +0.7390848683867142 +0.7390850130484203 +0.739085078689123 +0.7390851084737987 +0.7390851219886894 +0.7390851281211138 +0.7390851309037207 +0.7390851321663374 +0.7390851327392538 +0.7390851329992164 +0.7390851331171753 +0.7390851331706995 +0.7390851331949863 +0.7390851332060064 +0.7390851332110069 +0.7390851332132758 +0.7390851332143055 +0.7390851332147726 +0.7390851332149846 +0.7390851332150807 +0.7390851332151244 +0.7390851332151441 +0.7390851332151531 +0.7390851332151572 +0.7390851332151591 +0.7390851332151599 +0.7390851332151603 +0.7390851332151605 +0.7390851332151606 +0.7390851332151607 +0.7390851332151608 +0.7390851332151609 +0.7390851332151611 +0.7390851332151617 +0.7390851332151629 +0.7390851332151657 +0.7390851332151718 +0.7390851332151851 +0.7390851332152145 +0.7390851332152792 +0.739085133215422 +0.7390851332157367 +0.7390851332164302 +0.7390851332179587 +0.7390851332213271 +0.7390851332287504 +0.7390851332451103 +0.7390851332811648 +0.7390851333606233 +0.7390851335357372 +0.7390851339216605 +0.7390851347721744 +0.7390851366465718 +0.7390851407774467 +0.7390851498812394 +0.7390851699445544 +0.7390852141609171 +0.7390853116067619 +0.7390855263619245 +0.7390859996481299 +0.7390870426953322 +0.7390893414033927 +0.7390944073790913 +0.7391055719265363 +0.7391301765296711 +0.7391843997714936 +0.7393038923969059 +0.7395672022122561 +0.7401473355678757 +0.7414250866101092 +0.7442373549005569 +0.7504177617637605 +0.7639596829006542 +0.7934803587425656 +0.8575532158463934 +1 + +-- !sql -- +55 + +-- !sql -- +1 3 +1 5 +1 8 +2 4 +2 5 +2 10 +2 19 +3 1 +3 5 +3 8 +3 10 +3 24 +5 3 +5 4 +5 8 +5 15 +6 3 +6 4 +6 7 +7 4 +8 1 +9 4 + +-- !sql -- +1 3 +1 5 +2 4 +2 5 +2 10 +3 1 +3 5 +3 8 +3 10 +5 3 +5 4 +5 8 +5 10 +6 3 +6 4 +6 5 +7 4 +8 1 +8 8 +9 4 +11 1 +12 3 +29 4 + +-- !sql -- +1 3 +1 5 +2 4 +2 5 +2 10 +3 1 +3 5 +3 8 +3 10 +5 3 +5 4 +5 8 +6 3 +6 4 +7 4 +7 10 +8 1 +8 10 +9 4 +9 5 +9 10 +10 5 +10 8 +10 10 +11 5 +11 8 +11 10 +12 5 +12 8 +12 10 +13 1 +13 5 +13 8 +13 10 +14 1 +14 5 +14 8 +14 10 +15 1 +15 3 +15 5 +15 8 +15 10 +16 1 +16 3 +16 5 +16 8 +16 10 +17 1 +17 3 +17 5 +17 8 +17 10 +18 1 +18 3 +18 5 +18 8 +18 10 +19 1 +19 3 +19 5 +19 8 +19 10 +20 1 +20 3 +20 5 +20 8 +20 10 +21 1 +21 3 +21 5 +21 8 +21 10 +22 1 +22 3 +22 5 +22 8 +22 10 +23 1 +23 3 +23 5 +23 8 +23 10 +24 1 +24 3 +24 5 +24 8 +24 10 +25 1 +25 3 +25 5 +25 8 +25 10 +26 1 +26 3 +26 5 +26 8 +26 10 +27 1 +27 3 +27 5 +27 8 +27 10 +28 1 +28 3 +28 5 +28 8 +28 10 +29 1 +29 3 +29 5 +29 8 +29 10 +30 1 +30 3 +30 5 +30 8 +30 10 +31 1 +31 3 +31 5 +31 8 +31 10 +32 1 +32 3 +32 5 +32 8 +32 10 +33 1 +33 3 +33 5 +33 8 +33 10 +34 1 +34 3 +34 4 +34 5 +34 8 +34 10 +35 1 +35 3 +35 4 +35 5 +35 8 +35 10 +36 1 +36 3 +36 4 +36 5 +36 8 +36 10 +37 1 +37 3 +37 4 +37 5 +37 8 +37 10 +38 1 +38 3 +38 4 +38 5 +38 8 +38 10 +39 1 +39 3 +39 4 +39 5 +39 8 +39 10 +40 1 +40 3 +40 4 +40 5 +40 8 +40 10 +41 1 +41 3 +41 4 +41 5 +41 8 +41 10 +42 1 +42 3 +42 4 +42 5 +42 8 +42 10 +43 1 +43 3 +43 4 +43 5 +43 8 +43 10 +44 1 +44 3 +44 4 +44 5 +44 8 +44 10 +45 1 +45 3 +45 4 +45 5 +45 8 +45 10 +46 1 +46 3 +46 4 +46 5 +46 8 +46 10 +47 1 +47 3 +47 4 +47 5 +47 8 +47 10 +48 1 +48 3 +48 4 +48 5 +48 8 +48 10 +49 1 +49 3 +49 4 +49 5 +49 8 +49 10 +50 1 +50 3 +50 4 +50 5 +50 8 +50 10 +51 1 +51 3 +51 4 +51 5 +51 8 +51 10 +52 1 +52 3 +52 4 +52 5 +52 8 +52 10 +53 1 +53 3 +53 4 +53 5 +53 8 +53 10 +54 1 +54 3 +54 4 +54 5 +54 8 +54 10 +55 1 +55 3 +55 4 +55 5 +55 8 +55 10 +56 1 +56 3 +56 4 +56 5 +56 8 +56 10 +57 1 +57 3 +57 4 +57 5 +57 8 +57 10 +58 1 +58 3 +58 4 +58 5 +58 8 +58 10 +59 1 +59 3 +59 4 +59 5 +59 8 +59 10 +60 1 +60 3 +60 4 +60 5 +60 8 +60 10 +61 1 +61 3 +61 4 +61 5 +61 8 +61 10 +62 1 +62 3 +62 4 +62 5 +62 8 +62 10 +63 1 +63 3 +63 4 +63 5 +63 8 +63 10 +64 1 +64 3 +64 4 +64 5 +64 8 +64 10 +65 1 +65 3 +65 4 +65 5 +65 8 +65 10 +66 1 +66 3 +66 4 +66 5 +66 8 +66 10 +67 1 +67 3 +67 4 +67 5 +67 8 +67 10 +68 1 +68 3 +68 4 +68 5 +68 8 +68 10 +69 1 +69 3 +69 4 +69 5 +69 8 +69 10 +70 1 +70 3 +70 4 +70 5 +70 8 +70 10 +71 1 +71 3 +71 4 +71 5 +71 8 +71 10 +72 1 +72 3 +72 4 +72 5 +72 8 +72 10 +73 1 +73 3 +73 4 +73 5 +73 8 +73 10 +74 1 +74 3 +74 4 +74 5 +74 8 +74 10 +75 1 +75 3 +75 4 +75 5 +75 8 +75 10 +76 1 +76 3 +76 4 +76 5 +76 8 +76 10 +77 1 +77 3 +77 4 +77 5 +77 8 +77 10 +78 1 +78 3 +78 4 +78 5 +78 8 +78 10 +79 1 +79 3 +79 4 +79 5 +79 8 +79 10 +80 1 +80 3 +80 4 +80 5 +80 8 +80 10 +81 1 +81 3 +81 4 +81 5 +81 8 +81 10 +82 1 +82 3 +82 4 +82 5 +82 8 +82 10 +83 1 +83 3 +83 4 +83 5 +83 8 +83 10 +84 1 +84 3 +84 4 +84 5 +84 8 +84 10 +85 1 +85 3 +85 4 +85 5 +85 8 +85 10 +86 1 +86 3 +86 4 +86 5 +86 8 +86 10 +87 1 +87 3 +87 4 +87 5 +87 8 +87 10 +88 1 +88 3 +88 4 +88 5 +88 8 +88 10 +89 1 +89 3 +89 4 +89 5 +89 8 +89 10 +90 1 +90 3 +90 4 +90 5 +90 8 +90 10 +91 1 +91 3 +91 4 +91 5 +91 8 +91 10 +92 1 +92 3 +92 4 +92 5 +92 8 +92 10 +93 1 +93 3 +93 4 +93 5 +93 8 +93 10 +94 1 +94 3 +94 4 +94 5 +94 8 +94 10 +95 1 +95 3 +95 4 +95 5 +95 8 +95 10 +96 1 +96 3 +96 4 +96 5 +96 8 +96 10 +97 1 +97 3 +97 4 +97 5 +97 8 +97 10 +98 1 +98 3 +98 4 +98 5 +98 8 +98 10 +99 1 +99 3 +99 4 +99 5 +99 8 +99 10 +100 1 +100 3 +100 4 +100 5 +100 8 +100 10 + +-- !sql -- +1 3 +1 5 +1 8 +2 \N +2 4 +2 5 +2 9 +2 10 +2 15 +2 19 +2 28 +2 34 +2 43 +2 71 +2 77 +2 105 +2 176 +2 182 +2 253 +2 429 +2 435 +2 611 +2 1040 +2 1046 +2 1475 +2 2515 +2 2521 +2 3561 +2 6076 +2 6082 +2 8597 +2 14673 +2 14679 +2 20755 +2 35428 +2 35434 +2 50107 +2 85535 +2 85541 +2 120969 +2 206504 +2 206510 +2 292045 +2 498549 +2 498555 +2 705059 +2 1203608 +2 1203614 +2 1702163 +2 2905771 +2 2905777 +2 4109385 +2 7015156 +2 7015162 +2 9920933 +2 16936089 +2 16936095 +2 23951251 +2 40887340 +2 40887346 +2 57823435 +2 98710775 +2 98710781 +2 139598121 +2 238308896 +2 238308902 +2 337019677 +2 575328573 +2 575328579 +2 813637475 +2 1388966048 +2 1388966054 +2 1964294627 +3 \N +3 1 +3 5 +3 6 +3 8 +3 10 +3 14 +3 18 +3 20 +3 23 +3 41 +3 43 +3 55 +3 63 +3 96 +3 118 +3 139 +3 181 +3 235 +3 320 +3 353 +3 501 +3 588 +3 854 +3 908 +3 1355 +3 1496 +3 2263 +3 2350 +3 3618 +3 3846 +3 5968 +3 6109 +3 9586 +3 9955 +3 15695 +3 15923 +3 25281 +3 25878 +3 41204 +3 41573 +3 66485 +3 67451 +3 108058 +3 108655 +3 174543 +3 176106 +3 283198 +3 284164 +3 457741 +3 460270 +3 741905 +3 743468 +3 1199646 +3 1203738 +3 1943114 +3 1945643 +3 3142760 +3 3149381 +3 5088403 +3 5092495 +3 8231163 +3 8241876 +3 13323658 +3 13330279 +3 21554821 +3 21572155 +3 34885100 +3 34895813 +3 56439921 +3 56467968 +3 91335734 +3 91353068 +3 147775655 +3 147821036 +3 239128723 +3 239156770 +3 386904378 +3 386977806 +3 626061148 +3 626106529 +3 1012965526 +3 1013084335 +3 1639072055 +3 1639145483 +5 \N +5 3 +5 4 +5 7 +5 8 +5 12 +5 15 +5 22 +5 27 +5 34 +5 56 +5 61 +5 83 +5 139 +5 144 +5 200 +5 339 +5 344 +5 483 +5 822 +5 827 +5 1166 +5 1988 +5 1993 +5 2815 +5 4803 +5 4808 +5 6796 +5 11599 +5 11604 +5 16407 +5 28006 +5 28011 +5 39610 +5 67616 +5 67621 +5 95627 +5 163243 +5 163248 +5 230864 +5 394107 +5 394112 +5 557355 +5 951462 +5 951467 +5 1345574 +5 2297036 +5 2297041 +5 3248503 +5 5545539 +5 5545544 +5 7842580 +5 13388119 +5 13388124 +5 18933663 +5 32321782 +5 32321787 +5 45709906 +5 78031688 +5 78031693 +5 110353475 +5 188385163 +5 188385168 +5 266416856 +5 454802019 +5 454802024 +5 643187187 +5 1097989206 +5 1097989211 +5 1552791230 +6 3 +6 4 +6 7 +7 4 +8 1 +9 4 + +-- !sql -- +1 2 + +-- !sql -- +1 2 +3 4 + +-- !sql -- +1 2 +3 4 +11 22 +33 44 + +-- !sql -- +1 2 +3 4 +11 22 + +-- !sql -- +1 22 +3 22 +11 22 + +-- !sql -- +1 2 +2 3 + +-- !sql -- +1 2 +3 4 +11 22 + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out new file mode 100644 index 00000000000..3a055b01acc --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +5050 + +-- !q2 -- +0 \N ROOT +1 0 Child_1 +2 0 Child_2 +3 1 Child_1_1 + +-- !q3 -- +0 \N ROOT [0] +1 0 Child_1 [0, 1] +3 1 Child_1_1 [0, 1, 3] +2 0 Child_2 [0, 2] + +-- !q4 -- +0 \N ROOT [0] 0 +1 0 Child_1 [0, 1] 1 +2 0 Child_2 [0, 2] 1 +3 1 Child_1_1 [0, 1, 3] 2 + +-- !q5 -- +1 2 1 -> 2 +1 3 1 -> 3 +1 4 1 -> 4 +2 3 2 -> 3 +4 5 4 -> 5 + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out new file mode 100644 index 00000000000..958b6848838 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +0 0 1 +1 1 1 +2 1 2 +3 2 3 +4 3 5 +5 5 8 +6 8 13 +7 13 21 +8 21 34 +9 34 55 + +-- !q2 -- +["Oasis", "Rock", "Music", "Art"] + +-- !q3 -- +1 3 [1, 3] +1 5 [1, 5] +1 5 [1, 3, 5] +1 8 [1, 3, 8] +1 10 [1, 3, 10] +1 3 [1, 5, 3] +1 4 [1, 5, 4] +1 8 [1, 5, 8] +1 4 [1, 3, 5, 4] +1 8 [1, 3, 5, 8] +1 8 [1, 5, 3, 8] +1 10 [1, 5, 3, 10] + diff --git a/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out new file mode 100644 index 00000000000..3bc4cf27785 --- /dev/null +++ b/regression-test/data/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +1 abc +2 abcabc +3 abcabcabcabc + +-- !q2 -- +1 1 -1 +2 -2 2 +3 4 -4 +4 -8 8 +5 16 -16 + +-- !q4 -- +2017-01-03 +2017-01-04 +2017-01-05 +2017-01-06 +2017-01-07 +2017-01-08 +2017-01-09 +2017-01-10 + +-- !q5 -- +2017-01-03 300 +2017-01-04 0 +2017-01-05 0 +2017-01-06 50 +2017-01-07 0 +2017-01-08 180 +2017-01-09 0 +2017-01-10 5 + +-- !q6 -- +333 Yasmina 333 +198 John 333,198 +29 Pedro 333,198,29 +4610 Sarah 333,198,29,4610 +72 Pierre 333,198,29,72 +692 Tarek 333,692 +123 Adil 333,692,123 + diff --git a/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy new file mode 100644 index 00000000000..d456f6ba998 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte/rec_cte.groovy @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite ("rec_cte") { + qt_sql """ + WITH RECURSIVE test_table AS ( + SELECT + cast(1.0 as double) AS number + UNION + SELECT + cos(number) + FROM + test_table + ) + SELECT + number + FROM + test_table order by number; + """ + + qt_sql """ + WITH RECURSIVE test_table AS ( + SELECT cast(10 as int) AS number + UNION ALL + SELECT cast(number - 1 as int) FROM test_table WHERE number > 0 + ) + SELECT sum(number) FROM test_table; + """ + + + sql "DROP TABLE IF EXISTS edge;" + sql """ + CREATE TABLE edge + ( + node1id int, + node2id int + ) DUPLICATE KEY (node1id) + DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """ + INSERT INTO edge VALUES + (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1), + (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8), + (6, 3), (6, 4), (7, 4), (8, 1), (9, 4); + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + k1, + cast(sum(k2) as int) + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1) as int), + k2 + FROM t1 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + test { + sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1 + 1) as int), + k2 + FROM t1 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + exception "ABORTED" + } + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + cast(sum(k1 + 1) as int), + k2 + FROM t1 WHERE k1 < 100 GROUP BY k2 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + node1id AS k1, + node2id AS k2 + FROM edge + UNION + SELECT + k1, + cast(sum(k2) OVER (PARTITION BY k1 ORDER BY k1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as int) + FROM t1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + test { + sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION ALL + SELECT + 1,2 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + exception "ABORTED" + } + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 1,2 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ) + SELECT * FROM t1 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT + 33,44 + FROM t2 GROUP BY k1 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t2.k1, t2.k2 FROM t1,t2 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t1.k1, t2.k2 FROM t1,t2 + ) + select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 2,3 + UNION + SELECT least(t1.k1,t2.k1), least(t1.k2,t2.k2) FROM t1,t2 + ) + select * from t2 ORDER BY 1,2; + """ + + qt_sql """ + WITH RECURSIVE t1(k1, k2) AS ( + SELECT + 1,2 + UNION + SELECT + 3,4 + FROM t1 GROUP BY k1 + ), + t2(k1, k2) AS ( + SELECT + 11,22 + UNION + SELECT t1.k1, t1.k2 FROM t1 + ) + SELECT * FROM t1 UNION select * from t2 ORDER BY 1,2; + """ +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy new file mode 100644 index 00000000000..a01ab03db34 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_ck_doc/rec_cte_from_ck_doc.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://clickhouse.com/docs/sql-reference/statements/select/with +suite ("rec_cte_from_ck_doc") { + qt_q1 """ + WITH RECURSIVE test_table AS ( + SELECT cast(1 as int) AS number + UNION ALL + SELECT cast(number + 1 as int) FROM test_table WHERE number < 100 + ) + SELECT sum(number) FROM test_table; + """ + + sql "DROP TABLE IF EXISTS tree;" + sql """ + CREATE TABLE tree + ( + id int, + parent_id int, + data varchar(100) + ) DUPLICATE KEY (id) + DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO tree VALUES (0, NULL, 'ROOT'), (1, 0, 'Child_1'), (2, 0, 'Child_2'), (3, 1, 'Child_1_1');""" + + qt_q2 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree order BY id; + """ + + qt_q3 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data, array(t.id) AS path + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)) + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree ORDER BY path; + """ + + qt_q4 """ + WITH RECURSIVE search_tree AS ( + SELECT id, parent_id, data, array(t.id) AS path, cast(0 as int) AS depth + FROM tree t + WHERE t.id = 0 + UNION ALL + SELECT t.id, t.parent_id, t.data, array_concat(path, array(t.id)), cast(depth + 1 as int) + FROM tree t, search_tree st + WHERE t.parent_id = st.id + ) + SELECT * FROM search_tree ORDER BY depth, id; + """ + + sql "DROP TABLE IF EXISTS graph;" + sql """ + CREATE TABLE graph + ( + c_from int, + c_to int, + label varchar(100) + ) DUPLICATE KEY (c_from) DISTRIBUTED BY HASH(c_from) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO graph VALUES (1, 2, '1 -> 2'), (1, 3, '1 -> 3'), (2, 3, '2 -> 3'), (1, 4, '1 -> 4'), (4, 5, '4 -> 5');""" + + qt_q5 """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g, search_graph sg + WHERE g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + + sql "INSERT INTO graph VALUES (5, 1, '5 -> 1');" + test { + sql """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g, search_graph sg + WHERE g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + exception "ABORTED" + } + + // test global rf + sql "set enable_runtime_filter_prune = false;" + test { + sql """ + WITH RECURSIVE search_graph AS ( + SELECT c_from, c_to, label FROM graph g + UNION ALL + SELECT g.c_from, g.c_to, g.label + FROM graph g join [shuffle] search_graph sg + on g.c_from = sg.c_to + ) + SELECT DISTINCT * FROM search_graph ORDER BY c_from, c_to; + """ + exception "ABORTED" + } + + // do not support use limit to stop recursion now + //qt_q6 """ + //WITH RECURSIVE test_table AS ( + // SELECT cast(1 as int) AS number + //UNION ALL + // SELECT cast(number + 1 as int) FROM test_table + //) + //SELECT sum(number) FROM test_table LIMIT 100; + //""" +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy new file mode 100644 index 00000000000..62d70558163 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_duckdb_doc/rec_cte_from_duckdb_doc.groovy @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://duckdb.org/docs/stable/sql/query_syntax/with#recursive-ctes +suite ("rec_cte_from_duckdb_doc") { + qt_q1 """ + WITH RECURSIVE FibonacciNumbers ( + RecursionDepth, + FibonacciNumber, + NextNumber + ) AS ( + -- Base case + SELECT + cast(0 as int) AS RecursionDepth, + cast(0 as int) AS FibonacciNumber, + cast(1 as int) AS NextNumber + UNION + ALL -- Recursive step + SELECT + cast((fib.RecursionDepth + 1) as int) AS RecursionDepth, + fib.NextNumber AS FibonacciNumber, + cast((fib.FibonacciNumber + fib.NextNumber) as int) AS NextNumber + FROM + FibonacciNumbers fib + WHERE + cast((fib.RecursionDepth + 1) as int) < 10 + ) + SELECT + * + FROM + FibonacciNumbers fn ORDER BY fn.RecursionDepth; + """ + + sql "DROP TABLE IF EXISTS tag;" + sql """ + CREATE TABLE tag + ( + id int, + name varchar(100), + subclassof int + ) DUPLICATE KEY (id) + DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO tag VALUES + (1, 'U2', 5), + (2, 'Blur', 5), + (3, 'Oasis', 5), + (4, '2Pac', 6), + (5, 'Rock', 7), + (6, 'Rap', 7), + (7, 'Music', 9), + (8, 'Movies', 9), + (9, 'Art', NULL);""" + + qt_q2 """ + WITH RECURSIVE tag_hierarchy(id, source, path) AS ( + SELECT id, name, array(name) AS path + FROM tag + WHERE subclassof IS NULL + UNION ALL + SELECT tag.id, tag.name, array_concat(array(tag.name), tag_hierarchy.path) + FROM tag, tag_hierarchy + WHERE tag.subclassof = tag_hierarchy.id + ) + SELECT path + FROM tag_hierarchy + WHERE source = 'Oasis'; + """ + + sql "DROP TABLE IF EXISTS edge;" + sql """ + CREATE TABLE edge + ( + node1id int, + node2id int + ) DUPLICATE KEY (node1id) + DISTRIBUTED BY HASH(node1id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """ + INSERT INTO edge VALUES + (1, 3), (1, 5), (2, 4), (2, 5), (2, 10), (3, 1), + (3, 5), (3, 8), (3, 10), (5, 3), (5, 4), (5, 8), + (6, 3), (6, 4), (7, 4), (8, 1), (9, 4); + """ + + qt_q3 """ + WITH RECURSIVE paths(startNode, endNode, path) AS ( + SELECT -- Define the path as the first edge of the traversal + node1id AS startNode, + node2id AS endNode, + array_concat(array(node1id), array(node2id)) AS path + FROM edge + WHERE node1id = 1 + UNION ALL + SELECT -- Concatenate new edge to the path + paths.startNode AS startNode, + node2id AS endNode, + array_concat(path, array(node2id)) AS path + FROM paths + JOIN edge ON paths.endNode = node1id + -- Prevent adding a repeated node to the path. + -- This ensures that no cycles occur. + WHERE array_contains(paths.path, node2id) = false + ) + SELECT startNode, endNode, path + FROM paths + ORDER BY array_size(path), path; + """ + + // do not support subquery containing recursive cte + //qt_q4 """ + //WITH RECURSIVE paths(startNode, endNode, path) AS ( + // SELECT -- Define the path as the first edge of the traversal + // node1id AS startNode, + // node2id AS endNode, + // array_concat(array(node1id), array(node2id)) AS path + // FROM edge + // WHERE startNode = 1 + // UNION ALL + // SELECT -- Concatenate new edge to the path + // paths.startNode AS startNode, + // node2id AS endNode, + // array_concat(path, array(node2id)) AS path + // FROM paths + // JOIN edge ON paths.endNode = node1id + // -- Prevent adding a node that was visited previously by any path. + // -- This ensures that (1) no cycles occur and (2) only nodes that + // -- were not visited by previous (shorter) paths are added to a path. + // WHERE NOT EXISTS ( + // SELECT 1 FROM paths previous_paths + // WHERE array_contains(previous_paths.path, node2id) + // ) + // ) + //SELECT startNode, endNode, path + //FROM paths + //ORDER BY array_size(path), path; + //""" + + //qt_q5 """ + //WITH RECURSIVE paths(startNode, endNode, path, endReached) AS ( + //SELECT -- Define the path as the first edge of the traversal + // node1id AS startNode, + // node2id AS endNode, + // array_concat(array(node1id), array(node2id)) AS path, + // (node2id = 8) AS endReached + // FROM edge + // WHERE startNode = 1 + //UNION ALL + //SELECT -- Concatenate new edge to the path + // paths.startNode AS startNode, + // node2id AS endNode, + // array_concat(path, array(node2id)) AS path, + // max(CASE WHEN node2id = 8 THEN 1 ELSE 0 END) + // OVER (ROWS BETWEEN UNBOUNDED PRECEDING + // AND UNBOUNDED FOLLOWING) AS endReached + // FROM paths + // JOIN edge ON paths.endNode = node1id + // WHERE NOT EXISTS ( + // FROM paths previous_paths + // WHERE array_contains(previous_paths.path, node2id) + // ) + // AND paths.endReached = 0 + //) + //SELECT startNode, endNode, path + //FROM paths + //WHERE endNode = 8 + //ORDER BY array_size(path), path; + //""" +} diff --git a/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy new file mode 100644 index 00000000000..cee07dc6612 --- /dev/null +++ b/regression-test/suites/rec_cte_p0/rec_cte_from_mysql_doc/rec_cte_from_mysql_doc.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +// https://dev.mysql.com/doc/refman/8.4/en/with.html#common-table-expressions-recursive +suite ("rec_cte_from_mysql_doc") { + qt_q1 """ + WITH RECURSIVE cte AS + ( + SELECT cast(1 as int) AS n, cast('abc' as varchar(65533)) AS str + UNION ALL + SELECT cast(n + 1 as int), cast(CONCAT(str, str) as varchar(65533)) FROM cte WHERE n < 3 + ) + SELECT * FROM cte order by n; + """ + + qt_q2 """ + WITH RECURSIVE cte AS + ( + SELECT cast(1 as int) AS n, cast(1 as int) AS p, cast(-1 as int) AS q + UNION ALL + SELECT cast(n + 1 as int), cast(q * 2 as int), cast(p * 2 as int) FROM cte WHERE n < 5 + ) + SELECT * FROM cte order by n; + """ + + test { + sql """ + WITH RECURSIVE cte (n) AS + ( + SELECT cast(1 as int) + UNION ALL + SELECT cast(n + 1 as int) FROM cte + ) + SELECT n FROM cte order by n; + """ + exception "ABORTED" + } + + // do not support use limit to stop recursion now + //qt_q3 """ + //WITH RECURSIVE cte (n) AS + //( + //SELECT cast(1 as int) + //UNION ALL + //SELECT cast(n + 1 as int) FROM cte LIMIT 10000 + //) + //SELECT n FROM cte order by n; + //""" + + sql "DROP TABLE IF EXISTS sales;" + sql """ + CREATE TABLE sales + ( + c_date date, + c_price double + ) DUPLICATE KEY (c_date) + DISTRIBUTED BY HASH(c_date) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """insert into sales values + ('2017-01-03', 100.0), + ('2017-01-03', 200.0), + ('2017-01-06', 50.0), + ('2017-01-08', 10.0), + ('2017-01-08', 20.0), + ('2017-01-08', 150.0), + ('2017-01-10', 5.0);""" + + qt_q4 """ + WITH RECURSIVE dates (c_date) AS + ( + SELECT MIN(c_date) FROM sales + UNION ALL + SELECT c_date + INTERVAL 1 DAY FROM dates + WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales) + ) + SELECT * FROM dates order by 1; + """ + + qt_q5 """ + WITH RECURSIVE dates (c_date) AS + ( + SELECT MIN(c_date) FROM sales + UNION ALL + SELECT c_date + INTERVAL 1 DAY FROM dates + WHERE c_date + INTERVAL 1 DAY <= (SELECT MAX(c_date) FROM sales) + ) + SELECT dates.c_date, COALESCE(SUM(c_price), 0) AS sum_price + FROM dates LEFT JOIN sales ON dates.c_date = sales.c_date + GROUP BY dates.c_date + ORDER BY dates.c_date; + """ + + sql "DROP TABLE IF EXISTS employees;" + sql """ + CREATE TABLE employees ( + id INT NOT NULL, + name VARCHAR(100) NOT NULL, + manager_id INT NULL + ) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1'); + """ + sql """INSERT INTO employees VALUES + (333, "Yasmina", NULL), + (198, "John", 333), + (692, "Tarek", 333), + (29, "Pedro", 198), + (4610, "Sarah", 29), + (72, "Pierre", 29), + (123, "Adil", 692); + """ + + qt_q6 """ + WITH RECURSIVE employee_paths (id, name, path) AS + ( + SELECT id, name, CAST(id AS varchar(65533)) + FROM employees + WHERE manager_id IS NULL + UNION ALL + SELECT e.id, e.name, cast(CONCAT(ep.path, ',', e.id) as varchar(65533)) + FROM employee_paths AS ep JOIN employees AS e + ON ep.id = e.manager_id + ) + SELECT * FROM employee_paths ORDER BY path; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
