This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26289c2 [fix](load)(compaction) Fix NodeChannel coredump bug and
modify some compaction logic (#8072)
26289c2 is described below
commit 26289c28b0adf06e96f3789bb5560e5759005ea6
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Feb 17 10:52:08 2022 +0800
[fix](load)(compaction) Fix NodeChannel coredump bug and modify some
compaction logic (#8072)
1. Fix the problem of BE crash caused by destruct sequence. (close #8058)
2. Add a new BE config `compaction_task_num_per_fast_disk`
This config specify the max concurrent compaction task num on fast
disk(typically .SSD).
So that for high speed disk, we can execute more compaction task at
same time,
to compact the data as soon as possible
3. Avoid frequent selection of unqualified tablet to perform compaction.
4. Modify some log level to reduce the log size of BE.
5. Modify some clone logic to handle error correctly.
---
be/src/agent/task_worker_pool.cpp | 8 ++--
be/src/common/config.h | 3 ++
be/src/exec/tablet_sink.cpp | 14 +++----
be/src/exec/tablet_sink.h | 17 +++++++-
be/src/http/action/compaction_action.cpp | 2 +-
be/src/http/http_client.h | 13 +++++--
be/src/olap/base_compaction.cpp | 32 +++++++--------
be/src/olap/compaction_permit_limiter.h | 2 +
be/src/olap/cumulative_compaction.cpp | 8 ++--
be/src/olap/data_dir.cpp | 10 +++--
be/src/olap/olap_define.h | 2 +-
be/src/olap/olap_server.cpp | 28 +++++++++-----
be/src/olap/rowset/beta_rowset.cpp | 10 ++---
be/src/olap/rowset/beta_rowset_writer.cpp | 3 +-
be/src/olap/tablet.cpp | 45 ++++++++++++----------
be/src/olap/tablet.h | 5 ++-
be/src/olap/tablet_meta_manager.cpp | 7 ++--
be/src/olap/task/engine_clone_task.cpp | 2 +-
be/src/olap/task/engine_publish_version_task.cpp | 8 ++--
be/src/olap/txn_manager.cpp | 20 +++++-----
be/src/olap/utils.cpp | 2 +-
be/src/runtime/load_channel.cpp | 2 +-
be/src/runtime/load_channel_mgr.cpp | 2 +-
be/src/runtime/plan_fragment_executor.cpp | 6 +--
docs/en/administrator-guide/config/be_config.md | 20 ++++------
docs/zh-CN/administrator-guide/config/be_config.md | 20 ++++------
.../org/apache/doris/clone/TabletSchedCtx.java | 8 ++--
27 files changed, 164 insertions(+), 135 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index a0eaf57..457f8da 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -252,7 +252,7 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest&
task) {
_worker_thread_condition_variable.notify_one();
}
LOG(INFO) << "success to submit task. type=" << type_str << ",
signature=" << signature
- << ", task_count_in_queue=" << task_count_in_queue;
+ << ", queue size=" << task_count_in_queue;
} else {
LOG(INFO) << "fail to register task. type=" << type_str << ",
signature=" << signature;
}
@@ -280,8 +280,8 @@ void TaskWorkerPool::_remove_task_info(const
TTaskType::type task_type, int64_t
std::string type_str;
EnumToString(TTaskType, task_type, type_str);
- LOG(INFO) << "remove task info. type=" << type_str << ", signature=" <<
signature
- << ", queue_size=" << queue_size;
+ VLOG_NOTICE << "remove task info. type=" << type_str << ", signature=" <<
signature
+ << ", queue_size=" << queue_size;
TRACE("remove task info");
}
@@ -713,7 +713,7 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
}
DorisMetrics::instance()->publish_task_request_total->increment(1);
- LOG(INFO) << "get publish version task, signature:" <<
agent_task_req.signature;
+ VLOG_NOTICE << "get publish version task, signature:" <<
agent_task_req.signature;
Status st;
std::vector<TTabletId> error_tablet_ids;
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c5eed39..0c6cf36 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -289,7 +289,10 @@ CONF_mInt32(generate_compaction_tasks_min_interval_ms,
"10");
// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction
have at least one thread each.
CONF_mInt32(compaction_task_num_per_disk, "2");
+// compaction thread num for fast disk(typically .SSD), must be greater than 2.
+CONF_mInt32(compaction_task_num_per_fast_disk, "4");
CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
return config >= 2; });
+CONF_Validator(compaction_task_num_per_fast_disk, [](const int config) -> bool
{ return config >= 2; });
// How many rounds of cumulative compaction for each round of base compaction
when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 590e113..6aad7df 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -346,6 +346,7 @@ Status NodeChannel::mark_close() {
_pending_batches.emplace(std::move(_cur_batch),
_cur_add_batch_request);
_pending_batches_num++;
DCHECK(_pending_batches.back().second.eos());
+ _close_time_ms = UnixMillis();
LOG(INFO) << channel_info()
<< " mark closed, left pending batch size: " <<
_pending_batches.size();
}
@@ -367,14 +368,10 @@ Status NodeChannel::close_wait(RuntimeState* state) {
}
// waiting for finished, it may take a long time, so we couldn't set a
timeout
- MonotonicStopWatch timer;
- timer.start();
while (!_add_batches_finished && !_cancelled) {
SleepFor(MonoDelta::FromMilliseconds(1));
}
- timer.stop();
- VLOG_CRITICAL << name() << " close_wait cost: " << timer.elapsed_time() /
1000000 << " ms"
- << ", " << _load_info;
+ _close_time_ms = UnixMillis() - _close_time_ms;
if (_add_batches_finished) {
{
@@ -979,7 +976,8 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// TODO need to be improved
LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns
<< ", total queue_push_lock_ns=" << queue_push_lock_ns
- << ", total actual_consume_ns=" << actual_consume_ns;
+ << ", total actual_consume_ns=" << actual_consume_ns
+ << ", load id=" << print_id(_load_id);
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
@@ -1003,11 +1001,11 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// print log of add batch time of all node, for tracing load
performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
- << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait
execution time(ms)/num: ";
+ << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait
execution time(ms)/close time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
<< ")(" << (pair.second.add_batch_wait_execution_time_us /
1000) << ")("
- << pair.second.add_batch_num << ")} ";
+ << pair.second.close_wait_time_ms << ")(" <<
pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index b31f384..916b012 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -65,10 +65,14 @@ struct AddBatchCounter {
int64_t add_batch_wait_execution_time_us = 0;
// number of add_batch call
int64_t add_batch_num = 0;
+ // time passed between marked close and finish close
+ int64_t close_wait_time_ms = 0;
+
AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
add_batch_execution_time_us += rhs.add_batch_execution_time_us;
add_batch_wait_execution_time_us +=
rhs.add_batch_wait_execution_time_us;
add_batch_num += rhs.add_batch_num;
+ close_wait_time_ms += rhs.close_wait_time_ms;
return *this;
}
friend AddBatchCounter operator+(const AddBatchCounter& lhs, const
AddBatchCounter& rhs) {
@@ -186,6 +190,7 @@ public:
int64_t* total_add_batch_exec_time_ns, int64_t*
add_batch_exec_time_ns,
int64_t* total_add_batch_num) {
(*add_batch_counter_map)[_node_id] += _add_batch_counter;
+ (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
*serialize_batch_ns += _serialize_batch_ns;
*mem_exceeded_block_ns += _mem_exceeded_block_ns;
*queue_push_lock_ns += _queue_push_lock_ns;
@@ -274,6 +279,9 @@ private:
// the previous RPC to be fully completed before the next copy.
std::string _tuple_data_buffer;
std::string* _tuple_data_buffer_ptr = nullptr;
+
+ // the timestamp when this node channel be marked closed and finished
closed
+ uint64_t _close_time_ms = 0;
};
class IndexChannel {
@@ -309,12 +317,17 @@ private:
int64_t _index_id;
int32_t _schema_hash;
+ // from backend channel to tablet_id
+ // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
+ // Because the destruct order of objects is opposite to the creation order.
+ // So NodeChannel will be destructured first.
+ // And the destructor function of NodeChannel waits for all RPCs to finish.
+ // This ensures that it is safe to use `_tablets_by_channel` in the
callback function for the end of the RPC.
+ std::unordered_map<int64_t, std::unordered_set<int64_t>>
_tablets_by_channel;
// BeId -> channel
std::unordered_map<int64_t, std::shared_ptr<NodeChannel>> _node_channels;
// from tablet_id to backend channel
std::unordered_map<int64_t, std::vector<std::shared_ptr<NodeChannel>>>
_channels_by_tablet;
- // from backend channel to tablet_id
- std::unordered_map<int64_t, std::unordered_set<int64_t>>
_tablets_by_channel;
// lock to protect _failed_channels and _failed_channels_msgs
mutable SpinLock _fail_lock;
diff --git a/be/src/http/action/compaction_action.cpp
b/be/src/http/action/compaction_action.cpp
index 6c52c91..64f1042 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -238,7 +238,7 @@ OLAPStatus
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet
std::string tracker_label = "CompactionAction:CumulativeCompaction:" +
std::to_string(syscall(__NR_gettid));
CumulativeCompaction cumulative_compaction(tablet, tracker_label,
_compaction_mem_tracker);
OLAPStatus res = cumulative_compaction.compact();
- if (res != OLAP_SUCCESS && res !=
OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
+ if (res != OLAP_SUCCESS && res !=
OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION) {
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
<< ", table=" << tablet->full_name();
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index 5fec678..3f3028e 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -97,10 +97,15 @@ public:
}
// used to get content length
- int64_t get_content_length() const {
- double cl = 0.0f;
- curl_easy_getinfo(_curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &cl);
- return cl;
+ // return -1 as error
+ Status get_content_length(uint64_t* length) const {
+ curl_off_t cl;
+ auto code = curl_easy_getinfo(_curl,
CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &cl);
+ if (!code) {
+ *length = cl;
+ return Status::OK();
+ }
+ return Status::InternalError(fmt::format("failed to get content
length. err code: {}", code));
}
long get_http_status() const {
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index cc7c358..f1722d3 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -99,10 +99,10 @@ OLAPStatus BaseCompaction::pick_rowsets_to_compact() {
// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas
threshold
if (_input_rowsets.size() > config::base_compaction_num_cumulative_deltas)
{
- LOG(INFO) << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
- << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
- << ", base_compaction_num_cumulative_rowsets="
- << config::base_compaction_num_cumulative_deltas;
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
+ << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+ << ", base_compaction_num_cumulative_rowsets="
+ << config::base_compaction_num_cumulative_deltas;
return OLAP_SUCCESS;
}
@@ -126,11 +126,11 @@ OLAPStatus BaseCompaction::pick_rowsets_to_compact() {
double cumulative_base_ratio = static_cast<double>(cumulative_total_size)
/ base_size;
if (cumulative_base_ratio > base_cumulative_delta_ratio) {
- LOG(INFO) << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
- << ", cumulative_total_size=" << cumulative_total_size
- << ", base_size=" << base_size
- << ", cumulative_base_ratio=" << cumulative_base_ratio
- << ", policy_ratio=" << base_cumulative_delta_ratio;
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
+ << ", cumulative_total_size=" << cumulative_total_size
+ << ", base_size=" << base_size
+ << ", cumulative_base_ratio=" << cumulative_base_ratio
+ << ", policy_ratio=" << base_cumulative_delta_ratio;
return OLAP_SUCCESS;
}
@@ -139,16 +139,16 @@ OLAPStatus BaseCompaction::pick_rowsets_to_compact() {
int64_t interval_threshold =
config::base_compaction_interval_seconds_since_last_operation;
int64_t interval_since_last_base_compaction = time(nullptr) -
base_creation_time;
if (interval_since_last_base_compaction > interval_threshold) {
- LOG(INFO) << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
- << ", interval_since_last_base_compaction=" <<
interval_since_last_base_compaction
- << ", interval_threshold=" << interval_threshold;
+ VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
+ << ", interval_since_last_base_compaction=" <<
interval_since_last_base_compaction
+ << ", interval_threshold=" << interval_threshold;
return OLAP_SUCCESS;
}
- LOG(INFO) << "don't satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
- << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
- << ", cumulative_base_ratio=" << cumulative_base_ratio
- << ", interval_since_last_base_compaction=" <<
interval_since_last_base_compaction;
+ VLOG_NOTICE << "don't satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
+ << ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
+ << ", cumulative_base_ratio=" << cumulative_base_ratio
+ << ", interval_since_last_base_compaction=" <<
interval_since_last_base_compaction;
return OLAP_ERR_BE_NO_SUITABLE_VERSION;
}
diff --git a/be/src/olap/compaction_permit_limiter.h
b/be/src/olap/compaction_permit_limiter.h
index 316eb62..965cb86 100644
--- a/be/src/olap/compaction_permit_limiter.h
+++ b/be/src/olap/compaction_permit_limiter.h
@@ -41,6 +41,8 @@ public:
void release(int64_t permits);
+ int64_t usage() const { return _used_permits; }
+
private:
// sum of "permits" held by executing compaction tasks currently
AtomicInt64 _used_permits;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index f987acd..67efa07 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -100,7 +100,7 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
config::cumulative_compaction_skip_window_seconds,
&candidate_rowsets);
if (candidate_rowsets.empty()) {
- return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
+ return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION;
}
// candidate_rowsets may not be continuous. Because some rowset may not be
selected
@@ -124,14 +124,14 @@ OLAPStatus
CumulativeCompaction::pick_rowsets_to_compact() {
&_last_delete_version, &compaction_score);
// Cumulative compaction will process with at least 1 rowset.
- // So when there is no rowset being chosen, we should return
OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS:
+ // So when there is no rowset being chosen, we should return
OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION:
if (_input_rowsets.empty()) {
if (_last_delete_version.first != -1) {
// we meet a delete version, should increase the cumulative point
to let base compaction handle the delete version.
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max
version of this tablet, but it doesn't matter.
_tablet->set_cumulative_layer_point(_last_delete_version.first +
1);
- return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
+ return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION;
}
// we did not meet any delete version. which means compaction_score is
not enough to do cumulative compaction.
@@ -175,7 +175,7 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
}
}
- return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
+ return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION;
}
return OLAP_SUCCESS;
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 5bb86df..049f366 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -527,14 +527,16 @@ OLAPStatus DataDir::load() {
// 1. add committed rowset to txn map
// 2. add visible rowset to tablet
// ignore any errors when load tablet or rowset, because fe will repair
them after report
+ int64_t tablet_not_found = 0;
for (auto rowset_meta : dir_rowset_metas) {
TabletSharedPtr tablet =
_tablet_manager->get_tablet(rowset_meta->tablet_id(),
rowset_meta->tablet_schema_hash());
// tablet maybe dropped, but not drop related rowset meta
if (tablet == nullptr) {
- LOG(WARNING) << "could not find tablet id: " <<
rowset_meta->tablet_id()
- << ", schema hash: " <<
rowset_meta->tablet_schema_hash()
- << ", for rowset: " << rowset_meta->rowset_id() << ",
skip this rowset";
+ VLOG_NOTICE << "could not find tablet id: " <<
rowset_meta->tablet_id()
+ << ", schema hash: " <<
rowset_meta->tablet_schema_hash()
+ << ", for rowset: " << rowset_meta->rowset_id() << ",
skip this rowset";
+ ++tablet_not_found;
continue;
}
RowsetSharedPtr rowset;
@@ -584,6 +586,8 @@ OLAPStatus DataDir::load() {
<< " current valid tablet uid: " <<
tablet->tablet_uid();
}
}
+ LOG(INFO) << "finish to load tablets from " << _path_desc.filepath << ",
total rowset meta: "
+ << dir_rowset_metas.size() << ", tablet not found: " <<
tablet_not_found;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 6a3f4d6..19205e6 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -352,7 +352,7 @@ enum OLAPStatus {
// Cumulative Handler
// [-2000, -3000)
- OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS = -2000,
+ OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION = -2000,
OLAP_ERR_CUMULATIVE_REPEAT_INIT = -2001,
OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS = -2002,
OLAP_ERR_CUMULATIVE_FAILED_ACQUIRE_DATA_SOURCE = -2003,
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 7acca70..05a5e69 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -397,7 +397,11 @@ void StorageEngine::_compaction_tasks_producer_callback() {
/// If it is not cleaned up, the reference count of the tablet
will always be greater than 1,
/// thus cannot be collected by the garbage collector.
(TabletManager::start_trash_sweep)
for (const auto& tablet : tablets_compaction) {
- _submit_compaction_task(tablet, compaction_type);
+ Status st = _submit_compaction_task(tablet, compaction_type);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to submit compaction task for
tablet: " << tablet->tablet_id()
+ << ", err: " << st.get_error_msg();
+ }
}
interval = config::generate_compaction_tasks_min_interval_ms;
} else {
@@ -435,13 +439,14 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
// If so, the last Slot can be assigned to Base compaction,
// otherwise, this Slot needs to be reserved for cumulative compaction.
int count = copied_cumu_map[data_dir].size() +
copied_base_map[data_dir].size();
- if (count >= config::compaction_task_num_per_disk) {
+ int thread_per_disk = data_dir->is_ssd_disk() ?
config::compaction_task_num_per_fast_disk :
config::compaction_task_num_per_disk;
+ if (count >= thread_per_disk) {
// Return if no available slot
need_pick_tablet = false;
if (!check_score) {
continue;
}
- } else if (count >= config::compaction_task_num_per_disk - 1) {
+ } else if (count >= thread_per_disk - 1) {
// Only one slot left, check if it can be assigned to base
compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (copied_cumu_map[data_dir].empty()) {
@@ -548,9 +553,9 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Compaction
"compaction task has already been submitted, tablet_id=$0,
compaction_type=$1.",
tablet->tablet_id(), compaction_type));
}
- int64_t permits =
- tablet->prepare_compaction_and_calculate_permits(compaction_type,
tablet);
- if (permits > 0 && _permit_limiter.request(permits)) {
+ int64_t permits = 0;
+ Status st =
tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet,
&permits);
+ if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
auto st = _compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
@@ -573,9 +578,14 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Compaction
// reset compaction
tablet->reset_compaction(compaction_type);
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
- return Status::InternalError(strings::Substitute(
- "failed to prepare compaction task and calculate permits,
tablet_id=$0, compaction_type=$1.",
- tablet->tablet_id(), compaction_type));
+ if (st != OLAP_SUCCESS) {
+ return Status::InternalError(strings::Substitute(
+ "failed to prepare compaction task and calculate
permits, tablet_id=$0, compaction_type=$1, "
+ "permit=$2, current_permit=$3, status=$4",
+ tablet->tablet_id(), compaction_type, permits,
_permit_limiter.usage(), st.get_error_msg()));
+ } else {
+ return Status::OK();
+ }
}
}
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 99ce9e5..302e6ac 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -92,9 +92,9 @@ OLAPStatus BetaRowset::split_range(const RowCursor&
start_key, const RowCursor&
OLAPStatus BetaRowset::remove() {
// TODO should we close and remove all segment reader first?
- LOG(INFO) << "begin to remove files in rowset " << unique_id()
- << ", version:" << start_version() << "-" << end_version()
- << ", tabletid:" << _rowset_meta->tablet_id();
+ VLOG_NOTICE << "begin to remove files in rowset " << unique_id()
+ << ", version:" << start_version() << "-" << end_version()
+ << ", tabletid:" << _rowset_meta->tablet_id();
bool success = true;
for (int i = 0; i < num_segments(); ++i) {
FilePathDesc path_desc = segment_file_path(_rowset_path_desc,
rowset_id(), i);
@@ -102,8 +102,8 @@ OLAPStatus BetaRowset::remove() {
fs::BlockManager* block_mgr =
fs::fs_util::block_manager(path_desc.storage_medium);
if (!block_mgr->delete_block(path_desc).ok()) {
char errmsg[64];
- LOG(WARNING) << "failed to delete file. err=" << strerror_r(errno,
errmsg, 64)
- << ", " << path_desc.debug_string();
+ VLOG_NOTICE << "failed to delete file. err=" << strerror_r(errno,
errmsg, 64)
+ << ", " << path_desc.debug_string();
success = false;
}
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index be87131..4b68b39 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -217,7 +217,8 @@ OLAPStatus
BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::
DCHECK(block_mgr != nullptr);
Status st = block_mgr->create_block(opts, &wblock);
if (!st.ok()) {
- LOG(WARNING) << "failed to create writable block. path=" <<
path_desc.filepath;
+ LOG(WARNING) << "failed to create writable block. path=" <<
path_desc.filepath
+ << ", err: " << st.get_error_msg();
return OLAP_ERR_INIT_FAILED;
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b96290d..f9856d7 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -530,11 +530,11 @@ void Tablet::delete_expired_stale_rowset() {
bool reconstructed = _reconstruct_version_tracker_if_necessary();
- LOG(INFO) << "delete stale rowset _stale_rs_version_map tablet=" <<
full_name()
- << " current_size=" << _stale_rs_version_map.size() << "
old_size=" << old_size
- << " current_meta_size=" <<
_tablet_meta->all_stale_rs_metas().size()
- << " old_meta_size=" << old_meta_size << " sweep endtime " <<
std::fixed
- << expired_stale_sweep_endtime << ", reconstructed=" <<
reconstructed;
+ VLOG_NOTICE << "delete stale rowset _stale_rs_version_map tablet=" <<
full_name()
+ << " current_size=" << _stale_rs_version_map.size() << "
old_size=" << old_size
+ << " current_meta_size=" <<
_tablet_meta->all_stale_rs_metas().size()
+ << " old_meta_size=" << old_meta_size << " sweep endtime " <<
std::fixed
+ << expired_stale_sweep_endtime << ", reconstructed=" <<
reconstructed;
#ifndef BE_TEST
save_meta();
@@ -1165,7 +1165,7 @@ bool Tablet::do_tablet_meta_checkpoint() {
<< ", tablet=" << full_name();
return false;
}
- LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
+ VLOG_NOTICE << "start to do tablet meta checkpoint, tablet=" <<
full_name();
save_meta();
// if save meta successfully, then should remove the rowset meta existing
in tablet
// meta from rowset meta store
@@ -1177,9 +1177,8 @@ bool Tablet::do_tablet_meta_checkpoint() {
if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(),
tablet_uid(),
rs_meta->rowset_id())) {
RowsetMetaManager::remove(_data_dir->get_meta(), tablet_uid(),
rs_meta->rowset_id());
- LOG(INFO) << "remove rowset id from meta store because it is
already persistent with "
- "tablet meta"
- << ", rowset_id=" << rs_meta->rowset_id();
+ VLOG_NOTICE << "remove rowset id from meta store because it is
already persistent with "
+ << "tablet meta, rowset_id=" << rs_meta->rowset_id();
}
rs_meta->set_remove_from_rowset_meta();
}
@@ -1193,9 +1192,8 @@ bool Tablet::do_tablet_meta_checkpoint() {
if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(),
tablet_uid(),
rs_meta->rowset_id())) {
RowsetMetaManager::remove(_data_dir->get_meta(), tablet_uid(),
rs_meta->rowset_id());
- LOG(INFO) << "remove rowset id from meta store because it is
already persistent with "
- "tablet meta"
- << ", rowset_id=" << rs_meta->rowset_id();
+ VLOG_NOTICE << "remove rowset id from meta store because it is
already persistent with "
+ << "tablet meta, rowset_id=" << rs_meta->rowset_id();
}
rs_meta->set_remove_from_rowset_meta();
}
@@ -1305,8 +1303,9 @@ double Tablet::calculate_scan_frequency() {
return scan_frequency;
}
-int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
- TabletSharedPtr
tablet) {
+Status Tablet::prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
+ TabletSharedPtr tablet,
+ int64_t* permits) {
std::vector<RowsetSharedPtr> compaction_rowsets;
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
scoped_refptr<Trace> trace(new Trace);
@@ -1324,7 +1323,12 @@ int64_t
Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti
DorisMetrics::instance()->cumulative_compaction_request_total->increment(1);
OLAPStatus res = _cumulative_compaction->prepare_compact();
if (res != OLAP_SUCCESS) {
- return 0;
+ set_last_cumu_compaction_failure_time(UnixMillis());
+ if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION) {
+
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
+ }
+ *permits = 0;
+ return Status::InternalError(fmt::format("prepare compaction with
err: {}", res));
}
compaction_rowsets = _cumulative_compaction->get_input_rowsets();
} else {
@@ -1347,18 +1351,17 @@ int64_t
Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti
set_last_base_compaction_failure_time(UnixMillis());
if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
- LOG(WARNING) << "failed to pick rowsets for base compaction.
res=" << res
- << ", tablet=" << full_name();
}
- return 0;
+ *permits = 0;
+ return Status::InternalError(fmt::format("prepare compaction with
err: {}", res));
}
compaction_rowsets = _base_compaction->get_input_rowsets();
}
- int64_t permits = 0;
+ *permits = 0;
for (auto rowset : compaction_rowsets) {
- permits += rowset->rowset_meta()->get_compaction_score();
+ *permits += rowset->rowset_meta()->get_compaction_score();
}
- return permits;
+ return Status::OK();
}
void Tablet::execute_compaction(CompactionType compaction_type) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 03a86f8..52f58ae 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -242,8 +242,9 @@ public:
double calculate_scan_frequency();
- int64_t prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
- TabletSharedPtr tablet);
+ Status prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
+ TabletSharedPtr tablet,
+ int64_t* permits);
void execute_compaction(CompactionType compaction_type);
void reset_compaction(CompactionType compaction_type);
diff --git a/be/src/olap/tablet_meta_manager.cpp
b/be/src/olap/tablet_meta_manager.cpp
index 7844d44..d06b57c 100644
--- a/be/src/olap/tablet_meta_manager.cpp
+++ b/be/src/olap/tablet_meta_manager.cpp
@@ -104,8 +104,7 @@ OLAPStatus TabletMetaManager::save(DataDir* store,
TTabletId tablet_id, TSchemaH
VLOG_NOTICE << "save tablet meta to meta store: key = " << key;
OlapMeta* meta = store->get_meta();
- LOG(INFO) << "save tablet meta "
- << ", key:" << key << " meta_size=" << meta_binary.length();
+ VLOG_NOTICE << "save tablet meta, key:" << key << " meta_size=" <<
meta_binary.length();
return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
}
@@ -118,9 +117,9 @@ OLAPStatus TabletMetaManager::remove(DataDir* store,
TTabletId tablet_id, TSchem
key_stream << header_prefix << tablet_id << "_" << schema_hash;
std::string key = key_stream.str();
OlapMeta* meta = store->get_meta();
- LOG(INFO) << "start to remove tablet_meta, key:" << key;
+ VLOG_DEBUG << "start to remove tablet_meta, key:" << key;
OLAPStatus res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
- LOG(INFO) << "remove tablet_meta, key:" << key << ", res:" << res;
+ VLOG_NOTICE << "remove tablet_meta, key:" << key << ", res:" << res;
return res;
}
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 71e73a3..31a22ec 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -455,7 +455,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir,
const std::string& re
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(GET_LENGTH_TIMEOUT * 1000);
RETURN_IF_ERROR(client->head());
- file_size = client->get_content_length();
+ RETURN_IF_ERROR(client->get_content_length(&file_size));
return Status::OK();
};
RETURN_IF_ERROR(
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index fead3f0..004f0ad 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -34,7 +34,7 @@
EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publi
OLAPStatus EnginePublishVersionTask::finish() {
OLAPStatus res = OLAP_SUCCESS;
int64_t transaction_id = _publish_version_req.transaction_id;
- LOG(INFO) << "begin to process publish version. transaction_id=" <<
transaction_id;
+ VLOG_NOTICE << "begin to process publish version. transaction_id=" <<
transaction_id;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -108,9 +108,9 @@ OLAPStatus EnginePublishVersionTask::finish() {
continue;
}
partition_related_tablet_infos.erase(tablet_info);
- LOG(INFO) << "publish version successfully on tablet. tablet=" <<
tablet->full_name()
- << ", transaction_id=" << transaction_id << ", version="
<< version.first
- << ", res=" << publish_status;
+ VLOG_NOTICE << "publish version successfully on tablet. tablet="
<< tablet->full_name()
+ << ", transaction_id=" << transaction_id << ",
version=" << version.first
+ << ", res=" << publish_status;
}
// check if the related tablet remained all have the version
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1825eda..ce4618a 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -244,11 +244,11 @@ OLAPStatus TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
- LOG(INFO) << "commit transaction to engine successfully."
- << " partition_id: " << key.first << ", transaction_id: " <<
key.second
- << ", tablet: " << tablet_info.to_string()
- << ", rowsetid: " << rowset_ptr->rowset_id()
- << ", version: " << rowset_ptr->version().first;
+ VLOG_NOTICE << "commit transaction to engine successfully."
+ << " partition_id: " << key.first << ", transaction_id: "
<< key.second
+ << ", tablet: " << tablet_info.to_string()
+ << ", rowsetid: " << rowset_ptr->rowset_id()
+ << ", version: " << rowset_ptr->version().first;
}
return OLAP_SUCCESS;
}
@@ -300,11 +300,11 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
it->second.erase(tablet_info);
- LOG(INFO) << "publish txn successfully."
- << " partition_id: " << key.first << ", txn_id: " <<
key.second
- << ", tablet: " << tablet_info.to_string()
- << ", rowsetid: " << rowset_ptr->rowset_id() << ",
version: " << version.first
- << "," << version.second;
+ VLOG_NOTICE << "publish txn successfully."
+ << " partition_id: " << key.first << ", txn_id: " <<
key.second
+ << ", tablet: " << tablet_info.to_string()
+ << ", rowsetid: " << rowset_ptr->rowset_id() << ",
version: " << version.first
+ << "," << version.second;
if (it->second.empty()) {
txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id,
partition_id);
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index c37138e..16f71b2 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -644,7 +644,7 @@ OLAPStatus move_to_trash(const std::filesystem::path&
schema_hash_root,
OLAP_SUCCESS, "access dir failed. [dir=" + source_parent_dir);
if (sub_dirs.empty() && sub_files.empty()) {
- LOG(INFO) << "remove empty dir " << source_parent_dir;
+ VLOG_NOTICE << "remove empty dir " << source_parent_dir;
// no need to exam return status
Env::Default()->delete_dir(source_parent_dir);
}
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index ee33cc3..db523f2 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -37,7 +37,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
mem_limit, int64_t tim
}
LoadChannel::~LoadChannel() {
- LOG(INFO) << "load channel mem peak usage=" <<
_mem_tracker->peak_consumption()
+ LOG(INFO) << "load channel removed. mem peak usage=" <<
_mem_tracker->peak_consumption()
<< ", info=" << _mem_tracker->debug_string() << ", load_id=" <<
_load_id
<< ", is high priority=" << _is_high_priority << ", sender_ip="
<< _sender_ip;
}
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index baa0e88..5f0f2bb 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -161,7 +161,7 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBatchRequest& request,
// 4. handle finish
if (channel->is_finished()) {
- LOG(INFO) << "removing load channel " << load_id << " because it's
finished";
+ VLOG_NOTICE << "removing load channel " << load_id << " because it's
finished";
{
std::lock_guard<std::mutex> l(_lock);
_load_channels.erase(load_id);
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 92aefc4..021b05f 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -115,9 +115,9 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request,
}
if (bytes_limit > _exec_env->process_mem_tracker()->limit()) {
- LOG(WARNING) << "Query memory limit " <<
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
- << " exceeds process memory limit of "
- <<
PrettyPrinter::print(_exec_env->process_mem_tracker()->limit(),
+ VLOG_NOTICE << "Query memory limit " <<
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
+ << " exceeds process memory limit of "
+ <<
PrettyPrinter::print(_exec_env->process_mem_tracker()->limit(),
TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = _exec_env->process_mem_tracker()->limit();
diff --git a/docs/en/administrator-guide/config/be_config.md
b/docs/en/administrator-guide/config/be_config.md
index 98d0c9b..38b68b4 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -123,12 +123,6 @@ Default:5
One of the triggering conditions of BaseCompaction: The limit of the number of
Cumulative files to be reached. After reaching this limit, BaseCompaction will
be triggered
-### `base_compaction_num_threads_per_disk`
-
-Defalut:1
-
-The number of threads that execute BaseCompaction tasks per disk
-
### base_compaction_trace_threshold
* Type: int32
@@ -294,9 +288,15 @@ tablet_score = compaction_tablet_scan_frequency_factor *
tablet_scan_frequency +
### `compaction_task_num_per_disk`
* Type: int32
-* Description: The number of compaction tasks which execute in parallel for a
disk.
+* Description: The number of compaction tasks which execute in parallel for a
disk(HDD).
* Default value: 2
+### `compaction_task_num_per_fast_disk`
+
+* Type: int32
+* Description: The number of compaction tasks which execute in parallel for a
fast disk(SSD).
+* Default value: 4
+
### `compress_rowbatches`
* Type: bool
@@ -335,12 +335,6 @@ Default:10 (s)
CumulativeCompaction thread polling interval
-### `cumulative_compaction_num_threads_per_disk`
-
-Default:1
-
-The number of CumulativeCompaction threads per disk
-
### `cumulative_compaction_skip_window_seconds`
Default:30(s)
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md
b/docs/zh-CN/administrator-guide/config/be_config.md
index 83ce156..8e51bc3 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -119,12 +119,6 @@ BaseCompaction触发条件之一:上一次BaseCompaction距今的间隔
BaseCompaction触发条件之一:Cumulative文件数目要达到的限制,达到这个限制之后会触发BaseCompaction
-### `base_compaction_num_threads_per_disk`
-
-默认值:1
-
-每个磁盘执行BaseCompaction任务的线程数目
-
### `base_compaction_write_mbytes_per_sec`
默认值:5(MB)
@@ -289,9 +283,15 @@ tablet_score = compaction_tablet_scan_frequency_factor *
tablet_scan_frequency +
### `compaction_task_num_per_disk`
* 类型:int32
-* 描述:每个磁盘可以并发执行的compaction任务数量。
+* 描述:每个磁盘(HDD)可以并发执行的compaction任务数量。
* 默认值:2
+### `compaction_task_num_per_fast_disk`
+
+* 类型:int32
+* 描述:每个高速磁盘(SSD)可以并发执行的compaction任务数量。
+* 默认值:4
+
### `compress_rowbatches`
* 类型:bool
@@ -331,12 +331,6 @@ BaseCompaction触发条件之一:Singleton文件大小限制,100MB
CumulativeCompaction线程轮询的间隔
-### `cumulative_compaction_num_threads_per_disk`
-
-默认值:1
-
-每个磁盘执行CumulativeCompaction线程数
-
### `cumulative_compaction_skip_window_seconds`
默认值:30 (s)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 7af0af3..e981e9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -89,10 +89,12 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 *
1000L; // 30 min
/*
- * A clone task timeout is between Config.min_clone_task_timeout_sec and
Config.max_clone_task_timeout_sec,
- * estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND.
+ * A clone task timeout is between Config.min_clone_task_timeout_sec and
Config.max_clone_task_timeout_sec,
+ * estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND.
+ * We set a relatively small default value, so that the calculated timeout
will be larger
+ * to ensure that the clone task can be completed even in the case of poor
network environment.
*/
- private static final long MIN_CLONE_SPEED_MB_PER_SECOND = 5; // 5MB/sec
+ private static final long MIN_CLONE_SPEED_MB_PER_SECOND = 1; // 1MB/sec
/*
* If a clone task is failed to run more than
RUNNING_FAILED_COUNTER_THRESHOLD, it will be removed
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]