This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b79f5d77f1d [improve](move-memtable) improve logging messages (#27443)
b79f5d77f1d is described below
commit b79f5d77f1d4b08da2cba7c65e75e34e8a2b77b1
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Nov 23 11:46:29 2023 +0800
[improve](move-memtable) improve logging messages (#27443)
---
be/src/io/fs/stream_sink_file_writer.cpp | 12 ++++++------
be/src/runtime/load_stream.cpp | 11 +++++------
be/src/service/internal_service.cpp | 4 ++--
be/src/vec/sink/delta_writer_v2_pool.cpp | 9 +++++----
be/src/vec/sink/load_stream_stub.cpp | 26 +++++++++++++++++++-------
be/src/vec/sink/load_stream_stub.h | 2 ++
be/src/vec/sink/load_stream_stub_pool.cpp | 6 +++---
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +++--
8 files changed, 45 insertions(+), 30 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 67269335136..9b2125c9446 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
+#include "util/uid_util.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
@@ -45,9 +46,9 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
}
_bytes_appended += bytes_req;
- VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
- << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
- << ", segment_id: " << _segment_id << ", data_length: " <<
bytes_req;
+ VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
+ << ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id
+ << ", data_length: " << bytes_req;
std::span<const Slice> slices {data, data_cnt};
for (auto& stream : _streams) {
@@ -58,9 +59,8 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
}
Status StreamSinkFileWriter::finalize() {
- VLOG_DEBUG << "writer finalize, load_id: " <<
UniqueId(_load_id).to_string()
- << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
- << ", segment_id: " << _segment_id;
+ VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
+ << ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
for (auto& stream : _streams) {
RETURN_IF_ERROR(
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 25e79ccab58..b313307c70b 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -306,8 +306,8 @@ Status LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_t
}
}
LOG(INFO) << "close load " << *this
- << ", failed_tablet_num=" <<
failed_tablet_ids->size()
- << ", success_tablet_num=" <<
success_tablet_ids->size();
+ << ", success_tablet_num=" <<
success_tablet_ids->size()
+ << ", failed_tablet_num=" <<
failed_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
@@ -324,7 +324,7 @@ Status LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_t
void LoadStream::_report_result(StreamId stream, const Status& st,
const std::vector<int64_t>& success_tablet_ids,
const std::vector<int64_t>& failed_tablet_ids)
{
- LOG(INFO) << "report result, success tablet num " <<
success_tablet_ids.size()
+ LOG(INFO) << "report result " << *this << ", success tablet num " <<
success_tablet_ids.size()
<< ", failed tablet num " << failed_tablet_ids.size();
butil::IOBuf buf;
PWriteStreamSinkResponse response;
@@ -456,9 +456,8 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " <<
hdr.src_id()
<< " with tablet " << hdr.tablet_id();
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
- Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load
id {}, expected {}",
-
UniqueId(hdr.load_id()).to_string(),
-
UniqueId(_load_id).to_string());
+ Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
+ "invalid load id {}, expected {}", print_id(hdr.load_id()),
print_id(_load_id));
_report_failure(id, st, hdr);
return;
}
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index ce249536e76..751b246b9c6 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -359,8 +359,8 @@ void
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;
- LOG(INFO) << "open load stream, load_id = " << request->load_id()
- << ", src_id = " << request->src_id();
+ LOG(INFO) << "open load stream, load_id=" << request->load_id()
+ << ", src_id=" << request->src_id();
for (const auto& req : request->tablets()) {
TabletManager* tablet_mgr =
StorageEngine::instance()->tablet_manager();
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index dc9a2765a55..df9c0fc1c8c 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -41,13 +41,13 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
int num_use = --_use_cnt;
if (num_use > 0) {
- LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " ,
use_cnt = " << num_use;
+ LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " ,
use_cnt=" << num_use;
return Status::OK();
}
- LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
if (_pool != nullptr) {
_pool->erase(_load_id);
}
+ LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
@@ -57,6 +57,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
if (!status.ok()) {
return status;
}
+ LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
_map.for_each([&status, profile](auto& entry) {
if (status.ok()) {
status = entry.second->close_wait(profile);
@@ -67,7 +68,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
void DeltaWriterV2Map::cancel(Status status) {
int num_use = --_use_cnt;
- LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = "
<< num_use;
+ LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" <<
num_use;
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
@@ -95,7 +96,7 @@ std::shared_ptr<DeltaWriterV2Map>
DeltaWriterV2Pool::get_or_create(PUniqueId loa
void DeltaWriterV2Pool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
- LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
+ LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
_pool.erase(load_id);
}
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 8319def542c..793098a3e9d 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -23,6 +23,7 @@
#include "util/brpc_client_cache.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
+#include "util/uid_util.h"
namespace doris {
@@ -37,12 +38,15 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
Status st = Status::create(response.status());
std::stringstream ss;
- ss << "received response from backend " << _dst_id;
+ ss << "on_received_messages, load_id=" << _load_id << ", backend_id="
<< _dst_id;
if (response.success_tablet_ids_size() > 0) {
ss << ", success tablet ids:";
for (auto tablet_id : response.success_tablet_ids()) {
ss << " " << tablet_id;
}
+ if (response.success_tablet_ids_size() == 0) {
+ ss << " none";
+ }
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
for (auto tablet_id : response.success_tablet_ids()) {
_success_tablets.push_back(tablet_id);
@@ -53,6 +57,9 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
for (auto tablet_id : response.failed_tablet_ids()) {
ss << " " << tablet_id;
}
+ if (response.failed_tablet_ids_size() == 0) {
+ ss << " none";
+ }
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
for (auto tablet_id : response.failed_tablet_ids()) {
_failed_tablets.push_back(tablet_id);
@@ -86,6 +93,7 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
}
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+ LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
std::lock_guard<bthread::Mutex> lock(_mutex);
_is_closed.store(true);
_close_cv.notify_all();
@@ -123,6 +131,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
_dst_id = node_info.id;
_handler.set_dst_id(_dst_id);
+ _handler.set_load_id(_load_id);
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = 20 << 20; // 20MB
@@ -160,8 +169,8 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return Status::InternalError("Failed to connect to backend {}: {}",
_dst_id,
cntl.ErrorText());
}
- LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id
<< " (" << host_port
- << ")";
+ LOG(INFO) << "open load stream " << _stream_id << " load_id=" <<
print_id(_load_id)
+ << " for backend " << _dst_id << " (" << host_port << ")";
_is_init.store(true);
return Status::OK();
}
@@ -227,12 +236,15 @@ Status LoadStreamStub::get_schema(const
std::vector<PTabletID>& tablets) {
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
std::ostringstream oss;
- oss << "fetching tablet schema from stream " << _stream_id << ", load id:
" << _load_id
- << ", tablet id:";
+ oss << "fetching tablet schema from stream " << _stream_id
+ << ", load id: " << print_id(_load_id) << ", tablet id:";
for (const auto& tablet : tablets) {
*header.add_tablets() = tablet;
oss << " " << tablet.tablet_id();
}
+ if (tablets.size() == 0) {
+ oss << " none";
+ }
LOG(INFO) << oss.str();
return _encode_and_send(header);
}
@@ -318,12 +330,12 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf&
buf) {
const timespec time = butil::seconds_from_now(60);
int wait_ret = brpc::StreamWait(_stream_id, &time);
if (wait_ret != 0) {
- return Status::InternalError("StreamWait failed, err = ",
wait_ret);
+ return Status::InternalError("StreamWait failed, err=",
wait_ret);
}
break;
}
default:
- return Status::InternalError("StreamWrite failed, err = {}", ret);
+ return Status::InternalError("StreamWrite failed, err={}", ret);
}
}
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 8a2c9dfce59..bbed172f8ac 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -121,8 +121,10 @@ private:
}
void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
+ void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); }
private:
+ UniqueId _load_id; // for logging
int64_t _dst_id = -1; // for logging
std::atomic<bool> _is_closed;
bthread::Mutex _mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index ec7e53211fb..240e44ef380 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -30,11 +30,11 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id,
int num_use, LoadStre
void LoadStreams::release() {
int num_use = --_use_cnt;
if (num_use == 0) {
- LOG(INFO) << "releasing streams for load_id = " << _load_id << ",
dst_id = " << _dst_id;
+ LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id="
<< _dst_id;
_pool->erase(_load_id, _dst_id);
} else {
- LOG(INFO) << "no releasing streams for load_id = " << _load_id << ",
dst_id = " << _dst_id
- << ", use_cnt = " << num_use;
+ LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" <<
_dst_id
+ << ", use_cnt=" << num_use;
}
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 19ed6caf291..8bc65a4cba4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -187,11 +187,12 @@ Status VTabletWriterV2::_init(RuntimeState* state,
RuntimeProfile* profile) {
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
_num_local_sink = state->num_local_sink();
+ LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
+ << ", num senders: " << _num_senders << ", stream per node: " <<
_stream_per_node
+ << ", total_streams " << _total_streams << ", num_local_sink: "
<< _num_local_sink;
DCHECK(_stream_per_node > 0) << "load stream per node should be greator
than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than
0";
DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
- LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " <<
_stream_per_node
- << ", total_streams " << _total_streams << ", num_local_sink: "
<< _num_local_sink;
_is_high_priority =
(state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]