github-actions[bot] commented on code in PR #28428:
URL: https://github.com/apache/doris/pull/28428#discussion_r1427486787
##########
be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp:
##########
@@ -239,4 +239,200 @@ Status BroadcastExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
+Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
+ vectorized::Block*
in_block,
+ SourceState
source_state,
+
LocalExchangeSinkLocalState& local_state) {
+ vectorized::Block new_block;
+ if (!_free_blocks.try_dequeue(new_block)) {
+ new_block = {in_block->clone_empty()};
+ }
+ new_block.swap(*in_block);
+ auto channel_id = (local_state._channel_id++) % _num_partitions;
+ local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ _passthrough_data_queue[channel_id].enqueue(std::move(new_block));
+ local_state._shared_state->set_ready_to_read(channel_id);
+
+ return Status::OK();
+}
+
+bool AdaptivePassthroughExchanger::_passthrough_get_block(
Review Comment:
warning: method '_passthrough_get_block' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h:179:
```diff
- bool _passthrough_get_block(RuntimeState* state, vectorized::Block*
block,
+ static bool _passthrough_get_block(RuntimeState* state,
vectorized::Block* block,
```
##########
be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp:
##########
@@ -239,4 +239,200 @@
return Status::OK();
}
+Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
+ vectorized::Block*
in_block,
+ SourceState
source_state,
+
LocalExchangeSinkLocalState& local_state) {
+ vectorized::Block new_block;
+ if (!_free_blocks.try_dequeue(new_block)) {
+ new_block = {in_block->clone_empty()};
+ }
+ new_block.swap(*in_block);
+ auto channel_id = (local_state._channel_id++) % _num_partitions;
+ local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ _passthrough_data_queue[channel_id].enqueue(std::move(new_block));
+ local_state._shared_state->set_ready_to_read(channel_id);
+
+ return Status::OK();
+}
+
+bool AdaptivePassthroughExchanger::_passthrough_get_block(
+ RuntimeState* state, vectorized::Block* block, SourceState&
source_state,
+ LocalExchangeSourceLocalState& local_state) {
+ vectorized::Block next_block;
+ if (_running_sink_operators == 0) {
+ if
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ _free_blocks.enqueue(std::move(next_block));
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
+ block->allocated_bytes());
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ source_state = SourceState::FINISHED;
+ return false;
+ }
+ } else if
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ _free_blocks.enqueue(std::move(next_block));
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ return false;
+ }
+ return true;
+}
+
+Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state,
vectorized::Block* block,
+ SourceState source_state,
+
LocalExchangeSinkLocalState& local_state) {
+ std::vector<uint32_t> channel_ids;
+ const auto num_rows = block->rows();
+ channel_ids.resize(num_rows, 0);
+ if (num_rows <= _num_partitions) {
+ std::iota(channel_ids.begin(), channel_ids.end(), 0);
+ } else {
+ size_t i = 0;
+ for (; i < num_rows - _num_partitions; i += _num_partitions) {
+ std::iota(channel_ids.begin() + i, channel_ids.begin() + i +
_num_partitions, 0);
+ }
+ if (i < num_rows - 1) {
+ std::iota(channel_ids.begin() + i, channel_ids.end(), 0);
+ }
+ }
+ return _split_rows(state, channel_ids.data(), block, source_state,
local_state);
+}
+
+Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
+ const uint32_t* __restrict
channel_ids,
+ vectorized::Block* block,
SourceState source_state,
+ LocalExchangeSinkLocalState&
local_state) {
+ auto& data_queue = _shuffle_data_queue;
+ const auto rows = block->rows();
+ auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+ {
+ local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
+ for (size_t i = 0; i < rows; ++i) {
+ local_state._partition_rows_histogram[channel_ids[i]]++;
+ }
+ for (int32_t i = 1; i <= _num_partitions; ++i) {
+ local_state._partition_rows_histogram[i] +=
+ local_state._partition_rows_histogram[i - 1];
+ }
+
+ for (int32_t i = rows - 1; i >= 0; --i) {
+ (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] -
1] = i;
+ local_state._partition_rows_histogram[channel_ids[i]]--;
+ }
+ }
+
+ vectorized::Block data_block;
+ std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
+ if (_free_blocks.try_enqueue(data_block)) {
+ new_block_wrapper =
ShuffleBlockWrapper::create_shared(std::move(data_block));
+ } else {
+ new_block_wrapper =
ShuffleBlockWrapper::create_shared(block->clone_empty());
+ }
+
+ new_block_wrapper->data_block.swap(*block);
+ if (new_block_wrapper->data_block.empty()) {
+ return Status::OK();
+ }
+
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
+ new_block_wrapper->ref(_num_partitions);
+
+ for (size_t i = 0; i < _num_partitions; i++) {
+ size_t start = local_state._partition_rows_histogram[i];
+ size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ if (size > 0) {
+ local_state._shared_state->add_mem_usage(
+ i, new_block_wrapper->data_block.allocated_bytes(), false);
+ data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
+ local_state._shared_state->set_ready_to_read(i);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
+ }
+
+ return Status::OK();
+}
+bool AdaptivePassthroughExchanger::_shuffle_get_block(RuntimeState* state,
vectorized::Block* block,
Review Comment:
warning: method '_shuffle_get_block' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h:182:
```diff
- bool _shuffle_get_block(RuntimeState* state, vectorized::Block* block,
+ static bool _shuffle_get_block(RuntimeState* state, vectorized::Block*
block,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]