This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b79f5d77f1d [improve](move-memtable) improve logging messages (#27443)
b79f5d77f1d is described below

commit b79f5d77f1d4b08da2cba7c65e75e34e8a2b77b1
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Nov 23 11:46:29 2023 +0800

    [improve](move-memtable) improve logging messages (#27443)
---
 be/src/io/fs/stream_sink_file_writer.cpp     | 12 ++++++------
 be/src/runtime/load_stream.cpp               | 11 +++++------
 be/src/service/internal_service.cpp          |  4 ++--
 be/src/vec/sink/delta_writer_v2_pool.cpp     |  9 +++++----
 be/src/vec/sink/load_stream_stub.cpp         | 26 +++++++++++++++++++-------
 be/src/vec/sink/load_stream_stub.h           |  2 ++
 be/src/vec/sink/load_stream_stub_pool.cpp    |  6 +++---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  5 +++--
 8 files changed, 45 insertions(+), 30 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 67269335136..9b2125c9446 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
 
 #include "olap/olap_common.h"
 #include "olap/rowset/beta_rowset_writer.h"
+#include "util/uid_util.h"
 #include "vec/sink/load_stream_stub.h"
 
 namespace doris {
@@ -45,9 +46,9 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
     }
     _bytes_appended += bytes_req;
 
-    VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
-               << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
-               << ", segment_id: " << _segment_id << ", data_length: " << 
bytes_req;
+    VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
+               << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id
+               << ", data_length: " << bytes_req;
 
     std::span<const Slice> slices {data, data_cnt};
     for (auto& stream : _streams) {
@@ -58,9 +59,8 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
 }
 
 Status StreamSinkFileWriter::finalize() {
-    VLOG_DEBUG << "writer finalize, load_id: " << 
UniqueId(_load_id).to_string()
-               << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
-               << ", segment_id: " << _segment_id;
+    VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", 
index_id: " << _index_id
+               << ", tablet_id: " << _tablet_id << ", segment_id: " << 
_segment_id;
     // TODO(zhengyu): update get_inverted_index_file_size into stat
     for (auto& stream : _streams) {
         RETURN_IF_ERROR(
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 25e79ccab58..b313307c70b 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -306,8 +306,8 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
                         }
                     }
                     LOG(INFO) << "close load " << *this
-                              << ", failed_tablet_num=" << 
failed_tablet_ids->size()
-                              << ", success_tablet_num=" << 
success_tablet_ids->size();
+                              << ", success_tablet_num=" << 
success_tablet_ids->size()
+                              << ", failed_tablet_num=" << 
failed_tablet_ids->size();
                     std::unique_lock<bthread::Mutex> lock(mutex);
                     cond.notify_one();
                 });
@@ -324,7 +324,7 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
 void LoadStream::_report_result(StreamId stream, const Status& st,
                                 const std::vector<int64_t>& success_tablet_ids,
                                 const std::vector<int64_t>& failed_tablet_ids) 
{
-    LOG(INFO) << "report result, success tablet num " << 
success_tablet_ids.size()
+    LOG(INFO) << "report result " << *this << ", success tablet num " << 
success_tablet_ids.size()
               << ", failed tablet num " << failed_tablet_ids.size();
     butil::IOBuf buf;
     PWriteStreamSinkResponse response;
@@ -456,9 +456,8 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
     VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << 
hdr.src_id()
                << " with tablet " << hdr.tablet_id();
     if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
-        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load 
id {}, expected {}",
-                                                               
UniqueId(hdr.load_id()).to_string(),
-                                                               
UniqueId(_load_id).to_string());
+        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
+                "invalid load id {}, expected {}", print_id(hdr.load_id()), 
print_id(_load_id));
         _report_failure(id, st, hdr);
         return;
     }
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index ce249536e76..751b246b9c6 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -359,8 +359,8 @@ void 
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
         brpc::StreamOptions stream_options;
 
