github-actions[bot] commented on code in PR #30914:
URL: https://github.com/apache/doris/pull/30914#discussion_r1499154258
##########
be/src/vec/sink/vdata_stream_sender.cpp:
##########
@@ -503,6 +541,25 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState*
state, ChannelPtrType
static_cast<void>(channel->close(state, Status::OK()));
}
+Status VDataStreamSender::_send_new_partition_batch() {
+ if (_row_distribution.need_deal_batching()) { // maybe try_close more than
1 time
+ RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+ Block tmp_block = _row_distribution._batching_block->to_block(); //
Borrow out, for lval ref
+
+ // these order is only.
+ // 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
+ // 2. deal batched block
+ // 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
+ _row_distribution.clear_batching_stats();
+ RETURN_IF_ERROR(this->send(_state, &tmp_block, false));
+ // Recovery back
+
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
+ _row_distribution._batching_block->clear_column_data();
+ _row_distribution._deal_batched = false;
+ }
+ return Status::OK();
+}
+
Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
Review Comment:
warning: function 'send' has cognitive complexity of 184 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
^
```
<details>
<summary>Additional context</summary>
**be/src/vec/sink/vdata_stream_sender.cpp:574:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
if (all_receiver_eof) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:578:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() ==
1) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:582:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (_only_local_exchange) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:583:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (!block->empty()) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:592:** +1, nesting level increased
to 2
```cpp
} else if (_enable_pipeline_exec) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:594:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:594:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:598:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_serializer.next_serialized_block(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:598:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_serializer.next_serialized_block(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:600:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (serialized) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:602:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
if (!cur_block.empty()) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:603:** +5, including nesting
penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(_serializer.serialize_block(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:603:** +6, including nesting
penalty of 5, nesting level increased to 6
```cpp
RETURN_IF_ERROR(_serializer.serialize_block(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:605:** +1, nesting level increased
to 4
```cpp
} else {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:624:** +1, nesting level increased
to 2
```cpp
} else {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:627:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_serializer.next_serialized_block(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:627:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_serializer.next_serialized_block(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:629:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (serialized) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:631:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
if (!cur_block.empty()) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:632:** +5, including nesting
penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(_serializer.serialize_block(&cur_block,
_cur_pb_block,
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:632:** +6, including nesting
penalty of 5, nesting level increased to 6
```cpp
RETURN_IF_ERROR(_serializer.serialize_block(&cur_block,
_cur_pb_block,
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:652:** +1, nesting level increased
to 1
```cpp
} else if (_part_type == TPartitionType::RANDOM) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:655:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (!current_channel->is_receiver_eof()) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:657:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (current_channel->is_local()) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:659:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:419:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:659:** +5, including nesting
penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:420:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:659:** +1, nesting level increased
to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:422:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:659:** +6, including nesting
penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:423:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:659:** +7, including nesting
penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:423:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:660:** +1, nesting level increased
to 3
```cpp
} else {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:662:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:662:** +5, including nesting
penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:666:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:419:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:666:** +5, including nesting
penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:420:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:666:** +1, nesting level increased
to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:422:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:666:** +6, including nesting
penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:423:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:666:** +7, including nesting
penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:423:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:671:** +1, nesting level increased
to 1
```cpp
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:676:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block,
_mem_tracker.get()));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:676:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block,
_mem_tracker.get()));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:678:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (_part_type == TPartitionType::HASH_PARTITIONED) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:679:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows(state, _channels,
_partition_count,
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:679:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(channel_add_rows(state, _channels,
_partition_count,
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:682:** +1, nesting level increased
to 2
```cpp
} else {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:683:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs,
_partition_count,
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:683:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs,
_partition_count,
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:687:** +1, nesting level increased
to 1
```cpp
} else if (_part_type ==
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:689:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_send_new_partition_batch());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:689:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_send_new_partition_batch());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:690:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (UNLIKELY(block->rows() == 0)) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:697:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:697:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:706:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
for (int idx = 0; idx < row_ids.size(); ++idx) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:712:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels,
num_channels, channel2rows,
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:712:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels,
num_channels, channel2rows,
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:715:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (eos) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:717:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_send_new_partition_batch());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:717:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_send_new_partition_batch());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:719:** +1, nesting level increased
to 1
```cpp
} else {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:728:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
if (eos && _enable_pipeline_exec) {
^
```
**be/src/vec/sink/vdata_stream_sender.cpp:728:** +1
```cpp
if (eos && _enable_pipeline_exec) {
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]