This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 320ddf4987c [pipelineX](improvement) Support multiple instances
execution on single tablet (#28178)
320ddf4987c is described below
commit 320ddf4987c9fdb33f9ee7aced3881009681bd51
Author: Gabriel <[email protected]>
AuthorDate: Sun Dec 10 20:18:41 2023 +0800
[pipelineX](improvement) Support multiple instances execution on single
tablet (#28178)
---
be/src/pipeline/exec/scan_operator.cpp | 1 +
be/src/pipeline/pipeline.h | 15 ++-
be/src/pipeline/pipeline_fragment_context.cpp | 4 +-
be/src/pipeline/pipeline_fragment_context.h | 1 +
.../pipeline_x/local_exchange/local_exchanger.h | 10 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 110 ++++++++++++++-------
.../pipeline_x/pipeline_x_fragment_context.h | 3 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 1 +
.../main/java/org/apache/doris/qe/Coordinator.java | 16 +--
gensrc/thrift/PaloInternalService.thrift | 1 +
10 files changed, 107 insertions(+), 55 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 96bda3cd0be..e1e17e0ccf6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1431,6 +1431,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
if (_closed) {
return Status::OK();
}
+ COUNTER_UPDATE(exec_time_counter(),
_scan_dependency->watcher_elapse_time());
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 68213fe9c27..5d623f899aa 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -50,8 +50,9 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
public:
Pipeline() = delete;
- explicit Pipeline(PipelineId pipeline_id,
std::weak_ptr<PipelineFragmentContext> context)
- : _pipeline_id(pipeline_id), _context(context) {
+ explicit Pipeline(PipelineId pipeline_id, int num_tasks,
+ std::weak_ptr<PipelineFragmentContext> context)
+ : _pipeline_id(pipeline_id), _context(context),
_num_tasks(num_tasks) {
_init_profile();
}
@@ -138,6 +139,11 @@ public:
void set_children(std::shared_ptr<Pipeline> child) {
_children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
_children = children; }
+ void incr_created_tasks() { _num_tasks_created++; }
+ bool need_to_create_task() const { return _num_tasks > _num_tasks_created;
}
+ void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; }
+ int num_tasks() const { return _num_tasks; }
+
private:
void _init_profile();
@@ -199,6 +205,11 @@ private:
// then set `_need_to_local_shuffle` to false which means we should use
local shuffle in this fragment
// because data already be partitioned by storage/shuffling.
bool _need_to_local_shuffle = true;
+
+ // How many tasks should be created ?
+ int _num_tasks = 1;
+ // How many tasks are already created?
+ int _num_tasks_created = 0;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b800e167bcc..8bf884692df 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,7 +189,7 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
PipelinePtr PipelineFragmentContext::add_pipeline() {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
- auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+ auto pipeline = std::make_shared<Pipeline>(id, _num_instances,
weak_from_this());
_pipelines.emplace_back(pipeline);
return pipeline;
}
@@ -197,7 +197,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx)
{
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
- auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+ auto pipeline = std::make_shared<Pipeline>(id, _num_instances,
weak_from_this());
if (idx >= 0) {
_pipelines.insert(_pipelines.begin() + idx, pipeline);
} else {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index c8883248d42..a705230d2f4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -225,6 +225,7 @@ protected:
report_status_callback _report_status_cb;
DescriptorTbl* _desc_tbl = nullptr;
+ int _num_instances = 1;
private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 3993289d5da..8c28469b50d 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -27,8 +27,6 @@ class LocalExchangeSinkLocalState;
class Exchanger {
public:
- Exchanger(int num_partitions)
- : _running_sink_operators(num_partitions),
_num_partitions(num_partitions) {}
Exchanger(int running_sink_operators, int num_partitions)
: _running_sink_operators(running_sink_operators),
_num_partitions(num_partitions) {}
virtual ~Exchanger() = default;
@@ -56,9 +54,6 @@ class ShuffleExchanger : public Exchanger {
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
- ShuffleExchanger(int num_partitions) : Exchanger(num_partitions) {
- _data_queue.resize(num_partitions);
- }
ShuffleExchanger(int running_sink_operators, int num_partitions)
: Exchanger(running_sink_operators, num_partitions) {
_data_queue.resize(num_partitions);
@@ -90,8 +85,9 @@ class BucketShuffleExchanger : public ShuffleExchanger {
class PassthroughExchanger final : public Exchanger {
public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
- PassthroughExchanger(int num_instances) : Exchanger(num_instances) {
- _data_queue.resize(num_instances);
+ PassthroughExchanger(int running_sink_operators, int num_partitions)
+ : Exchanger(running_sink_operators, num_partitions) {
+ _data_queue.resize(num_partitions);
}
~PassthroughExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState
source_state,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 294b9fc109f..6a3f38d2c25 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -505,11 +505,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
return le_state_map;
};
for (auto& pipeline : _pipelines) {
- auto task = std::make_unique<PipelineXTask>(
- pipeline, _total_tasks++, _runtime_states[i].get(), this,
- _runtime_states[i]->runtime_profile(),
get_local_exchange_state(pipeline), i);
- pipeline_id_to_task.insert({pipeline->id(), task.get()});
- _tasks[i].emplace_back(std::move(task));
+ if (pipeline->need_to_create_task()) {
+ auto task = std::make_unique<PipelineXTask>(pipeline,
_total_tasks++,
+
_runtime_states[i].get(), this,
+
_runtime_states[i]->runtime_profile(),
+
get_local_exchange_state(pipeline), i);
+ pipeline_id_to_task.insert({pipeline->id(), task.get()});
+ _tasks[i].emplace_back(std::move(task));
+ }
}
/**
@@ -554,18 +557,22 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
};
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
- auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
- DCHECK(task != nullptr);
-
- // if this task has upstream dependency, then record them.
- if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
- auto& deps = _dag[_pipelines[pip_idx]->id()];
- for (auto& dep : deps) {
- task->add_upstream_dependency(
-
pipeline_id_to_task[dep]->get_downstream_dependency());
+ if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
+ auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+ DCHECK(task != nullptr);
+
+ // if this task has upstream dependency, then record them.
+ if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
+ auto& deps = _dag[_pipelines[pip_idx]->id()];
+ for (auto& dep : deps) {
+ if (pipeline_id_to_task.contains(dep)) {
+ task->add_upstream_dependency(
+
pipeline_id_to_task[dep]->get_downstream_dependency());
+ }
+ }
}
+ RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
}
- RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
}
{
@@ -653,6 +660,34 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
return Status::OK();
}
+void PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType
exchange_type,
+ PipelinePtr
pipe_with_source,
+ PipelinePtr
pipe_with_sink) {
+ pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
+ pipe_with_source->set_num_tasks(_num_instances);
+ switch (exchange_type) {
+ case ExchangeType::HASH_SHUFFLE:
+ // If HASH_SHUFFLE local exchanger is planned, data will be always
HASH distribution so we
+ // do not need to plan another shuffle local exchange in the rest of
current pipeline.
+ pipe_with_sink->set_need_to_local_shuffle(false);
+ pipe_with_source->set_need_to_local_shuffle(false);
+ break;
+ case ExchangeType::BUCKET_HASH_SHUFFLE:
+ // Same as ExchangeType::HASH_SHUFFLE.
+ pipe_with_sink->set_need_to_local_shuffle(false);
+ pipe_with_source->set_need_to_local_shuffle(false);
+ break;
+ case ExchangeType::PASSTHROUGH:
+ // If PASSTHROUGH local exchanger is planned, data will be split
randomly. So we should make
+ // sure remaining operators should use local shuffle to make data
distribution right.
+
pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle());
+ pipe_with_source->set_need_to_local_shuffle(true);
+ break;
+ default:
+ __builtin_unreachable();
+ }
+}
+
Status PipelineXFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr
cur_pipe,
const std::vector<TExpr>& texprs, ExchangeType exchange_type, bool*
do_local_exchange,
@@ -674,59 +709,58 @@ Status PipelineXFragmentContext::_add_local_exchange(
auto local_exchange_id = next_operator_id();
// 1. Create a new pipeline with local exchange sink.
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
-
DataSinkOperatorXPtr sink;
sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(),
local_exchange_id,
_num_instances, texprs,
bucket_seq_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
+ RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets));
+
+ // 2. Inherit properties from current pipeline.
+ _inherit_pipeline_properties(exchange_type, cur_pipe, new_pip);
+ // 3. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->source_dependencies.resize(_num_instances, nullptr);
switch (exchange_type) {
case ExchangeType::HASH_SHUFFLE:
- shared_state->exchanger =
ShuffleExchanger::create_unique(_num_instances);
- // If HASH_SHUFFLE local exchanger is planned, data will be always
HASH distribution so we
- // do not need to plan another shuffle local exchange in the rest of
current pipeline.
- new_pip->set_need_to_local_shuffle(false);
- cur_pipe->set_need_to_local_shuffle(false);
+ shared_state->exchanger =
+ ShuffleExchanger::create_unique(new_pip->num_tasks(),
_num_instances);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger =
- BucketShuffleExchanger::create_unique(_num_instances,
num_buckets);
- // Same as ExchangeType::HASH_SHUFFLE.
- new_pip->set_need_to_local_shuffle(false);
- cur_pipe->set_need_to_local_shuffle(false);
+ BucketShuffleExchanger::create_unique(new_pip->num_tasks(),
num_buckets);
break;
case ExchangeType::PASSTHROUGH:
- // If PASSTHROUGH local exchanger is planned, data will be split
randomly. So we should make
- // sure remaining operators should use local shuffle to make data
distribution right.
- shared_state->exchanger =
PassthroughExchanger::create_unique(_num_instances);
- new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle());
- cur_pipe->set_need_to_local_shuffle(true);
+ shared_state->exchanger =
+ PassthroughExchanger::create_unique(new_pip->num_tasks(),
_num_instances);
break;
default:
return Status::InternalError("Unsupported local exchange type : " +
std::to_string((int)exchange_type));
}
- RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets));
_op_id_to_le_state.insert({local_exchange_id, shared_state});
- // 2. Initialize operators list.
+ // 4. Set two pipelines' operator list. For example, split pipeline [Scan
- AggSink] to
+ // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource
- AggSink].
+
+ // 4.1 Initialize new pipeline's operator list.
std::copy(operator_xs.begin(), operator_xs.begin() + idx,
std::inserter(new_pip->operator_xs(),
new_pip->operator_xs().end()));
- // 3. Erase operators in new pipeline.
+ // 4.2 Erase unused operators in previous pipeline.
+ operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
+
+ // 5. Initialize LocalExchangeSource and insert it into this pipeline.
OperatorXPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->init(exchange_type));
- operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
if (operator_xs.size() > 0) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
}
-
operator_xs.insert(operator_xs.begin(), source_op);
RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back()));
+ // 6. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
std::vector<PipelineId> edges_with_source;
for (auto child : cur_pipe->children()) {
@@ -745,6 +779,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
new_children.push_back(new_pip);
edges_with_source.push_back(new_pip->id());
+ // 7. Set DAG for new pipelines.
if (!new_pip->children().empty()) {
std::vector<PipelineId> edges_with_sink;
for (auto child : new_pip->children()) {
@@ -773,6 +808,11 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::OLAP_SCAN_NODE: {
op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(),
descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ const bool shared_scan =
+ find_with_default(request.per_node_shared_scans,
op->node_id(), false);
+ if (shared_scan) {
+ cur_pipe->set_num_tasks(1);
+ }
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index c8b042bd39d..3719445babd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -129,6 +129,8 @@ private:
PipelinePtr cur_pipe, const std::vector<TExpr>&
texprs,
ExchangeType exchange_type, bool*
do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx);
+ void _inherit_pipeline_properties(ExchangeType exchange_type, PipelinePtr
pipe_with_source,
+ PipelinePtr pipe_with_sink);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
@@ -217,7 +219,6 @@ private:
int _operator_id = 0;
int _sink_operator_id = 0;
- int _num_instances = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>>
_op_id_to_le_state;
};
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 76e2c21d387..7657d82b7fa 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -62,6 +62,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t
task_id, RuntimeSta
for (auto& op : _operators) {
_source_dependency.insert({op->operator_id(),
op->get_dependency(state->get_query_ctx())});
}
+ pipeline->incr_created_tasks();
}
Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams& local_params,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index cd498ccf9bc..5188412bd3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1995,8 +1995,7 @@ public class Coordinator implements CoordInterface {
// 4. Disable shared scan optimization by session
variable
if (!enablePipelineEngine || (node.isPresent() &&
node.get().getShouldColoScan())
|| (node.isPresent() && node.get() instanceof
FileScanNode)
- || (node.isPresent() &&
node.get().shouldDisableSharedScan(context))
- || enablePipelineXEngine) {
+ || (node.isPresent() &&
node.get().shouldDisableSharedScan(context))) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than
the tablets num
@@ -3599,6 +3598,12 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
+ Map<Integer, List<TScanRangeParams>> scanRanges =
instanceExecParam.perNodeScanRanges;
+ Map<Integer, Boolean> perNodeSharedScans =
instanceExecParam.perNodeSharedScans;
+ if (scanRanges == null) {
+ scanRanges = Maps.newHashMap();
+ perNodeSharedScans = Maps.newHashMap();
+ }
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new
TPipelineFragmentParams();
@@ -3624,6 +3629,7 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
+ params.setPerNodeSharedScans(perNodeSharedScans);
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer,
Integer>());
instanceIdx.put(instanceExecParam.host, 0);
@@ -3641,12 +3647,6 @@ public class Coordinator implements CoordInterface {
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
- Map<Integer, List<TScanRangeParams>> scanRanges =
instanceExecParam.perNodeScanRanges;
- Map<Integer, Boolean> perNodeSharedScans =
instanceExecParam.perNodeSharedScans;
- if (scanRanges == null) {
- scanRanges = Maps.newHashMap();
- perNodeSharedScans = Maps.newHashMap();
- }
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index fb702f61133..b6b96eee95a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -691,6 +691,7 @@ struct TPipelineFragmentParams {
33: optional i32 num_local_sink
34: optional i32 num_buckets
35: optional map<i32, i32> bucket_seq_to_instance_idx
+ 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
}
struct TPipelineFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]