This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5acd1279a9c [fix](2.1) Fix correctness in branch-2.1 (#39901)
5acd1279a9c is described below
commit 5acd1279a9cda5e1e820b7418562be94b6ca08f2
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 26 14:12:59 2024 +0800
[fix](2.1) Fix correctness in branch-2.1 (#39901)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
.../pipeline_x/local_exchange/local_exchanger.cpp | 22 ++++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 194b81c7bee..1353e832e24 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -183,8 +183,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[i];
uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
- _enqueue_data_and_set_ready(i % _num_sources, local_state,
- {new_block_wrapper, {row_idx,
start, size}});
+ local_state._shared_state->add_mem_usage(
+ i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
+ if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
+ {new_block_wrapper, {row_idx,
start, size}})) {
+ local_state._shared_state->sub_mem_usage(
+ i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(),
+ false);
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -204,8 +211,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[it.first];
uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
if (size > 0) {
- _enqueue_data_and_set_ready(it.second, local_state,
- {new_block_wrapper, {row_idx,
start, size}});
+ local_state._shared_state->add_mem_usage(
+ it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
+
+ if (!_enqueue_data_and_set_ready(it.second, local_state,
+ {new_block_wrapper, {row_idx,
start, size}})) {
+ local_state._shared_state->sub_mem_usage(
+ it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]