This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1b3512d9423 [pipelineX](bug) Fix cancel timeout (#27396)
1b3512d9423 is described below
commit 1b3512d9423168f80b4d0f9b13fbe8fc8da87fde
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 22 22:31:34 2023 +0800
[pipelineX](bug) Fix cancel timeout (#27396)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 3 +-
be/src/pipeline/exec/aggregation_source_operator.h | 3 +-
be/src/pipeline/exec/analytic_sink_operator.h | 4 +-
be/src/pipeline/exec/analytic_source_operator.h | 4 +-
be/src/pipeline/exec/es_scan_operator.cpp | 5 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 16 +++---
be/src/pipeline/exec/exchange_sink_operator.h | 22 +++++--
be/src/pipeline/exec/exchange_source_operator.cpp | 5 +-
be/src/pipeline/exec/exchange_source_operator.h | 4 +-
be/src/pipeline/exec/file_scan_operator.cpp | 5 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 8 +--
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +-
be/src/pipeline/exec/meta_scan_operator.cpp | 2 +-
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 4 +-
.../pipeline/exec/multi_cast_data_stream_source.h | 4 +-
.../exec/nested_loop_join_build_operator.h | 4 +-
.../exec/nested_loop_join_probe_operator.h | 4 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 8 ++-
.../pipeline/exec/partition_sort_sink_operator.cpp | 10 ++--
.../pipeline/exec/partition_sort_sink_operator.h | 4 +-
.../pipeline/exec/partition_sort_source_operator.h | 22 ++++++-
be/src/pipeline/exec/result_sink_operator.cpp | 4 +-
be/src/pipeline/exec/result_sink_operator.h | 4 +-
be/src/pipeline/exec/scan_operator.cpp | 28 ++++-----
be/src/pipeline/exec/scan_operator.h | 10 ++--
be/src/pipeline/exec/set_probe_sink_operator.h | 4 +-
be/src/pipeline/exec/set_sink_operator.h | 3 +-
be/src/pipeline/exec/set_source_operator.h | 3 +-
be/src/pipeline/exec/sort_sink_operator.h | 3 +-
be/src/pipeline/exec/sort_source_operator.h | 3 +-
be/src/pipeline/exec/union_sink_operator.h | 4 +-
be/src/pipeline/exec/union_source_operator.cpp | 4 +-
be/src/pipeline/exec/union_source_operator.h | 5 +-
be/src/pipeline/pipeline_task.h | 10 ++++
be/src/pipeline/pipeline_x/dependency.cpp | 28 ++++++---
be/src/pipeline/pipeline_x/dependency.h | 67 +++++++++++-----------
.../local_exchange/local_exchange_sink_operator.h | 4 +-
.../local_exchange_source_operator.h | 4 +-
be/src/pipeline/pipeline_x/operator.cpp | 51 +++++++---------
be/src/pipeline/pipeline_x/operator.h | 12 ++--
.../pipeline_x/pipeline_x_fragment_context.cpp | 7 ++-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 23 ++++----
be/src/pipeline/pipeline_x/pipeline_x_task.h | 7 +++
be/src/runtime/fragment_mgr.cpp | 10 ++++
be/src/runtime/query_context.cpp | 43 ++++++++++++++
be/src/runtime/query_context.h | 18 +++---
be/src/vec/runtime/vdata_stream_recvr.cpp | 10 ++--
48 files changed, 318 insertions(+), 200 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 3f1ce260365..2d16df2be25 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -48,7 +48,8 @@ public:
class AggSinkDependency final : public Dependency {
public:
using SharedState = AggSharedState;
- AggSinkDependency(int id, int node_id) : Dependency(id, node_id,
"AggSinkDependency", true) {}
+ AggSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AggSinkDependency", true, query_ctx) {}
~AggSinkDependency() override = default;
void set_ready() override {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 79671fb9c74..9c6d3e0fd0d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -51,7 +51,8 @@ public:
class AggSourceDependency final : public Dependency {
public:
using SharedState = AggSharedState;
- AggSourceDependency(int id, int node_id) : Dependency(id, node_id,
"AggSourceDependency") {}
+ AggSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AggSourceDependency", query_ctx) {}
~AggSourceDependency() override = default;
void block() override {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index a90321f5e31..1e8152a28fa 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -48,8 +48,8 @@ public:
class AnalyticSinkDependency final : public Dependency {
public:
using SharedState = AnalyticSharedState;
- AnalyticSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "AnalyticSinkDependency", true) {}
+ AnalyticSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AnalyticSinkDependency", true,
query_ctx) {}
~AnalyticSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index 0a741181f87..8b76fbfe263 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -49,8 +49,8 @@ public:
class AnalyticSourceDependency final : public Dependency {
public:
using SharedState = AnalyticSharedState;
- AnalyticSourceDependency(int id, int node_id)
- : Dependency(id, node_id, "AnalyticSourceDependency") {}
+ AnalyticSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AnalyticSourceDependency", query_ctx) {}
~AnalyticSourceDependency() override = default;
};
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index b9112917954..8567db90948 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() {
Status EsScanLocalState::_process_conjuncts() {
RETURN_IF_ERROR(Base::_process_conjuncts());
- if (Base::_scan_dependency->eos()) {
+ if (Base::_eos) {
return Status::OK();
}
@@ -66,7 +66,8 @@ Status EsScanLocalState::_process_conjuncts() {
Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- Base::_scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 616a1bd76fe..f0e03596cc1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -174,16 +174,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
register_channels(_sink_buffer.get());
- _exchange_sink_dependency =
- AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _queue_dependency =
- ExchangeSinkQueueDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _exchange_sink_dependency = AndDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ _queue_dependency = ExchangeSinkQueueDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() ==
1) &&
!only_local_exchange) {
- _broadcast_dependency =
- BroadcastDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _broadcast_dependency = BroadcastDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
_broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
@@ -198,8 +198,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
size_t dep_id = 0;
_local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
- auto deps_for_channels =
- AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ auto deps_for_channels = AndDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 89b024a6c6e..19a326d5c6e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -67,18 +67,28 @@ private:
class ExchangeSinkQueueDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
- ExchangeSinkQueueDependency(int id, int node_id)
- : Dependency(id, node_id, "ResultQueueDependency", true) {}
+ ExchangeSinkQueueDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "ResultQueueDependency", true,
query_ctx) {}
~ExchangeSinkQueueDependency() override = default;
};
class BroadcastDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
- BroadcastDependency(int id, int node_id)
- : Dependency(id, node_id, "BroadcastDependency", true),
_available_block(0) {}
+ BroadcastDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "BroadcastDependency", true, query_ctx),
+ _available_block(0) {}
~BroadcastDependency() override = default;
+ std::string debug_string(int indentation_level = 0) override {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "{}{}: id={}, block task = {}, ready={},
_available_block = {}",
+ std::string(indentation_level * 2, ' '), _name,
_node_id,
+ _blocked_task.size(), _ready, _available_block.load());
+ return fmt::to_string(debug_string_buffer);
+ }
+
void set_available_block(int available_block) { _available_block =
available_block; }
void return_available_block() {
@@ -128,8 +138,8 @@ private:
class LocalExchangeChannelDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
- LocalExchangeChannelDependency(int id, int node_id)
- : Dependency(id, node_id, "LocalExchangeChannelDependency", true)
{}
+ LocalExchangeChannelDependency(int id, int node_id, QueryContext*
query_ctx)
+ : Dependency(id, node_id, "LocalExchangeChannelDependency", true,
query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 00cead5b2ca..3b630cfcfcf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -51,13 +51,14 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
- source_dependency = AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ source_dependency = AndDependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ state->get_query_ctx());
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
for (size_t i = 0; i < queues.size(); i++) {
deps[i] =
ExchangeDataDependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- queues[i]);
+
state->get_query_ctx(), queues[i]);
queues[i]->set_dependency(deps[i]);
source_dependency->add_child(deps[i]);
}
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 4a2bae8298d..8e745def1ba 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -53,9 +53,9 @@ public:
struct ExchangeDataDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
- ExchangeDataDependency(int id, int node_id,
+ ExchangeDataDependency(int id, int node_id, QueryContext* query_ctx,
vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
- : Dependency(id, node_id, "DataDependency") {}
+ : Dependency(id, node_id, "DataDependency", query_ctx) {}
};
class ExchangeSourceOperatorX;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 369ad607c6f..819575211c9 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -32,7 +32,8 @@ namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- Base::_scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
return Status::OK();
}
@@ -95,7 +96,7 @@ Status FileScanLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
Status FileScanLocalState::_process_conjuncts() {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
- if (Base::_scan_dependency->eos()) {
+ if (Base::_eos) {
return Status::OK();
}
// TODO: Push conjuncts down to reader.
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 53b20f53cd8..41b030b4e18 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -48,8 +48,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- _shared_hash_table_dependency =
- SharedHashTableDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _shared_hash_table_dependency = SharedHashTableDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
if (p._is_broadcast_join &&
state->enable_share_hash_table_for_broadcast_join()) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index e3f10a1feb0..b0618d59927 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -49,16 +49,16 @@ class HashJoinBuildSinkOperatorX;
class SharedHashTableDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
- SharedHashTableDependency(int id, int node_id)
- : Dependency(id, node_id, "SharedHashTableDependency", true) {}
+ SharedHashTableDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SharedHashTableDependency", true,
query_ctx) {}
~SharedHashTableDependency() override = default;
};
class HashJoinBuildSinkDependency final : public Dependency {
public:
using SharedState = HashJoinSharedState;
- HashJoinBuildSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "HashJoinBuildSinkDependency", true) {}
+ HashJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "HashJoinBuildSinkDependency", true,
query_ctx) {}
~HashJoinBuildSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 64f3cce2d7a..583bba1b006 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -64,8 +64,8 @@ using HashTableCtxVariants = std::variant<
class HashJoinProbeDependency final : public Dependency {
public:
using SharedState = HashJoinSharedState;
- HashJoinProbeDependency(int id, int node_id)
- : Dependency(id, node_id, "HashJoinProbeDependency") {}
+ HashJoinProbeDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "HashJoinProbeDependency", query_ctx) {}
~HashJoinProbeDependency() override = default;
};
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 2de19bb2ced..749fbcf333a 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -22,7 +22,7 @@
namespace doris::pipeline {
Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
- if (Base::_scan_dependency->eos()) {
+ if (Base::_eos) {
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index c9b3dcb4798..a2ad07e5297 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -44,8 +44,8 @@ public:
class MultiCastSinkDependency final : public Dependency {
public:
using SharedState = MultiCastSharedState;
- MultiCastSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "MultiCastSinkDependency", true) {}
+ MultiCastSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "MultiCastSinkDependency", true,
query_ctx) {}
~MultiCastSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 6c3a4cfbbc0..86034a76ce7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -95,8 +95,8 @@ private:
class MultiCastSourceDependency final : public Dependency {
public:
using SharedState = MultiCastSharedState;
- MultiCastSourceDependency(int id, int node_id)
- : Dependency(id, node_id, "MultiCastSourceDependency") {}
+ MultiCastSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "MultiCastSourceDependency", query_ctx)
{}
~MultiCastSourceDependency() override = default;
};
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index a02fb8ec1a1..0097b75c0ab 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -47,8 +47,8 @@ public:
class NestedLoopJoinBuildSinkDependency final : public Dependency {
public:
using SharedState = NestedLoopJoinSharedState;
- NestedLoopJoinBuildSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency",
true) {}
+ NestedLoopJoinBuildSinkDependency(int id, int node_id, QueryContext*
query_ctx)
+ : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency",
true, query_ctx) {}
~NestedLoopJoinBuildSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 34f123ba32a..d7a9b54e897 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -54,8 +54,8 @@ public:
class NestedLoopJoinProbeDependency final : public Dependency {
public:
using SharedState = NestedLoopJoinSharedState;
- NestedLoopJoinProbeDependency(int id, int node_id)
- : Dependency(id, node_id, "NestedLoopJoinProbeDependency") {}
+ NestedLoopJoinProbeDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "NestedLoopJoinProbeDependency",
query_ctx) {}
~NestedLoopJoinProbeDependency() override = default;
};
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index acda79bdfb3..059f961501d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() {
Status OlapScanLocalState::_process_conjuncts() {
SCOPED_TIMER(_process_conjunct_timer);
RETURN_IF_ERROR(ScanLocalState::_process_conjuncts());
- if (ScanLocalState::_scan_dependency->eos()) {
+ if (ScanLocalState::_eos) {
return Status::OK();
}
RETURN_IF_ERROR(_build_key_ranges_and_filters());
@@ -213,7 +213,8 @@ bool OlapScanLocalState::_storage_no_merge() {
Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- ScanLocalState::_scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
@@ -408,7 +409,8 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
iter->second));
}
if (eos) {
- ScanLocalState::_scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
}
for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 168d499e661..d7a2bc4c920 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -18,11 +18,10 @@
#include "partition_sort_sink_operator.h"
#include "common/status.h"
+#include "partition_sort_source_operator.h"
#include "vec/common/hash_table/hash.h"
-namespace doris {
-
-namespace pipeline {
+namespace doris::pipeline {
OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSinkOperator>(this, _node);
@@ -154,7 +153,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
COUNTER_SET(local_state._hash_table_size_counter,
int64_t(local_state._num_partition));
//so all data from child have sink completed
- local_state._dependency->set_eos();
+
((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready();
}
return Status::OK();
@@ -291,5 +290,4 @@ void PartitionSortSinkLocalState::_init_hash_method() {
}
}
-} // namespace pipeline
-} // namespace doris
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index a77df27cacf..dbfb7ad5d0e 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -53,8 +53,8 @@ public:
class PartitionSortSinkDependency final : public Dependency {
public:
using SharedState = PartitionSortNodeSharedState;
- PartitionSortSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "PartitionSortSinkDependency", true) {}
+ PartitionSortSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "PartitionSortSinkDependency", true,
query_ctx) {}
~PartitionSortSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 0ef89c5068a..df22bde6365 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -52,9 +52,27 @@ public:
class PartitionSortSourceDependency final : public Dependency {
public:
using SharedState = PartitionSortNodeSharedState;
- PartitionSortSourceDependency(int id, int node_id)
- : Dependency(id, node_id, "PartitionSortSourceDependency") {}
+ PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "PartitionSortSourceDependency",
query_ctx) {}
~PartitionSortSourceDependency() override = default;
+
+ void block() override {
+ if (_always_ready) {
+ return;
+ }
+ Dependency::block();
+ }
+
+ void set_always_ready() {
+ if (_always_ready) {
+ return;
+ }
+ _always_ready = true;
+ set_ready();
+ }
+
+private:
+ std::atomic<bool> _always_ready {false};
};
class PartitionSortSourceOperatorX;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 7f6d1673e95..8c314b995bb 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -62,8 +62,8 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
state->execution_timeout()));
- _result_sink_dependency =
- ResultSinkDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _result_sink_dependency = ResultSinkDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index e7af819e621..eedd2d4c053 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -46,8 +46,8 @@ public:
class ResultSinkDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ResultSinkDependency);
- ResultSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "ResultSinkDependency", true) {}
+ ResultSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "ResultSinkDependency", true, query_ctx)
{}
~ResultSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 15b77af6ff9..93c1eb9f982 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -123,7 +123,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
_scan_dependency =
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-
PipelineXLocalState<>::_parent->node_id());
+
PipelineXLocalState<>::_parent->node_id(),
+ state->get_query_ctx());
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(state, info.scan_ranges);
@@ -147,10 +148,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
RETURN_IF_ERROR(_init_profile());
// if you want to add some profile in scan node, even it have not new
VScanner object
// could add here, not in the _init_profile() function
- _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime");
-
_prepare_rf_timer(_runtime_profile.get());
- _alloc_resource_timer = ADD_TIMER(_runtime_profile,
"AllocateResourceTime");
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
@@ -171,10 +169,10 @@ Status ScanLocalState<Derived>::open(RuntimeState* state)
{
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
- auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners();
+ auto status = _eos ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
_finish_dependency->block();
- DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0);
+ DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
@@ -262,7 +260,8 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
std::visit(
[&](auto&& range) {
if (range.is_empty_value_range()) {
- _scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
}
},
it.second.second);
@@ -561,7 +560,8 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
constant_val =
const_cast<char*>(const_column->get_data_at(0).data);
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
- _scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
}
} else if (const vectorized::ColumnVector<vectorized::UInt8>*
bool_column =
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -578,7 +578,8 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
constant_val =
const_cast<char*>(bool_column->get_data_at(0).data);
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
- _scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
}
} else {
LOG(WARNING) << "Constant predicate in scan node should return
a bool column with "
@@ -775,7 +776,8 @@ Status
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
auto fn_name = std::string("");
if (!is_fixed_range && state->null_in_set) {
- _scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
}
while (iter->has_next()) {
// column not in (nullptr) is always true
@@ -1168,7 +1170,8 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
- _scan_dependency->set_eos();
+ _eos = true;
+ _scan_dependency->set_ready();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
@@ -1350,7 +1353,6 @@ template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
auto& local_state = get_local_state(state);
- SCOPED_TIMER(local_state._get_next_timer);
SCOPED_TIMER(local_state.exec_time_counter());
// in inverted index apply logic, in order to optimize query performance,
// we built some temporary columns into block, these columns only used in
scan node level,
@@ -1376,7 +1378,7 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
}
}
- if (local_state._scan_dependency->eos()) {
+ if (local_state._eos) {
source_state = SourceState::FINISHED;
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 06a6c611294..23adfffc3a6 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -59,8 +59,8 @@ public:
class ScanDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ScanDependency);
- ScanDependency(int id, int node_id)
- : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr)
{}
+ ScanDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "ScanDependency", query_ctx),
_scanner_ctx(nullptr) {}
// TODO(gabriel):
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
@@ -71,7 +71,7 @@ public:
return Dependency::is_blocked_by(task);
}
- bool push_to_blocking_queue() override { return true; }
+ bool push_to_blocking_queue() const override { return true; }
void block() override {
if (_scanner_done) {
@@ -384,9 +384,7 @@ protected:
// "_colname_to_value_range" and in "_not_in_value_ranges"
std::vector<ColumnValueRangeType> _not_in_value_ranges;
- RuntimeProfile::Counter* _get_next_timer = nullptr;
- RuntimeProfile::Counter* _alloc_resource_timer = nullptr;
- RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
+ bool _eos = false;
doris::Mutex _block_lock;
};
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index cd1dbd6267a..89a2ab3bb38 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -70,8 +70,8 @@ private:
class SetProbeSinkDependency final : public Dependency {
public:
using SharedState = SetSharedState;
- SetProbeSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "SetProbeSinkDependency", true) {}
+ SetProbeSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SetProbeSinkDependency", true,
query_ctx) {}
~SetProbeSinkDependency() override = default;
void set_cur_child_id(int id) {
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index e318c354c88..b8ca789b780 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -63,7 +63,8 @@ private:
class SetSinkDependency final : public Dependency {
public:
using SharedState = SetSharedState;
- SetSinkDependency(int id, int node_id) : Dependency(id, node_id,
"SetSinkDependency", true) {}
+ SetSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SetSinkDependency", true, query_ctx) {}
~SetSinkDependency() override = default;
void set_cur_child_id(int id) {
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index cc7275444c6..44800f23f41 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -56,7 +56,8 @@ public:
class SetSourceDependency final : public Dependency {
public:
using SharedState = SetSharedState;
- SetSourceDependency(int id, int node_id) : Dependency(id, node_id,
"SetSourceDependency") {}
+ SetSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SetSourceDependency", query_ctx) {}
~SetSourceDependency() override = default;
};
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 67305aad691..8730780b54c 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -48,7 +48,8 @@ public:
class SortSinkDependency final : public Dependency {
public:
using SharedState = SortSharedState;
- SortSinkDependency(int id, int node_id) : Dependency(id, node_id,
"SortSinkDependency", true) {}
+ SortSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SortSinkDependency", true, query_ctx) {}
~SortSinkDependency() override = default;
};
diff --git a/be/src/pipeline/exec/sort_source_operator.h
b/be/src/pipeline/exec/sort_source_operator.h
index 3b615a58be2..efbddfd1050 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -48,7 +48,8 @@ public:
class SortSourceDependency final : public Dependency {
public:
using SharedState = SortSharedState;
- SortSourceDependency(int id, int node_id) : Dependency(id, node_id,
"SortSourceDependency") {}
+ SortSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "SortSourceDependency", query_ctx) {}
~SortSourceDependency() override = default;
};
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index e135566417e..f3031dd1015 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -69,8 +69,8 @@ private:
class UnionSinkDependency final : public Dependency {
public:
using SharedState = UnionSharedState;
- UnionSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "UnionSinkDependency", true) {}
+ UnionSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "UnionSinkDependency", true, query_ctx)
{}
~UnionSinkDependency() override = default;
void block() override {}
};
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index fc1dd124a0e..d824f9db7a0 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -115,8 +115,8 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
DCHECK(deps.size() == 1);
DCHECK(deps.front() == nullptr);
//child_count == 0 , we need to creat a UnionDependency
- deps.front() =
-
std::make_shared<UnionSourceDependency>(_parent->operator_id(),
_parent->node_id());
+ deps.front() = std::make_shared<UnionSourceDependency>(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
}
RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 0c846b828f9..8b7060884e9 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -72,7 +72,8 @@ private:
class UnionSourceDependency final : public Dependency {
public:
using SharedState = UnionSharedState;
- UnionSourceDependency(int id, int node_id) : Dependency(id, node_id,
"UnionSourceDependency") {}
+ UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
~UnionSourceDependency() override = default;
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
@@ -85,7 +86,7 @@ public:
}
return this;
}
- bool push_to_blocking_queue() override { return true; }
+ bool push_to_blocking_queue() const override { return true; }
void block() override {}
};
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index a302dc7c34d..0b26c9d215d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -104,6 +104,16 @@ inline const char* get_state_name(PipelineTaskState idx) {
__builtin_unreachable();
}
+inline bool is_final_state(PipelineTaskState idx) {
+ switch (idx) {
+ case PipelineTaskState::FINISHED:
+ case PipelineTaskState::CANCELED:
+ return true;
+ default:
+ return false;
+ }
+}
+
class TaskQueue;
class PriorityTaskQueue;
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index c149da54fde..00a7d012c98 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -28,7 +28,7 @@
namespace doris::pipeline {
-void Dependency::add_block_task(PipelineXTask* task) {
+void Dependency::_add_block_task(PipelineXTask* task) {
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] !=
task)
<< "Duplicate task: " << task->debug_string();
_blocked_task.push_back(task);
@@ -54,16 +54,19 @@ void Dependency::set_ready() {
}
Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
- if (config::enable_fuzzy_mode && !_ready &&
_should_log(_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some reasons: "
<< name() << " "
- << _node_id << " block tasks: " << _blocked_task.size()
- << "task: " << (task ?
task->fragment_context()->debug_string() : "");
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && !push_to_blocking_queue() && task) {
+ _add_block_task(task);
}
+ return ready ? nullptr : this;
+}
+Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && !push_to_blocking_queue() && task) {
- add_block_task(task);
+ _add_block_task(task);
}
return ready ? nullptr : this;
}
@@ -73,9 +76,9 @@ Dependency*
RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
return nullptr;
}
std::unique_lock<std::mutex> lc(_task_lock);
- if (*_blocked_by_rf) {
+ if (*_blocked_by_rf && !_is_cancelled()) {
if (LIKELY(task)) {
- add_block_task(task);
+ _add_block_task(task);
}
return this;
}
@@ -90,6 +93,15 @@ std::string Dependency::debug_string(int indentation_level) {
return fmt::to_string(debug_string_buffer);
}
+std::string RuntimeFilterDependency::debug_string(int indentation_level) {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "{}{}: id={}, block task = {}, ready={}, _filters = {},
_blocked_by_rf = {}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
+ _ready, _filters.load(), _blocked_by_rf ?
_blocked_by_rf->load() : false);
+ return fmt::to_string(debug_string_buffer);
+}
+
std::string AndDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 662e789d65b..13ae1cd9d7d 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -61,30 +61,31 @@ struct BasicSharedState {
class Dependency : public std::enable_shared_from_this<Dependency> {
public:
- Dependency(int id, int node_id, std::string name)
+ Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(false),
- _ready(false) {}
- Dependency(int id, int node_id, std::string name, bool ready)
+ _ready(false),
+ _query_ctx(query_ctx) {}
+ Dependency(int id, int node_id, std::string name, bool ready,
QueryContext* query_ctx)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(true),
- _ready(ready) {}
+ _ready(ready),
+ _query_ctx(query_ctx) {}
virtual ~Dependency() = default;
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
- void set_parent(std::weak_ptr<Dependency> parent) { _parent = parent; }
void add_child(std::shared_ptr<Dependency> child) {
_children.push_back(child); }
std::shared_ptr<BasicSharedState> shared_state() { return _shared_state; }
void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
_shared_state = shared_state;
}
virtual std::string debug_string(int indentation_level = 0);
- virtual bool push_to_blocking_queue() { return false; }
+ virtual bool push_to_blocking_queue() const { return false; }
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
void start_watcher() {
@@ -104,26 +105,9 @@ public:
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->set_ready();
}
- void set_eos() {
- if (_eos) {
- return;
- }
- _eos = true;
- set_ready();
- if (_is_write_dependency && _shared_state->source_dep != nullptr) {
- _shared_state->source_dep->set_eos();
- }
- }
- bool eos() const { return _eos.load(); }
// Notify downstream pipeline tasks this dependency is blocked.
- virtual void block() {
- if (_eos) {
- return;
- }
- _ready = false;
- }
- void add_block_task(PipelineXTask* task);
+ virtual void block() { _ready = false; }
protected:
bool _should_log(uint64_t cur_time) {
@@ -136,14 +120,19 @@ protected:
_last_log_time = cur_time;
return true;
}
+ void _add_block_task(PipelineXTask* task);
+ bool _is_cancelled() const {
+ return push_to_blocking_queue() ? false : _query_ctx->is_cancelled();
+ }
const int _id;
const int _node_id;
const std::string _name;
const bool _is_write_dependency;
+ std::atomic<bool> _ready;
+ const QueryContext* _query_ctx;
std::shared_ptr<BasicSharedState> _shared_state {nullptr};
- std::atomic<bool> _ready;
MonotonicStopWatch _watcher;
std::weak_ptr<Dependency> _parent;
std::list<std::shared_ptr<Dependency>> _children;
@@ -151,7 +140,6 @@ protected:
uint64_t _last_log_time = 0;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
- std::atomic<bool> _eos {false};
};
struct FakeSharedState : public BasicSharedState {};
@@ -159,11 +147,21 @@ struct FakeSharedState : public BasicSharedState {};
struct FakeDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
- FakeDependency(int id, int node_id) : Dependency(id, node_id,
"FakeDependency") {}
+ FakeDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "FakeDependency", query_ctx) {}
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
return nullptr; }
};
+struct FinishDependency final : public Dependency {
+public:
+ using SharedState = FakeSharedState;
+ FinishDependency(int id, int node_id, std::string name, QueryContext*
query_ctx)
+ : Dependency(id, node_id, name, true, query_ctx) {}
+
+ [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
+};
+
class RuntimeFilterDependency;
class RuntimeFilterTimer {
public:
@@ -200,15 +198,17 @@ private:
class RuntimeFilterDependency final : public Dependency {
public:
- RuntimeFilterDependency(int id, int node_id, std::string name)
- : Dependency(id, node_id, name) {}
- Dependency* is_blocked_by(PipelineXTask* task);
+ RuntimeFilterDependency(int id, int node_id, std::string name,
QueryContext* query_ctx)
+ : Dependency(id, node_id, name, query_ctx) {}
+ Dependency* is_blocked_by(PipelineXTask* task) override;
void add_filters(IRuntimeFilter* runtime_filter);
void sub_filters();
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
_blocked_by_rf = blocked_by_rf;
}
+ std::string debug_string(int indentation_level = 0) override;
+
protected:
std::atomic_int _filters;
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
@@ -218,7 +218,8 @@ class AndDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
ENABLE_FACTORY_CREATOR(AndDependency);
- AndDependency(int id, int node_id) : Dependency(id, node_id,
"AndDependency") {}
+ AndDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AndDependency", query_ctx) {}
[[nodiscard]] std::string name() const override {
fmt::memory_buffer debug_string_buffer;
@@ -371,8 +372,8 @@ class AsyncWriterDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
- AsyncWriterDependency(int id, int node_id)
- : Dependency(id, node_id, "AsyncWriterDependency", true) {}
+ AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AsyncWriterDependency", true,
query_ctx) {}
~AsyncWriterDependency() override = default;
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index c6c28fbe8d2..b6ce3fbeb9e 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -25,8 +25,8 @@ namespace doris::pipeline {
struct LocalExchangeSinkDependency final : public Dependency {
public:
using SharedState = LocalExchangeSharedState;
- LocalExchangeSinkDependency(int id, int node_id)
- : Dependency(id, node_id, "LocalExchangeSinkDependency", true) {}
+ LocalExchangeSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "LocalExchangeSinkDependency", true,
query_ctx) {}
~LocalExchangeSinkDependency() override = default;
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index dfc89a86c9e..ebf18d9a249 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -25,8 +25,8 @@ namespace doris::pipeline {
struct LocalExchangeSourceDependency final : public Dependency {
public:
using SharedState = LocalExchangeSharedState;
- LocalExchangeSourceDependency(int id, int node_id)
- : Dependency(id, node_id, "LocalExchangeSourceDependency") {}
+ LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "LocalExchangeSourceDependency",
query_ctx) {}
~LocalExchangeSourceDependency() override = default;
};
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 4e3fec7abbf..050b198a22e 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -72,18 +72,12 @@
namespace doris::pipeline {
-std::string PipelineXLocalStateBase::debug_string(int indentation_level) const
{
- return _parent->debug_string(indentation_level);
-}
-
template <typename DependencyType>
std::string PipelineXLocalState<DependencyType>::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}",
- PipelineXLocalStateBase::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, "{}",
_parent->debug_string(indentation_level));
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
- fmt::format_to(debug_string_buffer, "\nDependency: \n {}",
- _dependency->debug_string(indentation_level + 1));
+ fmt::format_to(debug_string_buffer, " Dependency: {}",
_dependency->debug_string());
}
return fmt::to_string(debug_string_buffer);
}
@@ -91,12 +85,9 @@ std::string
PipelineXLocalState<DependencyType>::debug_string(int indentation_le
template <typename DependencyType>
std::string PipelineXSinkLocalState<DependencyType>::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}",
-
PipelineXSinkLocalStateBase::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, "{}",
_parent->debug_string(indentation_level));
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
- fmt::format_to(debug_string_buffer, "\n{}Dependency: \n {}",
- std::string(indentation_level * 2, ' '),
- _dependency->debug_string(indentation_level + 1));
+ fmt::format_to(debug_string_buffer, ", Dependency: {}",
_dependency->debug_string());
}
return fmt::to_string(debug_string_buffer);
}
@@ -245,10 +236,6 @@ std::string DataSinkOperatorXBase::debug_string(int
indentation_level) const {
return fmt::to_string(debug_string_buffer);
}
-std::string PipelineXSinkLocalStateBase::debug_string(int indentation_level)
const {
- return _parent->debug_string(indentation_level);
-}
-
std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int
indentation_level) const {
return
state->get_sink_local_state(operator_id())->debug_string(indentation_level);
}
@@ -294,7 +281,8 @@ template <>
inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>&
dependency) {
+void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>&
dependency,
+ QueryContext* ctx) {
std::shared_ptr<typename LocalStateType::DependencyType::SharedState> ss =
nullptr;
if constexpr (NeedToCreate<typename
LocalStateType::DependencyType::SharedState>) {
ss.reset(new typename LocalStateType::DependencyType::SharedState());
@@ -302,8 +290,8 @@ void
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
if constexpr (!std::is_same_v<typename LocalStateType::DependencyType,
FakeDependency>) {
auto& dests = dests_id();
for (auto& dest_id : dests) {
- dependency.push_back(
- std::make_shared<typename
LocalStateType::DependencyType>(dest_id, _node_id));
+ dependency.push_back(std::make_shared<typename
LocalStateType::DependencyType>(
+ dest_id, _node_id, ctx));
dependency.back()->set_shared_state(ss);
}
} else {
@@ -312,8 +300,8 @@ void
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
}
template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency() {
- return std::make_shared<typename
LocalStateType::DependencyType>(_operator_id, _node_id);
+DependencySPtr OperatorX<LocalStateType>::get_dependency(QueryContext* ctx) {
+ return std::make_shared<typename
LocalStateType::DependencyType>(_operator_id, _node_id, ctx);
}
template <typename LocalStateType>
@@ -328,8 +316,9 @@
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
RuntimeState* state)
: _parent(parent),
_state(state),
- _finish_dependency(new Dependency(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY", true)) {}
+ _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx())) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
: _num_rows_returned(0),
@@ -337,10 +326,12 @@
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state),
- _finish_dependency(new Dependency(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY", true)) {
+ _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx())) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY");
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
+ state->get_query_ctx());
}
template <typename DependencyType>
@@ -421,7 +412,7 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
}
} else {
auto& deps = info.dependencys;
- deps.front() = std::make_shared<FakeDependency>(0, 0);
+ deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
_dependency = (DependencyType*)deps.front().get();
}
_rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows",
TUnit::UNIT, 1);
@@ -499,8 +490,8 @@ Status AsyncWriterSink<Writer, Parent>::init(RuntimeState*
state, LocalSinkState
_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
}
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
- _async_writer_dependency =
- AsyncWriterDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _async_writer_dependency = AsyncWriterDependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_wait_for_dependency_timer =
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 5afa080e6d3..943d8c4670b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -97,7 +97,7 @@ public:
void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
- [[nodiscard]] virtual std::string debug_string(int indentation_level = 0)
const;
+ [[nodiscard]] virtual std::string debug_string(int indentation_level = 0)
const = 0;
virtual Dependency* dependency() { return nullptr; }
@@ -176,7 +176,7 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- virtual DependencySPtr get_dependency() = 0;
+ virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
Status prepare(RuntimeState* state) override;
@@ -307,7 +307,7 @@ public:
return state->get_local_state(operator_id())->template
cast<LocalState>();
}
- DependencySPtr get_dependency() override;
+ DependencySPtr get_dependency(QueryContext* ctx) override;
};
template <typename DependencyArg = FakeDependency>
@@ -348,7 +348,7 @@ public:
virtual Status close(RuntimeState* state, Status exec_status) = 0;
virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
- [[nodiscard]] virtual std::string debug_string(int indentation_level)
const;
+ [[nodiscard]] virtual std::string debug_string(int indentation_level)
const = 0;
template <class TARGET>
TARGET& cast() {
@@ -456,7 +456,7 @@ public:
return reinterpret_cast<const TARGET&>(*this);
}
- virtual void get_dependency(std::vector<DependencySPtr>& dependency) = 0;
+ virtual void get_dependency(std::vector<DependencySPtr>& dependency,
QueryContext* ctx) = 0;
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@@ -551,7 +551,7 @@ public:
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
- void get_dependency(std::vector<DependencySPtr>& dependency) override;
+ void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext*
ctx) override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b087c3bcab8..e47b55e4914 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -131,7 +131,7 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
.tag("fragment_id", _fragment_id)
.tag("reason", reason)
.tag("error message", msg);
- if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
+ if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) {
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext
cancel instance: "
<<
print_id(runtime_state->fragment_instance_id());)
@@ -149,6 +149,11 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
}
+ for (auto& tasks : _tasks) {
+ for (auto& task : tasks) {
+ task->clear_blocking_state();
+ }
+ }
}
Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams&
request) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 5b3524c69a2..703e4862f7c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -57,9 +57,9 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t
task_id, RuntimeSta
_local_exchange_state(local_exchange_state),
_task_idx(task_idx) {
_pipeline_task_watcher.start();
- _sink->get_dependency(_downstream_dependency);
+ _sink->get_dependency(_downstream_dependency, state->get_query_ctx());
for (auto& op : _operators) {
- _source_dependency.insert({op->operator_id(), op->get_dependency()});
+ _source_dependency.insert({op->operator_id(),
op->get_dependency(state->get_query_ctx())});
}
}
@@ -357,21 +357,24 @@ std::string PipelineXTask::debug_string() {
_opened ? _sink->debug_string(_state, _operators.size())
: _sink->debug_string(_operators.size()));
fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
- for (size_t i = 0; i < _read_dependencies.size(); i++) {
- fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
- _read_dependencies[i]->debug_string());
+ size_t i = 0;
+ for (; i < _read_dependencies.size(); i++) {
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+ _read_dependencies[i]->debug_string(i + 1));
}
fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
- fmt::format_to(debug_string_buffer, "{}\n",
_write_dependencies->debug_string());
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_write_dependencies->debug_string(1));
+ i++;
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
- fmt::format_to(debug_string_buffer, "{}\n",
_filter_dependency->debug_string());
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_filter_dependency->debug_string(1));
+ i++;
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
- for (size_t i = 0; i < _finish_dependencies.size(); i++) {
- fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
- _finish_dependencies[i]->debug_string());
+ for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+ _finish_dependencies[i]->debug_string(j + 1));
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index e0d0e58e658..46d006d0dac 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -142,6 +142,13 @@ public:
}
_use_blocking_queue = false;
}
+ void clear_blocking_state() {
+ if (!is_final_state(get_state()) && get_state() !=
PipelineTaskState::PENDING_FINISH &&
+ _blocked_dep) {
+ _blocked_dep->set_ready();
+ _blocked_dep = nullptr;
+ }
+ }
private:
Dependency* _write_blocked_dependency() {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 210de93a7ec..c0153652d26 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -560,6 +560,12 @@ void FragmentMgr::remove_pipeline_context(
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
for (const auto& ins_id : ins_ids) {
+ {
+ std::lock_guard<std::mutex> plock(q_context->pipeline_lock);
+ if
(q_context->fragment_id_to_pipeline_ctx.contains(f_context->get_fragment_id()))
{
+
q_context->fragment_id_to_pipeline_ctx.erase(f_context->get_fragment_id());
+ }
+ }
LOG_INFO("Removing query {} instance {}, all done? {}",
print_id(query_id),
print_id(ins_id), all_done);
_pipeline_map.erase(ins_id);
@@ -866,6 +872,10 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
_cv.notify_all();
}
+ {
+ std::lock_guard<std::mutex> lock(query_ctx->pipeline_lock);
+ query_ctx->fragment_id_to_pipeline_ctx.insert({params.fragment_id,
context});
+ }
RETURN_IF_ERROR(context->submit());
return Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
new file mode 100644
index 00000000000..823893dbe74
--- /dev/null
+++ b/be/src/runtime/query_context.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/query_context.h"
+
+#include "pipeline/pipeline_fragment_context.h"
+
+namespace doris {
+
+bool QueryContext::cancel(bool v, std::string msg, Status new_status, int
fragment_id) {
+ if (_is_cancelled) {
+ return false;
+ }
+ set_exec_status(new_status);
+ _is_cancelled.store(v);
+
+ set_ready_to_execute(true);
+ {
+ std::lock_guard<std::mutex> plock(pipeline_lock);
+ for (auto& ctx : fragment_id_to_pipeline_ctx) {
+ if (fragment_id == ctx.first) {
+ continue;
+ }
+ ctx.second->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg);
+ }
+ }
+ return true;
+}
+} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index bb7ad20b90b..49c2deb0d8c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -40,6 +40,11 @@
#include "vec/runtime/shared_scanner_controller.h"
namespace doris {
+
+namespace pipeline {
+class PipelineFragmentContext;
+} // namespace pipeline
+
struct ReportStatusRequest {
bool is_pipeline_x;
const Status status;
@@ -138,16 +143,7 @@ public:
}
[[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
- bool cancel(bool v, std::string msg, Status new_status) {
- if (_is_cancelled) {
- return false;
- }
- set_exec_status(new_status);
- _is_cancelled.store(v);
-
- set_ready_to_execute(true);
- return true;
- }
+ bool cancel(bool v, std::string msg, Status new_status, int fragment_id =
-1);
void set_exec_status(Status new_status) {
if (new_status.ok()) {
@@ -267,6 +263,8 @@ public:
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
std::vector<TUniqueId> fragment_instance_ids;
+ std::map<int, std::shared_ptr<pipeline::PipelineFragmentContext>>
fragment_id_to_pipeline_ctx;
+ std::mutex pipeline_lock;
// plan node id -> TFileScanRangeParams
// only for file scan node
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 94bc7a7bdee..d5d460e80b9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -273,7 +273,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int
be_number) {
<< " #senders=" << _num_remaining_senders;
if (_num_remaining_senders == 0) {
if (_dependency) {
- _dependency->set_eos();
+ _dependency->set_ready();
}
_data_arrival_cv.notify_one();
}
@@ -288,7 +288,7 @@ void VDataStreamRecvr::SenderQueue::cancel(Status
cancel_status) {
_is_cancelled = true;
_cancel_status = cancel_status;
if (_dependency) {
- _dependency->set_eos();
+ _dependency->set_ready();
}
VLOG_QUERY << "cancelled stream: _fragment_instance_id="
<< print_id(_recvr->fragment_instance_id())
@@ -318,7 +318,7 @@ void VDataStreamRecvr::SenderQueue::close() {
std::lock_guard<std::mutex> l(_lock);
_is_cancelled = true;
if (_dependency) {
- _dependency->set_eos();
+ _dependency->set_ready();
}
for (auto closure_pair : _pending_closures) {
@@ -362,8 +362,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_sender_to_local_channel_dependency.resize(num_queues);
for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
-
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
-
_dest_node_id);
+ pipeline::LocalExchangeChannelDependency::create_shared(
+ _dest_node_id, _dest_node_id,
state->get_query_ctx());
}
}
_sender_queues.reserve(num_queues);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]