github-actions[bot] commented on code in PR #42341:
URL: https://github.com/apache/doris/pull/42341#discussion_r1812483465
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -22,9 +22,9 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <parallel_hashmap/phmap.h>
+#include <stdint.h>
Review Comment:
warning: inclusion of deprecated C++ header 'stdint.h'; consider using
'cstdint' instead [modernize-deprecated-headers]
```suggestion
#include <cstdint>
```
##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -107,28 +108,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit
::TIME_NS, timer_name, 1));
}
+
+ _sink_buffer = p._sink_buffer;
+ _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+ _sink_buffer->inc_running_sink(this);
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile,
"WaitForBroadcastBuffer", timer_name);
return Status::OK();
}
-void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
- std::lock_guard<std::mutex> lock(_finished_channels_mutex);
-
- if (_finished_channels.contains(channel_id)) {
- LOG(WARNING) << "query: " << print_id(_state->query_id())
- << ", on_channel_finished on already finished channel: "
<< channel_id;
- return;
- } else {
- _finished_channels.emplace(channel_id);
- if (_working_channels_count.fetch_sub(1) == 1) {
- set_reach_limit();
- if (_finish_dependency) {
- _finish_dependency->set_ready();
- }
- }
- }
-}
-
Status ExchangeSinkLocalState::open(RuntimeState* state) {
Review Comment:
warning: function 'open' has cognitive complexity of 64 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status ExchangeSinkLocalState::open(RuntimeState* state) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/exchange_sink_operator.cpp:120:** nesting level
increased to 1
```cpp
SCOPED_TIMER(_open_timer);
^
```
**be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
```cpp
#define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch>
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
RETURN_IF_ERROR(Base::open(state));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(Base::open(state));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:124:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:144:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (!only_local_exchange) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:147:** +1, nesting level
increased to 1
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:151:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() ==
1) &&
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:157:** +1, nesting level
increased to 1
```cpp
} else if (local_size > 0) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:172:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (_part_type == TPartitionType::HASH_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:180:** +1, nesting level
increased to 1
```cpp
} else if (_part_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:188:** +1, nesting level
increased to 1
```cpp
} else if (_part_type ==
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_vpartition->init());
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_vpartition->init());
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:226:** +1, nesting level
increased to 1
```cpp
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->init(p._texprs));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:255:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (_part_type == TPartitionType::HASH_PARTITIONED ||
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_partitioner->open(state));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_partitioner->open(state));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:259:** +1, nesting level
increased to 1
```cpp
} else if (_part_type ==
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
</details>
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- Status send_local_block(Block* block, bool eos, bool can_be_moved);
- // Flush buffered rows and close channel. This function don't wait the
response
- // of close operation, client should call close_wait() to finish channel's
close.
- // We split one close operation into two phases in order to make multiple
channels
- // can run parallel.
- Status close(RuntimeState* state);
+ Status send_local_block(Status exec_status, bool eos = false);
+
+ Status send_local_block(Block* block, bool can_be_moved);
+
+ // Get close wait's response, to finish channel close operation.
+ Status close_wait(RuntimeState* state);
+
+ int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+ PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
std::string get_fragment_instance_id_str() {
- UniqueId uid(_fragment_instance_id);
+ UniqueId uid(_dest_fragment_instance_id);
return uid.to_string();
}
bool is_local() const { return _is_local; }
+ virtual void ch_roll_pb_block();
+
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
- int64_t mem_usage() const;
-
- // Asynchronously sends a block
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
-
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
- if (_fragment_instance_id.lo == -1) {
- return Status::OK();
- }
-
- bool serialized = false;
- if (_pblock == nullptr) {
- _pblock = std::make_unique<PBlock>();
- }
- RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
- &rows));
- if (serialized) {
- RETURN_IF_ERROR(_send_current_block(eos));
- }
-
- return Status::OK();
- }
-
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
- _buffer = buffer;
- _buffer->register_sink(_fragment_instance_id);
- }
-
- std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
- InstanceLoId id, bool eos) {
- if (!_send_callback) {
- _send_callback =
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
- } else {
- _send_callback->cntl_->Reset();
+protected:
+ bool _recvr_is_valid() {
+ if (_local_recvr && !_local_recvr->is_closed()) {
+ return true;
}
- _send_callback->init(id, eos);
- return _send_callback;
+ _receiver_status = Status::EndOfFile(
+ "local data stream receiver closed"); // local data stream
receiver closed
+ return false;
}
- std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-
-protected:
- Status _send_local_block(bool eos);
- Status _send_current_block(bool eos);
-
- Status _recvr_status() const {
- if (_local_recvr && !_local_recvr->is_closed()) {
+ Status _wait_last_brpc() {
+ SCOPED_TIMER(_parent->brpc_wait_timer());
+ if (_send_remote_block_callback == nullptr) {
return Status::OK();
}
- return Status::EndOfFile(
- "local data stream receiver closed"); // local data stream
receiver closed
+ _send_remote_block_callback->join();
+ if (_send_remote_block_callback->cntl_->Failed()) {
+ std::string err = fmt::format(
+ "failed to send brpc batch, error={}, error_text={},
client: {}, "
+ "latency = {}",
+ berror(_send_remote_block_callback->cntl_->ErrorCode()),
+ _send_remote_block_callback->cntl_->ErrorText(),
+ BackendOptions::get_localhost(),
+ _send_remote_block_callback->cntl_->latency_us());
+ LOG(WARNING) << err;
+ return Status::RpcError(err);
+ }
+ _receiver_status =
Status::create(_send_remote_block_callback->response_->status());
+ return _receiver_status;
}
- pipeline::ExchangeSinkLocalState* _parent = nullptr;
+ Parent* _parent = nullptr;
- const TUniqueId _fragment_instance_id;
+ const RowDescriptor& _row_desc;
+ const TUniqueId _dest_fragment_instance_id;
PlanNodeId _dest_node_id;
- bool _closed {false};
- bool _need_close {false};
+
+ // the number of RowBatch.data bytes sent successfully
+ int64_t _num_data_bytes_sent {};
+ int64_t _packet_seq {};
+
+ bool _need_close;
Review Comment:
warning: use default member initializer for '_need_close'
[modernize-use-default-member-init]
be/src/vec/sink/vdata_stream_sender.h:114:
```diff
- _need_close(false),
+ ,
```
```suggestion
bool _need_close{false};
```
##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -383,7 +365,7 @@
Status st) {
channel->set_receiver_eof(st);
// Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
- static_cast<void>(channel->close(state));
+ static_cast<void>(channel->close(state, Status::OK()));
}
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block, bool eos) {
Review Comment:
warning: function 'sink' has cognitive complexity of 232 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block, bool eos) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/exchange_sink_operator.cpp:382:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (all_receiver_eof) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:390:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (_part_type == TPartitionType::UNPARTITIONED ||
local_state.channels.size() == 1) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:394:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (local_state.only_local_exchange) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:395:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (!block->empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:410:** +1, nesting level
increased to 2
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:417:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (serialized) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:419:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
if (!cur_block.empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +6, including
nesting penalty of 5, nesting level increased to 6
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:423:** +1, nesting level
increased to 4
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:448:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
if (moved) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:450:** +1, nesting level
increased to 4
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:458:** +1, nesting level
increased to 1
```cpp
} else if (_part_type == TPartitionType::RANDOM) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:462:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (!current_channel->is_receiver_eof()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:464:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (current_channel->is_local()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +1, nesting level
increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +6, including
nesting penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +7, including
nesting penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:467:** +1, nesting level
increased to 3
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +1, nesting level
increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +6, including
nesting penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +7, including
nesting penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:478:** +1, nesting level
increased to 1
```cpp
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(channel_add_rows(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:502:** +1, nesting level
increased to 1
```cpp
} else if (_part_type ==
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:515:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (input_rows > 0) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:526:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
for (int idx = 0; idx < row_ids.size(); ++idx) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:534:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (eos) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(state,
local_state.channels, num_channels,
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(state,
local_state.channels, num_channels,
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:550:** +1, nesting level
increased to 1
```cpp
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
block));
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(channel_add_rows_with_idx(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:572:** +1, nesting level
increased to 1
```cpp
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:577:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (!current_channel->is_receiver_eof()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:579:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (current_channel->is_local()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +1, nesting level
increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +6, including
nesting penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +7, including
nesting penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:582:** +1, nesting level
increased to 3
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +4, including
nesting penalty of 3, nesting level increased to 4
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +5, including
nesting penalty of 4, nesting level increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +1, nesting level
increased to 5
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
} else { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +6, including
nesting penalty of 5, nesting level increased to 6
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +7, including
nesting penalty of 6, nesting level increased to 7
```cpp
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
```
**be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro
'HANDLE_CHANNEL_STATUS'
```cpp
RETURN_IF_ERROR(status); \
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:593:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (_writer_count < local_state.channels.size()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:594:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (_data_processed >=
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:601:** +1, nesting level
increased to 1
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:608:** +1, including
nesting penalty of 0, nesting level increased to 1
```cpp
if (eos) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:610:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
for (int i = 0; i < local_state.channels.size(); ++i) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +3, including
nesting penalty of 2, nesting level increased to 3
```cpp
if (!st.ok() && final_st.ok()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +1
```cpp
if (!st.ok() && final_st.ok()) {
^
```
**be/src/pipeline/exec/exchange_sink_operator.cpp:616:** +2, including
nesting penalty of 1, nesting level increased to 2
```cpp
if (local_state._sink_buffer) {
^
```
</details>
##########
be/src/pipeline/exec/exchange_sink_operator.h:
##########
@@ -103,30 +106,30 @@ class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<> {
void set_reach_limit() { _reach_limit = true; };
[[nodiscard]] int sender_id() const { return _sender_id; }
-
+ [[nodiscard]] int be_number() const { return _state->be_number(); }
std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch();
- std::vector<std::shared_ptr<vectorized::Channel>> channels;
- int current_channel_idx {0}; // index of current channel to send to if
_random == true
- bool only_local_exchange {false};
-
- void on_channel_finished(InstanceLoId channel_id);
+ std::vector<vectorized::PipChannel*> channels;
+ std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
+ int current_channel_idx; // index of current channel to send to if _random
== true
Review Comment:
warning: use default member initializer for 'current_channel_idx'
[modernize-use-default-member-init]
be/src/pipeline/exec/exchange_sink_operator.h:56:
```diff
- current_channel_idx(0),
+ ,
```
```suggestion
int current_channel_idx{0}; // index of current channel to send to if
_random == true
```
##########
be/src/pipeline/exec/exchange_sink_operator.h:
##########
@@ -103,30 +106,30 @@
void set_reach_limit() { _reach_limit = true; };
[[nodiscard]] int sender_id() const { return _sender_id; }
-
+ [[nodiscard]] int be_number() const { return _state->be_number(); }
std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
static Status empty_callback_function(void* sender,
TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch();
- std::vector<std::shared_ptr<vectorized::Channel>> channels;
- int current_channel_idx {0}; // index of current channel to send to if
_random == true
- bool only_local_exchange {false};
-
- void on_channel_finished(InstanceLoId channel_id);
+ std::vector<vectorized::PipChannel*> channels;
+ std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
+ int current_channel_idx; // index of current channel to send to if _random
== true
+ bool only_local_exchange;
Review Comment:
warning: use default member initializer for 'only_local_exchange'
[modernize-use-default-member-init]
be/src/pipeline/exec/exchange_sink_operator.h:57:
```diff
- only_local_exchange(false),
+ ,
```
```suggestion
bool only_local_exchange{false};
```
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -209,30 +222,40 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
Review Comment:
warning: function '_send_rpc' has cognitive complexity of 76 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/exchange_sink_buffer.cpp:228:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
if (_is_finishing) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:235:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
while (!q.empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (request.block && !request.block->column_metas().empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +1
```cpp
if (request.block && !request.block->column_metas().empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:253:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (!request.exec_status.ok()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:260:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (config::exchange_sink_ignore_eovercrowded) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:263:** nesting level
increased to 2
```cpp
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:266:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (task_lock == nullptr) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:274:** nesting level
increased to 2
```cpp
send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:279:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (task_lock == nullptr) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:287:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (s.is<ErrorCode::END_OF_FILE>()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:289:** +1, nesting level
increased to 3
```cpp
} else if (!s.ok()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:292:** +1, nesting level
increased to 3
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:294:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
if (!s) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:305:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (enable_http_send_block(*brpc_request)) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:309:** +1, nesting level
increased to 2
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:314:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (request.block) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:319:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (_total_queue_size <= _queue_capacity) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:324:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (_keep_order) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:329:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
while (!broadcast_q.empty()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (request.block_holder->get_block() &&
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +1
```cpp
if (request.block_holder->get_block() &&
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:352:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (config::exchange_sink_ignore_eovercrowded) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:355:** nesting level
increased to 2
```cpp
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:358:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (task_lock == nullptr) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:366:** nesting level
increased to 2
```cpp
send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:371:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (task_lock == nullptr) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:379:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
if (s.is<ErrorCode::END_OF_FILE>()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:381:** +1, nesting level
increased to 3
```cpp
} else if (!s.ok()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:384:** +1, nesting level
increased to 3
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:386:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
if (!s) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:397:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (enable_http_send_block(*brpc_request)) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +3, including nesting
penalty of 2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
^
```
**be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +4, including nesting
penalty of 3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
^
```
**be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:401:** +1, nesting level
increased to 2
```cpp
} else {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:406:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (request.block_holder->get_block()) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:410:** +2, including nesting
penalty of 1, nesting level increased to 2
```cpp
if (_keep_order) {
^
```
**be/src/pipeline/exec/exchange_sink_buffer.cpp:414:** +1, including nesting
penalty of 0, nesting level increased to 1
```cpp
if (is_empty) {
^
```
</details>
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@ class Channel {
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- Status send_local_block(Block* block, bool eos, bool can_be_moved);
- // Flush buffered rows and close channel. This function don't wait the
response
- // of close operation, client should call close_wait() to finish channel's
close.
- // We split one close operation into two phases in order to make multiple
channels
- // can run parallel.
- Status close(RuntimeState* state);
+ Status send_local_block(Status exec_status, bool eos = false);
+
+ Status send_local_block(Block* block, bool can_be_moved);
+
+ // Get close wait's response, to finish channel close operation.
+ Status close_wait(RuntimeState* state);
+
+ int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+ PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
std::string get_fragment_instance_id_str() {
- UniqueId uid(_fragment_instance_id);
+ UniqueId uid(_dest_fragment_instance_id);
return uid.to_string();
}
bool is_local() const { return _is_local; }
+ virtual void ch_roll_pb_block();
+
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
- int64_t mem_usage() const;
-
- // Asynchronously sends a block
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
-
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
- if (_fragment_instance_id.lo == -1) {
- return Status::OK();
- }
-
- bool serialized = false;
- if (_pblock == nullptr) {
- _pblock = std::make_unique<PBlock>();
- }
- RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
- &rows));
- if (serialized) {
- RETURN_IF_ERROR(_send_current_block(eos));
- }
-
- return Status::OK();
- }
-
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
- _buffer = buffer;
- _buffer->register_sink(_fragment_instance_id);
- }
-
- std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
- InstanceLoId id, bool eos) {
- if (!_send_callback) {
- _send_callback =
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
- } else {
- _send_callback->cntl_->Reset();
+protected:
+ bool _recvr_is_valid() {
+ if (_local_recvr && !_local_recvr->is_closed()) {
+ return true;
Review Comment:
warning: redundant boolean literal in conditional return statement
[readability-simplify-boolean-expr]
```cpp
return true;
^
```
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- Status send_local_block(Block* block, bool eos, bool can_be_moved);
- // Flush buffered rows and close channel. This function don't wait the
response
- // of close operation, client should call close_wait() to finish channel's
close.
- // We split one close operation into two phases in order to make multiple
channels
- // can run parallel.
- Status close(RuntimeState* state);
+ Status send_local_block(Status exec_status, bool eos = false);
+
+ Status send_local_block(Block* block, bool can_be_moved);
+
+ // Get close wait's response, to finish channel close operation.
+ Status close_wait(RuntimeState* state);
+
+ int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+ PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
std::string get_fragment_instance_id_str() {
- UniqueId uid(_fragment_instance_id);
+ UniqueId uid(_dest_fragment_instance_id);
return uid.to_string();
}
bool is_local() const { return _is_local; }
+ virtual void ch_roll_pb_block();
+
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
- int64_t mem_usage() const;
-
- // Asynchronously sends a block
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
-
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
- if (_fragment_instance_id.lo == -1) {
- return Status::OK();
- }
-
- bool serialized = false;
- if (_pblock == nullptr) {
- _pblock = std::make_unique<PBlock>();
- }
- RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
- &rows));
- if (serialized) {
- RETURN_IF_ERROR(_send_current_block(eos));
- }
-
- return Status::OK();
- }
-
- void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
- _buffer = buffer;
- _buffer->register_sink(_fragment_instance_id);
- }
-
- std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
- InstanceLoId id, bool eos) {
- if (!_send_callback) {
- _send_callback =
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
- } else {
- _send_callback->cntl_->Reset();
+protected:
+ bool _recvr_is_valid() {
+ if (_local_recvr && !_local_recvr->is_closed()) {
+ return true;
}
- _send_callback->init(id, eos);
- return _send_callback;
+ _receiver_status = Status::EndOfFile(
+ "local data stream receiver closed"); // local data stream
receiver closed
+ return false;
}
- std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-
-protected:
- Status _send_local_block(bool eos);
- Status _send_current_block(bool eos);
-
- Status _recvr_status() const {
- if (_local_recvr && !_local_recvr->is_closed()) {
+ Status _wait_last_brpc() {
+ SCOPED_TIMER(_parent->brpc_wait_timer());
+ if (_send_remote_block_callback == nullptr) {
return Status::OK();
}
- return Status::EndOfFile(
- "local data stream receiver closed"); // local data stream
receiver closed
+ _send_remote_block_callback->join();
+ if (_send_remote_block_callback->cntl_->Failed()) {
+ std::string err = fmt::format(
+ "failed to send brpc batch, error={}, error_text={},
client: {}, "
+ "latency = {}",
+ berror(_send_remote_block_callback->cntl_->ErrorCode()),
+ _send_remote_block_callback->cntl_->ErrorText(),
+ BackendOptions::get_localhost(),
+ _send_remote_block_callback->cntl_->latency_us());
+ LOG(WARNING) << err;
+ return Status::RpcError(err);
+ }
+ _receiver_status =
Status::create(_send_remote_block_callback->response_->status());
+ return _receiver_status;
}
- pipeline::ExchangeSinkLocalState* _parent = nullptr;
+ Parent* _parent = nullptr;
- const TUniqueId _fragment_instance_id;
+ const RowDescriptor& _row_desc;
+ const TUniqueId _dest_fragment_instance_id;
PlanNodeId _dest_node_id;
- bool _closed {false};
- bool _need_close {false};
+
+ // the number of RowBatch.data bytes sent successfully
+ int64_t _num_data_bytes_sent {};
+ int64_t _packet_seq {};
+
+ bool _need_close;
+ bool _closed;
Review Comment:
warning: use default member initializer for '_closed'
[modernize-use-default-member-init]
be/src/vec/sink/vdata_stream_sender.h:115:
```diff
- _closed(false),
+ ,
```
```suggestion
bool _closed{false};
```
##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -230,5 +234,88 @@
} \
} while (0)
+class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
+public:
+ PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor&
row_desc,
+ const TNetworkAddress& brpc_dest, const TUniqueId&
dest_fragment_instance_id,
+ PlanNodeId dest_node_id)
+ : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc,
brpc_dest,
+
dest_fragment_instance_id, dest_node_id) {
+ ch_roll_pb_block();
+ }
+
+ ~PipChannel() override { delete
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
Review Comment:
warning: use '= default' to define a trivial destructor
[modernize-use-equals-default]
```cpp
~PipChannel() override { delete
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]