This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 30b41528955 [pipelineX](fix) Fix input data distribution for distinct 
streaming agg (#29980)
30b41528955 is described below

commit 30b41528955035102ab4a6b88597287700c54a66
Author: Gabriel <[email protected]>
AuthorDate: Mon Jan 15 21:46:31 2024 +0800

    [pipelineX](fix) Fix input data distribution for distinct streaming agg 
(#29980)
---
 .../distinct_streaming_aggregation_sink_operator.h |  5 +++++
 .../local_exchange/local_exchange_sink_operator.h  | 24 ++++++++++++---------
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 25 +++++++++++-----------
 be/src/pipeline/pipeline_x/operator.h              |  3 ++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  5 ++---
 .../pipeline_x/pipeline_x_fragment_context.h       |  1 +
 6 files changed, 37 insertions(+), 26 deletions(-)

diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index 6607516d6cb..900aa786634 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -112,6 +112,11 @@ public:
                 SourceState source_state) override;
 
     DataDistribution required_data_distribution() const override {
+        if (_needs_finalize) {
+            return _is_colocate
+                           ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
+                           : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
+        }
         return 
DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution();
     }
 };
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 7275e545205..7ed7d7d0e90 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -86,13 +86,11 @@ public:
     using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
     LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
                                const std::vector<TExpr>& texprs,
-                               const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                               const std::map<int, int>& 
shuffle_idx_to_instance_idx)
+                               const std::map<int, int>& 
bucket_seq_to_instance_idx)
             : Base(sink_id, dest_id, dest_id),
               _num_partitions(num_partitions),
               _texprs(texprs),
-              _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
-              _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}
+              _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
@@ -102,8 +100,8 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(ExchangeType type, const int num_buckets,
-                const bool is_shuffled_hash_join) override {
+    Status init(ExchangeType type, const int num_buckets, const bool 
is_shuffled_hash_join,
+                const std::map<int, int>& shuffle_idx_to_instance_idx) 
override {
         _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + 
get_exchange_type_name(type) + ")";
         _type = type;
         if (_type == ExchangeType::HASH_SHUFFLE) {
@@ -111,10 +109,16 @@ public:
             // should use a HASH_SHUFFLE local exchanger to shuffle data 
again. To be mentioned,
             // we should use map shuffle idx to instance idx because all 
instances will be
             // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
-            if (!is_shuffled_hash_join) {
-                _shuffle_idx_to_instance_idx.clear();
+            if (is_shuffled_hash_join) {
+                std::for_each(shuffle_idx_to_instance_idx.begin(),
+                              shuffle_idx_to_instance_idx.end(), [&](const 
auto& item) {
+                                  DCHECK(item.first != -1);
+                                  
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
+                              });
+            } else {
+                _shuffle_idx_to_instance_idx.resize(_num_partitions);
                 for (int i = 0; i < _num_partitions; i++) {
-                    _shuffle_idx_to_instance_idx.insert({i, i});
+                    _shuffle_idx_to_instance_idx[i] = {i, i};
                 }
             }
             _partitioner.reset(
@@ -156,7 +160,7 @@ private:
     const std::vector<TExpr>& _texprs;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
-    std::map<int, int> _shuffle_idx_to_instance_idx;
+    std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 900e31e6631..1f680fdd1d9 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -116,26 +116,26 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         return Status::OK();
     }
     
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
-    new_block_wrapper->ref(_num_partitions);
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
-        auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
-                           ._shuffle_idx_to_instance_idx;
-        for (size_t i = 0; i < _num_partitions; i++) {
-            DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " << 
_num_partitions
-                                    << " map.size(): " << map.size();
-            DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " " 
<< _num_partitions;
-            size_t start = local_state._partition_rows_histogram[i];
-            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+        const auto& map = 
local_state._parent->cast<LocalExchangeSinkOperatorX>()
+                                  ._shuffle_idx_to_instance_idx;
+        new_block_wrapper->ref(map.size());
+        for (const auto& it : map) {
+            DCHECK(it.second >= 0 && it.second < _num_partitions)
+                    << it.first << " : " << it.second << " " << 
_num_partitions;
+            size_t start = local_state._partition_rows_histogram[it.first];
+            size_t size = local_state._partition_rows_histogram[it.first + 1] 
- start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
-                        map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}});
-                local_state._shared_state->set_ready_to_read(map[i]);
+                        it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
+                data_queue[it.second].enqueue({new_block_wrapper, {row_idx, 
start, size}});
+                local_state._shared_state->set_ready_to_read(it.second);
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
         }
     } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
+        new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
             size_t start = local_state._partition_rows_histogram[i];
             size_t size = local_state._partition_rows_histogram[i + 1] - start;
@@ -149,6 +149,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             }
         }
     } else {
+        new_block_wrapper->ref(_num_partitions);
         auto map =
                 
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 6792ce35f36..d46dc859b0c 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -470,7 +470,8 @@ public:
 
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
-                                      const bool is_shuffled_hash_join) {
+                                      const bool is_shuffled_hash_join,
+                                      const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e2f1d9742b4..a44db667450 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -733,11 +733,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
                                                : 
cur_pipe->sink_x()->is_shuffled_hash_join();
     sink.reset(new LocalExchangeSinkOperatorX(
             sink_id, local_exchange_id, is_shuffled_hash_join ? 
_total_instances : _num_instances,
-            data_distribution.partition_exprs, bucket_seq_to_instance_idx,
-            shuffle_idx_to_instance_idx));
+            data_distribution.partition_exprs, bucket_seq_to_instance_idx));
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, 
num_buckets,
-                                            is_shuffled_hash_join));
+                                            is_shuffled_hash_join, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
     auto shared_state = LocalExchangeSharedState::create_shared();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 92178d359d9..439b0072d72 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -244,6 +244,7 @@ private:
 
     std::vector<std::unique_ptr<RuntimeFilterParamsContext>> 
_runtime_filter_states;
 
+    // Total instance num running on all BEs
     int _total_instances = -1;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to