This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new dea016321ec [fix](coordinator) Fix wrong `recvrId` in fragment
contains BHJ (#47728)
dea016321ec is described below
commit dea016321ec92ba63451a6307c134daa47c6df82
Author: Gabriel <[email protected]>
AuthorDate: Tue Feb 11 10:44:48 2025 +0800
[fix](coordinator) Fix wrong `recvrId` in fragment contains BHJ (#47728)
Pick #47727
In Coordinator, a shuffle map consists of `recvrId` in each instance.
For example, if 3 BEs exist in a cluster, for a shuffled hash join, we
get 3 maps for a fragment sent to each BE:
BE0: {0:0, 1:1}
BE1: {2:0, 3:1}
BE2: {6:0, 5:1}
In this example, parallelism is 2. Keys in shuffle map indicate the
global shuffle id and the values indicate the instance id in current BE.
In Coordinator, the `recvrId` is the global shuffle id of each instance
so we may get a wrong result if it is wrong.
This bug is caused by `recvrId` set by a BHJ fragment. If a fragment
contains both BHJ and SHJ, `recvrId` should be set by SHJ and BHJ should
be ignored.
---
be/src/pipeline/local_exchange/local_exchanger.cpp | 14 ++++++++++++++
.../src/main/java/org/apache/doris/qe/Coordinator.java | 2 --
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 3ec4f537e47..09fdf768af7 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -205,6 +205,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
local_state._channel_id);
auto bucket_seq_to_instance_idx =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
+ int32_t enqueue_rows = 0;
if (get_type() == ExchangeType::HASH_SHUFFLE) {
/**
* If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed
to all instances of
@@ -221,12 +222,25 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[it.first];
uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
if (size > 0) {
+ enqueue_rows += size;
_enqueue_data_and_set_ready(it.second, local_state,
{new_block_wrapper, {row_idx,
start, size}});
} else {
new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
}
}
+ if (enqueue_rows != rows) [[unlikely]] {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id:
{}, Shuffled Map: ",
+ get_exchange_type_name(get_type()),
local_state.parent()->node_id());
+ for (const auto& it : map) {
+ fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first,
it.second);
+ }
+ return Status::InternalError(
+ "Rows mismatched! Data may be lost. [Expected enqueue
rows={}, Real enqueue "
+ "rows={}, Detail: {}]",
+ rows, enqueue_rows, fmt::to_string(debug_string_buffer));
+ }
} else {
DCHECK(!bucket_seq_to_instance_idx.empty());
new_block_wrapper->ref(_num_partitions);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 774e4efa432..58631524024 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1447,7 +1447,6 @@ public class Coordinator implements CoordInterface {
} else {
destHosts.put(param.host, param);
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
- param.recvrId = params.destinations.size();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
@@ -1593,7 +1592,6 @@ public class Coordinator implements CoordInterface {
destHosts.put(param.host, param);
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
- param.recvrId = params.destinations.size();
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]