This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 320ddf4987c [pipelineX](improvement) Support multiple instances 
execution on single tablet (#28178)
320ddf4987c is described below

commit 320ddf4987c9fdb33f9ee7aced3881009681bd51
Author: Gabriel <[email protected]>
AuthorDate: Sun Dec 10 20:18:41 2023 +0800

    [pipelineX](improvement) Support multiple instances execution on single 
tablet (#28178)
---
 be/src/pipeline/exec/scan_operator.cpp             |   1 +
 be/src/pipeline/pipeline.h                         |  15 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      |   4 +-
 be/src/pipeline/pipeline_fragment_context.h        |   1 +
 .../pipeline_x/local_exchange/local_exchanger.h    |  10 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 110 ++++++++++++++-------
 .../pipeline_x/pipeline_x_fragment_context.h       |   3 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |   1 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  16 +--
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 10 files changed, 107 insertions(+), 55 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 96bda3cd0be..e1e17e0ccf6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1431,6 +1431,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     if (_closed) {
         return Status::OK();
     }
+    COUNTER_UPDATE(exec_time_counter(), 
_scan_dependency->watcher_elapse_time());
     SCOPED_TIMER(_close_timer);
 
     SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 68213fe9c27..5d623f899aa 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -50,8 +50,9 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
 
 public:
     Pipeline() = delete;
-    explicit Pipeline(PipelineId pipeline_id, 
std::weak_ptr<PipelineFragmentContext> context)
-            : _pipeline_id(pipeline_id), _context(context) {
+    explicit Pipeline(PipelineId pipeline_id, int num_tasks,
+                      std::weak_ptr<PipelineFragmentContext> context)
+            : _pipeline_id(pipeline_id), _context(context), 
_num_tasks(num_tasks) {
         _init_profile();
     }
 
@@ -138,6 +139,11 @@ public:
     void set_children(std::shared_ptr<Pipeline> child) { 
_children.push_back(child); }
     void set_children(std::vector<std::shared_ptr<Pipeline>> children) { 
_children = children; }
 
+    void incr_created_tasks() { _num_tasks_created++; }
+    bool need_to_create_task() const { return _num_tasks > _num_tasks_created; 
}
+    void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; }
+    int num_tasks() const { return _num_tasks; }
+
 private:
     void _init_profile();
 
@@ -199,6 +205,11 @@ private:
     // then set `_need_to_local_shuffle` to false which means we should use 
local shuffle in this fragment
     // because data already be partitioned by storage/shuffling.
     bool _need_to_local_shuffle = true;
+
+    // How many tasks should be created ?
+    int _num_tasks = 1;
+    // How many tasks are already created?
+    int _num_tasks_created = 0;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b800e167bcc..8bf884692df 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,7 +189,7 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
 PipelinePtr PipelineFragmentContext::add_pipeline() {
     // _prepared、_submitted, _canceled should do not add pipeline
     PipelineId id = _next_pipeline_id++;
-    auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+    auto pipeline = std::make_shared<Pipeline>(id, _num_instances, 
weak_from_this());
     _pipelines.emplace_back(pipeline);
     return pipeline;
 }
@@ -197,7 +197,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
 PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) 
{
     // _prepared、_submitted, _canceled should do not add pipeline
     PipelineId id = _next_pipeline_id++;
-    auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+    auto pipeline = std::make_shared<Pipeline>(id, _num_instances, 
weak_from_this());
     if (idx >= 0) {
         _pipelines.insert(_pipelines.begin() + idx, pipeline);
     } else {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index c8883248d42..a705230d2f4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -225,6 +225,7 @@ protected:
     report_status_callback _report_status_cb;
 
     DescriptorTbl* _desc_tbl = nullptr;
+    int _num_instances = 1;
 
 private:
     static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 3993289d5da..8c28469b50d 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -27,8 +27,6 @@ class LocalExchangeSinkLocalState;
 
 class Exchanger {
 public:
-    Exchanger(int num_partitions)
-            : _running_sink_operators(num_partitions), 
_num_partitions(num_partitions) {}
     Exchanger(int running_sink_operators, int num_partitions)
             : _running_sink_operators(running_sink_operators), 
_num_partitions(num_partitions) {}
     virtual ~Exchanger() = default;
@@ -56,9 +54,6 @@ class ShuffleExchanger : public Exchanger {
 
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
-    ShuffleExchanger(int num_partitions) : Exchanger(num_partitions) {
-        _data_queue.resize(num_partitions);
-    }
     ShuffleExchanger(int running_sink_operators, int num_partitions)
             : Exchanger(running_sink_operators, num_partitions) {
         _data_queue.resize(num_partitions);
@@ -90,8 +85,9 @@ class BucketShuffleExchanger : public ShuffleExchanger {
 class PassthroughExchanger final : public Exchanger {
 public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
-    PassthroughExchanger(int num_instances) : Exchanger(num_instances) {
-        _data_queue.resize(num_instances);
+    PassthroughExchanger(int running_sink_operators, int num_partitions)
+            : Exchanger(running_sink_operators, num_partitions) {
+        _data_queue.resize(num_partitions);
     }
     ~PassthroughExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 294b9fc109f..6a3f38d2c25 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -505,11 +505,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             return le_state_map;
         };
         for (auto& pipeline : _pipelines) {
-            auto task = std::make_unique<PipelineXTask>(
-                    pipeline, _total_tasks++, _runtime_states[i].get(), this,
-                    _runtime_states[i]->runtime_profile(), 
get_local_exchange_state(pipeline), i);
-            pipeline_id_to_task.insert({pipeline->id(), task.get()});
-            _tasks[i].emplace_back(std::move(task));
+            if (pipeline->need_to_create_task()) {
+                auto task = std::make_unique<PipelineXTask>(pipeline, 
_total_tasks++,
+                                                            
_runtime_states[i].get(), this,
+                                                            
_runtime_states[i]->runtime_profile(),
+                                                            
get_local_exchange_state(pipeline), i);
+                pipeline_id_to_task.insert({pipeline->id(), task.get()});
+                _tasks[i].emplace_back(std::move(task));
+            }
         }
 
         /**
@@ -554,18 +557,22 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         };
 
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
-            auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
-            DCHECK(task != nullptr);
-
-            // if this task has upstream dependency, then record them.
-            if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
-                auto& deps = _dag[_pipelines[pip_idx]->id()];
-                for (auto& dep : deps) {
-                    task->add_upstream_dependency(
-                            
pipeline_id_to_task[dep]->get_downstream_dependency());
+            if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
+                auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+                DCHECK(task != nullptr);
+
+                // if this task has upstream dependency, then record them.
+                if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
+                    auto& deps = _dag[_pipelines[pip_idx]->id()];
+                    for (auto& dep : deps) {
+                        if (pipeline_id_to_task.contains(dep)) {
+                            task->add_upstream_dependency(
+                                    
pipeline_id_to_task[dep]->get_downstream_dependency());
+                        }
+                    }
                 }
+                RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
             }
-            RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
         }
 
         {
@@ -653,6 +660,34 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
     return Status::OK();
 }
 
+void PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType 
exchange_type,
+                                                            PipelinePtr 
pipe_with_source,
+                                                            PipelinePtr 
pipe_with_sink) {
+    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
+    pipe_with_source->set_num_tasks(_num_instances);
+    switch (exchange_type) {
+    case ExchangeType::HASH_SHUFFLE:
+        // If HASH_SHUFFLE local exchanger is planned, data will be always 
HASH distribution so we
+        // do not need to plan another shuffle local exchange in the rest of 
current pipeline.
+        pipe_with_sink->set_need_to_local_shuffle(false);
+        pipe_with_source->set_need_to_local_shuffle(false);
+        break;
+    case ExchangeType::BUCKET_HASH_SHUFFLE:
+        // Same as ExchangeType::HASH_SHUFFLE.
+        pipe_with_sink->set_need_to_local_shuffle(false);
+        pipe_with_source->set_need_to_local_shuffle(false);
+        break;
+    case ExchangeType::PASSTHROUGH:
+        // If PASSTHROUGH local exchanger is planned, data will be split 
randomly. So we should make
+        // sure remaining operators should use local shuffle to make data 
distribution right.
+        
pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle());
+        pipe_with_source->set_need_to_local_shuffle(true);
+        break;
+    default:
+        __builtin_unreachable();
+    }
+}
+
 Status PipelineXFragmentContext::_add_local_exchange(
         int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr 
cur_pipe,
         const std::vector<TExpr>& texprs, ExchangeType exchange_type, bool* 
do_local_exchange,
@@ -674,59 +709,58 @@ Status PipelineXFragmentContext::_add_local_exchange(
     auto local_exchange_id = next_operator_id();
     // 1. Create a new pipeline with local exchange sink.
     auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
-
     DataSinkOperatorXPtr sink;
     sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), 
local_exchange_id,
                                               _num_instances, texprs, 
bucket_seq_to_instance_idx));
     RETURN_IF_ERROR(new_pip->set_sink(sink));
+    RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets));
+
+    // 2. Inherit properties from current pipeline.
+    _inherit_pipeline_properties(exchange_type, cur_pipe, new_pip);
 
+    // 3. Create and initialize LocalExchangeSharedState.
     auto shared_state = LocalExchangeSharedState::create_shared();
     shared_state->source_dependencies.resize(_num_instances, nullptr);
     switch (exchange_type) {
     case ExchangeType::HASH_SHUFFLE:
-        shared_state->exchanger = 
ShuffleExchanger::create_unique(_num_instances);
-        // If HASH_SHUFFLE local exchanger is planned, data will be always 
HASH distribution so we
-        // do not need to plan another shuffle local exchange in the rest of 
current pipeline.
-        new_pip->set_need_to_local_shuffle(false);
-        cur_pipe->set_need_to_local_shuffle(false);
+        shared_state->exchanger =
+                ShuffleExchanger::create_unique(new_pip->num_tasks(), 
_num_instances);
         break;
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger =
-                BucketShuffleExchanger::create_unique(_num_instances, 
num_buckets);
-        // Same as ExchangeType::HASH_SHUFFLE.
-        new_pip->set_need_to_local_shuffle(false);
-        cur_pipe->set_need_to_local_shuffle(false);
+                BucketShuffleExchanger::create_unique(new_pip->num_tasks(), 
num_buckets);
         break;
     case ExchangeType::PASSTHROUGH:
-        // If PASSTHROUGH local exchanger is planned, data will be split 
randomly. So we should make
-        // sure remaining operators should use local shuffle to make data 
distribution right.
-        shared_state->exchanger = 
PassthroughExchanger::create_unique(_num_instances);
-        new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle());
-        cur_pipe->set_need_to_local_shuffle(true);
+        shared_state->exchanger =
+                PassthroughExchanger::create_unique(new_pip->num_tasks(), 
_num_instances);
         break;
     default:
         return Status::InternalError("Unsupported local exchange type : " +
                                      std::to_string((int)exchange_type));
     }
-    RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets));
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
 
-    // 2. Initialize operators list.
+    // 4. Set two pipelines' operator list. For example, split pipeline [Scan 
- AggSink] to
+    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource 
- AggSink].
+
+    // 4.1 Initialize new pipeline's operator list.
     std::copy(operator_xs.begin(), operator_xs.begin() + idx,
               std::inserter(new_pip->operator_xs(), 
new_pip->operator_xs().end()));
 
-    // 3. Erase operators in new pipeline.
+    // 4.2 Erase unused operators in previous pipeline.
+    operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
+
+    // 5. Initialize LocalExchangeSource and insert it into this pipeline.
     OperatorXPtr source_op;
     source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
     RETURN_IF_ERROR(source_op->init(exchange_type));
-    operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
     if (operator_xs.size() > 0) {
         RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
     }
-
     operator_xs.insert(operator_xs.begin(), source_op);
     RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back()));
 
+    // 6. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;
     std::vector<PipelineId> edges_with_source;
     for (auto child : cur_pipe->children()) {
@@ -745,6 +779,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
     new_children.push_back(new_pip);
     edges_with_source.push_back(new_pip->id());
 
+    // 7. Set DAG for new pipelines.
     if (!new_pip->children().empty()) {
         std::vector<PipelineId> edges_with_sink;
         for (auto child : new_pip->children()) {
@@ -773,6 +808,11 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     case TPlanNodeType::OLAP_SCAN_NODE: {
         op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), 
descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        const bool shared_scan =
+                find_with_default(request.per_node_shared_scans, 
op->node_id(), false);
+        if (shared_scan) {
+            cur_pipe->set_num_tasks(1);
+        }
         break;
     }
     case doris::TPlanNodeType::JDBC_SCAN_NODE: {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index c8b042bd39d..3719445babd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -129,6 +129,8 @@ private:
                                PipelinePtr cur_pipe, const std::vector<TExpr>& 
texprs,
                                ExchangeType exchange_type, bool* 
do_local_exchange, int num_buckets,
                                const std::map<int, int>& 
bucket_seq_to_instance_idx);
+    void _inherit_pipeline_properties(ExchangeType exchange_type, PipelinePtr 
pipe_with_source,
+                                      PipelinePtr pipe_with_sink);
 
     [[nodiscard]] Status _build_pipelines(ObjectPool* pool,
                                           const 
doris::TPipelineFragmentParams& request,
@@ -217,7 +219,6 @@ private:
 
     int _operator_id = 0;
     int _sink_operator_id = 0;
-    int _num_instances = 0;
     std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> 
_op_id_to_le_state;
 };
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 76e2c21d387..7657d82b7fa 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -62,6 +62,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeSta
     for (auto& op : _operators) {
         _source_dependency.insert({op->operator_id(), 
op->get_dependency(state->get_query_ctx())});
     }
+    pipeline->incr_created_tasks();
 }
 
 Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams& local_params,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index cd498ccf9bc..5188412bd3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1995,8 +1995,7 @@ public class Coordinator implements CoordInterface {
                         // 4. Disable shared scan optimization by session 
variable
                         if (!enablePipelineEngine || (node.isPresent() && 
node.get().getShouldColoScan())
                                 || (node.isPresent() && node.get() instanceof 
FileScanNode)
-                                || (node.isPresent() && 
node.get().shouldDisableSharedScan(context))
-                                || enablePipelineXEngine) {
+                                || (node.isPresent() && 
node.get().shouldDisableSharedScan(context))) {
                             int expectedInstanceNum = 1;
                             if (parallelExecInstanceNum > 1) {
                                 //the scan instance num should not larger than 
the tablets num
@@ -3599,6 +3598,12 @@ public class Coordinator implements CoordInterface {
             Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
+                Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
+                Map<Integer, Boolean> perNodeSharedScans = 
instanceExecParam.perNodeSharedScans;
+                if (scanRanges == null) {
+                    scanRanges = Maps.newHashMap();
+                    perNodeSharedScans = Maps.newHashMap();
+                }
                 if (!res.containsKey(instanceExecParam.host)) {
                     TPipelineFragmentParams params = new 
TPipelineFragmentParams();
 
@@ -3624,6 +3629,7 @@ public class Coordinator implements CoordInterface {
 
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
+                    params.setPerNodeSharedScans(perNodeSharedScans);
                     res.put(instanceExecParam.host, params);
                     
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, 
Integer>());
                     instanceIdx.put(instanceExecParam.host, 0);
@@ -3641,12 +3647,6 @@ public class Coordinator implements CoordInterface {
 
                 
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
                 
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
-                Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
-                Map<Integer, Boolean> perNodeSharedScans = 
instanceExecParam.perNodeSharedScans;
-                if (scanRanges == null) {
-                    scanRanges = Maps.newHashMap();
-                    perNodeSharedScans = Maps.newHashMap();
-                }
                 localParams.setPerNodeScanRanges(scanRanges);
                 localParams.setPerNodeSharedScans(perNodeSharedScans);
                 localParams.setSenderId(i);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index fb702f61133..b6b96eee95a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -691,6 +691,7 @@ struct TPipelineFragmentParams {
   33: optional i32 num_local_sink
   34: optional i32 num_buckets
   35: optional map<i32, i32> bucket_seq_to_instance_idx
+  36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to