This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new b3bd16257e [impovement](sink) print load_id when sink fails (#11893)
b3bd16257e is described below
commit b3bd16257e81a3a55c32ca891b99ed6f3dab8409
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri Aug 19 08:48:02 2022 +0800
[impovement](sink) print load_id when sink fails (#11893)
---
be/src/exec/tablet_sink.cpp | 27 +++++++++++++++++----------
be/src/runtime/fragment_mgr.cpp | 7 ++++---
2 files changed, 21 insertions(+), 13 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 1aa0cdf692..be4cdca5e6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -45,8 +45,7 @@ namespace doris {
namespace stream_load {
NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel,
int64_t node_id)
- : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
-}
+ : _parent(parent), _index_channel(index_channel), _node_id(node_id) {}
NodeChannel::~NodeChannel() noexcept {
if (_open_closure != nullptr) {
@@ -79,6 +78,9 @@ Status NodeChannel::init(RuntimeState* state) {
_node_info = *node;
+ _load_info = "load_id=" + print_id(_parent->_load_id) +
+ ", txn_id=" + std::to_string(_parent->_txn_id);
+
_row_desc.reset(new RowDescriptor(_tuple_desc, false));
_batch_size = state->batch_size();
_cur_batch.reset(new RowBatch(*_row_desc, _batch_size,
_parent->_mem_tracker.get()));
@@ -87,7 +89,7 @@ Status NodeChannel::init(RuntimeState* state) {
_node_info.brpc_port);
if (_stub == nullptr) {
LOG(WARNING) << "Get rpc stub failed, host=" << _node_info.host
- << ", port=" << _node_info.brpc_port;
+ << ", port=" << _node_info.brpc_port << ", " <<
channel_info();
_cancelled = true;
return Status::InternalError("get rpc stub failed");
}
@@ -143,7 +145,7 @@ void NodeChannel::open() {
}
void NodeChannel::_cancel_with_msg(const std::string& msg) {
- LOG(WARNING) << msg;
+ LOG(WARNING) << channel_info() << ", " << msg;
{
std::lock_guard<SpinLock> l(_cancel_msg_lock);
if (_cancel_msg == "") {
@@ -165,8 +167,11 @@ Status NodeChannel::open_wait() {
ss << "failed to open tablet writer, error=" <<
berror(_open_closure->cntl.ErrorCode())
<< ", error_text=" << _open_closure->cntl.ErrorText();
_cancelled = true;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+
+ LOG(WARNING) << ss.str() << " " << channel_info();
+ return Status::InternalError("failed to open tablet writer, error={},
error_text={}",
+ berror(_open_closure->cntl.ErrorCode()),
+ _open_closure->cntl.ErrorText());
}
Status status(_open_closure->result.status());
if (_open_closure->unref()) {
@@ -443,7 +448,8 @@ void NodeChannel::cancel(const std::string& cancel_msg) {
request.release_id();
}
-int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
+int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
+ std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
auto st = none_of({_cancelled, _send_finished});
if (!st.ok()) {
return 0;
@@ -902,8 +908,8 @@ Status OlapTableSink::open(RuntimeState* state) {
_send_batch_thread_pool_token =
state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
RETURN_IF_ERROR(Thread::create(
- "OlapTableSink", "send_batch_process", [this, state]() {
this->_send_batch_process(state); },
- &_sender_thread));
+ "OlapTableSink", "send_batch_process",
+ [this, state]() { this->_send_batch_process(state); },
&_sender_thread));
return Status::OK();
}
@@ -1300,7 +1306,8 @@ void OlapTableSink::_send_batch_process(RuntimeState*
state) {
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
- index_channel->for_each_node_channel([&running_channels_num, this,
state](const std::shared_ptr<NodeChannel>& ch) {
+ index_channel->for_each_node_channel([&running_channels_num, this,
+ state](const
std::shared_ptr<NodeChannel>& ch) {
running_channels_num +=
ch->try_send_and_fetch_status(state,
this->_send_batch_thread_pool_token);
});
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bad2c50c55..a70d744447 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -300,7 +300,8 @@ void FragmentExecState::coordinator_callback(const Status&
status, RuntimeProfil
FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
_coord_addr, &coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
- ss << "couldn't get a client for " << _coord_addr;
+ ss << "couldn't get a client for " << _coord_addr << ", reason: " <<
coord_status;
+ LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
update_status(Status::InternalError(ss.str()));
return;
}
@@ -387,8 +388,8 @@ void FragmentExecState::coordinator_callback(const Status&
status, RuntimeProfil
TReportExecStatusResult res;
Status rpc_status;
- VLOG_ROW << "debug: reportExecStatus params is "
- << apache::thrift::ThriftDebugString(params).c_str();
+ VLOG_DEBUG << "reportExecStatus params is "
+ << apache::thrift::ThriftDebugString(params).c_str();
try {
try {
coord->reportExecStatus(res, params);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]