This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fefbde8927508d696a0d70febf3dece51b938462
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 10 22:30:32 2024 +0800

    [log](move-memtable) improve logs in vtablet_writer_v2 and load_stream 
(#33103)
---
 be/src/runtime/load_stream.cpp               | 26 ++++++++++++++++----------
 be/src/vec/sink/load_stream_stub.cpp         |  3 +++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  9 ++++++++-
 3 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 037d4764f53..266a4b97183 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -28,6 +28,7 @@
 #include <runtime/exec_env.h>
 
 #include <memory>
+#include <sstream>
 
 #include "common/signal_handler.h"
 #include "exec/tablet_info.h"
@@ -124,8 +125,8 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             for (size_t index = origin_size; index <= segid; index++) {
                 mapping->at(index) = _next_segid;
                 _next_segid++;
-                LOG(INFO) << "src_id=" << src_id << ", segid=" << index << " 
to "
-                          << " segid=" << _next_segid - 1;
+                VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " 
to "
+                           << " segid=" << _next_segid - 1 << ", " << *this;
             }
         }
     }
@@ -383,7 +384,7 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
     }
     _close_load_cnt++;
     LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
-              << _total_streams - _close_load_cnt << " senders";
+              << _total_streams - _close_load_cnt << " senders, " << *this;
 
     _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
                               tablets_to_commit.end());
@@ -432,14 +433,14 @@ void LoadStream::_report_result(StreamId stream, const 
Status& status,
         if (st.ok()) {
             response.set_load_stream_profile(buf, len);
         } else {
-            LOG(WARNING) << "load channel TRuntimeProfileTree serialize 
failed, errmsg=" << st;
+            LOG(WARNING) << "TRuntimeProfileTree serialize failed, errmsg=" << 
st << ", " << *this;
         }
     }
 
     buf.append(response.SerializeAsString());
     auto wst = _write_stream(stream, buf);
     if (!wst.ok()) {
-        LOG(WARNING) << *this << " report result failed with " << wst;
+        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
     }
 }
 
@@ -464,7 +465,7 @@ void LoadStream::_report_schema(StreamId stream, const 
PStreamHeader& hdr) {
     buf.append(response.SerializeAsString());
     auto wst = _write_stream(stream, buf);
     if (!wst.ok()) {
-        LOG(WARNING) << *this << " report result failed with " << wst;
+        LOG(WARNING) << " report result failed with " << wst << ", " << *this;
     }
 }
 
@@ -592,26 +593,31 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         _report_schema(id, hdr);
     } break;
     default:
-        LOG(WARNING) << "unexpected stream message " << hdr.opcode();
+        LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " 
<< *this;
         DCHECK(false);
     }
 }
 
 void LoadStream::on_idle_timeout(StreamId id) {
-    LOG(WARNING) << "closing load stream on idle timeout, load_id=" << 
print_id(_load_id);
+    LOG(WARNING) << "closing load stream on idle timeout, " << *this;
     brpc::StreamClose(id);
 }
 
 void LoadStream::on_closed(StreamId id) {
+    // `this` may be freed by other threads after increasing `_close_rpc_cnt`,
+    // format string first to prevent use-after-free
+    std::stringstream ss;
+    ss << *this;
     auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
-    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << 
remaining_streams;
+    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << 
remaining_streams << ", "
+              << ss.str();
     if (remaining_streams == 0) {
         _load_stream_mgr->clear_load(_load_id);
     }
 }
 
 inline std::ostream& operator<<(std::ostream& ostr, const LoadStream& 
load_stream) {
-    ostr << "load_id=" << UniqueId(load_stream._load_id) << ", txn_id=" << 
load_stream._txn_id;
+    ostr << "load_id=" << print_id(load_stream._load_id) << ", txn_id=" << 
load_stream._txn_id;
     return ostr;
 }
 
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 414e253c155..6d661a4c88e 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -305,6 +305,9 @@ Status LoadStreamStub::close_wait(RuntimeState* state, 
int64_t timeout_ms) {
     while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
         //the query maybe cancel, so need check after wait 1s
         timeout_sec = timeout_sec - 1;
+        LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << 
timeout_sec
+                  << ", is_closed=" << _is_closed.load()
+                  << ", is_cancelled=" << 
state->get_query_ctx()->is_cancelled();
         int ret = _close_cv.wait_for(lock, 1000000);
         if (ret != 0 && timeout_sec <= 0) {
             return Status::InternalError(
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index af1caefa43d..f05400fc6e4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -246,6 +246,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
 
 Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
     RETURN_IF_ERROR(_init(state, profile));
+    LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << 
", txn_id=" << _txn_id
+              << ", sink_id=" << _sender_id;
     _timeout_watch.start();
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -462,7 +464,8 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
 
 Status VTabletWriterV2::_cancel(Status status) {
     LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
-              << ", txn_id=" << _txn_id << ", due to error: " << status;
+              << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id
+              << ", due to error: " << status;
     if (_delta_writer_for_tablet) {
         _delta_writer_for_tablet->cancel(status);
         _delta_writer_for_tablet.reset();
@@ -503,6 +506,8 @@ Status VTabletWriterV2::close(Status exec_status) {
     if (_is_closed) {
         return _close_status;
     }
+    LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << 
", txn_id=" << _txn_id
+              << ", sink_id=" << _sender_id << ", status=" << 
exec_status.to_string();
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
 
@@ -625,6 +630,8 @@ Status VTabletWriterV2::close(Status exec_status) {
 }
 
 void VTabletWriterV2::_calc_tablets_to_commit() {
+    LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << 
", txn_id=" << _txn_id
+              << ", sink_id=" << _sender_id;
     for (const auto& [dst_id, tablets] : _tablets_for_node) {
         std::vector<PTabletID> tablets_to_commit;
         std::vector<int64_t> partition_ids;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to