This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new eba5955 [Optimize] Optimize the execution model of compaction to
limit memory consumption (#4670)
eba5955 is described below
commit eba595583e923c873295a02599ec47e0bafc68b2
Author: weizuo93 <[email protected]>
AuthorDate: Sun Oct 11 11:39:25 2020 +0800
[Optimize] Optimize the execution model of compaction to limit memory
consumption (#4670)
Currently, there are M threads to do base compaction and N threads to do
cumulative compaction for each disk.
Too many compaction tasks may run out of memory, so the max concurrency of
running compaction tasks
is limited by semaphore.
If the running threads cost too much memory, we can't defense it. In
addition, reducing concurrency to avoid OOM
will lead to some compaction tasks can't be executed in time and we may
encounter more heavy compaction.
Therefore, concurrency limitation is not enough.
The strategy proposed in #3624 may be effective to solve the OOM.
A CompactionPermitLimiter is used for compaction limitation, and use
single-producer/multi-consumer model.
Producer will try to generate compaction tasks and acquire `permits` for
each task.
The compaction task which can hold `permits` will be executed in thread
pool and each finished task will
release its `permits`.
`permits` should be applied for before a compaction task can execute. When
the sum of `permits`
held by executing compaction tasks reaches a threshold, subsequent
compaction task will be no longer allowed,
until some `permits` are released. Tablet compaction score is used as
`permits` of compaction task here.
To some extent, memory consumption can be limited by setting appropriate
`permits` threshold.
---
be/src/common/config.h | 26 ++--
be/src/olap/CMakeLists.txt | 1 +
be/src/olap/base_compaction.cpp | 3 +-
be/src/olap/compaction.cpp | 24 ++--
be/src/olap/compaction.h | 9 +-
be/src/olap/compaction_permit_limiter.cpp | 50 +++++++
be/src/olap/compaction_permit_limiter.h | 49 +++++++
be/src/olap/cumulative_compaction.cpp | 3 +-
be/src/olap/data_dir.cpp | 12 ++
be/src/olap/data_dir.h | 6 +
be/src/olap/olap_server.cpp | 211 +++++++++++++++++-------------
be/src/olap/storage_engine.cpp | 19 +--
be/src/olap/storage_engine.h | 40 ++++--
be/src/olap/tablet.cpp | 9 ++
be/src/olap/tablet.h | 1 +
be/src/olap/tablet_manager.cpp | 14 +-
be/src/olap/tablet_manager.h | 4 +-
17 files changed, 321 insertions(+), 160 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6d245d9..04fcc4f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -264,12 +264,12 @@ namespace config {
// be policy
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
+ // check the configuration of auto compaction in seconds when auto
compaction disabled
+ CONF_mInt32(check_auto_compaction_interval_seconds, "5");
// CONF_Int64(base_compaction_start_hour, "20");
// CONF_Int64(base_compaction_end_hour, "7");
- CONF_mInt32(base_compaction_check_interval_seconds, "60");
CONF_mInt64(base_compaction_num_cumulative_deltas, "5");
- CONF_Int32(base_compaction_num_threads_per_disk, "1");
CONF_mDouble(base_cumulative_delta_ratio, "0.3");
CONF_mInt64(base_compaction_interval_seconds_since_last_operation,
"86400");
CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
@@ -296,10 +296,8 @@ namespace config {
CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
// cumulative compaction policy: max delta file's size unit:B
- CONF_mInt32(cumulative_compaction_check_interval_seconds, "10");
CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
- CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_mInt64(cumulative_compaction_budgeted_bytes, "104857600");
// CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100");
// cumulative compaction skips recently published deltas in order to
prevent
@@ -310,13 +308,19 @@ namespace config {
// if compaction of a tablet failed, this tablet should not be chosen to
// compaction until this interval passes.
CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
- // Too many compaction tasks may run out of memory.
- // This config is to limit the max concurrency of running compaction tasks.
- // -1 means no limit, and the max concurrency will be:
- // C = (cumulative_compaction_num_threads_per_disk +
base_compaction_num_threads_per_disk) * dir_num
- // set it to larger than C will be set to equal to C.
- // This config can be set to 0, which means to forbid any compaction, for
some special cases.
- CONF_Int32(max_compaction_concurrency, "-1");
+
+ // This config can be set to limit thread number in compaction thread pool.
+ CONF_mInt32(min_compaction_threads, "10");
+ CONF_mInt32(max_compaction_threads, "10");
+
+ // The upper limit of "permits" held by all compaction tasks. This config
can be set to limit memory consumption for compaction.
+ CONF_mInt64(total_permits_for_compaction_score, "10000");
+
+ // Compaction task number per disk.
+ CONF_mInt32(compaction_task_num_per_disk, "2");
+
+ // How many rounds of cumulative compaction for each round of base
compaction when compaction tasks generation.
+ CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round,
"9");
// Threshold to logging compaction trace, in seconds.
CONF_mInt32(base_compaction_trace_threshold, "10");
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 13c11a0..3768b5e 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -34,6 +34,7 @@ add_library(Olap STATIC
bloom_filter_writer.cpp
byte_buffer.cpp
compaction.cpp
+ compaction_permit_limiter.cpp
comparison_predicate.cpp
compress.cpp
cumulative_compaction.cpp
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index bb2404c..a38c244 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -45,7 +45,8 @@ OLAPStatus BaseCompaction::compact() {
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
// 2. do base compaction, merge rowsets
- RETURN_NOT_OK(do_compaction());
+ int64_t permits = _tablet->calc_base_compaction_score();
+ RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");
// 3. set state to success
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 67b56f6..aa0f0b4 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -25,8 +25,6 @@ using std::vector;
namespace doris {
-Semaphore Compaction::_concurrency_sem;
-
Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, const
std::shared_ptr<MemTracker>& parent_tracker)
: _mem_tracker(MemTracker::CreateTracker(-1, label, parent_tracker)),
_readers_tracker(MemTracker::CreateTracker(-1, "readers tracker",
_mem_tracker)),
@@ -37,20 +35,17 @@ Compaction::Compaction(TabletSharedPtr tablet, const
std::string& label, const s
Compaction::~Compaction() {}
-OLAPStatus Compaction::init(int concurreny) {
- _concurrency_sem.set_count(concurreny);
- return OLAP_SUCCESS;
-}
-
-OLAPStatus Compaction::do_compaction() {
- _concurrency_sem.wait();
- TRACE("got concurrency lock and start to do compaction");
- OLAPStatus st = do_compaction_impl();
- _concurrency_sem.signal();
+OLAPStatus Compaction::do_compaction(int64_t permits) {
+ TRACE("start to do compaction");
+ _tablet->data_dir()->disks_compaction_score_increment(permits);
+ _tablet->data_dir()->disks_compaction_num_increment(1);
+ OLAPStatus st = do_compaction_impl(permits);
+ _tablet->data_dir()->disks_compaction_score_increment(-permits);
+ _tablet->data_dir()->disks_compaction_num_increment(-1);
return st;
}
-OLAPStatus Compaction::do_compaction_impl() {
+OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;
// 1. prepare input and output parameters
@@ -68,7 +63,8 @@ OLAPStatus Compaction::do_compaction_impl() {
_tablet->compute_version_hash_from_rowsets(_input_rowsets,
&_output_version_hash);
LOG(INFO) << "start " << compaction_name() << ". tablet=" <<
_tablet->full_name()
- << ", output version is=" << _output_version.first << "-" <<
_output_version.second;
+ << ", output version is=" << _output_version.first << "-" <<
_output_version.second
+ << ", score: " << permits;
RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_input_rowset_readers());
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 6c4b438..f43bc6f 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -49,15 +49,13 @@ public:
virtual OLAPStatus compact() = 0;
- static OLAPStatus init(int concurreny);
-
protected:
virtual OLAPStatus pick_rowsets_to_compact() = 0;
virtual std::string compaction_name() const = 0;
virtual ReaderType compaction_type() const = 0;
- OLAPStatus do_compaction();
- OLAPStatus do_compaction_impl();
+ OLAPStatus do_compaction(int64_t permits);
+ OLAPStatus do_compaction_impl(int64_t permits);
void modify_rowsets();
OLAPStatus gc_unused_rowsets();
@@ -68,9 +66,6 @@ protected:
OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>&
rowsets);
OLAPStatus check_correctness(const Merger::Statistics& stats);
- // semaphore used to limit the concurrency of running compaction tasks
- static Semaphore _concurrency_sem;
-
private:
// get num rows from segment group meta of input rowsets.
// return -1 if these are not alpha rowsets.
diff --git a/be/src/olap/compaction_permit_limiter.cpp
b/be/src/olap/compaction_permit_limiter.cpp
new file mode 100644
index 0000000..b0a08e6
--- /dev/null
+++ b/be/src/olap/compaction_permit_limiter.cpp
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/compaction_permit_limiter.h"
+
+namespace doris {
+
+CompactionPermitLimiter::CompactionPermitLimiter() : _used_permits(0) {}
+
+bool CompactionPermitLimiter::request(int64_t permits) {
+ if (permits > config::total_permits_for_compaction_score) {
+ // when tablet's compaction score is larger than
"config::total_permits_for_compaction_score",
+ // it's necessary to do compaction for this tablet because this tablet
will not get "permits"
+ // anyway. otherwise, compaction task for this tablet will not be
executed forever.
+ std::unique_lock<std::mutex> lock(_permits_mutex);
+ _permits_cv.wait(lock, [=] {
+ return _used_permits == 0 ||
+ _used_permits + permits <=
config::total_permits_for_compaction_score;
+ });
+ } else {
+ if (_used_permits + permits >
config::total_permits_for_compaction_score) {
+ std::unique_lock<std::mutex> lock(_permits_mutex);
+ _permits_cv.wait(lock, [=] {
+ return _used_permits + permits <=
config::total_permits_for_compaction_score;
+ });
+ }
+ }
+ _used_permits += permits;
+ return true;
+}
+
+void CompactionPermitLimiter::release(int64_t permits) {
+ _used_permits -= permits;
+ _permits_cv.notify_one();
+}
+} // namespace doris
diff --git a/be/src/olap/compaction_permit_limiter.h
b/be/src/olap/compaction_permit_limiter.h
new file mode 100644
index 0000000..b8a216d
--- /dev/null
+++ b/be/src/olap/compaction_permit_limiter.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+
+#include "common/config.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+/*
+ This class is used to control compaction permission. To some extent, it
can be used to control memory consumption.
+ "permits" should be applied before a compaction task can execute. When the
sum of "permites" held by executing
+ compaction tasks reaches a threshold, subsequent compaction task will be
no longer allowed, until some "permits"
+ are released by some finished compaction tasks. "compaction score" for
tablet is used as "permits" here.
+*/
+class CompactionPermitLimiter {
+public:
+ CompactionPermitLimiter();
+ virtual ~CompactionPermitLimiter() {}
+
+ bool request(int64_t permits);
+
+ void release(int64_t permits);
+
+private:
+ // sum of "permits" held by executing compaction tasks currently
+ AtomicInt64 _used_permits;
+ std::mutex _permits_mutex;
+ std::condition_variable _permits_cv;
+};
+} // namespace doris
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 6611e51..c2c8755 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -53,7 +53,8 @@ OLAPStatus CumulativeCompaction::compact() {
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
// 3. do cumulative compaction, merge rowsets
- RETURN_NOT_OK(do_compaction());
+ int64_t permits = _tablet->calc_cumulative_compaction_score();
+ RETURN_NOT_OK(do_compaction(permits));
TRACE("compaction finished");
// 4. set state to success
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index f5b5fbb..c20849b 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -59,6 +59,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_total_capacity,
MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_avail_capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_data_used_capacity,
MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_state, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_score, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_compaction_num, MetricUnit::NOUNIT);
static const char* const kMtabPath = "/etc/mtab";
static const char* const kTestFilePath = "/.testfile";
@@ -83,6 +85,8 @@ DataDir::DataDir(const std::string& path, int64_t
capacity_bytes,
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity,
disks_data_used_capacity);
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state);
+ INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_score);
+ INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_compaction_num);
}
DataDir::~DataDir() {
@@ -996,4 +1000,12 @@ bool DataDir::reach_capacity_limit(int64_t
incoming_data_size) {
}
return false;
}
+
+void DataDir::disks_compaction_score_increment(int64_t delta) {
+ disks_compaction_score->increment(delta);
+}
+
+void DataDir::disks_compaction_num_increment(int64_t delta) {
+ disks_compaction_num->increment(delta);
+}
} // namespace doris
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 0fc8b4a..898f644 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -128,6 +128,10 @@ public:
std::set<TabletInfo> tablet_set() { return _tablet_set; }
+ void disks_compaction_score_increment(int64_t delta);
+
+ void disks_compaction_num_increment(int64_t delta);
+
private:
std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
@@ -201,6 +205,8 @@ private:
IntGauge* disks_avail_capacity;
IntGauge* disks_data_used_capacity;
IntGauge* disks_state;
+ IntGauge* disks_compaction_score;
+ IntGauge* disks_compaction_num;
};
} // namespace doris
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 3f860f4..57ed54d 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -27,12 +27,12 @@
#include <gperftools/profiler.h>
#include <boost/algorithm/string.hpp>
+#include "agent/cgroups_mgr.h"
#include "common/status.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
-#include "agent/cgroups_mgr.h"
#include "util/time.h"
using std::string;
@@ -63,52 +63,28 @@ Status StorageEngine::start_bg_threads() {
&_disk_stat_monitor_thread));
LOG(INFO) << "disk stat monitor thread started";
-
+
// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
}
- int32_t data_dir_num = data_dirs.size();
// check cumulative compaction config
_check_cumulative_compaction_config();
- // base and cumulative compaction threads
- int32_t base_compaction_num_threads_per_disk = std::max<int32_t>(1,
config::base_compaction_num_threads_per_disk);
- int32_t cumulative_compaction_num_threads_per_disk = std::max<int32_t>(1,
config::cumulative_compaction_num_threads_per_disk);
- int32_t base_compaction_num_threads = base_compaction_num_threads_per_disk
* data_dir_num;
- int32_t cumulative_compaction_num_threads =
cumulative_compaction_num_threads_per_disk * data_dir_num;
- // calc the max concurrency of compaction tasks
- int32_t max_compaction_concurrency = config::max_compaction_concurrency;
- if (max_compaction_concurrency < 0
- || max_compaction_concurrency > base_compaction_num_threads +
cumulative_compaction_num_threads + 1) {
- // reserve 1 thread for manual execution
- max_compaction_concurrency = base_compaction_num_threads +
cumulative_compaction_num_threads + 1;
- }
- Compaction::init(max_compaction_concurrency);
-
- _base_compaction_threads.reserve(base_compaction_num_threads);
- for (uint32_t i = 0; i < base_compaction_num_threads; ++i) {
- scoped_refptr<Thread> base_compaction_thread;
- RETURN_IF_ERROR(
- Thread::create("StorageEngine", "base_compaction_thread",
- [this, i, data_dir_num, data_dirs]() {
this->_base_compaction_thread_callback(data_dirs[i % data_dir_num]); },
- &base_compaction_thread));
- _base_compaction_threads.emplace_back(base_compaction_thread);
- }
- LOG(INFO) << "base compaction threads started. number: " <<
base_compaction_num_threads;
+ int32_t max_thread_num = config::max_compaction_threads;
+ int32_t min_thread_num = config::min_compaction_threads;
+ ThreadPoolBuilder("CompactionTaskThreadPool")
+ .set_min_threads(min_thread_num)
+ .set_max_threads(max_thread_num)
+ .build(&_compaction_thread_pool);
- _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
- for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) {
- scoped_refptr<Thread> cumulative_compaction_thread;
- RETURN_IF_ERROR(
- Thread::create("StorageEngine", "cumulative_compaction_thread",
- [this, i, data_dir_num, data_dirs]() {
this->_cumulative_compaction_thread_callback(data_dirs[i % data_dir_num]); },
- &cumulative_compaction_thread));
-
_cumulative_compaction_threads.emplace_back(cumulative_compaction_thread);
- }
- LOG(INFO) << "cumulative compaction threads started. number: " <<
cumulative_compaction_num_threads;
+ // compaction tasks producer thread
+ RETURN_IF_ERROR(Thread::create("StorageEngine",
"compaction_tasks_producer_thread",
+ [this]() {
this->_compaction_tasks_producer_callback(); },
+ &_compaction_tasks_producer_thread));
+ LOG(INFO) << "compaction tasks producer thread started";
// tablet checkpoint thread
for (auto data_dir : data_dirs) {
@@ -169,33 +145,6 @@ void StorageEngine::_fd_cache_clean_callback() {
}
}
-void StorageEngine::_base_compaction_thread_callback(DataDir* data_dir) {
-#ifdef GOOGLE_PROFILER
- ProfilerRegisterThread();
-#endif
-
- int32_t interval = config::base_compaction_check_interval_seconds;
- do {
- if (!config::disable_auto_compaction) {
- // must be here, because this thread is start on start and
- // cgroup is not initialized at this time
- // add tid to cgroup
- CgroupsMgr::apply_system_cgroup();
- if (!data_dir->reach_capacity_limit(0)) {
- _perform_base_compaction(data_dir);
- }
- }
-
- interval = config::base_compaction_check_interval_seconds;
- if (interval <= 0) {
- OLAP_LOG_WARNING("base compaction check interval config is
illegal: [%d], "
- "force set to 1", interval);
- interval = 1;
- }
-
- } while
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
-}
-
void StorageEngine::_garbage_sweeper_thread_callback() {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
@@ -286,33 +235,6 @@ void StorageEngine::_check_cumulative_compaction_config() {
}
}
-void StorageEngine::_cumulative_compaction_thread_callback(DataDir* data_dir) {
-#ifdef GOOGLE_PROFILER
- ProfilerRegisterThread();
-#endif
- LOG(INFO) << "try to start cumulative compaction process!";
-
- int32_t interval = config::cumulative_compaction_check_interval_seconds;
- do {
- if (!config::disable_auto_compaction) {
- // must be here, because this thread is start on start and
- // cgroup is not initialized at this time
- // add tid to cgroup
- CgroupsMgr::apply_system_cgroup();
- if (!data_dir->reach_capacity_limit(0)) {
- _perform_cumulative_compaction(data_dir);
- }
- }
-
- interval = config::cumulative_compaction_check_interval_seconds;
- if (interval <= 0) {
- LOG(WARNING) << "cumulative compaction check interval config is
illegal:" << interval
- << "will be forced set to one";
- interval = 1;
- }
- } while
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
-}
-
void StorageEngine::_unused_rowset_monitor_thread_callback() {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
@@ -340,7 +262,7 @@ void StorageEngine::_path_gc_thread_callback(DataDir*
data_dir) {
do {
LOG(INFO) << "try to perform path gc by tablet!";
data_dir->perform_path_gc_by_tablet();
-
+
LOG(INFO) << "try to perform path gc by rowsetid!";
data_dir->perform_path_gc_by_rowsetid();
@@ -391,4 +313,107 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir*
data_dir) {
} while
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
}
-} // namespace doris
+void StorageEngine::_compaction_tasks_producer_callback() {
+#ifdef GOOGLE_PROFILER
+ ProfilerRegisterThread();
+#endif
+ LOG(INFO) << "try to start compaction producer process!";
+
+ std::vector<TTabletId> tablet_submitted;
+ std::vector<DataDir*> data_dirs;
+ for (auto& tmp_store : _store_map) {
+ data_dirs.push_back(tmp_store.second);
+ _tablet_submitted_compaction[tmp_store.second] = tablet_submitted;
+ }
+
+ int round = 0;
+ CompactionType compaction_type;
+ while (true) {
+ if (!config::disable_auto_compaction) {
+ if (round <
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+ compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+ round++;
+ } else {
+ compaction_type = CompactionType::BASE_COMPACTION;
+ round = 0;
+ }
+ vector<TabletSharedPtr> tablets_compaction =
+ _compaction_tasks_generator(compaction_type, data_dirs);
+ if (tablets_compaction.size() == 0) {
+ _wakeup_producer_flag = 0;
+ std::unique_lock<std::mutex>
lock(_compaction_producer_sleep_mutex);
+ // It is necessary to wake up the thread on timeout to prevent
deadlock
+ // in case of no running compaction task.
+ _compaction_producer_sleep_cv.wait_for(lock,
std::chrono::milliseconds(2000),
+ [=] { return
_wakeup_producer_flag == 1; });
+ continue;
+ }
+ for (const auto& tablet : tablets_compaction) {
+ int64_t permits =
tablet->calc_compaction_score(compaction_type);
+ if (_permit_limiter.request(permits)) {
+ if (compaction_type ==
CompactionType::CUMULATIVE_COMPACTION) {
+ _compaction_thread_pool->submit_func([=]() {
+ CgroupsMgr::apply_system_cgroup();
+ _perform_cumulative_compaction(tablet);
+ _permit_limiter.release(permits);
+ std::unique_lock<std::mutex>
lock(_tablet_submitted_compaction_mutex);
+ vector<TTabletId>::iterator it_tablet =
+
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
+
_tablet_submitted_compaction[tablet->data_dir()].end(),
+ tablet->tablet_id());
+ if (it_tablet !=
+
_tablet_submitted_compaction[tablet->data_dir()].end()) {
+
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
+ _wakeup_producer_flag = 1;
+ _compaction_producer_sleep_cv.notify_one();
+ }
+ });
+ } else {
+ _compaction_thread_pool->submit_func([=]() {
+ CgroupsMgr::apply_system_cgroup();
+ _perform_base_compaction(tablet);
+ _permit_limiter.release(permits);
+ std::unique_lock<std::mutex>
lock(_tablet_submitted_compaction_mutex);
+ vector<TTabletId>::iterator it_tablet =
+
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
+
_tablet_submitted_compaction[tablet->data_dir()].end(),
+ tablet->tablet_id());
+ if (it_tablet !=
+
_tablet_submitted_compaction[tablet->data_dir()].end()) {
+
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
+ _wakeup_producer_flag = 1;
+ _compaction_producer_sleep_cv.notify_one();
+ }
+ });
+ }
+ std::unique_lock<std::mutex>
lock(_tablet_submitted_compaction_mutex);
+
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
+ tablet->tablet_id());
+ }
+ }
+ } else {
+ sleep(config::check_auto_compaction_interval_seconds);
+ }
+ }
+}
+
+vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
+ CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
+ vector<TabletSharedPtr> tablets_compaction;
+ std::random_shuffle(data_dirs.begin(), data_dirs.end());
+ for (auto data_dir : data_dirs) {
+ std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
+ if (_tablet_submitted_compaction[data_dir].size() >=
config::compaction_task_num_per_disk) {
+ continue;
+ }
+ if (!data_dir->reach_capacity_limit(0)) {
+ TabletSharedPtr tablet =
_tablet_manager->find_best_tablet_to_compaction(
+ compaction_type, data_dir,
_tablet_submitted_compaction[data_dir]);
+ if (tablet != nullptr) {
+ tablets_compaction.emplace_back(tablet);
+ }
+ }
+ }
+ return tablets_compaction;
+}
+} // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 689fa8a..163cae4 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -140,6 +140,7 @@ StorageEngine::~StorageEngine() {
DEREGISTER_HOOK_METRIC(unused_rowsets_count);
DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption);
_clear();
+ _compaction_thread_pool->shutdown();
}
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@@ -578,7 +579,7 @@ void StorageEngine::_start_clean_fd_cache() {
VLOG(10) << "end clean file descritpor cache";
}
-void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) {
+void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr
best_tablet) {
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
@@ -589,12 +590,6 @@ void
StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) {
});
ADOPT_TRACE(trace.get());
TRACE("start to perform cumulative compaction");
- TabletSharedPtr best_tablet =
_tablet_manager->find_best_tablet_to_compaction(
- CompactionType::CUMULATIVE_COMPACTION, data_dir);
- if (best_tablet == nullptr) {
- return;
- }
- TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);
DorisMetrics::instance()->cumulative_compaction_request_total->increment(1);
@@ -603,8 +598,8 @@ void StorageEngine::_perform_cumulative_compaction(DataDir*
data_dir) {
OLAPStatus res = cumulative_compaction.compact();
if (res != OLAP_SUCCESS) {
- best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
+ best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
<< ", table=" << best_tablet->full_name();
@@ -614,7 +609,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir*
data_dir) {
best_tablet->set_last_cumu_compaction_failure_time(0);
}
-void StorageEngine::_perform_base_compaction(DataDir* data_dir) {
+void StorageEngine::_perform_base_compaction(TabletSharedPtr best_tablet) {
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
@@ -625,12 +620,6 @@ void StorageEngine::_perform_base_compaction(DataDir*
data_dir) {
});
ADOPT_TRACE(trace.get());
TRACE("start to perform base compaction");
- TabletSharedPtr best_tablet =
_tablet_manager->find_best_tablet_to_compaction(
- CompactionType::BASE_COMPACTION, data_dir);
- if (best_tablet == nullptr) {
- return;
- }
- TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);
DorisMetrics::instance()->base_compaction_request_total->increment(1);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 0b3aacf..f570723 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -18,18 +18,18 @@
#ifndef DORIS_BE_SRC_OLAP_STORAGE_ENGINE_H
#define DORIS_BE_SRC_OLAP_STORAGE_ENGINE_H
+#include <condition_variable>
#include <ctime>
#include <list>
#include <map>
#include <mutex>
-#include <condition_variable>
#include <set>
#include <string>
-#include <vector>
#include <thread>
+#include <vector>
-#include <rapidjson/document.h>
#include <pthread.h>
+#include <rapidjson/document.h>
#include "agent/status.h"
#include "common/status.h"
@@ -37,20 +37,22 @@
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/MasterService_types.h"
#include "gutil/ref_counted.h"
+#include "olap/compaction_permit_limiter.h"
+#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
-#include "olap/tablet.h"
#include "olap/olap_meta.h"
#include "olap/options.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_sync_service.h"
-#include "olap/txn_manager.h"
#include "olap/task/engine_task.h"
-#include "olap/rowset/rowset_id_generator.h"
-#include "olap/fs/fs_util.h"
+#include "olap/txn_manager.h"
#include "runtime/heartbeat_flags.h"
#include "util/countdown_latch.h"
#include "util/thread.h"
+#include "util/threadpool.h"
namespace doris {
@@ -211,12 +213,8 @@ private:
// unused rowset monitor thread
void _unused_rowset_monitor_thread_callback();
- // base compaction thread process function
- void _base_compaction_thread_callback(DataDir* data_dir);
// check cumulative compaction config
void _check_cumulative_compaction_config();
- // cumulative process function
- void _cumulative_compaction_thread_callback(DataDir* data_dir);
// garbage sweep thread process function. clear snapshot and trash folder
void _garbage_sweeper_thread_callback();
@@ -238,8 +236,8 @@ private:
void _parse_default_rowset_type();
void _start_clean_fd_cache();
- void _perform_cumulative_compaction(DataDir* data_dir);
- void _perform_base_compaction(DataDir* data_dir);
+ void _perform_cumulative_compaction(TabletSharedPtr best_tablet);
+ void _perform_base_compaction(TabletSharedPtr best_tablet);
// 清理trash和snapshot文件,返回清理后的磁盘使用量
OLAPStatus _start_trash_sweep(double *usage);
// 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位,
@@ -248,6 +246,9 @@ private:
// 重新加载数据。
void _start_disk_stat_monitor();
+ void _compaction_tasks_producer_callback();
+ vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType
compaction_type, std::vector<DataDir*> data_dirs);
+
private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t
tablet_id_, uint32_t index_) :
@@ -313,6 +314,7 @@ private:
std::vector<scoped_refptr<Thread>> _base_compaction_threads;
// threads to check cumulative
std::vector<scoped_refptr<Thread>> _cumulative_compaction_threads;
+ scoped_refptr<Thread> _compaction_tasks_producer_thread;
scoped_refptr<Thread> _fd_cache_clean_thread;
// threads to clean all file descriptor not actively in use
std::vector<scoped_refptr<Thread>> _path_gc_threads;
@@ -342,6 +344,18 @@ private:
HeartbeatFlags* _heartbeat_flags;
+ std::unique_ptr<ThreadPool> _compaction_thread_pool;
+
+ CompactionPermitLimiter _permit_limiter;
+
+ std::mutex _tablet_submitted_compaction_mutex;
+ std::map<DataDir*, vector<TTabletId>> _tablet_submitted_compaction;
+
+ AtomicInt32 _wakeup_producer_flag;
+
+ std::mutex _compaction_producer_sleep_mutex;
+ std::condition_variable _compaction_producer_sleep_cv;
+
DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index df64de6..b8a07d8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -748,6 +748,15 @@ bool Tablet::can_do_compaction() {
return true;
}
+const uint32_t Tablet::calc_compaction_score(CompactionType compaction_type)
const {
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
+ return calc_cumulative_compaction_score();
+ } else {
+ DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
+ return calc_base_compaction_score();
+ }
+}
+
const uint32_t Tablet::calc_cumulative_compaction_score() const {
uint32_t score = 0;
_cumulative_compaction_policy->calc_cumulative_compaction_score(
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c32daad..ddf2248 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -163,6 +163,7 @@ public:
// operation for compaction
bool can_do_compaction();
+ const uint32_t calc_compaction_score(CompactionType compaction_type) const;
const uint32_t calc_cumulative_compaction_score() const;
const uint32_t calc_base_compaction_score() const;
static void compute_version_hash_from_rowsets(const
std::vector<RowsetSharedPtr>& rowsets,
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 6260f21..bf7d098 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -679,8 +679,9 @@ void TabletManager::get_tablet_stat(TTabletStatResult*
result) {
result->__set_tablets_stats(_tablet_stat_cache);
}
-TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType
compaction_type,
- DataDir*
data_dir) {
+TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
+ CompactionType compaction_type, DataDir* data_dir,
+ vector<TTabletId> &tablet_submitted_compaction) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str = compaction_type ==
CompactionType::BASE_COMPACTION ? "base" : "cumulative";
uint32_t highest_score = 0;
@@ -690,6 +691,12 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(CompactionType com
tablet_map_t& tablet_map = _tablet_map_array[i];
for (tablet_map_t::value_type& table_ins : tablet_map){
for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
+ vector<TTabletId>::iterator it_tablet =
+ find(tablet_submitted_compaction.begin(),
tablet_submitted_compaction.end(),
+ tablet_ptr->tablet_id());
+ if (it_tablet != tablet_submitted_compaction.end()) {
+ continue;
+ }
AlterTabletTaskSharedPtr cur_alter_task =
tablet_ptr->alter_task();
if (cur_alter_task != nullptr
&& cur_alter_task->alter_state() != ALTER_FINISHED
@@ -738,7 +745,6 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(CompactionType com
}
}
-
uint32_t table_score = 0;
{
ReadLock rdlock(tablet_ptr->get_header_lock_ptr());
@@ -757,7 +763,7 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(CompactionType com
}
if (best_tablet != nullptr) {
- LOG(INFO) << "Found the best tablet for compaction. "
+ VLOG(1) << "Found the best tablet for compaction. "
<< "compaction_type=" << compaction_type_str
<< ", tablet_id=" << best_tablet->tablet_id()
<< ", highest_score=" << highest_score;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index ea4de8c..dda2f9e 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -69,7 +69,9 @@ public:
OLAPStatus drop_tablets_on_error_root_path(const std::vector<TabletInfo>&
tablet_info_vec);
- TabletSharedPtr find_best_tablet_to_compaction(CompactionType
compaction_type, DataDir* data_dir);
+ TabletSharedPtr find_best_tablet_to_compaction(
+ CompactionType compaction_type, DataDir* data_dir,
+ vector<TTabletId> &tablet_submitted_compaction);
TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted = false, std::string* err
= nullptr);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]