This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1738a56dc9d [UT](exchanger) Add UT case to test local merge exchanger
(#47787)
1738a56dc9d is described below
commit 1738a56dc9d316431b63b8d7ab0134c722128533
Author: Gabriel <[email protected]>
AuthorDate: Wed Feb 12 10:15:32 2025 +0800
[UT](exchanger) Add UT case to test local merge exchanger (#47787)
---
be/src/pipeline/exec/operator.h | 6 +
.../local_exchange/local_exchange_sink_operator.h | 9 +
.../local_exchange_source_operator.h | 3 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 1 -
be/src/pipeline/local_exchange/local_exchanger.h | 5 +-
be/src/vec/core/sort_cursor.h | 21 +-
be/test/pipeline/local_exchanger_test.cpp | 284 +++++++++++++++++++++
7 files changed, 313 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e0f11049415..241fd42b5c7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -435,6 +435,9 @@ public:
DataSinkOperatorXBase(const int operator_id, const int node_id,
std::vector<int>& sources)
: OperatorBase(), _operator_id(operator_id), _node_id(node_id),
_dests_id(sources) {}
+#ifdef BE_TEST
+ DataSinkOperatorXBase() : _operator_id(-1), _node_id(0), _dests_id({-1})
{};
+#endif
~DataSinkOperatorXBase() override = default;
@@ -537,6 +540,9 @@ public:
DataSinkOperatorX(const int id, const int node_id, std::vector<int>
sources)
: DataSinkOperatorXBase(id, node_id, sources) {}
+#ifdef BE_TEST
+ DataSinkOperatorX() = default;
+#endif
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 7ef053af6f7..1620a68d33e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -92,6 +92,15 @@ public:
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
+#ifdef BE_TEST
+ LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
+ const std::map<int, int>&
bucket_seq_to_instance_idx)
+ : Base(),
+ _num_partitions(0),
+ _texprs(texprs),
+ _partitioned_exprs_num(texprs.size()),
+ _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
+#endif
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 3c706d50182..e0d1715afb1 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -65,6 +65,9 @@ class LocalExchangeSourceOperatorX final : public
OperatorX<LocalExchangeSourceL
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id,
id) {}
+#ifdef BE_TEST
+ LocalExchangeSourceOperatorX() = default;
+#endif
Status init(ExchangeType type) override {
_op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type)
+ ")";
_exchange_type = type;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 0fb4db625a5..1fc47b2f55d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -19,7 +19,6 @@
#include "common/cast_set.h"
#include "common/status.h"
-#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "vec/runtime/partitioner.h"
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 7f87289e413..25f090e4208 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -87,7 +87,10 @@ public:
~BlockWrapper() {
if (_shared_state != nullptr) {
DCHECK_GT(_allocated_bytes, 0);
- _shared_state->sub_total_mem_usage(_allocated_bytes,
_channel_ids.front());
+ // `_channel_ids` may be empty if exchanger is shuffled
exchanger and channel id is
+ // not used by `sub_total_mem_usage`. So we just pass -1 here.
+ _shared_state->sub_total_mem_usage(
+ _allocated_bytes, _channel_ids.empty() ? -1 :
_channel_ids.front());
if (_shared_state->exchanger->_free_block_limit == 0 ||
_shared_state->exchanger->_free_blocks.size_approx() <
_shared_state->exchanger->_free_block_limit *
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index a37b6feb21e..ee4ce22f42d 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -205,23 +205,16 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
return false;
}
block->clear();
- Status status;
do {
- status = _block_supplier(block.get(), &_is_eof);
- } while (block->empty() && !_is_eof && status.ok());
- // If status not ok, upper callers could not detect whether it is eof
or error.
- // So that fatal here, and should throw exception in the future.
- if (status.ok() && !block->empty()) {
- if (!_ordering_expr.empty()) {
- for (int i = 0; status.ok() && i < desc.size(); ++i) {
- // TODO yiguolei: throw exception if status not ok in the
future
- status = _ordering_expr[i]->execute(block.get(),
&desc[i].column_number);
- }
+ THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
+ } while (block->empty() && !_is_eof);
+ if (!block->empty()) {
+ DCHECK_EQ(_ordering_expr.size(), desc.size());
+ for (int i = 0; i < desc.size(); ++i) {
+ THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(),
&desc[i].column_number));
}
MergeSortCursorImpl::reset();
- return status.ok();
- } else if (!status.ok()) {
- throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
status.msg());
+ return true;
}
return false;
}
diff --git a/be/test/pipeline/local_exchanger_test.cpp
b/be/test/pipeline/local_exchanger_test.cpp
index c9a10eb32b5..b0e91f0018f 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -1098,4 +1098,288 @@ TEST_F(LocalExchangerTest,
AdaptivePassthroughExchanger) {
}
}
+TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) {
+ int num_sink = 1;
+ int num_sources = 4;
+ int num_partitions = 4;
+ int free_block_limit = 0;
+ std::map<int, int> shuffle_idx_to_instance_idx;
+ for (int i = 0; i < num_partitions; i++) {
+ shuffle_idx_to_instance_idx[i] = i;
+ }
+ // Wrong map lost (0 -> 0) mapping
+ std::map<int, int> wrong_shuffle_idx_to_instance_idx;
+ for (int i = 1; i < num_partitions; i++) {
+ wrong_shuffle_idx_to_instance_idx[i] = i;
+ }
+
+ std::vector<std::pair<std::vector<uint32_t>, int>> hash_vals_and_value;
+ std::vector<std::unique_ptr<LocalExchangeSinkLocalState>>
_sink_local_states;
+ std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+ _sink_local_states.resize(num_sink);
+ _local_states.resize(num_sources);
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto shared_state =
LocalExchangeSharedState::create_shared(num_partitions);
+ shared_state->exchanger = ShuffleExchanger::create_unique(num_sink,
num_sources, num_partitions,
+
free_block_limit);
+ auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_deps.push_back(sink_dep);
+ shared_state->create_dependencies(0);
+
+ auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+ auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(TTypeNodeBuilder()
+
.set_type(TTypeNodeType::SCALAR)
+
.set_scalar_type(TPrimitiveType::INT)
+ .build())
+ .build(),
+ 0)
+ .set_slot_ref(TSlotRefBuilder(0, 0).build())
+ .build();
+ std::vector<TExpr> texprs;
+ texprs.push_back(TExpr {});
+ for (size_t i = 0; i < num_sink; i++) {
+ auto compute_hash_value_timer =
+ ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+ auto distribute_timer = ADD_TIMER(profile, "distribute_timer" +
std::to_string(i));
+ _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr,
nullptr));
+ _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+ _sink_local_states[i]->_compute_hash_value_timer =
compute_hash_value_timer;
+ _sink_local_states[i]->_distribute_timer = distribute_timer;
+ _sink_local_states[i]->_partitioner.reset(
+ new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+ num_partitions));
+ auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+ slot->_column_id = 0;
+
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)_sink_local_states[i]
+ ->_partitioner.get())
+ ->_partition_expr_ctxs.push_back(
+
std::make_shared<doris::vectorized::VExprContext>(slot));
+ _sink_local_states[i]->_channel_id = i;
+ _sink_local_states[i]->_shared_state = shared_state.get();
+ _sink_local_states[i]->_dependency = sink_dep.get();
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ auto get_block_failed_counter =
+ ADD_TIMER(profile, "_get_block_failed_counter" +
std::to_string(i));
+ auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" +
std::to_string(i));
+ _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr,
nullptr));
+ _local_states[i]->_exchanger = shared_state->exchanger.get();
+ _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+ _local_states[i]->_copy_data_timer = copy_data_timer;
+ _local_states[i]->_channel_id = i;
+ _local_states[i]->_shared_state = shared_state.get();
+ _local_states[i]->_dependency =
shared_state->get_dep_by_channel_id(i).front().get();
+ _local_states[i]->_memory_used_counter =
profile->AddHighWaterMarkCounter(
+ "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+ shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+ }
+ const auto num_blocks = 1;
+ {
+ for (size_t i = 0; i < num_partitions; i++) {
+ hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+ for (size_t j = 0; j < num_blocks; j++) {
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(hash_vals_and_value.back().second,
10);
+
+ auto pre_size = hash_vals_and_value.back().first.size();
+ hash_vals_and_value.back().first.resize(pre_size + 10);
+ std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+ hash_vals_and_value.back().first.end(), 0);
+
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() +
pre_size,
+ PrimitiveType::TYPE_INT,
+
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+ in_block.insert({std::move(int_col0), int_type,
"test_int_col0"});
+ }
+ }
+ }
+ {
+ // Enqueue 2 blocks with 10 rows for each data queue.
+ for (size_t i = 0; i < num_partitions; i++) {
+ hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+ for (size_t j = 0; j < num_blocks; j++) {
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(hash_vals_and_value[i].second, 10);
+ in_block.insert({std::move(int_col0), int_type,
"test_int_col0"});
+ bool in_eos = false;
+ EXPECT_EQ(exchanger->sink(
+ _runtime_state.get(), &in_block, in_eos,
+
{_sink_local_states[0]->_compute_hash_value_timer,
+ _sink_local_states[0]->_distribute_timer,
nullptr},
+ {&_sink_local_states[0]->_channel_id,
+ _sink_local_states[0]->_partitioner.get(),
+ _sink_local_states[0].get(),
&shuffle_idx_to_instance_idx}),
+ Status::OK());
+ }
+ }
+ }
+ {
+ for (size_t i = 0; i < num_sources; i++) {
+ EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 1);
+ }
+ }
+
+ {
+ LocalExchangeSinkOperatorX op(texprs,
wrong_shuffle_idx_to_instance_idx);
+ _sink_local_states[0]->_parent = &op;
+ EXPECT_EQ(hash_vals_and_value[0].first.front() % num_partitions, 0);
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(hash_vals_and_value[0].second, 10);
+ in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+ bool in_eos = false;
+ EXPECT_TRUE(
+ exchanger
+ ->sink(_runtime_state.get(), &in_block, in_eos,
+
{_sink_local_states[0]->_compute_hash_value_timer,
+ _sink_local_states[0]->_distribute_timer,
nullptr},
+ {&_sink_local_states[0]->_channel_id,
+ _sink_local_states[0]->_partitioner.get(),
+ _sink_local_states[0].get(),
&wrong_shuffle_idx_to_instance_idx})
+ .is<ErrorCode::INTERNAL_ERROR>());
+ }
+}
+
+TEST_F(LocalExchangerTest, LocalMergeSortExchanger) {
+ int num_sink = 4;
+ int num_sources = 4;
+ int num_partitions = 4;
+ int free_block_limit = 0;
+ const auto expect_block_bytes = 128;
+ const auto num_blocks = 2;
+ std::vector<bool> is_asc_order;
+ std::vector<bool> nulls_first;
+ vectorized::VExprContextSPtrs ordering_expr_ctxs;
+ auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(TTypeNodeBuilder()
+
.set_type(TTypeNodeType::SCALAR)
+
.set_scalar_type(TPrimitiveType::INT)
+ .build())
+ .build(),
+ 0)
+ .set_slot_ref(TSlotRefBuilder(0, 0).build())
+ .build();
+ auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+ slot->_column_id = 0;
+ slot->_prepare_finished = true;
+
ordering_expr_ctxs.push_back(std::make_shared<doris::vectorized::VExprContext>(slot));
+ is_asc_order.push_back(true);
+ nulls_first.push_back(true);
+ ordering_expr_ctxs[0]->_prepared = true;
+ ordering_expr_ctxs[0]->_opened = true;
+
+ std::vector<std::unique_ptr<LocalExchangeSinkLocalState>>
_sink_local_states;
+ std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+ _sink_local_states.resize(num_sink);
+ _local_states.resize(num_sources);
+ config::local_exchange_buffer_mem_limit = expect_block_bytes *
num_partitions;
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto shared_state =
LocalMergeExchangeSharedState::create_shared(num_partitions);
+ shared_state->exchanger = LocalMergeSortExchanger::create_unique(
+ LocalMergeSortExchanger::MergeInfo {is_asc_order, nulls_first, -1,
0,
+ ordering_expr_ctxs},
+ num_sink, num_partitions, free_block_limit);
+ auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_deps.push_back(sink_dep);
+ shared_state->create_dependencies(0);
+ auto& sink_deps = shared_state->sink_deps;
+ EXPECT_EQ(sink_deps.size(), num_sink);
+
+ auto* exchanger = (LocalMergeSortExchanger*)shared_state->exchanger.get();
+ for (size_t i = 0; i < num_sink; i++) {
+ auto compute_hash_value_timer =
+ ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+ auto distribute_timer = ADD_TIMER(profile, "distribute_timer" +
std::to_string(i));
+ _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr,
nullptr));
+ _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+ _sink_local_states[i]->_compute_hash_value_timer =
compute_hash_value_timer;
+ _sink_local_states[i]->_distribute_timer = distribute_timer;
+ _sink_local_states[i]->_channel_id = i;
+ _sink_local_states[i]->_shared_state = shared_state.get();
+ _sink_local_states[i]->_dependency = sink_deps[i].get();
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ auto get_block_failed_counter =
+ ADD_TIMER(profile, "_get_block_failed_counter" +
std::to_string(i));
+ auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" +
std::to_string(i));
+ _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr,
nullptr));
+ _local_states[i]->_runtime_profile =
+ std::make_unique<RuntimeProfile>("source_profile " +
std::to_string(i));
+ _local_states[i]->_exchanger = shared_state->exchanger.get();
+ _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+ _local_states[i]->_copy_data_timer = copy_data_timer;
+ _local_states[i]->_channel_id = i;
+ _local_states[i]->_shared_state = shared_state.get();
+ _local_states[i]->_dependency =
shared_state->get_dep_by_channel_id(i).front().get();
+ _local_states[i]->_memory_used_counter =
profile->AddHighWaterMarkCounter(
+ "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+ shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+ }
+ {
+ // Enqueue 2 blocks with 10 rows for each data queue.
+ for (size_t i = 0; i < num_partitions; i++) {
+ for (size_t j = 0; j < num_blocks; j++) {
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(i, 10);
+
+ in_block.insert({std::move(int_col0), int_type,
"test_int_col0"});
+ EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes());
+ bool in_eos = j == num_blocks - 1;
+ EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block,
in_eos,
+
{_sink_local_states[i]->_compute_hash_value_timer,
+
_sink_local_states[i]->_distribute_timer, nullptr},
+ {&_sink_local_states[i]->_channel_id,
+
_sink_local_states[i]->_partitioner.get(),
+ _sink_local_states[i].get(),
nullptr}),
+ Status::OK());
+ EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), j == 0);
+ EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+ }
+ }
+ }
+ for (size_t i = 0; i < num_sink; i++) {
+ shared_state->sub_running_sink_operators();
+ }
+
+ {
+ for (size_t i = 0; i < num_sources; i++) {
+ EXPECT_EQ(shared_state->mem_counters[i]->value(),
shared_state->_queues_mem_usage[i]);
+ EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+ }
+ // Dequeue from data queue and accumulate rows if rows is smaller than
batch_size.
+ EXPECT_EQ(exchanger->_merger, nullptr);
+ for (size_t i = 0; i < num_sources; i++) {
+ for (size_t j = 0; j < num_sink * num_blocks + 1; j++) {
+ bool eos = false;
+ vectorized::Block block;
+ EXPECT_EQ(
+ exchanger->get_block(_runtime_state.get(), &block,
&eos,
+ {nullptr, nullptr,
_local_states[i]->_copy_data_timer},
+
{cast_set<int>(_local_states[i]->_channel_id),
+ _local_states[i].get()}),
+ Status::OK());
+ EXPECT_TRUE(exchanger->_merger != nullptr);
+ EXPECT_EQ(block.rows(), i > 0 || j == num_sink * num_blocks ?
0 : 10);
+ EXPECT_EQ(eos, i > 0 || j == num_sink * num_blocks);
+ EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+ }
+ }
+ }
+
+ for (size_t i = 0; i < num_sink; i++) {
+ EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true);
+ }
+}
+
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]