This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 30b41528955 [pipelineX](fix) Fix input data distribution for distinct
streaming agg (#29980)
30b41528955 is described below
commit 30b41528955035102ab4a6b88597287700c54a66
Author: Gabriel <[email protected]>
AuthorDate: Mon Jan 15 21:46:31 2024 +0800
[pipelineX](fix) Fix input data distribution for distinct streaming agg
(#29980)
---
.../distinct_streaming_aggregation_sink_operator.h | 5 +++++
.../local_exchange/local_exchange_sink_operator.h | 24 ++++++++++++---------
.../pipeline_x/local_exchange/local_exchanger.cpp | 25 +++++++++++-----------
be/src/pipeline/pipeline_x/operator.h | 3 ++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 5 ++---
.../pipeline_x/pipeline_x_fragment_context.h | 1 +
6 files changed, 37 insertions(+), 26 deletions(-)
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index 6607516d6cb..900aa786634 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -112,6 +112,11 @@ public:
SourceState source_state) override;
DataDistribution required_data_distribution() const override {
+ if (_needs_finalize) {
+ return _is_colocate
+ ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
+ : DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
+ }
return
DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution();
}
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 7275e545205..7ed7d7d0e90 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -86,13 +86,11 @@ public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
const std::vector<TExpr>& texprs,
- const std::map<int, int>&
bucket_seq_to_instance_idx,
- const std::map<int, int>&
shuffle_idx_to_instance_idx)
+ const std::map<int, int>&
bucket_seq_to_instance_idx)
: Base(sink_id, dest_id, dest_id),
_num_partitions(num_partitions),
_texprs(texprs),
- _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
- _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}
+ _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
@@ -102,8 +100,8 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(ExchangeType type, const int num_buckets,
- const bool is_shuffled_hash_join) override {
+ Status init(ExchangeType type, const int num_buckets, const bool
is_shuffled_hash_join,
+ const std::map<int, int>& shuffle_idx_to_instance_idx)
override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" +
get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
@@ -111,10 +109,16 @@ public:
// should use a HASH_SHUFFLE local exchanger to shuffle data
again. To be mentioned,
// we should use map shuffle idx to instance idx because all
instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (!is_shuffled_hash_join) {
- _shuffle_idx_to_instance_idx.clear();
+ if (is_shuffled_hash_join) {
+ std::for_each(shuffle_idx_to_instance_idx.begin(),
+ shuffle_idx_to_instance_idx.end(), [&](const
auto& item) {
+ DCHECK(item.first != -1);
+
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
+ });
+ } else {
+ _shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
- _shuffle_idx_to_instance_idx.insert({i, i});
+ _shuffle_idx_to_instance_idx[i] = {i, i};
}
}
_partitioner.reset(
@@ -156,7 +160,7 @@ private:
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
- std::map<int, int> _shuffle_idx_to_instance_idx;
+ std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 900e31e6631..1f680fdd1d9 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -116,26 +116,26 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
return Status::OK();
}
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
- new_block_wrapper->ref(_num_partitions);
if (get_type() == ExchangeType::HASH_SHUFFLE) {
- auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
- ._shuffle_idx_to_instance_idx;
- for (size_t i = 0; i < _num_partitions; i++) {
- DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " <<
_num_partitions
- << " map.size(): " << map.size();
- DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " "
<< _num_partitions;
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ const auto& map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()
+ ._shuffle_idx_to_instance_idx;
+ new_block_wrapper->ref(map.size());
+ for (const auto& it : map) {
+ DCHECK(it.second >= 0 && it.second < _num_partitions)
+ << it.first << " : " << it.second << " " <<
_num_partitions;
+ size_t start = local_state._partition_rows_histogram[it.first];
+ size_t size = local_state._partition_rows_histogram[it.first + 1]
- start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
- map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}});
- local_state._shared_state->set_ready_to_read(map[i]);
+ it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
+ data_queue[it.second].enqueue({new_block_wrapper, {row_idx,
start, size}});
+ local_state._shared_state->set_ready_to_read(it.second);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
}
} else if (_num_senders != _num_sources ||
_ignore_source_data_distribution) {
+ new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
@@ -149,6 +149,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
}
}
} else {
+ new_block_wrapper->ref(_num_partitions);
auto map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 6792ce35f36..d46dc859b0c 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -470,7 +470,8 @@ public:
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
- const bool is_shuffled_hash_join) {
+ const bool is_shuffled_hash_join,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e2f1d9742b4..a44db667450 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -733,11 +733,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
:
cur_pipe->sink_x()->is_shuffled_hash_join();
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id, is_shuffled_hash_join ?
_total_instances : _num_instances,
- data_distribution.partition_exprs, bucket_seq_to_instance_idx,
- shuffle_idx_to_instance_idx));
+ data_distribution.partition_exprs, bucket_seq_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets,
- is_shuffled_hash_join));
+ is_shuffled_hash_join,
shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 92178d359d9..439b0072d72 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -244,6 +244,7 @@ private:
std::vector<std::unique_ptr<RuntimeFilterParamsContext>>
_runtime_filter_states;
+ // Total instance num running on all BEs
int _total_instances = -1;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]