This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 34e53acaea1 [pipelineX](fix) Fix local exchange on pipelineX engine
(#27763)
34e53acaea1 is described below
commit 34e53acaea1d7c103e810b3034893fe88ad77678
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 30 11:16:20 2023 +0800
[pipelineX](fix) Fix local exchange on pipelineX engine (#27763)
---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 9 +++--
be/src/pipeline/exec/join_probe_operator.h | 6 ++--
be/src/pipeline/pipeline.h | 2 +-
be/src/pipeline/pipeline_x/dependency.h | 9 +++--
.../local_exchange_sink_operator.cpp | 10 ++++--
.../local_exchange/local_exchange_sink_operator.h | 10 +++---
.../local_exchange_source_operator.cpp | 38 ++++++++++++++--------
.../local_exchange_source_operator.h | 27 ++++++++++++++-
be/src/pipeline/pipeline_x/operator.h | 10 +-----
.../pipeline_x/pipeline_x_fragment_context.cpp | 35 +++++++++++++-------
.../pipeline_x/pipeline_x_fragment_context.h | 5 +--
be/src/runtime/runtime_state.cpp | 5 +--
12 files changed, 106 insertions(+), 60 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 412c358037d..174d102993d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -577,7 +577,11 @@ Status HashJoinProbeOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
DCHECK(!_build_unique);
DCHECK(_have_other_join_conjunct);
}
+ return Status::OK();
+}
+Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
// init left/right output slots flags, only column of slot_id in
_hash_output_slot_ids need
// insert to output block of hash join.
// _left_output_slots_flags : column of left table need to output set flag
= true
@@ -596,11 +600,6 @@ Status HashJoinProbeOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
init_output_slots_flags(_child_x->row_desc().tuple_descriptors(),
_left_output_slot_flags);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
- return Status::OK();
-}
-
-Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
-
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state,
*_intermediate_row_desc));
// _other_join_conjuncts are evaluated in the context of the rows produced
by this node
for (auto& conjunct : _other_join_conjuncts) {
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index 67537e65cac..12d89c1049e 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -50,15 +50,15 @@ protected:
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
vectorized::Block _join_block;
- vectorized::MutableColumnPtr _tuple_is_null_left_flag_column;
- vectorized::MutableColumnPtr _tuple_is_null_right_flag_column;
+ vectorized::MutableColumnPtr _tuple_is_null_left_flag_column = nullptr;
+ vectorized::MutableColumnPtr _tuple_is_null_right_flag_column = nullptr;
RuntimeProfile::Counter* _probe_timer = nullptr;
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
- std::unique_ptr<vectorized::Block> _child_block;
+ std::unique_ptr<vectorized::Block> _child_block = nullptr;
SourceState _child_source_state;
};
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7dcffb410a2..f4b7928887c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -145,7 +145,7 @@ private:
// Operators for pipelineX. All pipeline tasks share operators from this.
// [SourceOperator -> ... -> SinkOperator]
OperatorXs operatorXs;
- DataSinkOperatorXPtr _sink_x;
+ DataSinkOperatorXPtr _sink_x = nullptr;
std::shared_ptr<ObjectPool> _obj_pool;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 8a55efcdb7f..3fd1489103d 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -584,10 +584,10 @@ public:
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
- void add_running_sink_operators() { running_sink_operators++; }
+ std::mutex le_lock;
void sub_running_sink_operators() {
- auto val = running_sink_operators.fetch_sub(1);
- if (val == 1) {
+ std::unique_lock<std::mutex> lc(le_lock);
+ if (running_sink_operators.fetch_sub(1) == 1) {
_set_ready_for_read();
}
}
@@ -599,11 +599,10 @@ public:
}
void set_dep_by_channel_id(Dependency* dep, int channel_id) {
source_dependencies[channel_id] = dep;
- dep->block();
}
void set_ready_for_read(int channel_id) {
auto* dep = source_dependencies[channel_id];
- DCHECK(dep);
+ DCHECK(dep) << channel_id << " " << (int64_t)this;
dep->set_ready();
}
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index a793a22761f..12cc5e042e8 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -25,9 +25,14 @@ Status LocalExchangeSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
SCOPED_TIMER(_open_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
+ _num_rows_in_queue.resize(p._num_partitions);
+ for (size_t i = 0; i < p._num_partitions; i++) {
+ _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
+ profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT,
1);
+ }
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
- _shared_state->add_running_sink_operators();
return Status::OK();
}
@@ -59,8 +64,9 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState*
state,
size_t size = _partition_rows_histogram[i + 1] - start;
if (size > 0) {
data_queue[i].enqueue({new_block, {row_idx, start, size}});
+ _shared_state->set_ready_for_read(i);
+ COUNTER_UPDATE(_num_rows_in_queue[i], size);
}
- _shared_state->set_ready_for_read(i);
}
return Status::OK();
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index b6ce3fbeb9e..45d61d4ff6b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -51,8 +51,9 @@ private:
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
- std::unique_ptr<vectorized::PartitionerBase> _partitioner;
- std::vector<size_t> _partition_rows_histogram;
+ std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
+ std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
+ std::vector<size_t> _partition_rows_histogram {};
};
// A single 32-bit division on a recent x64 processor has a throughput of one
instruction every six cycles with a latency of 26 cycles.
@@ -69,8 +70,9 @@ struct LocalExchangeChannelIds {
class LocalExchangeSinkOperatorX final : public
DataSinkOperatorX<LocalExchangeSinkLocalState> {
public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
- LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const
std::vector<TExpr>& texprs)
- : Base(sink_id, -1), _num_partitions(num_partitions),
_texprs(texprs) {}
+ LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
+ const std::vector<TExpr>& texprs)
+ : Base(sink_id, -1, dest_id), _num_partitions(num_partitions),
_texprs(texprs) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 81c9cc21447..83dac5eb8f4 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -41,30 +41,40 @@ Status
LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
- if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
- partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block =
-
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
-
+ auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
mutable_block->add_rows(partitioned_block.first.get(),
offset_start,
offset_start +
std::get<2>(partitioned_block.second));
- } while
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
- partitioned_block) &&
- mutable_block->rows() < state->batch_size());
- *block = mutable_block->to_block();
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- if (local_state._shared_state->running_sink_operators == 0) {
+ } while (mutable_block->rows() < state->batch_size() &&
+
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+ partitioned_block));
+ *result_block = mutable_block->to_block();
+ };
+ if (local_state._shared_state->running_sink_operators == 0) {
+ if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+ partitioned_block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ mutable_block =
+
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+ get_data(block);
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
+ } else if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+ partitioned_block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ mutable_block =
+
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+ get_data(block);
+ } else {
+ local_state._dependency->block();
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
}
local_state.reached_limit(block, source_state);
-
return Status::OK();
}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index ebf18d9a249..3ccc38854f5 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -28,6 +28,17 @@ public:
LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeSourceDependency",
query_ctx) {}
~LocalExchangeSourceDependency() override = default;
+
+ void block() override {
+ if
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0)
{
+ return;
+ }
+ std::unique_lock<std::mutex>
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
+ if
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0)
{
+ return;
+ }
+ Dependency::block();
+ }
};
class LocalExchangeSourceOperatorX;
@@ -52,7 +63,8 @@ private:
class LocalExchangeSourceOperatorX final : public
OperatorX<LocalExchangeSourceLocalState> {
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
- LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1,
id) {}
+ LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase*
parent)
+ : Base(pool, -1, id), _parent(parent) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
_op_name = "LOCAL_EXCHANGE_OPERATOR";
return Status::OK();
@@ -70,8 +82,21 @@ public:
bool is_source() const override { return true; }
+ Status set_child(OperatorXPtr child) override {
+ if (_child_x) {
+ // Set build side child for join probe operator
+ DCHECK(_parent != nullptr);
+ RETURN_IF_ERROR(_parent->set_child(child));
+ } else {
+ _child_x = std::move(child);
+ }
+ return Status::OK();
+ }
+
private:
friend class LocalExchangeSourceLocalState;
+
+ OperatorXBase* _parent = nullptr;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index ed2dbfc3d50..5fa6785435b 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -23,14 +23,6 @@
namespace doris::pipeline {
-#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
- auto _sptr = state->get_local_state(operator_id()); \
- auto& local_state = _sptr->template cast<LocalState>();
-
-#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \
- auto _sptr = state->get_sink_local_state(operator_id()); \
- auto& local_state = _sptr->template cast<LocalState>();
-
// This struct is used only for initializing local state.
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
@@ -279,7 +271,7 @@ protected:
RowDescriptor _row_descriptor;
- std::unique_ptr<RowDescriptor> _output_row_descriptor;
+ std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;
/// Resource information sent from the frontend.
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1924dc90d58..d49e290c044 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
if (_prepared) {
return Status::InternalError("Already prepared");
}
+ _num_instances = request.local_params.size();
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_start_timer = ADD_TIMER(_runtime_profile, "StartTime");
COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
@@ -232,7 +233,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
- //TODO: can we do this in set_sink?
+ DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}
@@ -593,15 +594,17 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
}
Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool,
OperatorXPtr& op,
- PipelinePtr& cur_pipe,
+ PipelinePtr& cur_pipe,
const TPlanNode& tnode,
const std::vector<TExpr>&
texprs) {
- if (!_runtime_state->enable_local_shuffle() ||
- _runtime_state->query_parallel_instance_num() == 1) {
+ if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
return Status::OK();
}
+ auto parent = op;
+ RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get()));
auto local_exchange_id = next_operator_id();
- op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
+ op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id,
parent.get()));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(parent->set_child(op));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -611,14 +614,15 @@ Status
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- auto num_instances = _runtime_state->query_parallel_instance_num();
- sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id,
num_instances, texprs));
+ sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(),
local_exchange_id,
+ _num_instances, texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init());
auto shared_state = LocalExchangeSharedState::create_shared();
- shared_state->data_queue.resize(num_instances);
- shared_state->source_dependencies.resize(num_instances, nullptr);
+ shared_state->data_queue.resize(_num_instances);
+ shared_state->source_dependencies.resize(_num_instances, nullptr);
+ shared_state->running_sink_operators = _num_instances;
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
@@ -717,9 +721,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
if (!tnode.agg_node.need_finalize) {
- RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
- RETURN_IF_ERROR(
- _add_local_exchange(pool, op, cur_pipe,
tnode.agg_node.grouping_exprs));
+ RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
+
tnode.agg_node.grouping_exprs));
}
}
break;
@@ -740,6 +743,14 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+
+ std::vector<TExpr> probe_exprs;
+ const std::vector<TEqJoinCondition>& eq_join_conjuncts =
+ tnode.hash_join_node.eq_join_conjuncts;
+ for (const auto& eq_join_conjunct : eq_join_conjuncts) {
+ probe_exprs.push_back(eq_join_conjunct.left);
+ }
+ RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index f579265ab63..7f47052296e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,7 +125,7 @@ private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
- const std::vector<TExpr>& texprs);
+ const TPlanNode& tnode, const
std::vector<TExpr>& texprs);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
@@ -170,7 +170,7 @@ private:
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wshadow-field"
#endif
- DataSinkOperatorXPtr _sink;
+ DataSinkOperatorXPtr _sink = nullptr;
#ifdef __clang__
#pragma clang diagnostic pop
#endif
@@ -210,6 +210,7 @@ private:
int _operator_id = 0;
int _sink_operator_id = 0;
+ int _num_instances = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>>
_op_id_to_le_state;
};
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index c6df2daff0d..43eff466019 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -458,8 +458,9 @@ Result<RuntimeState::LocalState*>
RuntimeState::get_local_state_result(int id) {
void RuntimeState::emplace_sink_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase>
state) {
- DCHECK(id < _op_id_to_sink_local_state.size());
- DCHECK(!_op_id_to_sink_local_state[id]);
+ DCHECK(id < _op_id_to_sink_local_state.size())
+ << " id=" << id << " state: " << state->debug_string(0);
+ DCHECK(!_op_id_to_sink_local_state[id]) << " id=" << id << " state: " <<
state->debug_string(0);
_op_id_to_sink_local_state[id] = std::move(state);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]