This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new d366476697 [chore](BE) add more log for better tracing for be write
(#14425)
d366476697 is described below
commit d366476697c626470b9a129e5747bbbb173b07e0
Author: AlexYue <[email protected]>
AuthorDate: Thu Nov 24 22:51:53 2022 +0800
[chore](BE) add more log for better tracing for be write (#14425)
Recently when tracing when bug happened in version1.1.4 I found out there
were some places we can add more log for a better tracing.
---
be/src/exec/tablet_sink.cpp | 19 ++++++++++++-----
be/src/olap/delta_writer.cpp | 6 +++++-
be/src/olap/memtable_flush_executor.cpp | 6 +++++-
be/src/runtime/tablets_channel.cpp | 36 +++++++++++++++++++++------------
4 files changed, 47 insertions(+), 20 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f7c4268704..8db57b45ff 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -200,6 +200,7 @@ Status NodeChannel::open_wait() {
} else if (is_last_rpc) {
// if this is last rpc, will must set _add_batches_finished.
otherwise, node channel's close_wait
// will be blocked.
+ VLOG_PROGRESS << "node channel " << channel_info() << "
add_batches_finished";
_add_batches_finished = true;
}
_add_batch_counter.add_batch_rpc_time_us +=
_add_batch_closure->watch.elapsed_time() / 1000;
@@ -232,6 +233,9 @@ Status NodeChannel::open_wait() {
commit_info.backendId = _node_id;
_tablet_commit_infos.emplace_back(std::move(commit_info));
}
+ VLOG_PROGRESS << "node channel " << channel_info()
+ << " add_batches_finished and handled "
+ << result.tablet_errors().size() << " tablets
errors";
_add_batches_finished = true;
}
} else {
@@ -348,8 +352,7 @@ void NodeChannel::_sleep_if_memory_exceed() {
<< ", max_pending_batches_bytes = " <<
_max_pending_batches_bytes
<< ", is_packet_in_flight = " <<
_add_batch_closure->is_packet_in_flight()
<< ", next_packet_seq = " << _next_packet_seq
- << ", cur_batch_rows = " << _cur_batch->num_rows()
- << ", " << channel_info();
+ << ", cur_batch_rows = " << _cur_batch->num_rows() << ",
" << channel_info();
}
}
}
@@ -370,7 +373,8 @@ void NodeChannel::mark_close() {
DCHECK(_pending_batches.back().second.eos());
_close_time_ms = UnixMillis();
LOG(INFO) << channel_info()
- << " mark closed, left pending batch size: " <<
_pending_batches.size();
+ << " mark closed, left pending batch size: " <<
_pending_batches.size()
+ << " left pending batch size: " << _pending_batches_bytes;
}
_eos_is_produced = true;
@@ -672,6 +676,8 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host,
const std::string& err,
int64_t tablet_id) {
+ VLOG_PROGRESS << "mark node_id:" << node_id << " tablet_id: " << tablet_id
+ << " as failed, err: " << err;
const auto& it = _tablets_by_channel.find(node_id);
if (it == _tablets_by_channel.end()) {
return;
@@ -883,6 +889,9 @@ Status OlapTableSink::prepare(RuntimeState* state) {
}
}
auto channel = std::make_shared<IndexChannel>(this, index->index_id);
+ if (UNLIKELY(tablets.empty())) {
+ LOG(WARNING) << "load job:" << state->load_job_id() << " index: "
<< index->index_id << " would open 0 tablet";
+ }
RETURN_IF_ERROR(channel->init(state, tablets));
_channels.emplace_back(channel);
}
@@ -1109,8 +1118,8 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
<< ")(" << (pair.second.add_batch_wait_execution_time_us /
1000) << ")("
- << (pair.second.add_batch_rpc_time_us / 1000) << ")(" <<
pair.second.close_wait_time_ms
- << ")(" << pair.second.add_batch_num << ")} ";
+ << (pair.second.add_batch_rpc_time_us / 1000) << ")("
+ << pair.second.close_wait_time_ms << ")(" <<
pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 7dff080a9d..fb2f69a235 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -295,7 +295,11 @@ OLAPStatus DeltaWriter::close_wait() {
}
// return error if previous flush failed
- RETURN_NOT_OK(_flush_token->wait());
+ auto st = _flush_token->wait();
+ if (OLAP_UNLIKELY(st != OLAP_SUCCESS)) {
+ LOG(WARNING) << "previous flush failed tablet " <<
_tablet->tablet_id();
+ return st;
+ }
// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index b63074d282..0a9ad6242e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -42,7 +42,11 @@ std::ostream& operator<<(std::ostream& os, const
FlushStatistic& stat) {
OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
RETURN_NOT_OK(_flush_status.load());
int64_t submit_task_time = MonotonicNanos();
- _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this,
memtable, submit_task_time));
+ auto st =
_flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this,
memtable, submit_task_time));
+ if (UNLIKELY(!st.ok())) {
+ VLOG_CRITICAL << "submit func err: " << st.get_error_msg();
+ return OLAP_ERR_OTHER_ERROR;
+ }
return OLAP_SUCCESS;
}
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 3785eedd10..66a5a3fdc2 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -35,7 +35,10 @@ std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
const std::shared_ptr<MemTracker>& mem_tracker,
bool is_high_priority)
- : _key(key), _state(kInitialized), _closed_senders(64),
_is_high_priority(is_high_priority) {
+ : _key(key),
+ _state(kInitialized),
+ _closed_senders(64),
+ _is_high_priority(is_high_priority) {
_mem_tracker = MemTracker::CreateTracker(-1, "TabletsChannel",
mem_tracker);
static std::once_flag once_flag;
std::call_once(once_flag, [] {
@@ -78,26 +81,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest&
request) {
}
Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
- PTabletWriterAddBatchResult* response) {
+ PTabletWriterAddBatchResult* response) {
DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
int64_t cur_seq;
{
std::lock_guard<std::mutex> l(_lock);
if (_state != kOpened) {
return _state == kFinished
- ? _close_status
- : Status::InternalError(strings::Substitute("TabletsChannel $0
state: $1",
- _key.to_string(), _state));
+ ? _close_status
+ : Status::InternalError(strings::Substitute(
+ "TabletsChannel $0 state: $1",
_key.to_string(), _state));
}
cur_seq = _next_seqs[request.sender_id()];
// check packet
if (request.packet_seq() < cur_seq) {
LOG(INFO) << "packet has already recept before, expect_seq=" <<
cur_seq
- << ", recept_seq=" << request.packet_seq();
+ << ", recept_seq=" << request.packet_seq();
return Status::OK();
} else if (request.packet_seq() > cur_seq) {
LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
- << ", recept_seq=" << request.packet_seq();
+ << ", recept_seq=" << request.packet_seq();
return Status::InternalError("lost data packet");
}
}
@@ -108,22 +111,24 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBatchRequest& request,
int64_t tablet_id = request.tablet_ids(i);
if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
// skip broken tablets
+ VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
continue;
}
auto it = tablet_to_rowidxs.find(tablet_id);
if (it == tablet_to_rowidxs.end()) {
- tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i
});
+ tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>
{i});
} else {
it->second.emplace_back(i);
}
}
- google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
+ response->mutable_tablet_errors();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
auto tablet_writer_it =
_tablet_writers.find(tablet_to_rowidxs_it.first);
if (tablet_writer_it == _tablet_writers.end()) {
- return Status::InternalError(
- strings::Substitute("unknown tablet to append data,
tablet=$0", tablet_to_rowidxs_it.first));
+ return Status::InternalError(strings::Substitute(
+ "unknown tablet to append data, tablet=$0",
tablet_to_rowidxs_it.first));
}
OLAPStatus st = tablet_writer_it->second->write(&row_batch,
tablet_to_rowidxs_it.second);
@@ -192,6 +197,8 @@ Status TabletsChannel::close(int sender_id, int64_t
backend_id, bool* finished,
// just skip this tablet(writer) and continue to close
others
continue;
}
+ VLOG_PROGRESS << "cancel tablet writer successfully,
tablet_id=" << it.first
+ << ", transaction_id=" << _txn_id;
}
}
@@ -221,6 +228,8 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(writer->tablet_id());
tablet_error->set_msg("close wait failed: " +
boost::lexical_cast<string>(st));
+ VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() <<
" transaction_id "
+ << _txn_id << "err msg " << st;
}
}
@@ -255,7 +264,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
int64_t mem_to_flushed = mem_limit / 3;
int counter = 0;
- int64_t sum = 0;
+ int64_t sum = 0;
for (auto writer : writers) {
if (writer->memtable_consumption() <= 0) {
break;
@@ -278,7 +287,8 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
}
OLAPStatus st = writers[i]->wait_flush();
if (st != OLAP_SUCCESS) {
- return Status::InternalError(fmt::format("failed to reduce mem
consumption by flushing memtable. err: {}", st));
+ return Status::InternalError(fmt::format(
+ "failed to reduce mem consumption by flushing memtable.
err: {}", st));
}
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]