This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 19ba6bec38 [Improvement](pipeline) support send eos on local exchange
and remove some unused code (#22086)
19ba6bec38 is described below
commit 19ba6bec384599377d04eb83f4ef07ed8bfbac56
Author: Pxl <[email protected]>
AuthorDate: Mon Jul 24 09:25:32 2023 +0800
[Improvement](pipeline) support send eos on local exchange and remove some
unused code (#22086)
support send eos on local exchange and remove some unused code
---
be/src/pipeline/exec/exchange_source_operator.cpp | 1 -
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/service/brpc_service.cpp | 2 +-
be/src/service/internal_service.cpp | 17 ++-----
be/src/service/internal_service.h | 2 +-
be/src/util/proto_util.h | 56 -----------------------
be/src/vec/exec/vexchange_node.h | 1 -
be/src/vec/sink/vdata_stream_sender.cpp | 16 +++++--
be/src/vec/sink/vdata_stream_sender.h | 12 ++---
be/test/vec/exec/vtablet_sink_test.cpp | 2 -
be/test/vec/runtime/vdata_stream_test.cpp | 2 -
11 files changed, 24 insertions(+), 89 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 1dd20f929c..84e4288dce 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -32,7 +32,6 @@ bool ExchangeSourceOperator::can_read() {
}
bool ExchangeSourceOperator::is_pending_finish() const {
- // TODO HappenLee
return false;
}
} // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5bde9ad19a..bce4db795d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -138,7 +138,7 @@ public:
}
// Update status of this fragment execute
- Status update_status(Status status) {
+ Status update_status(const Status& status) {
std::lock_guard<std::mutex> l(_status_lock);
if (!status.ok() && _exec_status.ok()) {
_exec_status = status;
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 80138b99ee..2406d08b1e 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -47,7 +47,7 @@ BRpcService::BRpcService(ExecEnv* exec_env) :
_exec_env(exec_env), _server(new b
brpc::FLAGS_socket_max_unwritten_bytes =
config::brpc_socket_max_unwritten_bytes;
}
-BRpcService::~BRpcService() {}
+BRpcService::~BRpcService() = default;
Status BRpcService::start(int port, int num_threads) {
// Add service
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index c38416a593..924e829539 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -328,12 +328,7 @@ void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
PTabletWriterAddBlockResult* response,
google::protobuf::Closure*
done) {
bool ret = _heavy_work_pool.try_offer([this, controller, request,
response, done]() {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
-
- _tablet_writer_add_block(controller, request, response, new_done);
+ _tablet_writer_add_block(controller, request, response, done);
});
if (!ret) {
LOG(WARNING) << "fail to offer request to the work pool";
@@ -1024,13 +1019,9 @@ void
PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
google::protobuf::Closure* done) {
int64_t receive_time = GetCurrentTimeNanos();
response->set_receive_time(receive_time);
- bool ret = _heavy_work_pool.try_offer([this, controller, request,
response, done]() {
- // TODO(zxy) delete in 1.2 version
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
-
- _transmit_block(controller, request, response, new_done, Status::OK());
+ PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool :
_light_work_pool;
+ bool ret = pool.try_offer([this, controller, request, response, done]() {
+ _transmit_block(controller, request, response, done, Status::OK());
});
if (!ret) {
LOG(WARNING) << "fail to offer request to the work pool";
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 0d568e2b84..823f29504b 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -41,7 +41,7 @@ class PHandShakeResponse;
class PInternalServiceImpl : public PBackendService {
public:
PInternalServiceImpl(ExecEnv* exec_env);
- virtual ~PInternalServiceImpl();
+ ~PInternalServiceImpl() override;
void transmit_data(::google::protobuf::RpcController* controller,
const ::doris::PTransmitDataParams* request,
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 2741092d80..3c583b867c 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -86,62 +86,6 @@ Status transmit_block_http(RuntimeState* state, Closure*
closure, PTransmitDataP
return Status::OK();
}
-// TODO(zxy) delete in v1.3 version
-// Transfer RowBatch in ProtoBuf Request to Controller Attachment.
-// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
-// and it is expected that performance can be improved.
-template <typename Params, typename Closure>
-void request_row_batch_transfer_attachment(Params* brpc_request, const
std::string& tuple_data,
- Closure* closure) {
- auto row_batch = brpc_request->mutable_row_batch();
- row_batch->set_tuple_data("");
- brpc_request->set_transfer_by_attachment(true);
- butil::IOBuf attachment;
- attachment.append(tuple_data);
- closure->cntl.request_attachment().swap(attachment);
-}
-
-// TODO(zxy) delete in v1.3 version
-// Transfer Block in ProtoBuf Request to Controller Attachment.
-// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
-// and it is expected that performance can be improved.
-template <typename Params, typename Closure>
-void request_block_transfer_attachment(Params* brpc_request, const
std::string& column_values,
- Closure* closure) {
- auto block = brpc_request->mutable_block();
- block->set_column_values("");
- brpc_request->set_transfer_by_attachment(true);
- butil::IOBuf attachment;
- attachment.append(column_values);
- closure->cntl.request_attachment().swap(attachment);
-}
-
-// TODO(zxy) delete in v1.3 version
-// Controller Attachment transferred to RowBatch in ProtoBuf Request.
-template <typename Params>
-void attachment_transfer_request_row_batch(const Params* brpc_request,
brpc::Controller* cntl) {
- Params* req = const_cast<Params*>(brpc_request);
- if (req->has_row_batch() && req->transfer_by_attachment()) {
- auto rb = req->mutable_row_batch();
- const butil::IOBuf& io_buf = cntl->request_attachment();
- CHECK(io_buf.size() > 0) << io_buf.size() << ", row num: " <<
req->row_batch().num_rows();
- io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
- }
-}
-
-// TODO(zxy) delete in v1.3 version
-// Controller Attachment transferred to Block in ProtoBuf Request.
-template <typename Params>
-void attachment_transfer_request_block(const Params* brpc_request,
brpc::Controller* cntl) {
- Params* req = const_cast<Params*>(brpc_request);
- if (req->has_block() && req->transfer_by_attachment()) {
- auto block = req->mutable_block();
- const butil::IOBuf& io_buf = cntl->request_attachment();
- CHECK(io_buf.size() > 0) << io_buf.size();
- io_buf.copy_to(block->mutable_column_values(), io_buf.size(), 0);
- }
-}
-
template <typename Params, typename Closure>
Status request_embed_attachment(Params* brpc_request, const std::string& data,
Closure* closure) {
butil::IOBuf attachment;
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 58b61dc9af..c4f083dda4 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -58,7 +58,6 @@ public:
Status collect_query_statistics(QueryStatistics* statistics) override;
Status close(RuntimeState* state) override;
- // Status collect_query_statistics(QueryStatistics* statistics) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }
private:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index d8c623353d..97154c3e3f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -134,7 +134,7 @@ Status Channel::send_local_block(bool eos) {
return Status::OK();
} else {
_mutable_block.reset();
- return receiver_status_;
+ return _receiver_status;
}
}
@@ -147,7 +147,7 @@ Status Channel::send_local_block(Block* block) {
_local_recvr->add_block(block, _parent->_sender_id, false);
return Status::OK();
} else {
- return receiver_status_;
+ return _receiver_status;
}
}
@@ -256,8 +256,8 @@ Status Channel::close_internal() {
VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id
<< " #rows= " << ((_mutable_block == nullptr) ? 0 :
_mutable_block->rows())
- << " receiver status: " << receiver_status_;
- if (receiver_status_.is<ErrorCode::END_OF_FILE>()) {
+ << " receiver status: " << _receiver_status;
+ if (is_receiver_eof()) {
_mutable_block.reset();
return Status::OK();
}
@@ -266,7 +266,13 @@ Status Channel::close_internal() {
status = send_current_block(true);
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
- status = send_block((PBlock*)nullptr, true);
+ if (is_local()) {
+ if (_recvr_is_valid()) {
+ _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+ }
+ } else {
+ status = send_block((PBlock*)nullptr, true);
+ }
}
// Don't wait for the last packet to finish, left it to close_wait.
if (status.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 5998732406..679557a92a 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -293,16 +293,16 @@ public:
_local_recvr->sender_queue_empty(_parent->_sender_id);
}
- bool is_receiver_eof() const { return
receiver_status_.is<ErrorCode::END_OF_FILE>(); }
+ bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
- void set_receiver_eof(Status st) { receiver_status_ = st; }
+ void set_receiver_eof(Status st) { _receiver_status = st; }
protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {
return true;
}
- receiver_status_ = Status::EndOfFile("local data stream receiver
closed");
+ _receiver_status = Status::EndOfFile("local data stream receiver
closed");
return false;
}
@@ -314,7 +314,7 @@ protected:
auto cntl = &_closure->cntl;
auto call_id = _closure->cntl.call_id();
brpc::Join(call_id);
- receiver_status_ = Status::create(_closure->result.status());
+ _receiver_status = Status::create(_closure->result.status());
if (cntl->Failed()) {
std::string err = fmt::format(
"failed to send brpc batch, error={}, error_text={},
client: {}, "
@@ -324,7 +324,7 @@ protected:
LOG(WARNING) << err;
return Status::RpcError(err);
}
- return receiver_status_;
+ return _receiver_status;
}
// Serialize _batch into _thrift_batch and send via send_batch().
@@ -357,7 +357,7 @@ protected:
PTransmitDataParams _brpc_request;
std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
- Status receiver_status_;
+ Status _receiver_status;
int32_t _brpc_timeout_ms = 500;
// whether the dest can be treated as query statistics transfer chain.
bool _is_transfer_chain;
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 15ee6b79a2..5e60181c8f 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -316,8 +316,6 @@ public:
k_add_batch_status.to_protobuf(response->mutable_status());
if (request->has_block() && _row_desc != nullptr) {
- brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
-
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
vectorized::Block block(request->block());
for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp
b/be/test/vec/runtime/vdata_stream_test.cpp
index 4c42bde016..954bf8f194 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -74,8 +74,6 @@ public:
const ::doris::PTransmitDataParams* request,
::doris::PTransmitDataResult* response,
::google::protobuf::Closure* done) {
// stream_mgr->transmit_block(request, &done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
- attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
// The response is accessed when done->Run is called in
transmit_block(),
// give response a default value to avoid null pointers in high
concurrency.
Status st;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]