-        LOG(INFO) << "open load stream, load_id = " << request->load_id()
-                  << ", src_id = " << request->src_id();
+        LOG(INFO) << "open load stream, load_id=" << request->load_id()
+                  << ", src_id=" << request->src_id();
 
         for (const auto& req : request->tablets()) {
             TabletManager* tablet_mgr = 
StorageEngine::instance()->tablet_manager();
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index dc9a2765a55..df9c0fc1c8c 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -41,13 +41,13 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
 Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
     int num_use = --_use_cnt;
     if (num_use > 0) {
-        LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , 
use_cnt = " << num_use;
+        LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , 
use_cnt=" << num_use;
         return Status::OK();
     }
-    LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
     if (_pool != nullptr) {
         _pool->erase(_load_id);
     }
+    LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
     Status status = Status::OK();
     _map.for_each([&status](auto& entry) {
         if (status.ok()) {
@@ -57,6 +57,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
     if (!status.ok()) {
         return status;
     }
+    LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
     _map.for_each([&status, profile](auto& entry) {
         if (status.ok()) {
             status = entry.second->close_wait(profile);
@@ -67,7 +68,7 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
 
 void DeltaWriterV2Map::cancel(Status status) {
     int num_use = --_use_cnt;
-    LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " 
<< num_use;
+    LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << 
num_use;
     if (num_use == 0 && _pool != nullptr) {
         _pool->erase(_load_id);
     }
@@ -95,7 +96,7 @@ std::shared_ptr<DeltaWriterV2Map> 
DeltaWriterV2Pool::get_or_create(PUniqueId loa
 
 void DeltaWriterV2Pool::erase(UniqueId load_id) {
     std::lock_guard<std::mutex> lock(_mutex);
-    LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
+    LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
     _pool.erase(load_id);
 }
 
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 8319def542c..793098a3e9d 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -23,6 +23,7 @@
 #include "util/brpc_client_cache.h"
 #include "util/network_util.h"
 #include "util/thrift_util.h"
+#include "util/uid_util.h"
 
 namespace doris {
 
@@ -37,12 +38,15 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
         Status st = Status::create(response.status());
 
         std::stringstream ss;
-        ss << "received response from backend " << _dst_id;
+        ss << "on_received_messages, load_id=" << _load_id << ", backend_id=" 
<< _dst_id;
         if (response.success_tablet_ids_size() > 0) {
             ss << ", success tablet ids:";
             for (auto tablet_id : response.success_tablet_ids()) {
                 ss << " " << tablet_id;
             }
+            if (response.success_tablet_ids_size() == 0) {
+                ss << " none";
+            }
             std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
             for (auto tablet_id : response.success_tablet_ids()) {
                 _success_tablets.push_back(tablet_id);
@@ -53,6 +57,9 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
             for (auto tablet_id : response.failed_tablet_ids()) {
                 ss << " " << tablet_id;
             }
+            if (response.failed_tablet_ids_size() == 0) {
+                ss << " none";
+            }
             std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
             for (auto tablet_id : response.failed_tablet_ids()) {
                 _failed_tablets.push_back(tablet_id);
@@ -86,6 +93,7 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
 }
 
 void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+    LOG(INFO) << "on_closed, load_id=" << _load_id << ", stream_id=" << id;
     std::lock_guard<bthread::Mutex> lock(_mutex);
     _is_closed.store(true);
     _close_cv.notify_all();
@@ -123,6 +131,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     }
     _dst_id = node_info.id;
     _handler.set_dst_id(_dst_id);
+    _handler.set_load_id(_load_id);
     std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
     brpc::StreamOptions opt;
     opt.max_buf_size = 20 << 20; // 20MB
@@ -160,8 +169,8 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
         return Status::InternalError("Failed to connect to backend {}: {}", 
_dst_id,
                                      cntl.ErrorText());
     }
-    LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id 
<< " (" << host_port
-              << ")";
+    LOG(INFO) << "open load stream " << _stream_id << " load_id=" << 
print_id(_load_id)
+              << " for backend " << _dst_id << " (" << host_port << ")";
     _is_init.store(true);
     return Status::OK();
 }
@@ -227,12 +236,15 @@ Status LoadStreamStub::get_schema(const 
std::vector<PTabletID>& tablets) {
     header.set_src_id(_src_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
     std::ostringstream oss;
-    oss << "fetching tablet schema from stream " << _stream_id << ", load id: 
" << _load_id
-        << ", tablet id:";
+    oss << "fetching tablet schema from stream " << _stream_id
+        << ", load id: " << print_id(_load_id) << ", tablet id:";
     for (const auto& tablet : tablets) {
         *header.add_tablets() = tablet;
         oss << " " << tablet.tablet_id();
     }
+    if (tablets.size() == 0) {
+        oss << " none";
+    }
     LOG(INFO) << oss.str();
     return _encode_and_send(header);
 }
@@ -318,12 +330,12 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& 
buf) {
             const timespec time = butil::seconds_from_now(60);
             int wait_ret = brpc::StreamWait(_stream_id, &time);
             if (wait_ret != 0) {
-                return Status::InternalError("StreamWait failed, err = ", 
wait_ret);
+                return Status::InternalError("StreamWait failed, err=", 
wait_ret);
             }
             break;
         }
         default:
-            return Status::InternalError("StreamWrite failed, err = {}", ret);
+            return Status::InternalError("StreamWrite failed, err={}", ret);
         }
     }
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 8a2c9dfce59..bbed172f8ac 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -121,8 +121,10 @@ private:
         }
 
         void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
+        void set_load_id(PUniqueId load_id) { _load_id = UniqueId(load_id); }
 
     private:
+        UniqueId _load_id;    // for logging
         int64_t _dst_id = -1; // for logging
         std::atomic<bool> _is_closed;
         bthread::Mutex _mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index ec7e53211fb..240e44ef380 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -30,11 +30,11 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, 
int num_use, LoadStre
 void LoadStreams::release() {
     int num_use = --_use_cnt;
     if (num_use == 0) {
-        LOG(INFO) << "releasing streams for load_id = " << _load_id << ", 
dst_id = " << _dst_id;
+        LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
         _pool->erase(_load_id, _dst_id);
     } else {
-        LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", 
dst_id = " << _dst_id
-                  << ", use_cnt = " << num_use;
+        LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << 
_dst_id
+                  << ", use_cnt=" << num_use;
     }
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 19ed6caf291..8bc65a4cba4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -187,11 +187,12 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     _stream_per_node = state->load_stream_per_node();
     _total_streams = state->total_load_streams();
     _num_local_sink = state->num_local_sink();
+    LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
+              << ", num senders: " << _num_senders << ", stream per node: " << 
_stream_per_node
+              << ", total_streams " << _total_streams << ", num_local_sink: " 
<< _num_local_sink;
     DCHECK(_stream_per_node > 0) << "load stream per node should be greator 
than 0";
     DCHECK(_total_streams > 0) << "total load streams should be greator than 
0";
     DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
-    LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << 
_stream_per_node
-              << ", total_streams " << _total_streams << ", num_local_sink: " 
<< _num_local_sink;
     _is_high_priority =
             (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to