This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1eef30b493 [pipelineX](dependency) Wake up task by dependencies
(#26879)
b1eef30b493 is described below
commit b1eef30b49318caa42110bede0535fc76dc669ac
Author: Gabriel <[email protected]>
AuthorDate: Sat Nov 18 03:20:24 2023 +0800
[pipelineX](dependency) Wake up task by dependencies (#26879)
---------
Co-authored-by: Mryange <[email protected]>
---
be/src/http/action/pipeline_task_action.cpp | 41 +++
be/src/http/action/pipeline_task_action.h | 36 +++
be/src/pipeline/exec/exchange_sink_buffer.cpp | 33 +-
be/src/pipeline/exec/exchange_sink_buffer.h | 8 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 15 +-
be/src/pipeline/exec/exchange_sink_operator.h | 47 ++-
be/src/pipeline/exec/exchange_source_operator.cpp | 5 +-
be/src/pipeline/exec/exchange_source_operator.h | 7 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 3 +-
.../exec/multi_cast_data_stream_source.cpp | 3 +-
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 31 ++
be/src/pipeline/exec/multi_cast_data_streamer.h | 20 +-
.../pipeline/exec/partition_sort_source_operator.h | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 12 +-
be/src/pipeline/exec/result_sink_operator.h | 10 +-
be/src/pipeline/exec/scan_operator.cpp | 19 +-
be/src/pipeline/exec/scan_operator.h | 40 +--
be/src/pipeline/exec/set_probe_sink_operator.cpp | 9 +-
be/src/pipeline/exec/set_sink_operator.cpp | 4 +-
be/src/pipeline/exec/set_source_operator.cpp | 2 +-
be/src/pipeline/exec/union_source_operator.cpp | 3 +-
be/src/pipeline/pipeline_fragment_context.cpp | 3 +-
be/src/pipeline/pipeline_fragment_context.h | 8 +-
be/src/pipeline/pipeline_task.h | 7 +
be/src/pipeline/pipeline_x/dependency.cpp | 230 +++++++++++++-
be/src/pipeline/pipeline_x/dependency.h | 340 +++++++++------------
.../local_exchange_sink_operator.cpp | 5 +-
.../local_exchange_source_operator.cpp | 2 +-
be/src/pipeline/pipeline_x/operator.cpp | 17 +-
be/src/pipeline/pipeline_x/operator.h | 8 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 25 +-
.../pipeline_x/pipeline_x_fragment_context.h | 2 +
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 75 +++--
be/src/pipeline/pipeline_x/pipeline_x_task.h | 101 +++---
be/src/pipeline/task_scheduler.cpp | 56 +++-
be/src/pipeline/task_scheduler.h | 1 -
be/src/runtime/fragment_mgr.cpp | 18 ++
be/src/runtime/fragment_mgr.h | 2 +
be/src/service/http_service.cpp | 6 +
be/src/vec/exec/scan/scanner_context.cpp | 54 ++--
be/src/vec/exec/scan/scanner_context.h | 4 +
be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 5 +-
be/src/vec/sink/writer/async_result_writer.h | 3 +-
45 files changed, 858 insertions(+), 468 deletions(-)
diff --git a/be/src/http/action/pipeline_task_action.cpp
b/be/src/http/action/pipeline_task_action.cpp
new file mode 100644
index 00000000000..c3b49b57136
--- /dev/null
+++ b/be/src/http/action/pipeline_task_action.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/pipeline_task_action.h"
+
+#include <sstream>
+#include <string>
+
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+void PipelineTaskAction::handle(HttpRequest* req) {
+ req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain;
version=0.0.4");
+ HttpChannel::send_reply(req, HttpStatus::OK,
+
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks());
+}
+
+} // end namespace doris
diff --git a/be/src/http/action/pipeline_task_action.h
b/be/src/http/action/pipeline_task_action.h
new file mode 100644
index 00000000000..488a1148a53
--- /dev/null
+++ b/be/src/http/action/pipeline_task_action.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "http/http_handler.h"
+
+namespace doris {
+
+class HttpRequest;
+
+// Get BE health state from http API.
+class PipelineTaskAction : public HttpHandler {
+public:
+ PipelineTaskAction() = default;
+
+ ~PipelineTaskAction() override = default;
+
+ void handle(HttpRequest* req) override;
+};
+
+} // end namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 46364528458..68f34ffc820 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -90,6 +90,13 @@ bool ExchangeSinkBuffer<Parent>::can_write() const {
return total_package_size <= max_package_size;
}
+template <typename Parent>
+void ExchangeSinkBuffer<Parent>::_set_ready_to_finish(bool all_done) {
+ if (_finish_dependency && _should_stop && all_done) {
+ _finish_dependency->set_ready_to_finish();
+ }
+}
+
template <typename Parent>
bool ExchangeSinkBuffer<Parent>::is_pending_finish() {
//note(wb) angly implementation here, because operator couples the
scheduling logic
@@ -160,9 +167,6 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
send_now = true;
_rpc_channel_is_idle[ins_id.lo] = false;
_busy_channels++;
- if (_finish_dependency) {
- _finish_dependency->block_finishing();
- }
}
_instance_to_package_queue[ins_id.lo].emplace(std::move(request));
_total_queue_size++;
@@ -196,9 +200,6 @@ Status
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
send_now = true;
_rpc_channel_is_idle[ins_id.lo] = false;
_busy_channels++;
- if (_finish_dependency) {
- _finish_dependency->block_finishing();
- }
}
_instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
}
@@ -222,10 +223,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
if (_is_finishing) {
_rpc_channel_is_idle[id] = true;
- _busy_channels--;
- if (_finish_dependency && _busy_channels == 0) {
- _finish_dependency->set_ready_to_finish();
- }
+ _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
return Status::OK();
}
@@ -361,10 +359,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId
id) {
broadcast_q.pop();
} else {
_rpc_channel_is_idle[id] = true;
- _busy_channels--;
- if (_finish_dependency && _busy_channels == 0) {
- _finish_dependency->set_ready_to_finish();
- }
+ _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
return Status::OK();
@@ -396,11 +391,8 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
if (!_rpc_channel_is_idle[id]) {
- _busy_channels--;
_rpc_channel_is_idle[id] = true;
- if (_finish_dependency && _busy_channels == 0) {
- _finish_dependency->set_ready_to_finish();
- }
+ _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
}
}
@@ -417,11 +409,8 @@ void
ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
if (!_rpc_channel_is_idle[id]) {
- _busy_channels--;
_rpc_channel_is_idle[id] = true;
- if (_finish_dependency && _busy_channels == 0) {
- _finish_dependency->set_ready_to_finish();
- }
+ _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 9111f553b27..edcfa80bc28 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -195,7 +195,14 @@ public:
}
void set_query_statistics(QueryStatistics* statistics) { _statistics =
statistics; }
+ void set_should_stop() {
+ _should_stop = true;
+ _set_ready_to_finish(_busy_channels == 0);
+ }
+
private:
+ void _set_ready_to_finish(bool all_done);
+
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
// store data in non-broadcast shuffle
@@ -244,6 +251,7 @@ private:
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
QueryStatistics* _statistics = nullptr;
+ std::atomic<bool> _should_stop {false};
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 25418492954..f5518a1553e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -173,13 +173,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
register_channels(_sink_buffer.get());
- _exchange_sink_dependency =
AndDependency::create_shared(_parent->operator_id());
- _queue_dependency =
ExchangeSinkQueueDependency::create_shared(_parent->operator_id());
+ _exchange_sink_dependency =
+ AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _queue_dependency =
+ ExchangeSinkQueueDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() ==
1) &&
!only_local_exchange) {
- _broadcast_dependency =
BroadcastDependency::create_shared(_parent->operator_id());
+ _broadcast_dependency =
+ BroadcastDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
_broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
@@ -194,7 +197,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
size_t dep_id = 0;
_local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
- auto deps_for_channels =
AndDependency::create_shared(_parent->operator_id());
+ auto deps_for_channels =
+ AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] =
channel->get_local_channel_dependency();
@@ -225,6 +229,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
fmt::format("Crc32HashPartitioner({})",
_partition_count));
}
+ _finish_dependency->should_finish_after_check();
+
return Status::OK();
}
@@ -506,6 +512,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState*
state, Status exec_status)
final_st = st;
}
}
+ local_state._sink_buffer->set_should_stop();
return final_st;
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 6b9d3b5e4b1..9dc670cd668 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -67,7 +67,8 @@ private:
class ExchangeSinkQueueDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
- ExchangeSinkQueueDependency(int id) : WriteDependency(id,
"ResultQueueDependency") {}
+ ExchangeSinkQueueDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "ResultQueueDependency") {}
~ExchangeSinkQueueDependency() override = default;
void* shared_state() override { return nullptr; }
@@ -76,23 +77,23 @@ public:
class BroadcastDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
- BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"),
_available_block(0) {}
+ BroadcastDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "BroadcastDependency"),
_available_block(0) {}
~BroadcastDependency() override = default;
- [[nodiscard]] WriteDependency* write_blocked_by() override {
- if (config::enable_fuzzy_mode && _available_block == 0 &&
- _should_log(_write_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _available_block > 0 ? nullptr : this;
- }
-
void set_available_block(int available_block) { _available_block =
available_block; }
- void return_available_block() { _available_block++; }
+ void return_available_block() {
+ _available_block++;
+ WriteDependency::set_ready_for_write();
+ }
- void take_available_block() { _available_block--; }
+ void take_available_block() {
+ auto old_vale = _available_block.fetch_sub(1);
+ if (old_vale == 1) {
+ WriteDependency::block_writing();
+ }
+ }
void* shared_state() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
@@ -134,25 +135,11 @@ private:
class LocalExchangeChannelDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
- LocalExchangeChannelDependency(int id, std::shared_ptr<bool> mem_available)
- : WriteDependency(id, "LocalExchangeChannelDependency"),
- _mem_available(mem_available) {}
+ LocalExchangeChannelDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
~LocalExchangeChannelDependency() override = default;
-
- WriteDependency* write_blocked_by() override {
- if (config::enable_fuzzy_mode && !_is_runnable() &&
- _should_log(_write_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _is_runnable() ? nullptr : this;
- }
-
void* shared_state() override { return nullptr; }
-
-private:
- bool _is_runnable() const { return _ready_for_write || *_mem_available; }
- std::shared_ptr<bool> _mem_available;
+ // TODO(gabriel): blocked by memory
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3213ed55778..4a29694a37f 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -51,12 +51,13 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
- source_dependency = AndDependency::create_shared(_parent->operator_id());
+ source_dependency = AndDependency::create_shared(_parent->operator_id(),
_parent->node_id());
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
for (size_t i = 0; i < queues.size(); i++) {
- deps[i] =
ExchangeDataDependency::create_shared(_parent->operator_id(), queues[i]);
+ deps[i] =
ExchangeDataDependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ queues[i]);
queues[i]->set_dependency(deps[i]);
source_dependency->add_child(deps[i]);
}
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index c41268f8eac..5d754747bee 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -53,8 +53,9 @@ public:
struct ExchangeDataDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
- ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
- : Dependency(id, "DataDependency"), _always_done(false) {}
+ ExchangeDataDependency(int id, int node_id,
+ vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
+ : Dependency(id, node_id, "DataDependency"), _always_done(false) {}
void* shared_state() override { return nullptr; }
void set_always_done() {
@@ -70,7 +71,7 @@ public:
if (_always_done) {
return;
}
- _ready_for_read = false;
+ Dependency::block_reading();
}
private:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index cb63f64ab42..3d9827a27b4 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -49,7 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_shared_hash_table_dependency =
- SharedHashTableDependency::create_shared(_parent->operator_id());
+ SharedHashTableDependency::create_shared(_parent->operator_id(),
_parent->node_id());
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
if (p._is_broadcast_join &&
state->enable_share_hash_table_for_broadcast_join()) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 10056a30e72..a1815ca5118 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -49,7 +49,8 @@ class HashJoinBuildSinkOperatorX;
class SharedHashTableDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
- SharedHashTableDependency(int id) : WriteDependency(id,
"SharedHashTableDependency") {}
+ SharedHashTableDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "SharedHashTableDependency") {}
~SharedHashTableDependency() override = default;
void* shared_state() override { return nullptr; }
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 4c6e21c5c6d..26eee2161c9 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -138,7 +138,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
-
static_cast<MultiCastDependency*>(_dependency)->set_consumer_id(p._consumer_id);
+ _shared_state->multi_cast_data_streamer.set_dep_by_sender_idx(
+ p._consumer_id, static_cast<MultiCastDependency*>(_dependency));
_output_expr_contexts.resize(p._output_expr_contexts.size());
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 3929c6ced09..8d7a745a042 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -17,6 +17,7 @@
#include "multi_cast_data_streamer.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_state.h"
namespace doris::pipeline {
@@ -46,6 +47,9 @@ void MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* block
}
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
+ if (pos_to_pull == _multi_cast_blocks.end()) {
+ _block_reading(sender_idx);
+ }
}
void MultiCastDataStreamer::close_sender(int sender_idx) {
@@ -63,6 +67,7 @@ void MultiCastDataStreamer::close_sender(int sender_idx) {
}
}
_closed_sender_count++;
+ _block_reading(sender_idx);
}
Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
@@ -87,10 +92,36 @@ Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
+ _set_ready_for_read(i);
}
}
_eos = eos;
return Status::OK();
}
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+ if (_dependencies.empty()) {
+ return;
+ }
+ auto* dep = _dependencies[sender_idx];
+ DCHECK(dep);
+ dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {
+ for (auto* dep : _dependencies) {
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
+}
+
+void MultiCastDataStreamer::_block_reading(int sender_idx) {
+ if (_dependencies.empty()) {
+ return;
+ }
+ auto* dep = _dependencies[sender_idx];
+ DCHECK(dep);
+ dep->block_reading();
+}
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 92c0e24079e..4d03fe53b80 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -21,6 +21,7 @@
namespace doris::pipeline {
+class MultiCastDependency;
struct MultiCastBlock {
MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
@@ -33,11 +34,16 @@ struct MultiCastBlock {
// code
class MultiCastDataStreamer {
public:
- MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count)
+ MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count,
+ bool with_dependencies = false)
: _row_desc(row_desc),
_profile(pool->add(new
RuntimeProfile("MultiCastDataStreamSink"))),
_cast_sender_count(cast_sender_count) {
_sender_pos_to_read.resize(cast_sender_count,
_multi_cast_blocks.end());
+ if (with_dependencies) {
+ _dependencies.resize(cast_sender_count, nullptr);
+ }
+
_peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
_process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
};
@@ -65,9 +71,19 @@ public:
void set_eos() {
std::lock_guard l(_mutex);
_eos = true;
+ _set_ready_for_read();
+ }
+
+ void set_dep_by_sender_idx(int sender_idx, MultiCastDependency* dep) {
+ _dependencies[sender_idx] = dep;
+ _block_reading(sender_idx);
}
private:
+ void _set_ready_for_read(int sender_idx);
+ void _set_ready_for_read();
+ void _block_reading(int sender_idx);
+
const RowDescriptor& _row_desc;
RuntimeProfile* _profile;
std::list<MultiCastBlock> _multi_cast_blocks;
@@ -80,5 +96,7 @@ private:
RuntimeProfile::Counter* _process_rows;
RuntimeProfile::Counter* _peak_mem_usage;
+
+ std::vector<MultiCastDependency*> _dependencies;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 40b802c35e6..9720e55efa7 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -66,7 +66,7 @@ private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer;
- int _sort_idx = 0;
+ std::atomic<int> _sort_idx = 0;
};
class PartitionSortSourceOperatorX final : public
OperatorX<PartitionSortSourceLocalState> {
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index b65a5ba1b86..e27b0da1d32 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -65,12 +65,16 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
state->execution_timeout()));
- _result_sink_dependency =
OrDependency::create_shared(_parent->operator_id());
- _buffer_dependency =
ResultBufferDependency::create_shared(_parent->operator_id());
- _cancel_dependency =
CancelDependency::create_shared(_parent->operator_id());
+ _result_sink_dependency =
+ OrDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _buffer_dependency =
+ ResultBufferDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+ _cancel_dependency =
+ CancelDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_result_sink_dependency->add_child(_cancel_dependency);
_result_sink_dependency->add_child(_buffer_dependency);
- _queue_dependency =
ResultQueueDependency::create_shared(_parent->operator_id());
+ _queue_dependency =
+ ResultQueueDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_result_sink_dependency->add_child(_queue_dependency);
((PipBufferControlBlock*)_sender.get())
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 311d2c70673..e117f55d25c 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -46,7 +46,8 @@ public:
class ResultBufferDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ResultBufferDependency);
- ResultBufferDependency(int id) : WriteDependency(id,
"ResultBufferDependency") {}
+ ResultBufferDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "ResultBufferDependency") {}
~ResultBufferDependency() override = default;
void* shared_state() override { return nullptr; }
@@ -55,7 +56,8 @@ public:
class ResultQueueDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ResultQueueDependency);
- ResultQueueDependency(int id) : WriteDependency(id,
"ResultQueueDependency") {}
+ ResultQueueDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "ResultQueueDependency") {}
~ResultQueueDependency() override = default;
void* shared_state() override { return nullptr; }
@@ -64,7 +66,9 @@ public:
class CancelDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(CancelDependency);
- CancelDependency(int id) : WriteDependency(id, "CancelDependency") {
_ready_for_write = false; }
+ CancelDependency(int id, int node_id) : WriteDependency(id, node_id,
"CancelDependency") {
+ _ready_for_write = false;
+ }
~CancelDependency() override = default;
void* shared_state() override { return nullptr; }
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 601bf5b8b9f..d2953d3593b 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -122,11 +122,11 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
- _source_dependency =
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
+ _source_dependency =
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+
PipelineXLocalState<>::_parent->node_id());
- _open_dependency =
OpenDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
- _source_dependency->add_child(_open_dependency);
- _eos_dependency =
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
+ _eos_dependency =
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+
PipelineXLocalState<>::_parent->node_id());
_source_dependency->add_child(_eos_dependency);
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(state, info.scan_ranges);
@@ -168,7 +168,7 @@ template <typename Derived>
Status ScanLocalState<Derived>::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- if (_open_dependency == nullptr) {
+ if (_opened) {
return Status::OK();
}
RETURN_IF_ERROR(_acquire_runtime_filter());
@@ -177,12 +177,12 @@ Status ScanLocalState<Derived>::open(RuntimeState* state)
{
auto status =
_eos_dependency->read_blocked_by() == nullptr ? Status::OK() :
_prepare_scanners();
if (_scanner_ctx) {
+ _finish_dependency->should_finish_after_check();
DCHECK(_eos_dependency->read_blocked_by() != nullptr &&
_num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
- _source_dependency->remove_first_child();
- _open_dependency = nullptr;
+ _opened = true;
return status;
}
@@ -1181,11 +1181,10 @@ Status ScanLocalState<Derived>::_start_scanners(
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
p._col_distribute_ids, 1);
- _scanner_done_dependency =
- ScannerDoneDependency::create_shared(p.operator_id(),
_scanner_ctx.get());
+ _scanner_done_dependency =
ScannerDoneDependency::create_shared(p.operator_id(), p.node_id());
_source_dependency->add_child(_scanner_done_dependency);
_data_ready_dependency =
- DataReadyDependency::create_shared(p.operator_id(),
_scanner_ctx.get());
+ DataReadyDependency::create_shared(p.operator_id(), p.node_id(),
_scanner_ctx.get());
_source_dependency->add_child(_data_ready_dependency);
_scanner_ctx->set_dependency(_data_ready_dependency,
_scanner_done_dependency,
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 68d006006f6..66543dc7ffd 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -56,58 +56,34 @@ public:
Status try_close(RuntimeState* state) override;
};
-struct OpenDependency final : public Dependency {
-public:
- ENABLE_FACTORY_CREATOR(OpenDependency);
- OpenDependency(int id) : Dependency(id, "OpenDependency") {}
- void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
- [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
-};
-
class EosDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(EosDependency);
- EosDependency(int id) : Dependency(id, "EosDependency") {}
+ EosDependency(int id, int node_id) : Dependency(id, node_id,
"EosDependency") {}
void* shared_state() override { return nullptr; }
};
class ScannerDoneDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
- ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx)
- : Dependency(id, "ScannerDoneDependency"),
_scanner_ctx(scanner_ctx) {}
+ ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id,
"ScannerDoneDependency") {}
void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override {
- return _scanner_ctx->done() ? nullptr : this;
- }
- void set_ready_for_read() override {
- // ScannerContext is set done outside this function now and only stop
watcher here.
- _read_dependency_watcher.stop();
- }
-
-private:
- vectorized::ScannerContext* _scanner_ctx;
};
class DataReadyDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(DataReadyDependency);
- DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx)
- : Dependency(id, "DataReadyDependency"), _scanner_ctx(scanner_ctx)
{}
+ DataReadyDependency(int id, int node_id, vectorized::ScannerContext*
scanner_ctx)
+ : Dependency(id, node_id, "DataReadyDependency"),
_scanner_ctx(scanner_ctx) {}
void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override {
+ // TODO(gabriel):
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (_scanner_ctx->get_num_running_scanners() == 0 &&
_scanner_ctx->should_be_scheduled()) {
_scanner_ctx->reschedule_scanner_ctx();
}
- if (config::enable_fuzzy_mode && !_ready_for_read &&
- _should_log(_read_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _ready_for_read ? nullptr : this;
+ return Dependency::read_blocked_by(task);
}
private:
@@ -151,7 +127,7 @@ protected:
virtual Status _init_profile() = 0;
- std::shared_ptr<OpenDependency> _open_dependency;
+ std::atomic<bool> _opened {false};
std::shared_ptr<EosDependency> _eos_dependency;
std::shared_ptr<OrDependency> _source_dependency;
std::shared_ptr<ScannerDoneDependency> _scanner_done_dependency;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 36503df2db8..de9b48362c3 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -72,6 +72,7 @@ template class SetProbeSinkOperator<false>;
template <bool is_intersect>
Status SetProbeSinkOperatorX<is_intersect>::init(const TPlanNode& tnode,
RuntimeState* state) {
+ DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::_name =
"SET_PROBE_SINK_OPERATOR";
const std::vector<std::vector<TExpr>>* result_texpr_lists;
// Create result_expr_ctx_lists_ from thrift exprs.
@@ -109,11 +110,6 @@ Status
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- if (_cur_child_id > 1) {
-
CHECK(local_state._shared_state->probe_finished_children_index[_cur_child_id -
1])
- << fmt::format("child with id: {} should be probed first",
_cur_child_id);
- }
-
auto probe_rows = in_block->rows();
if (probe_rows > 0) {
RETURN_IF_ERROR(_extract_probe_column(local_state, *in_block,
local_state._probe_columns,
@@ -202,7 +198,6 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
SetProbeSinkLocalState<is_intersect>& local_state) {
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;
auto& hash_table_variants = local_state._shared_state->hash_table_variants;
- auto& probe_finished_children_index =
local_state._shared_state->probe_finished_children_index;
if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) {
_refresh_hash_table(local_state);
@@ -223,7 +218,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
} else {
local_state._dependency->set_ready_for_read();
}
- probe_finished_children_index[_cur_child_id] = true;
+ local_state._shared_state->set_probe_finished_children(_cur_child_id);
}
template <bool is_intersect>
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 90cc792d471..52bd8aa3cf7 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -90,7 +90,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
},
*local_state._shared_state->hash_table_variants);
}
-
local_state._shared_state->probe_finished_children_index[_cur_child_id] = true;
+
local_state._shared_state->set_probe_finished_children(_cur_child_id);
if (_child_quantity == 1) {
local_state._dependency->set_ready_for_read();
}
@@ -171,7 +171,6 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
}
_shared_state->child_quantity = parent._child_quantity;
-
_shared_state->probe_finished_children_index.assign(parent._child_quantity,
false);
auto& child_exprs_lists = _shared_state->child_exprs_lists;
DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() ==
parent._child_quantity);
@@ -192,6 +191,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode,
RuntimeState* state) {
+ Base::_name = "SET_SINK_OPERATOR";
const std::vector<std::vector<TExpr>>* result_texpr_lists;
// Create result_expr_ctx_lists_ from thrift exprs.
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index d3840285c38..6e5e0140365 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -52,8 +52,8 @@ template class SetSourceOperator<false>;
template <bool is_intersect>
Status SetSourceLocalState<is_intersect>::init(RuntimeState* state,
LocalStateInfo& info) {
std::shared_ptr<typename SetDependency::SharedState> ss = nullptr;
- ss.reset(new typename SetDependency::SharedState());
auto& deps = info.dependencys;
+ ss.reset(new typename SetDependency::SharedState(deps.size()));
for (auto& dep : deps) {
((SetDependency*)dep.get())->set_shared_state(ss);
}
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 8e5179734cb..d1515faa1b7 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -114,7 +114,8 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
DCHECK(deps.size() == 1);
DCHECK(deps.front() == nullptr);
//child_count == 0 , we need to creat a UnionDependency
- deps.front() =
std::make_shared<UnionDependency>(_parent->operator_id());
+ deps.front() =
+ std::make_shared<UnionDependency>(_parent->operator_id(),
_parent->node_id());
((UnionDependency*)deps.front().get())->set_shared_state(ss);
}
RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9610122bc02..be4fd6c5aea 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -126,7 +126,8 @@ PipelineFragmentContext::PipelineFragmentContext(
_call_back(call_back),
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
- _group_commit(group_commit) {
+ _group_commit(group_commit),
+ _create_time(MonotonicNanos()) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 85834e513f9..aa2f139c507 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -109,8 +109,6 @@ public:
void close_a_pipeline();
- std::string to_http_path(const std::string& file_name);
-
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
@@ -145,6 +143,10 @@ public:
}
void refresh_next_report_time();
+ virtual std::string debug_string() { return ""; }
+
+ uint64_t create_time() const { return _create_time; }
+
protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink,
RuntimeState* state);
Status _build_pipelines(ExecNode*, PipelinePtr);
@@ -222,6 +224,8 @@ private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
std::vector<std::unique_ptr<PipelineTask>> _tasks;
bool _group_commit;
+
+ uint64_t _create_time;
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 690c4e3419d..947f3418802 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -251,6 +251,11 @@ public:
void set_parent_profile(RuntimeProfile* profile) { _parent_profile =
profile; }
+ virtual bool is_pipelineX() const { return false; }
+
+ bool is_running() { return _running.load(); }
+ void set_running(bool running) { _running = running; }
+
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
@@ -359,5 +364,7 @@ private:
OperatorPtr _source;
OperatorPtr _root;
OperatorPtr _sink;
+
+ std::atomic<bool> _running {false};
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 32bd06f5983..a06fadef03b 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -21,10 +21,177 @@
#include <mutex>
#include "common/logging.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/memory/mem_tracker.h"
namespace doris::pipeline {
+void Dependency::add_block_task(PipelineXTask* task) {
+ // TODO(gabriel): support read dependency
+ if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] ==
task) {
+ return;
+ }
+ _blocked_task.push_back(task);
+}
+
+void WriteDependency::add_write_block_task(PipelineXTask* task) {
+ DCHECK(_write_blocked_task.empty() ||
+ _write_blocked_task[_write_blocked_task.size() - 1] != task)
+ << "Duplicate task: " << task->debug_string();
+ _write_blocked_task.push_back(task);
+}
+
+void FinishDependency::add_block_task(PipelineXTask* task) {
+ DCHECK(_finish_blocked_task.empty() ||
+ _finish_blocked_task[_finish_blocked_task.size() - 1] != task)
+ << "Duplicate task: " << task->debug_string();
+ _finish_blocked_task.push_back(task);
+}
+
+void RuntimeFilterDependency::add_block_task(PipelineXTask* task) {
+ DCHECK(_filter_blocked_task.empty() ||
+ _filter_blocked_task[_filter_blocked_task.size() - 1] != task)
+ << "Duplicate task: " << task->debug_string();
+ DCHECK(_blocked_by_rf) << "It is not allowed: task: " <<
task->debug_string()
+ << " \n dependency: " << debug_string()
+ << " \n state: " <<
get_state_name(task->get_state());
+ _filter_blocked_task.push_back(task);
+}
+
+void Dependency::set_ready_for_read() {
+ if (_ready_for_read) {
+ return;
+ }
+ _read_dependency_watcher.stop();
+ std::vector<PipelineXTask*> local_block_task {};
+ {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (_ready_for_read) {
+ return;
+ }
+ _ready_for_read = true;
+ local_block_task.swap(_blocked_task);
+ }
+}
+
+void WriteDependency::set_ready_for_write() {
+ if (_ready_for_write) {
+ return;
+ }
+ _write_dependency_watcher.stop();
+
+ std::vector<PipelineXTask*> local_block_task {};
+ {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (_ready_for_write) {
+ return;
+ }
+ _ready_for_write = true;
+ local_block_task.swap(_write_blocked_task);
+ }
+ for (auto* task : local_block_task) {
+ task->try_wake_up(this);
+ }
+}
+
+void FinishDependency::set_ready_to_finish() {
+ if (_ready_to_finish) {
+ return;
+ }
+ _finish_dependency_watcher.stop();
+
+ std::vector<PipelineXTask*> local_block_task {};
+ {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (_ready_to_finish) {
+ return;
+ }
+ _ready_to_finish = true;
+ local_block_task.swap(_finish_blocked_task);
+ }
+ for (auto* task : local_block_task) {
+ task->try_wake_up(this);
+ }
+}
+
+Dependency* Dependency::read_blocked_by(PipelineXTask* task) {
+ if (config::enable_fuzzy_mode && !_ready_for_read &&
+ _should_log(_read_dependency_watcher.elapsed_time())) {
+ LOG(WARNING) << "========Dependency may be blocked by some reasons: "
<< name() << " "
+ << _node_id << " block tasks: " << _blocked_task.size()
+ << " write block tasks: "
+ << (is_write_dependency()
+ ?
((WriteDependency*)this)->_write_blocked_task.size()
+ : 0)
+ << " write done: "
+ << (is_write_dependency() ?
((WriteDependency*)this)->_ready_for_write.load()
+ : true)
+ << "task: " << (task ?
task->fragment_context()->debug_string() : "");
+ }
+
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready_for_read = _ready_for_read.load();
+ if (!ready_for_read && task) {
+ add_block_task(task);
+ }
+ return ready_for_read ? nullptr : this;
+}
+
+RuntimeFilterDependency*
RuntimeFilterDependency::filter_blocked_by(PipelineXTask* task) {
+ if (!_blocked_by_rf) {
+ return nullptr;
+ }
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (*_blocked_by_rf) {
+ if (LIKELY(task)) {
+ add_block_task(task);
+ }
+ return this;
+ }
+ return nullptr;
+}
+
+FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (!_ready_to_finish && task) {
+ add_block_task(task);
+ }
+ return _ready_to_finish ? nullptr : this;
+}
+
+WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ const auto ready_for_write = _ready_for_write.load();
+ if (!ready_for_write && task) {
+ add_write_block_task(task);
+ }
+ return ready_for_write ? nullptr : this;
+}
+
+Dependency* OrDependency::read_blocked_by(PipelineXTask* task) {
+ // TODO(gabriel):
+ for (auto& child : _children) {
+ auto* cur_res = child->read_blocked_by(nullptr);
+ if (cur_res == nullptr) {
+ return nullptr;
+ }
+ }
+ return this;
+}
+
+WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) {
+ for (auto& child : _children) {
+ CHECK(child->is_write_dependency());
+ auto* cur_res =
((WriteDependency*)child.get())->write_blocked_by(nullptr);
+ if (cur_res == nullptr) {
+ return nullptr;
+ }
+ }
+ return this;
+}
+
template Status HashJoinDependency::extract_join_column<true>(
vectorized::Block&,
COW<vectorized::IColumn>::mutable_ptr<vectorized::ColumnVector<unsigned char>>&,
@@ -39,17 +206,43 @@ template Status
HashJoinDependency::extract_join_column<false>(
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}",
- std::string(indentation_level * 2, ' '), _name, _id,
- read_blocked_by() == nullptr);
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {},
_ready_for_read={}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
+ _ready_for_read);
+ return fmt::to_string(debug_string_buffer);
+}
+
+std::string WriteDependency::debug_string(int indentation_level) {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "{}{}: id={}, read block task = {},write block "
+ "task = {}, _ready_for_write = {}, _ready_for_read = {}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
+ _write_blocked_task.size(), _ready_for_write,
_ready_for_read);
+ return fmt::to_string(debug_string_buffer);
+}
+
+std::string FinishDependency::debug_string(int indentation_level) {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {},
_ready_to_finish = {}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
+ _finish_blocked_task.size(), _ready_to_finish);
+ return fmt::to_string(debug_string_buffer);
+}
+
+std::string RuntimeFilterDependency::debug_string(int indentation_level) {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(
+ debug_string_buffer, "{}{}: id={}, block task = {}, _blocked_by_rf
= {}, _filters = {}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_filter_blocked_task.size(),
+ _blocked_by_rf ? _blocked_by_rf->load() : false, _filters);
return fmt::to_string(debug_string_buffer);
}
std::string AndDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[",
- std::string(indentation_level * 2, ' '), _name, _id,
- read_blocked_by() == nullptr);
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
+ std::string(indentation_level * 2, ' '), _name, _node_id);
for (auto& child : _children) {
fmt::format_to(debug_string_buffer, "{}, \n",
child->debug_string(indentation_level = 1));
}
@@ -59,9 +252,8 @@ std::string AndDependency::debug_string(int
indentation_level) {
std::string OrDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[",
- std::string(indentation_level * 2, ' '), _name, _id,
- read_blocked_by() == nullptr);
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
+ std::string(indentation_level * 2, ' '), _name, _node_id);
for (auto& child : _children) {
fmt::format_to(debug_string_buffer, "{}, \n",
child->debug_string(indentation_level = 1));
}
@@ -294,6 +486,12 @@ std::vector<uint16_t>
HashJoinDependency::convert_block_to_null(vectorized::Bloc
return results;
}
+void SetSharedState::set_probe_finished_children(int child_id) {
+ if (child_id + 1 < probe_finished_children_dependency.size()) {
+ probe_finished_children_dependency[child_id +
1]->set_ready_for_write();
+ }
+}
+
template <bool BuildSide>
Status HashJoinDependency::extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
@@ -433,9 +631,17 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter*
runtime_filter) {
}
void RuntimeFilterDependency::sub_filters() {
- _filters--;
- if (_filters == 0) {
- *_blocked_by_rf = false;
+ auto value = _filters.fetch_sub(1);
+ if (value == 1) {
+ std::vector<PipelineXTask*> local_block_task {};
+ {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ *_blocked_by_rf = false;
+ local_block_task.swap(_filter_blocked_task);
+ }
+ for (auto* task : local_block_task) {
+ task->try_wake_up(this);
+ }
}
}
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index e9635253b2b..11a8975b306 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -40,19 +40,22 @@
#include "vec/exec/vanalytic_eval_node.h"
#include "vec/exec/vpartition_sort_node.h"
-namespace doris {
-namespace pipeline {
+namespace doris::pipeline {
class Dependency;
+class PipelineXTask;
using DependencySPtr = std::shared_ptr<Dependency>;
-static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 10 * 1000L * 1000L * 1000L;
-static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 5 * 1000L * 1000L * 1000L;
+static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
+static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
+
class Dependency : public std::enable_shared_from_this<Dependency> {
public:
- Dependency(int id, std::string name) : _id(id), _name(name),
_ready_for_read(false) {}
+ Dependency(int id, int node_id, std::string name)
+ : _id(id), _node_id(node_id), _name(std::move(name)),
_ready_for_read(false) {}
virtual ~Dependency() = default;
+ virtual bool is_or_dep() { return false; }
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
virtual void* shared_state() = 0;
@@ -72,35 +75,19 @@ public:
}
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
- [[nodiscard]] virtual Dependency* read_blocked_by() {
- if (config::enable_fuzzy_mode && !_ready_for_read &&
- _should_log(_read_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _ready_for_read ? nullptr : this;
- }
+ [[nodiscard]] virtual Dependency* read_blocked_by(PipelineXTask* task =
nullptr);
// Notify downstream pipeline tasks this dependency is ready.
- virtual void set_ready_for_read() {
- if (_ready_for_read) {
- return;
- }
- _read_dependency_watcher.stop();
- _ready_for_read = true;
- }
+ virtual void set_ready_for_read();
// Notify downstream pipeline tasks this dependency is blocked.
virtual void block_reading() { _ready_for_read = false; }
void set_parent(std::weak_ptr<Dependency> parent) { _parent = parent; }
- void add_child(std::shared_ptr<Dependency> child) {
- _children.push_back(child);
- child->set_parent(weak_from_this());
- }
+ virtual void add_child(std::shared_ptr<Dependency> child) {
_children.push_back(child); }
- void remove_first_child() { _children.erase(_children.begin()); }
+ virtual void add_block_task(PipelineXTask* task);
protected:
bool _should_log(uint64_t cur_time) {
@@ -115,6 +102,7 @@ protected:
}
int _id;
+ const int _node_id;
std::string _name;
std::atomic<bool> _ready_for_read;
MonotonicStopWatch _read_dependency_watcher;
@@ -124,15 +112,16 @@ protected:
std::list<std::shared_ptr<Dependency>> _children;
uint64_t _last_log_time = 0;
+ std::mutex _task_lock;
+ std::vector<PipelineXTask*> _blocked_task;
};
class WriteDependency : public Dependency {
public:
- WriteDependency(int id, std::string name) : Dependency(id, name),
_ready_for_write(true) {}
+ WriteDependency(int id, int node_id, std::string name) : Dependency(id,
node_id, name) {}
~WriteDependency() override = default;
bool is_write_dependency() override { return true; }
-
void start_write_watcher() {
for (auto& child : _children) {
CHECK(child->is_write_dependency());
@@ -145,36 +134,30 @@ public:
return _write_dependency_watcher.elapsed_time();
}
- [[nodiscard]] virtual WriteDependency* write_blocked_by() {
- if (config::enable_fuzzy_mode && !_ready_for_write &&
- _should_log(_write_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _ready_for_write ? nullptr : this;
- }
+ [[nodiscard]] virtual WriteDependency* write_blocked_by(PipelineXTask*
task);
- virtual void set_ready_for_write() {
- if (_ready_for_write) {
- return;
- }
- _write_dependency_watcher.stop();
- _ready_for_write = true;
- }
+ virtual void set_ready_for_write();
virtual void block_writing() { _ready_for_write = false; }
+ std::string debug_string(int indentation_level = 0) override;
+ void add_write_block_task(PipelineXTask* task);
+
protected:
- std::atomic<bool> _ready_for_write;
+ friend class Dependency;
+ std::atomic<bool> _ready_for_write {true};
MonotonicStopWatch _write_dependency_watcher;
+
+private:
+ std::vector<PipelineXTask*> _write_blocked_task;
};
class FinishDependency final : public Dependency {
public:
- FinishDependency(int id, int node_id, std::string name)
- : Dependency(id, name), _ready_to_finish(true), _node_id(node_id)
{}
+ FinishDependency(int id, int node_id, std::string name) : Dependency(id,
node_id, name) {}
~FinishDependency() override = default;
+ void should_finish_after_check() { _ready_to_finish = false; }
void start_finish_watcher() {
for (auto& child : _children) {
((FinishDependency*)child.get())->start_finish_watcher();
@@ -186,31 +169,19 @@ public:
return _finish_dependency_watcher.elapsed_time();
}
- [[nodiscard]] FinishDependency* finish_blocked_by() {
- if (config::enable_fuzzy_mode && !_ready_to_finish &&
- _should_log(_finish_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << _node_id;
- }
- return _ready_to_finish ? nullptr : this;
- }
-
- void set_ready_to_finish() {
- if (_ready_to_finish) {
- return;
- }
- _finish_dependency_watcher.stop();
- _ready_to_finish = true;
- }
+ [[nodiscard]] FinishDependency* finish_blocked_by(PipelineXTask* task);
- void block_finishing() { _ready_to_finish = false; }
+ void set_ready_to_finish();
void* shared_state() override { return nullptr; }
+ std::string debug_string(int indentation_level = 0) override;
-protected:
- std::atomic<bool> _ready_to_finish;
+ void add_block_task(PipelineXTask* task) override;
+
+private:
+ std::atomic<bool> _ready_to_finish {true};
MonotonicStopWatch _finish_dependency_watcher;
- const int _node_id;
+ std::vector<PipelineXTask*> _finish_blocked_task;
};
class RuntimeFilterDependency;
@@ -246,37 +217,34 @@ private:
const int32_t _wait_time_ms;
IRuntimeFilter* _runtime_filter;
};
+
class RuntimeFilterDependency final : public Dependency {
public:
RuntimeFilterDependency(int id, int node_id, std::string name)
- : Dependency(id, name), _node_id(node_id) {}
-
- RuntimeFilterDependency* filter_blocked_by() {
- if (!_blocked_by_rf) {
- return nullptr;
- }
- if (*_blocked_by_rf) {
- return this;
- }
- return nullptr;
- }
+ : Dependency(id, node_id, name) {}
+ RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
void* shared_state() override { return nullptr; }
void add_filters(IRuntimeFilter* runtime_filter);
void sub_filters();
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
_blocked_by_rf = blocked_by_rf;
}
+ std::string debug_string(int indentation_level = 0) override;
+
+ void add_block_task(PipelineXTask* task) override;
protected:
- const int _node_id;
std::atomic_int _filters;
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
+
+private:
+ std::vector<PipelineXTask*> _filter_blocked_task;
};
class AndDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(AndDependency);
- AndDependency(int id) : WriteDependency(id, "AndDependency") {}
+ AndDependency(int id, int node_id) : WriteDependency(id, node_id,
"AndDependency") {}
[[nodiscard]] std::string name() const override {
fmt::memory_buffer debug_string_buffer;
@@ -292,19 +260,19 @@ public:
std::string debug_string(int indentation_level = 0) override;
- [[nodiscard]] Dependency* read_blocked_by() override {
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
for (auto& child : _children) {
- if (auto* dep = child->read_blocked_by()) {
+ if (auto* dep = child->read_blocked_by(task)) {
return dep;
}
}
return nullptr;
}
- [[nodiscard]] WriteDependency* write_blocked_by() override {
+ [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task)
override {
for (auto& child : _children) {
CHECK(child->is_write_dependency());
- if (auto* dep =
((WriteDependency*)child.get())->write_blocked_by()) {
+ if (auto* dep =
((WriteDependency*)child.get())->write_blocked_by(task)) {
return dep;
}
}
@@ -315,7 +283,7 @@ public:
class OrDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(OrDependency);
- OrDependency(int id) : WriteDependency(id, "OrDependency") {}
+ OrDependency(int id, int node_id) : WriteDependency(id, node_id,
"OrDependency") {}
[[nodiscard]] std::string name() const override {
fmt::memory_buffer debug_string_buffer;
@@ -331,59 +299,32 @@ public:
std::string debug_string(int indentation_level = 0) override;
- [[nodiscard]] Dependency* read_blocked_by() override {
- Dependency* res = nullptr;
- for (auto& child : _children) {
- auto* cur_res = child->read_blocked_by();
- if (cur_res == nullptr) {
- return nullptr;
- } else {
- res = cur_res;
- }
- }
- return res;
- }
+ bool is_or_dep() override { return true; }
- [[nodiscard]] WriteDependency* write_blocked_by() override {
- WriteDependency* res = nullptr;
- for (auto& child : _children) {
- CHECK(child->is_write_dependency());
- auto* cur_res =
((WriteDependency*)child.get())->write_blocked_by();
- if (cur_res == nullptr) {
- return nullptr;
- } else {
- res = cur_res;
- }
- }
- return res;
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override;
+
+ [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task)
override;
+
+ void add_child(std::shared_ptr<Dependency> child) override {
+ WriteDependency::add_child(child);
+ child->set_parent(weak_from_this());
}
};
struct FakeSharedState {};
struct FakeDependency final : public WriteDependency {
public:
- FakeDependency(int id) : WriteDependency(id, "FakeDependency") {}
+ FakeDependency(int id, int node_id) : WriteDependency(id, node_id,
"FakeDependency") {}
using SharedState = FakeSharedState;
void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
- [[nodiscard]] WriteDependency* write_blocked_by() override { return
nullptr; }
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
return nullptr; }
+ [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task)
override {
+ return nullptr;
+ }
[[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; }
[[nodiscard]] int64_t write_watcher_elapse_time() override { return 0; }
};
-class AsyncWriterSinkDependency : public WriteDependency {
-public:
- AsyncWriterSinkDependency(int id) : WriteDependency(id,
"AsyncWriterSinkDependency") {}
- using SharedState = FakeSharedState;
- void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; }
- [[nodiscard]] WriteDependency* write_blocked_by() override { return
_call_func(); }
- void set_write_blocked_by(std::function<WriteDependency*()> call_func) {
- _call_func = call_func;
- }
- std::function<WriteDependency*()> _call_func;
-};
-
struct AggSharedState {
public:
AggSharedState() {
@@ -412,7 +353,7 @@ public:
class AggDependency final : public WriteDependency {
public:
using SharedState = AggSharedState;
- AggDependency(int id) : WriteDependency(id, "AggDependency") {
+ AggDependency(int id, int node_id) : WriteDependency(id, node_id,
"AggDependency") {
_mem_tracker = std::make_unique<MemTracker>("AggregateOperator:");
}
~AggDependency() override = default;
@@ -421,20 +362,20 @@ public:
if (_is_streaming_agg_state()) {
if (_agg_state.data_queue->_cur_blocks_nums_in_queue[0] == 0 &&
!_agg_state.data_queue->_is_finished[0]) {
- _ready_for_read = false;
+ Dependency::block_reading();
}
} else {
- _ready_for_read = false;
+ Dependency::block_reading();
}
}
void block_writing() override {
if (_is_streaming_agg_state()) {
if (!_agg_state.data_queue->has_enough_space_to_push()) {
- _ready_for_write = false;
+ WriteDependency::block_writing();
}
} else {
- _ready_for_write = false;
+ WriteDependency::block_writing();
}
}
@@ -518,7 +459,7 @@ public:
class SortDependency final : public WriteDependency {
public:
using SharedState = SortSharedState;
- SortDependency(int id) : WriteDependency(id, "SortDependency") {}
+ SortDependency(int id, int node_id) : WriteDependency(id, node_id,
"SortDependency") {}
~SortDependency() override = default;
void* shared_state() override { return (void*)&_sort_state; };
@@ -538,15 +479,15 @@ public:
class UnionDependency final : public WriteDependency {
public:
using SharedState = UnionSharedState;
- UnionDependency(int id) : WriteDependency(id, "UnionDependency") {}
+ UnionDependency(int id, int node_id) : WriteDependency(id, node_id,
"UnionDependency") {}
~UnionDependency() override = default;
+
void* shared_state() override { return (void*)_union_state.get(); }
void set_shared_state(std::shared_ptr<UnionSharedState> union_state) {
_union_state = union_state;
}
- void set_ready_for_write() override {}
- void set_ready_for_read() override {}
- [[nodiscard]] Dependency* read_blocked_by() override {
+
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (_union_state->child_count() == 0) {
return nullptr;
}
@@ -566,27 +507,20 @@ private:
struct MultiCastSharedState {
public:
MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count)
- : multi_cast_data_streamer(row_desc, pool, cast_sender_count) {}
+ : multi_cast_data_streamer(row_desc, pool, cast_sender_count,
true) {}
pipeline::MultiCastDataStreamer multi_cast_data_streamer;
};
class MultiCastDependency final : public WriteDependency {
public:
using SharedState = MultiCastSharedState;
- MultiCastDependency(int id) : WriteDependency(id, "MultiCastDependency") {}
+ MultiCastDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "MultiCastDependency") {}
~MultiCastDependency() override = default;
void* shared_state() override { return (void*)_multi_cast_state.get(); };
void set_shared_state(std::shared_ptr<MultiCastSharedState>
multi_cast_state) {
_multi_cast_state = multi_cast_state;
}
- WriteDependency* read_blocked_by() override {
- if
(_multi_cast_state->multi_cast_data_streamer.can_read(_consumer_id)) {
- return nullptr;
- }
- return this;
- }
- int _consumer_id {};
- void set_consumer_id(int consumer_id) { _consumer_id = consumer_id; }
private:
std::shared_ptr<MultiCastSharedState> _multi_cast_state;
@@ -617,7 +551,7 @@ public:
class AnalyticDependency final : public WriteDependency {
public:
using SharedState = AnalyticSharedState;
- AnalyticDependency(int id) : WriteDependency(id, "AnalyticDependency") {}
+ AnalyticDependency(int id, int node_id) : WriteDependency(id, node_id,
"AnalyticDependency") {}
~AnalyticDependency() override = default;
void* shared_state() override { return (void*)&_analytic_state; };
@@ -675,7 +609,7 @@ struct HashJoinSharedState : public JoinSharedState {
class HashJoinDependency final : public WriteDependency {
public:
using SharedState = HashJoinSharedState;
- HashJoinDependency(int id) : WriteDependency(id, "HashJoinDependency") {}
+ HashJoinDependency(int id, int node_id) : WriteDependency(id, node_id,
"HashJoinDependency") {}
~HashJoinDependency() override = default;
void* shared_state() override { return (void*)&_join_state; }
@@ -707,7 +641,8 @@ struct NestedLoopJoinSharedState : public JoinSharedState {
class NestedLoopJoinDependency final : public WriteDependency {
public:
using SharedState = NestedLoopJoinSharedState;
- NestedLoopJoinDependency(int id) : WriteDependency(id,
"NestedLoopJoinDependency") {}
+ NestedLoopJoinDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "NestedLoopJoinDependency") {}
~NestedLoopJoinDependency() override = default;
void* shared_state() override { return (void*)&_join_state; }
@@ -727,22 +662,28 @@ public:
class PartitionSortDependency final : public WriteDependency {
public:
using SharedState = PartitionSortNodeSharedState;
- PartitionSortDependency(int id) : WriteDependency(id,
"PartitionSortDependency"), _eos(false) {}
+ PartitionSortDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "PartitionSortDependency"),
_eos(false) {}
~PartitionSortDependency() override = default;
void* shared_state() override { return (void*)&_partition_sort_state; };
- void set_ready_for_write() override {}
- void block_writing() override {}
+ void set_ready_for_write() override {
+ throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
+ }
+ void block_writing() override {
+ throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
+ }
- [[nodiscard]] Dependency* read_blocked_by() override {
- if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
- _should_log(_read_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
+ void block_reading() override {
+ if (_eos) {
+ return;
}
- return _ready_for_read || _eos ? nullptr : this;
+ Dependency::block_reading();
}
- void set_eos() { _eos = true; }
+ void set_eos() {
+ _eos = true;
+ WriteDependency::set_ready_for_read();
+ }
private:
PartitionSortNodeSharedState _partition_sort_state;
@@ -752,12 +693,17 @@ private:
class AsyncWriterDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
- AsyncWriterDependency(int id) : WriteDependency(id,
"AsyncWriterDependency") {}
+ AsyncWriterDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "AsyncWriterDependency") {}
~AsyncWriterDependency() override = default;
void* shared_state() override { return nullptr; }
};
+class SetDependency;
+
struct SetSharedState {
+public:
+ SetSharedState(int num_deps) {
probe_finished_children_dependency.resize(num_deps, nullptr); }
/// default init
//record memory during running
int64_t mem_used = 0;
@@ -782,14 +728,15 @@ struct SetSharedState {
/// init in build side
int child_quantity;
vectorized::VExprContextSPtrs build_child_exprs;
- std::vector<bool> probe_finished_children_index; // use in probe side
+ std::vector<SetDependency*> probe_finished_children_dependency;
/// init in probe side
std::vector<vectorized::VExprContextSPtrs> probe_child_exprs_lists;
std::atomic<bool> ready_for_read = false;
-public:
+ void set_probe_finished_children(int child_id);
+
/// called in setup_local_state
void hash_table_init() {
if (child_exprs_lists[0].size() == 1 && (!build_not_ignore_null[0])) {
@@ -845,28 +792,24 @@ public:
class SetDependency final : public WriteDependency {
public:
using SharedState = SetSharedState;
- SetDependency(int id) : WriteDependency(id, "SetDependency") {}
+ SetDependency(int id, int node_id) : WriteDependency(id, node_id,
"SetDependency") {}
~SetDependency() override = default;
void* shared_state() override { return (void*)_set_state.get(); }
void set_shared_state(std::shared_ptr<SetSharedState> set_state) {
_set_state = set_state; }
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
- [[nodiscard]] Dependency* read_blocked_by() override {
+ [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
_should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
+ << id() << " " << _node_id << " block tasks: " <<
_blocked_task.size();
}
- return _set_state->ready_for_read ? nullptr : this;
- }
-
- [[nodiscard]] WriteDependency* write_blocked_by() override {
- if (is_set_probe) {
- DCHECK((_cur_child_id - 1) <
_set_state->probe_finished_children_index.size());
- return _set_state->probe_finished_children_index[_cur_child_id -
1] ? nullptr : this;
+ std::unique_lock<std::mutex> lc(_task_lock);
+ if (!_set_state->ready_for_read && task) {
+ add_block_task(task);
}
- return nullptr;
+ return _set_state->ready_for_read ? nullptr : this;
}
// Notify downstream pipeline tasks this dependency is ready.
@@ -877,15 +820,16 @@ public:
_read_dependency_watcher.stop();
_set_state->ready_for_read = true;
}
+
void set_cur_child_id(int id) {
- _cur_child_id = id;
- is_set_probe = true;
+ _set_state->probe_finished_children_dependency[id] = this;
+ if (id != 0) {
+ block_writing();
+ }
}
private:
std::shared_ptr<SetSharedState> _set_state;
- int _cur_child_id;
- bool is_set_probe {false};
};
using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
@@ -894,14 +838,37 @@ struct LocalExchangeSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+ std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
+ void add_running_sink_operators() { running_sink_operators++; }
+ void sub_running_sink_operators() {
+ auto val = running_sink_operators.fetch_sub(1);
+ if (val == 1) {
+ _set_ready_for_read();
+ }
+ }
+ void _set_ready_for_read() {
+ for (auto* dep : source_dependencies) {
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
+ }
+ void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+ source_dependencies[channel_id] = dep;
+ dep->block_reading();
+ }
+ void set_ready_for_read(int channel_id) {
+ auto* dep = source_dependencies[channel_id];
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
};
struct LocalExchangeDependency final : public WriteDependency {
public:
using SharedState = LocalExchangeSharedState;
- LocalExchangeDependency(int id)
- : WriteDependency(id, "LocalExchangeDependency"),
+ LocalExchangeDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "LocalExchangeDependency"),
_local_exchange_shared_state(nullptr) {}
~LocalExchangeDependency() override = default;
void* shared_state() override { return _local_exchange_shared_state.get();
}
@@ -910,27 +877,8 @@ public:
_local_exchange_shared_state = state;
}
- void set_channel_id(int channel_id) { _channel_id = channel_id; }
-
- Dependency* read_blocked_by() override {
- if (config::enable_fuzzy_mode && !_should_run() &&
- _should_log(_read_dependency_watcher.elapsed_time())) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _should_run() ? nullptr : this;
- }
-
private:
- bool _should_run() const {
- DCHECK(_local_exchange_shared_state != nullptr);
- return
_local_exchange_shared_state->data_queue[_channel_id].size_approx() > 0 ||
- _local_exchange_shared_state->running_sink_operators == 0;
- }
-
std::shared_ptr<LocalExchangeSharedState> _local_exchange_shared_state;
- int _channel_id;
};
-} // namespace pipeline
-} // namespace doris
+} // namespace doris::pipeline
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index eba75c0fc1f..a793a22761f 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,7 +27,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
- _shared_state->running_sink_operators++;
+ _shared_state->add_running_sink_operators();
return Status::OK();
}
@@ -60,6 +60,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState*
state,
if (size > 0) {
data_queue[i].enqueue({new_block, {row_idx, start, size}});
}
+ _shared_state->set_ready_for_read(i);
}
return Status::OK();
@@ -83,7 +84,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block*
}
if (source_state == SourceState::FINISHED) {
- local_state._shared_state->running_sink_operators--;
+ local_state._shared_state->sub_running_sink_operators();
}
return Status::OK();
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 127e14dba6b..a1bff19cb2b 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -27,7 +27,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
_shared_state = (LocalExchangeSharedState*)_dependency->shared_state();
DCHECK(_shared_state != nullptr);
_channel_id = info.task_idx;
- _dependency->set_channel_id(_channel_id);
+ _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime",
TUnit::UNIT, 1);
_copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 9e6df06da01..cc21a389651 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -288,7 +288,7 @@ void
DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d
if constexpr (!std::is_same_v<typename LocalStateType::Dependency,
FakeDependency>) {
auto& dests = dests_id();
for (auto& dest_id : dests) {
- dependency.push_back(std::make_shared<DependencyType>(dest_id));
+ dependency.push_back(std::make_shared<DependencyType>(dest_id,
_node_id));
}
} else {
dependency.push_back(nullptr);
@@ -341,7 +341,7 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
}
} else {
auto& deps = info.dependencys;
- deps.front() = std::make_shared<FakeDependency>(0);
+ deps.front() = std::make_shared<FakeDependency>(0, 0);
_dependency = (DependencyType*)deps.front().get();
}
@@ -403,7 +403,7 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
}
} else {
auto& deps = info.dependencys;
- deps.front() = std::make_shared<FakeDependency>(0);
+ deps.front() = std::make_shared<FakeDependency>(0, 0);
_dependency = (DependencyType*)deps.front().get();
}
_rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows",
TUnit::UNIT, 1);
@@ -481,15 +481,14 @@ Status AsyncWriterSink<Writer,
Parent>::init(RuntimeState* state, LocalSinkState
RETURN_IF_ERROR(
_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
}
-
static_cast<AsyncWriterSinkDependency*>(_dependency)->set_write_blocked_by([this]()
{
- return this->write_blocked_by();
- });
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
- _async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->operator_id());
+ _async_writer_dependency =
+ AsyncWriterDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
+ _finish_dependency->should_finish_after_check();
return Status::OK();
}
@@ -507,8 +506,8 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState*
state, vectorized::Bl
}
template <typename Writer, typename Parent>
-WriteDependency* AsyncWriterSink<Writer, Parent>::write_blocked_by() {
- return _writer->write_blocked_by();
+WriteDependency* AsyncWriterSink<Writer,
Parent>::write_blocked_by(PipelineXTask* task) {
+ return _writer->write_blocked_by(task);
}
template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 265579bfaab..4e8f030d505 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -625,9 +625,9 @@ public:
};
template <typename Writer, typename Parent>
-class AsyncWriterSink : public
PipelineXSinkLocalState<AsyncWriterSinkDependency> {
+class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {
public:
- using Base = PipelineXSinkLocalState<AsyncWriterSinkDependency>;
+ using Base = PipelineXSinkLocalState<FakeDependency>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {}
@@ -637,8 +637,8 @@ public:
Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state);
- WriteDependency* write_blocked_by();
-
+ WriteDependency* write_blocked_by(PipelineXTask* task);
+ WriteDependency* dependency() override { return
_async_writer_dependency.get(); }
Status close(RuntimeState* state, Status exec_status) override;
Status try_close(RuntimeState* state, Status exec_status) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7113989ee1e..28e1be496f4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -422,8 +422,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto task = std::make_unique<PipelineXTask>(
_pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
_runtime_states[i]->runtime_profile(),
- _op_id_to_le_state.count(
-
_pipelines[pip_idx]->operator_xs().front()->operator_id()) > 0
+ _op_id_to_le_state.contains(
+
_pipelines[pip_idx]->operator_xs().front()->operator_id())
? _op_id_to_le_state
[_pipelines[pip_idx]->operator_xs().front()->operator_id()]
: nullptr,
@@ -592,13 +592,14 @@ Status
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new LocalExchangeSinkOperatorX(
- local_exchange_id, _runtime_state->query_parallel_instance_num(),
texprs));
+ auto num_instances = _runtime_state->query_parallel_instance_num();
+ sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id,
num_instances, texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init());
auto shared_state = LocalExchangeSharedState::create_shared();
-
shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num());
+ shared_state->data_queue.resize(num_instances);
+ shared_state->source_dependencies.resize(num_instances, nullptr);
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
@@ -1029,4 +1030,18 @@ bool
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableS
return false;
}
+std::string PipelineXFragmentContext::debug_string() {
+ fmt::memory_buffer debug_string_buffer;
+ for (size_t j = 0; j < _tasks.size(); j++) {
+ fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
+ for (size_t i = 0; i < _tasks[j].size(); i++) {
+ if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) {
+ continue;
+ }
+ fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
_tasks[j][i]->debug_string());
+ }
+ }
+
+ return fmt::to_string(debug_string_buffer);
+}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 6fa91aedf12..9e7ff42219f 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -115,6 +115,8 @@ public:
[[nodiscard]] int max_operator_id() const { return _operator_id; }
+ std::string debug_string() override;
+
private:
void _close_action() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 140fecbb1d5..7295f38a124 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -91,14 +91,14 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
}
_block = doris::vectorized::Block::create_unique();
- RETURN_IF_ERROR(extract_dependencies());
+ RETURN_IF_ERROR(_extract_dependencies());
// We should make sure initial state for task are runnable so that we can
do some preparation jobs (e.g. initialize runtime filters).
set_state(PipelineTaskState::RUNNABLE);
_prepared = true;
return Status::OK();
}
-Status PipelineXTask::extract_dependencies() {
+Status PipelineXTask::_extract_dependencies() {
for (auto op : _operators) {
auto result = _state->get_local_state_result(op->operator_id());
if (!result) {
@@ -179,7 +179,22 @@ Status PipelineXTask::_open() {
SCOPED_TIMER(_open_timer);
_dry_run = _sink->should_dry_run(_state);
for (auto& o : _operators) {
-
RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state));
+ auto* local_state = _state->get_local_state(o->operator_id());
+ for (size_t i = 0; i < 2; i++) {
+ auto st = local_state->open(_state);
+ if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+ _blocked_dep = _filter_dependency->filter_blocked_by(this);
+ if (_blocked_dep) {
+ set_state(PipelineTaskState::BLOCKED_FOR_RF);
+ set_use_blocking_queue(false);
+ RETURN_IF_ERROR(st);
+ } else if (i == 1) {
+ CHECK(false) << debug_string();
+ }
+ } else {
+ break;
+ }
+ }
}
RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state));
_opened = true;
@@ -204,10 +219,6 @@ Status PipelineXTask::execute(bool* eos) {
SCOPED_RAW_TIMER(&time_spent);
auto st = _open();
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
- set_state(PipelineTaskState::BLOCKED_FOR_RF);
- return Status::OK();
- } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
- set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
RETURN_IF_ERROR(st);
@@ -326,17 +337,10 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));
- fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
- PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
- {
- std::stringstream profile_ss;
- _fresh_profile_counter();
- _task_profile->pretty_print(&profile_ss, "");
- fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
- }
- fmt::format_to(debug_string_buffer,
- "PipelineTask[this = {}, state = {}]\noperators: ",
(void*)this,
- get_state_name(_cur_state));
+ fmt::format_to(
+ debug_string_buffer,
+ "PipelineTask[this = {}, state = {}, data state = {}, dry run =
{}]\noperators: ",
+ (void*)this, get_state_name(_cur_state), (int)_data_state,
_dry_run);
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(
debug_string_buffer, "\n{}",
@@ -345,7 +349,42 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "\n{}",
_opened ? _sink->debug_string(_state, _operators.size())
: _sink->debug_string(_operators.size()));
+ fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
+ for (size_t i = 0; i < _read_dependencies.size(); i++) {
+ fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
+ _read_dependencies[i]->debug_string());
+ }
+
+ fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
+ fmt::format_to(debug_string_buffer, "{}\n",
_write_dependencies->debug_string());
+
+ fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
+ fmt::format_to(debug_string_buffer, "{}\n",
_filter_dependency->debug_string());
+
+ fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
+ for (size_t i = 0; i < _finish_dependencies.size(); i++) {
+ fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '),
+ _finish_dependencies[i]->debug_string());
+ }
return fmt::to_string(debug_string_buffer);
}
+void PipelineXTask::try_wake_up(Dependency* wake_up_dep) {
+ // call by dependency
+ VecDateTimeValue now = VecDateTimeValue::local_time();
+ // TODO(gabriel): task will never be wake up if canceled / timeout
+ if (query_context()->is_cancelled()) {
+ _make_run();
+ return;
+ }
+ if (query_context()->is_timeout(now)) {
+ query_context()->cancel(true, "", Status::Cancelled(""));
+ }
+ _make_run();
+}
+
+void PipelineXTask::_make_run() {
+ static_cast<void>(get_task_queue()->push_back(this));
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 90fdda921f0..bc50f1e89de 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -74,51 +74,21 @@ public:
if (_dry_run) {
return true;
}
- for (auto* op_dep : _read_dependencies) {
- auto* dep = op_dep->read_blocked_by();
- if (dep != nullptr) {
- dep->start_read_watcher();
- push_blocked_task_to_dependency(dep);
- return false;
- }
- }
- return true;
+ return _read_blocked_dependency() == nullptr;
}
bool runtime_filters_are_ready_or_timeout() override {
- auto* dep = _filter_dependency->filter_blocked_by();
- if (dep != nullptr) {
- push_blocked_task_to_dependency(dep);
- return false;
- }
- return true;
+ throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
+ return false;
}
- bool sink_can_write() override {
- auto* dep = _write_dependencies->write_blocked_by();
- if (dep != nullptr) {
- dep->start_write_watcher();
- push_blocked_task_to_dependency(dep);
- return false;
- }
- return true;
- }
+ bool sink_can_write() override { return _write_blocked_dependency() ==
nullptr; }
Status finalize() override;
std::string debug_string() override;
- bool is_pending_finish() override {
- for (auto* fin_dep : _finish_dependencies) {
- auto* dep = fin_dep->finish_blocked_by();
- if (dep != nullptr) {
- dep->start_finish_watcher();
- push_blocked_task_to_dependency(dep);
- return true;
- }
- }
- return false;
- }
+ bool is_pending_finish() override { return _finish_blocked_dependency() !=
nullptr; }
std::vector<DependencySPtr>& get_downstream_dependency() { return
_downstream_dependency; }
@@ -147,9 +117,9 @@ public:
return _upstream_dependency[id];
}
- Status extract_dependencies();
+ bool is_pipelineX() const override { return true; }
- void push_blocked_task_to_dependency(Dependency* dep) {}
+ void try_wake_up(Dependency* wake_up_dep);
DataSinkOperatorXPtr sink() const { return _sink; }
@@ -157,7 +127,60 @@ public:
OperatorXs operatorXs() { return _operators; }
+ bool push_blocked_task_to_queue() {
+ /**
+ * Push task into blocking queue if:
+ * 1. `_use_blocking_queue` is true.
+ * 2. Or this task is blocked by FE two phase execution
(BLOCKED_FOR_DEPENDENCY).
+ */
+ return _use_blocking_queue || get_state() ==
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
+ }
+ void set_use_blocking_queue(bool use_blocking_queue) {
+ if (_blocked_dep->is_or_dep()) {
+ _use_blocking_queue = true;
+ return;
+ }
+ _use_blocking_queue = use_blocking_queue;
+ }
+
private:
+ Dependency* _write_blocked_dependency() {
+ _blocked_dep = _write_dependencies->write_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ set_use_blocking_queue(false);
+ static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
+ return _blocked_dep;
+ }
+ return nullptr;
+ }
+
+ Dependency* _finish_blocked_dependency() {
+ for (auto* fin_dep : _finish_dependencies) {
+ _blocked_dep = fin_dep->finish_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ set_use_blocking_queue(false);
+
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
+ return _blocked_dep;
+ }
+ }
+ return nullptr;
+ }
+
+ Dependency* _read_blocked_dependency() {
+ for (auto* op_dep : _read_dependencies) {
+ _blocked_dep = op_dep->read_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ // TODO(gabriel):
+ set_use_blocking_queue(true);
+ _blocked_dep->start_read_watcher();
+ return _blocked_dep;
+ }
+ }
+ return nullptr;
+ }
+
+ Status _extract_dependencies();
+ void _make_run();
void set_close_pipeline_time() override {}
void _init_profile() override;
void _fresh_profile_counter() override;
@@ -180,6 +203,10 @@ private:
std::shared_ptr<LocalExchangeSharedState> _local_exchange_state;
int _task_idx;
bool _dry_run = false;
+
+ Dependency* _blocked_dep {nullptr};
+
+ std::atomic<bool> _use_blocking_queue {true};
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index aa3891a5a2c..9ce2711c27a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -34,6 +34,8 @@
#include "common/logging.h"
#include "common/signal_handler.h"
#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/query_context.h"
@@ -75,6 +77,11 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask*
task) {
return Status::InternalError("BlockedTaskScheduler shutdown");
}
std::unique_lock<std::mutex> lock(_task_mutex);
+ if (task->is_pipelineX() &&
!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
+ // put this task into current dependency's blocking queue and wait for
event notification
+ // instead of using a separate BlockedTaskScheduler.
+ return Status::OK();
+ }
_blocked_tasks.push_back(task);
_task_cond.notify_one();
return Status::OK();
@@ -222,24 +229,30 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
continue;
}
+ if (task->is_pipelineX() && task->is_running()) {
+ static_cast<void>(_task_queue->push_back(task, index));
+ continue;
+ }
+ task->set_running(true);
task->set_task_queue(_task_queue.get());
auto* fragment_ctx = task->fragment_context();
signal::query_id_hi = fragment_ctx->get_query_id().hi;
signal::query_id_lo = fragment_ctx->get_query_id().lo;
bool canceled = fragment_ctx->is_canceled();
- auto check_state = task->get_state();
- if (check_state == PipelineTaskState::PENDING_FINISH) {
- DCHECK(!task->is_pending_finish()) << "must not pending close " <<
task->debug_string();
+ auto state = task->get_state();
+ if (state == PipelineTaskState::PENDING_FINISH) {
+ DCHECK(task->is_pipelineX() || !task->is_pending_finish())
+ << "must not pending close " << task->debug_string();
Status exec_status =
fragment_ctx->get_query_context()->exec_status();
_try_close_task(task,
canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
exec_status);
continue;
}
- DCHECK(check_state != PipelineTaskState::FINISHED &&
- check_state != PipelineTaskState::CANCELED)
- << "task already finish";
+
+ DCHECK(state != PipelineTaskState::FINISHED && state !=
PipelineTaskState::CANCELED)
+ << "task already finish: " << task->debug_string();
if (canceled) {
// may change from pending FINISH,should called cancel
@@ -253,7 +266,13 @@ void TaskScheduler::_do_work(size_t index) {
continue;
}
- DCHECK(check_state == PipelineTaskState::RUNNABLE);
+ if (task->is_pipelineX()) {
+ task->set_state(PipelineTaskState::RUNNABLE);
+ }
+
+ DCHECK(task->is_pipelineX() || task->get_state() ==
PipelineTaskState::RUNNABLE)
+ << "state:" << get_state_name(task->get_state())
+ << " task: " << task->debug_string();
// task exec
bool eos = false;
auto status = Status::OK();
@@ -313,6 +332,7 @@ void TaskScheduler::_do_work(size_t index) {
}
auto pipeline_state = task->get_state();
+ task->set_running(false);
switch (pipeline_state) {
case PipelineTaskState::BLOCKED_FOR_SOURCE:
case PipelineTaskState::BLOCKED_FOR_SINK:
@@ -324,7 +344,8 @@ void TaskScheduler::_do_work(size_t index) {
static_cast<void>(_task_queue->push_back(task, index));
break;
default:
- DCHECK(false) << "error state after run task, " <<
get_state_name(pipeline_state);
+ DCHECK(false) << "error state after run task, " <<
get_state_name(pipeline_state)
+ << " task: " << task->debug_string();
break;
}
}
@@ -344,21 +365,24 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
cancel();
// Call `close` if `try_close` failed to make sure allocated resources
are released
static_cast<void>(task->close(exec_status));
- } else if (!task->is_pending_finish()) {
- status = task->close(exec_status);
- if (!status.ok() && state != PipelineTaskState::CANCELED) {
- cancel();
- }
- }
-
- if (task->is_pending_finish()) {
+ } else if (!task->is_pipelineX() && task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+ task->set_running(false);
return;
+ } else if (task->is_pending_finish()) {
+ task->set_state(PipelineTaskState::PENDING_FINISH);
+ task->set_running(false);
+ return;
+ }
+ status = task->close(exec_status);
+ if (!status.ok() && state != PipelineTaskState::CANCELED) {
+ cancel();
}
task->set_state(state);
task->set_close_pipeline_time();
task->release_dependency();
+ task->set_running(false);
task->fragment_context()->close_a_pipeline();
}
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 9b85ec420ee..070f2b6a5a7 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -65,7 +65,6 @@ private:
static constexpr auto EMPTY_TIMES_TO_YIELD = 64;
-private:
void _schedule();
void _make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e4b703e792f..a7a89bfd421 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -797,6 +797,24 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
return Status::OK();
}
+std::string FragmentMgr::dump_pipeline_tasks() {
+ fmt::memory_buffer debug_string_buffer;
+ auto t = MonotonicNanos();
+ size_t i = 0;
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are
still running!\n",
+ _pipeline_map.size());
+ for (auto& it : _pipeline_map) {
+ fmt::format_to(debug_string_buffer, "No.{} (elapse time = {},
InstanceId = {}) : {}\n",
+ i, t - it.second->create_time(), print_id(it.first),
+ it.second->debug_string());
+ i++;
+ }
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
const FinishCallback& cb) {
VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment
params is "
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 93f010a5ecd..08f95bd3357 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -143,6 +143,8 @@ public:
return _query_ctx_map.size();
}
+ std::string dump_pipeline_tasks();
+
private:
void cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
bool is_pipeline,
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index c4f24cfdf8f..cb8f275574e 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -41,6 +41,7 @@
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
#include "http/action/pad_rowset_action.h"
+#include "http/action/pipeline_task_action.h"
#include "http/action/pprof_actions.h"
#include "http/action/reload_tablet_action.h"
#include "http/action/reset_rpc_channel_action.h"
@@ -165,6 +166,11 @@ Status HttpService::start() {
HealthAction* health_action = _pool.add(new HealthAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/health",
health_action);
+ // Register BE health action
+ PipelineTaskAction* pipeline_task_action = _pool.add(new
PipelineTaskAction());
+ _ev_http_server->register_handler(HttpMethod::GET,
"/api/running_pipeline_tasks",
+ pipeline_task_action);
+
// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index d3688507fcd..ef6a9d415de 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -65,6 +65,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
_is_finished = true;
+ _set_scanner_done();
}
if (limit < 0) {
limit = -1;
@@ -135,10 +136,6 @@ Status ScannerContext::init() {
// 4. This ctx will be submitted to the scanner scheduler right after init.
// So set _num_scheduling_ctx to 1 here.
_num_scheduling_ctx = 1;
- if (_finish_dependency) {
- std::lock_guard l(_transfer_lock);
- _finish_dependency->block_finishing();
- }
_num_unfinished_scanners = _scanners.size();
@@ -230,9 +227,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
- if (_finish_dependency) {
- _finish_dependency->block_finishing();
- }
} else {
set_status_on_error(state, false);
}
@@ -304,12 +298,11 @@ Status ScannerContext::validate_block_schema(Block*
block) {
}
void ScannerContext::set_should_stop() {
- if (_scanner_done_dependency) {
- _scanner_done_dependency->set_ready_for_read();
- }
std::lock_guard l(_transfer_lock);
_should_stop = true;
+ _set_scanner_done();
_blocks_queue_added_cv.notify_one();
+ set_ready_to_finish();
}
void ScannerContext::inc_num_running_scanners(int32_t inc) {
@@ -320,19 +313,20 @@ void ScannerContext::inc_num_running_scanners(int32_t
inc) {
void ScannerContext::dec_num_scheduling_ctx() {
std::lock_guard l(_transfer_lock);
_num_scheduling_ctx--;
- if (_finish_dependency) {
- if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
- _finish_dependency->set_ready_to_finish();
- } else {
- _finish_dependency->block_finishing();
- }
- }
-
+ set_ready_to_finish();
if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
_ctx_finish_cv.notify_one();
}
}
+void ScannerContext::set_ready_to_finish() {
+ // `_should_stop == true` means this task has already ended and wait for
pending finish now.
+ if (_finish_dependency && _should_stop && _num_running_scanners == 0 &&
+ _num_scheduling_ctx == 0) {
+ _finish_dependency->set_ready_to_finish();
+ }
+}
+
bool ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
@@ -342,10 +336,8 @@ bool ScannerContext::set_status_on_error(const Status&
status, bool need_lock) {
_process_status = status;
_status_error = true;
_blocks_queue_added_cv.notify_one();
- if (_scanner_done_dependency) {
- _scanner_done_dependency->set_ready_for_read();
- }
_should_stop = true;
+ _set_scanner_done();
return true;
}
return false;
@@ -437,6 +429,12 @@ bool ScannerContext::no_schedule() {
return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
}
+void ScannerContext::_set_scanner_done() {
+ if (_scanner_done_dependency) {
+ _scanner_done_dependency->set_ready_for_read();
+ }
+}
+
std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, sacnners: {}, blocks in queue: {},"
@@ -455,9 +453,6 @@ void ScannerContext::reschedule_scanner_ctx() {
//todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
if (state.ok()) {
_num_scheduling_ctx++;
- if (_finish_dependency) {
- _finish_dependency->block_finishing();
- }
} else {
set_status_on_error(state, false);
}
@@ -474,17 +469,12 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
// We have to decrease _num_running_scanners before schedule, otherwise
// schedule does not woring due to _num_running_scanners.
_num_running_scanners--;
- if (_finish_dependency && _num_running_scanners == 0 &&
_num_scheduling_ctx == 0) {
- _finish_dependency->set_ready_to_finish();
- }
+ set_ready_to_finish();
if (should_be_scheduled()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
- if (_finish_dependency) {
- _finish_dependency->block_finishing();
- }
} else {
set_status_on_error(state, false);
}
@@ -499,9 +489,7 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
(--_num_unfinished_scanners) == 0) {
_dispose_coloate_blocks_not_in_queue();
_is_finished = true;
- if (_scanner_done_dependency) {
- _scanner_done_dependency->set_ready_for_read();
- }
+ _set_scanner_done();
_blocks_queue_added_cv.notify_one();
}
_ctx_finish_cv.notify_one();
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 244aedf87a3..10b4775ceff 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -120,6 +120,8 @@ public:
void inc_num_running_scanners(int32_t scanner_inc);
+ void set_ready_to_finish();
+
int get_num_running_scanners() const { return _num_running_scanners; }
void dec_num_scheduling_ctx();
@@ -185,6 +187,8 @@ private:
protected:
virtual void _dispose_coloate_blocks_not_in_queue() {}
+ void _set_scanner_done();
+
RuntimeState* _state;
VScanNode* _parent;
pipeline::ScanLocalStateBase* _local_state;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index be291828f0f..891161f0710 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -363,7 +363,7 @@ VDataStreamRecvr::VDataStreamRecvr(
for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
-
_mem_available);
+
_dest_node_id);
}
}
_sender_queues.reserve(num_queues);
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 9f2f597494c..4ae7c3d3644 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -41,7 +41,6 @@ void
AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
pipeline::FinishDependency* finish_dep)
{
_dependency = dep;
_finish_dependency = finish_dep;
- _finish_dependency->block_finishing();
}
Status AsyncResultWriter::sink(Block* block, bool eos) {
@@ -181,10 +180,10 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
return b;
}
-pipeline::WriteDependency* AsyncResultWriter::write_blocked_by() {
+pipeline::WriteDependency*
AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) {
std::lock_guard l(_m);
DCHECK(_dependency != nullptr);
- return _dependency->write_blocked_by();
+ return _dependency->write_blocked_by(task);
}
} // namespace vectorized
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 780f8b506ef..75cc6529ba7 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -36,6 +36,7 @@ namespace pipeline {
class AsyncWriterDependency;
class WriteDependency;
class FinishDependency;
+class PipelineXTask;
} // namespace pipeline
@@ -79,7 +80,7 @@ public:
return _data_queue_is_available() || _is_finished();
}
- pipeline::WriteDependency* write_blocked_by();
+ pipeline::WriteDependency* write_blocked_by(pipeline::PipelineXTask* task);
[[nodiscard]] bool is_pending_finish() const { return
!_writer_thread_closed; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]