This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 88c2c590b5 [improvement](load) print more load log (#11925)
88c2c590b5 is described below
commit 88c2c590b56e730f7bd499f83882b71febc0e5dd
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri Aug 19 17:47:53 2022 +0800
[improvement](load) print more load log (#11925)
---
be/src/exec/tablet_sink.cpp | 42 +++++++++++++++++++++---------------------
be/src/exec/tablet_sink.h | 1 +
be/src/olap/delta_writer.cpp | 6 +++---
be/src/olap/memtable.cpp | 6 ++----
4 files changed, 27 insertions(+), 28 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index be4cdca5e6..560189c03b 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -169,9 +169,7 @@ Status NodeChannel::open_wait() {
_cancelled = true;
LOG(WARNING) << ss.str() << " " << channel_info();
- return Status::InternalError("failed to open tablet writer, error={},
error_text={}",
- berror(_open_closure->cntl.ErrorCode()),
- _open_closure->cntl.ErrorText());
+ return Status::InternalError(ss.str());
}
Status status(_open_closure->result.status());
if (_open_closure->unref()) {
@@ -260,15 +258,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t
tablet_id) {
}
}
- // We use OlapTableSink mem_tracker which has the same ancestor of _plan
node,
- // so in the ideal case, mem limit is a matter for _plan node.
- // But there is still some unfinished things, we do mem limit here
temporarily.
- // _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
- // It's fine to do a fake add_row() and return OK, because we will check
_cancelled in next add_row() or mark_close().
- while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes)
{
- SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
+ _sleep_if_memory_exceed();
auto row_no = _cur_batch->add_row();
if (row_no == RowBatch::INVALID_ROW_INDEX) {
@@ -309,15 +299,8 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
}
}
- // We use OlapTableSink mem_tracker which has the same ancestor of _plan
node,
- // so in the ideal case, mem limit is a matter for _plan node.
- // But there is still some unfinished things, we do mem limit here
temporarily.
- // _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
- // It's fine to do a fake add_row() and return OK, because we will check
_cancelled in next add_row() or mark_close().
- while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes)
{
- SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
+ _sleep_if_memory_exceed();
+
constexpr size_t BATCH_SIZE_FOR_SEND = 2 * 1024 * 1024; //2M
auto row_no = _cur_batch->add_row();
if (row_no == RowBatch::INVALID_ROW_INDEX ||
@@ -346,6 +329,23 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
return Status::OK();
}
+void NodeChannel::_sleep_if_memory_exceed() {
+ size_t begin_us = _mem_exceeded_block_ns / 1000;
+ while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes)
{
+ SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ if (_mem_exceeded_block_ns / 1000 - begin_us > 5000000) {
+ begin_us = _mem_exceeded_block_ns / 1000;
+ LOG(INFO) << "sink sleeps too long, pending_batches_bytes = " <<
_pending_batches_bytes
+ << ", max_pending_batches_bytes = " <<
_max_pending_batches_bytes
+ << ", is_packet_in_flight = " <<
_add_batch_closure->is_packet_in_flight()
+ << ", next_packet_seq = " << _next_packet_seq
+ << ", cur_batch_rows = " << _cur_batch->num_rows()
+ << ", " << channel_info();
+ }
+ }
+}
+
void NodeChannel::mark_close() {
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index d3e083363d..34abf496c0 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -233,6 +233,7 @@ public:
private:
void _cancel_with_msg(const std::string& msg);
+ void _sleep_if_memory_exceed();
private:
OlapTableSink* _parent = nullptr;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index e85cf77f5c..b33b5b215b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -314,9 +314,9 @@ OLAPStatus DeltaWriter::close_wait() {
_delta_written_success = true;
const FlushStatistic& stat = _flush_token->get_stats();
- VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id()
- << ", load id: " << print_id(_req.load_id)
- << ", stats: " << stat;
+ LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
+ << ", load id: " << print_id(_req.load_id)
+ << ", stats: " << stat;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a7d6728ebe..671bb02d52 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -134,8 +134,6 @@ void MemTable::_aggregate_two_row_with_sequence(const
ContiguousRow& src_row,
}
OLAPStatus MemTable::flush() {
- VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
- << ", memsize: " << memory_usage() << ", rows: " << _rows;
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
@@ -157,8 +155,8 @@ OLAPStatus MemTable::flush() {
}
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
- VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
- << ", flushsize: " << _flush_size;
+ LOG(INFO) << "flush memtable, tablet: " << _tablet_id << ", memsize: " <<
memory_usage()
+ << ", rows: " << _rows << ", flushsize: " << _flush_size << ",
duration_us: " << duration_ns / 1000;
return OLAP_SUCCESS;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]