This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe184e322a9 [code](pipelineX) refine some pipelineX code (#28570)
fe184e322a9 is described below
commit fe184e322a90e680ebbbb951f0ba5a8299407c1a
Author: Mryange <[email protected]>
AuthorDate: Wed Dec 20 11:45:06 2023 +0800
[code](pipelineX) refine some pipelineX code (#28570)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 14 +++--
be/src/pipeline/exec/exchange_sink_operator.h | 16 ++++--
be/src/pipeline/exec/exchange_source_operator.cpp | 14 +++--
be/src/pipeline/exec/exchange_source_operator.h | 7 +--
.../exec/multi_cast_data_stream_source.cpp | 10 ++--
.../pipeline/exec/multi_cast_data_stream_source.h | 3 ++
be/src/pipeline/exec/scan_operator.cpp | 14 ++++-
be/src/pipeline/exec/scan_operator.h | 9 ++++
be/src/pipeline/pipeline_x/operator.cpp | 60 ++++++++++++----------
be/src/pipeline/pipeline_x/operator.h | 23 ++++++---
.../pipeline_x/pipeline_x_fragment_context.cpp | 7 +--
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 31 ++++++-----
be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +-
be/src/runtime/fragment_mgr.cpp | 1 +
14 files changed, 130 insertions(+), 82 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e70be36b774..0dd1dda415b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc()
const {
}
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+ RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_sender_id = info.sender_id;
@@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
id, p._dest_node_id, _sender_id, _state->be_number(),
state->get_query_ctx());
register_channels(_sink_buffer.get());
-
- _exchange_sink_dependency = AndDependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ auto* _exchange_sink_dependency = _dependency;
_queue_dependency = ExchangeSinkQueueDependency::create_shared(
_parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
@@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
}
Status ExchangeSinkLocalState::open(RuntimeState* state) {
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+ RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSinkOperatorX>();
if (p._part_type == TPartitionType::HASH_PARTITIONED ||
p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
@@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState*
state, Status exec_status)
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}",
- PipelineXSinkLocalState<>::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {},
_busy_channels = {})",
_sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load());
return fmt::to_string(debug_string_buffer);
@@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
COUNTER_UPDATE(_wait_queue_timer,
_queue_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
if (_broadcast_dependency) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer,
_broadcast_dependency->watcher_elapse_time());
}
@@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();
- return PipelineXSinkLocalState<>::close(state, exec_status);
+ return Base::close(state, exec_status);
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index bc91e5dc19d..766f43dc80e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -144,20 +144,25 @@ public:
// TODO(gabriel): blocked by memory
};
-class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
+class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
+ using Base = PipelineXSinkLocalState<AndDependency>;
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : PipelineXSinkLocalState<>(parent, state),
+ : Base(parent, state),
current_channel_idx(0),
only_local_exchange(false),
- _serializer(this) {}
+ _serializer(this) {
+ _finish_dependency = std::make_shared<FinishDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx());
+ }
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
- Dependency* dependency() override { return
_exchange_sink_dependency.get(); }
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
Status serialize_block(vectorized::Block* src, PBlock* dest, int
num_receivers = 1);
void
register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder**
holder);
@@ -231,11 +236,12 @@ private:
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
- std::shared_ptr<AndDependency> _exchange_sink_dependency;
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>>
_local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;
+
+ std::shared_ptr<Dependency> _finish_dependency;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 41e57fbde79..99c9e8e9fc2 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const {
}
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase*
parent)
- : PipelineXLocalState<>(state, parent), num_rows_skipped(0),
is_ready(false) {}
+ : Base(state, parent), num_rows_skipped(0), is_ready(false) {}
std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}",
- PipelineXLocalState<>::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Queues: (");
const auto& queues = stream_recvr->sender_queues();
for (size_t i = 0; i < queues.size(); i++) {
@@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int
indentation_level) const {
}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
- RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(),
p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
- source_dependency = AndDependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- state->get_query_ctx());
+ auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
@@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
Status ExchangeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
+ RETURN_IF_ERROR(Base::open(state));
return Status::OK();
}
@@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
vsort_exec_exprs.close(state);
}
- return PipelineXLocalState<>::close(state);
+ return Base::close(state);
}
Status ExchangeSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 49211321886..a9848be23c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -71,21 +71,22 @@ private:
};
class ExchangeSourceOperatorX;
-class ExchangeLocalState final : public PipelineXLocalState<> {
+class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
+
+public:
+ using Base = PipelineXLocalState<AndDependency>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Dependency* dependency() override { return source_dependency.get(); }
std::string debug_string(int indentation_level) const override;
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
bool is_ready;
- std::shared_ptr<AndDependency> source_dependency;
std::vector<std::shared_ptr<ExchangeDataDependency>> deps;
std::vector<RuntimeProfile::Counter*> metrics;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 4c73f6ecb28..a4f3ff55a5c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -128,9 +128,13 @@ RuntimeProfile*
MultiCastDataStreamerSourceOperator::get_runtime_profile() const
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
state,
OperatorXBase* parent)
: Base(state, parent),
- vectorized::RuntimeFilterConsumer(
- static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
- static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
+
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
+ parent->runtime_filter_descs(),
+
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
+ _filter_dependency = std::make_shared<RuntimeFilterDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
+ state->get_query_ctx());
+};
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 86034a76ce7..baeca2ca7b1 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -120,8 +120,11 @@ public:
friend class MultiCastDataStreamerSourceOperatorX;
+ RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); }
+
private:
vectorized::VExprContextSPtrs _output_expr_contexts;
+ std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};
class MultiCastDataStreamerSourceOperatorX final
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index d8d3958a779..398765c8cc6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase*
parent)
- : ScanLocalStateBase(state, parent) {}
+ : ScanLocalStateBase(state, parent) {
+ _finish_dependency = std::make_shared<FinishDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx());
+ _filter_dependency = std::make_shared<RuntimeFilterDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
+ state->get_query_ctx());
+}
template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
@@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile,
"MaxScannerThreadNum", TUnit::UNIT);
+ _wait_for_finish_dependency_timer =
+ ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+
return Status::OK();
}
@@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this),
state);
}
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
-
+ COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 603f3804aae..78cb399427e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -171,6 +171,8 @@ protected:
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
+
+ RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
};
template <typename LocalStateType>
@@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }
+ RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
+
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
@@ -405,6 +410,10 @@ protected:
std::atomic<bool> _eos = false;
std::mutex _block_lock;
+
+ std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+
+ std::shared_ptr<Dependency> _finish_dependency;
};
template <typename LocalStateType>
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index a466b8b3b67..55fa103e193 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -308,25 +308,14 @@ Status
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : _parent(parent),
- _state(state),
- _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx())) {}
+ : _parent(parent), _state(state) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_parent(parent),
- _state(state),
- _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx())) {
- _filter_dependency = std::make_shared<RuntimeFilterDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
- state->get_query_ctx());
-}
+ _state(state) {}
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state,
LocalStateInfo& info) {
@@ -334,22 +323,30 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
- _wait_for_finish_dependency_timer =
- ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+ constexpr auto is_fake_shared =
+ std::is_same_v<typename DependencyType::SharedState,
FakeSharedState>;
_dependency = (DependencyType*)info.dependency.get();
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
+ _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+ _runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSourceDependency,
DependencyType>) {
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
- } else {
+ _shared_state =
+ (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ _shared_state->ref();
+
+ _shared_state->source_dep = _dependency;
+ _shared_state->sink_dep = deps.front().get();
+ } else if constexpr (!is_fake_shared) {
_dependency->set_shared_state(deps.front()->shared_state());
+ _shared_state =
+ (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ _shared_state->ref();
+
+ _shared_state->source_dep = _dependency;
+ _shared_state->sink_dep = deps.front().get();
}
- _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state().get();
- _shared_state->ref();
- _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
- _runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
- _shared_state->source_dep = _dependency;
- _shared_state->sink_dep = deps.front().get();
}
_conjuncts.resize(_parent->_conjuncts.size());
@@ -386,7 +383,6 @@ Status
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
- COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
@@ -405,6 +401,8 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_profile->set_metadata(_parent->node_id());
_profile->set_is_sink(true);
_wait_for_finish_dependency_timer = ADD_TIMER(_profile,
"PendingFinishDependency");
+ constexpr auto is_fake_shared =
+ std::is_same_v<typename DependencyType::SharedState,
FakeSharedState>;
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.dependencys;
_dependency = (DependencyType*)deps.front().get();
@@ -412,12 +410,18 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_dependency =
info.le_state_map[_parent->dests_id().front()].second.get();
}
if (_dependency) {
- _shared_state =
- (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ if constexpr (!is_fake_shared) {
+ _shared_state =
+ (typename
DependencyType::SharedState*)_dependency->shared_state().get();
+ }
+
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
}
- _shared_state->ref();
+ if constexpr (!is_fake_shared) {
+ _shared_state->ref();
+ }
+
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
@@ -446,7 +450,6 @@ Status
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
}
- COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
@@ -536,6 +539,7 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState*
state, Status exec_s
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer,
_async_writer_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
if (_writer->need_normal_close()) {
@@ -630,6 +634,7 @@ template class
PipelineXSinkLocalState<MultiCastSinkDependency>;
template class PipelineXSinkLocalState<SetSinkDependency>;
template class PipelineXSinkLocalState<SetProbeSinkDependency>;
template class PipelineXSinkLocalState<LocalExchangeSinkDependency>;
+template class PipelineXSinkLocalState<AndDependency>;
template class PipelineXLocalState<HashJoinProbeDependency>;
template class PipelineXLocalState<SortSourceDependency>;
@@ -642,6 +647,7 @@ template class
PipelineXLocalState<MultiCastSourceDependency>;
template class PipelineXLocalState<PartitionSortSourceDependency>;
template class PipelineXLocalState<SetSourceDependency>;
template class PipelineXLocalState<LocalExchangeSourceDependency>;
+template class PipelineXLocalState<AndDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 783b15ac7eb..d697970311d 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -102,8 +102,10 @@ public:
virtual Dependency* dependency() { return nullptr; }
- Dependency* finishdependency() { return _finish_dependency.get(); }
- RuntimeFilterDependency* filterdependency() { return
_filter_dependency.get(); }
+ // override in Scan
+ virtual Dependency* finishdependency() { return nullptr; }
+ // override in Scan MultiCastSink
+ virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
protected:
friend class OperatorXBase;
@@ -121,7 +123,6 @@ protected:
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
- RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
@@ -135,8 +136,6 @@ protected:
vectorized::VExprContextSPtrs _projections;
bool _closed = false;
vectorized::Block _origin_block;
- std::shared_ptr<Dependency> _finish_dependency;
- std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};
class OperatorXBase : public OperatorBase {
@@ -397,7 +396,8 @@ public:
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
virtual Dependency* dependency() { return nullptr; }
- Dependency* finishdependency() { return _finish_dependency.get(); }
+ // override in exchange sink , AsyncWriterSink
+ virtual Dependency* finishdependency() { return nullptr; }
protected:
DataSinkOperatorXBase* _parent = nullptr;
@@ -424,7 +424,6 @@ protected:
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
- std::shared_ptr<Dependency> _finish_dependency;
};
class DataSinkOperatorXBase : public OperatorBase {
@@ -659,7 +658,11 @@ class AsyncWriterSink : public
PipelineXSinkLocalState<FakeDependency> {
public:
using Base = PipelineXSinkLocalState<FakeDependency>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
- : Base(parent, state), _async_writer_dependency(nullptr) {}
+ : Base(parent, state), _async_writer_dependency(nullptr) {
+ _finish_dependency = std::make_shared<FinishDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx());
+ }
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
@@ -672,11 +675,15 @@ public:
Status try_close(RuntimeState* state, Status exec_status) override;
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
+
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
+
+ std::shared_ptr<Dependency> _finish_dependency;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 31ba7b0b882..6cb666da41d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t
pip_idx) {
DCHECK(pipeline_id_to_profile[pip_idx]);
-
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()),
local_params,
- request.fragment.output_sink));
+ RETURN_IF_ERROR(task->prepare(local_params,
request.fragment.output_sink));
return Status::OK();
};
@@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
OperatorXPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->init(exchange_type));
- if (operator_xs.size() > 0) {
+ if (!operator_xs.empty()) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
}
operator_xs.insert(operator_xs.begin(), source_op);
@@ -878,6 +877,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs,
OperatorXPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx) {
+ // We directly construct the operator from Thrift because the given array
is in the order of preorder traversal.
+ // Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 283b7851e4c..be62fcac213 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t
task_id, RuntimeSta
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
- _le_state_map(le_state_map),
+ _le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
@@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline,
uint32_t task_id, RuntimeSta
pipeline->incr_created_tasks();
}
-Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams& local_params,
- const TDataSink& tsink) {
+Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params,
const TDataSink& tsink) {
DCHECK(_sink);
DCHECK(_cur_state == PipelineTaskState::NOT_READY) <<
get_state_name(_cur_state);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
- DCHECK_EQ(state, _state);
{
// set sink local state
@@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
get_downstream_dependency(),
_le_state_map,
tsink};
- RETURN_IF_ERROR(_sink->setup_local_state(state, info));
+ RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(),
no_scan_ranges);
- auto* parent_profile =
state->get_sink_local_state(_sink->operator_id())->profile();
+ auto* parent_profile =
_state->get_sink_local_state(_sink->operator_id())->profile();
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile, scan_ranges, deps,
_le_state_map, _task_idx,
_source_dependency[op->operator_id()]};
- RETURN_IF_ERROR(op->setup_local_state(state, info));
- parent_profile = state->get_local_state(op->operator_id())->profile();
+ RETURN_IF_ERROR(op->setup_local_state(_state, info));
+ parent_profile = _state->get_local_state(op->operator_id())->profile();
}
_block = doris::vectorized::Block::create_unique();
@@ -120,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
auto* fin_dep = local_state->finishdependency();
- _finish_dependencies.push_back(fin_dep);
+ if (fin_dep) {
+ _finish_dependencies.push_back(fin_dep);
+ }
}
{
auto result =
_state->get_sink_local_state_result(_sink->operator_id());
@@ -132,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_write_dependencies = dep;
auto* fin_dep = local_state->finishdependency();
- _finish_dependencies.push_back(fin_dep);
+ if (fin_dep) {
+ _finish_dependencies.push_back(fin_dep);
+ }
}
{
auto result = _state->get_local_state_result(_source->operator_id());
@@ -193,6 +195,7 @@ Status PipelineXTask::_open() {
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+ DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
@@ -377,9 +380,11 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_write_dependencies->debug_string(1));
i++;
- fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
- fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_filter_dependency->debug_string(1));
- i++;
+ if (_filter_dependency) {
+ fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_filter_dependency->debug_string(1));
+ i++;
+ }
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index fc633339c31..f7b996f40a7 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -62,8 +62,7 @@ public:
return Status::InternalError("Should not reach here!");
}
- Status prepare(RuntimeState* state, const TPipelineInstanceParams&
local_params,
- const TDataSink& tsink);
+ Status prepare(const TPipelineInstanceParams& local_params, const
TDataSink& tsink);
Status execute(bool* eos) override;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 90ebbff37a2..c54786e5428 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
DCHECK(req.runtime_state != nullptr);
if (req.query_statistics) {
+ // use to report 'insert into select'
TQueryStatistics queryStatistics;
DCHECK(req.query_statistics->collect_dml_statistics());
req.query_statistics->to_thrift(&queryStatistics);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]