This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3dde97bff1 (compaction) opt compaction task producer and quick
compaction (#13495) (#14535)
3dde97bff1 is described below
commit 3dde97bff1bb19fd293e2df60d3df443e0a1a776
Author: yixiutt <[email protected]>
AuthorDate: Fri Dec 2 10:07:44 2022 +0800
(compaction) opt compaction task producer and quick compaction (#13495)
(#14535)
1.remove quick_compaction's rowset pick policy, call cu compaction when
trigger
quick compaction
2. skip tablet's compaction task when compaction score is too small
Co-authored-by: yixiutt <[email protected]>
---
be/src/agent/task_worker_pool.cpp | 36 ++++-----
be/src/common/config.h | 69 +++++-----------
be/src/olap/base_compaction.cpp | 17 ++--
be/src/olap/compaction.cpp | 55 +------------
be/src/olap/compaction.h | 1 -
be/src/olap/cumulative_compaction.cpp | 10 +--
be/src/olap/cumulative_compaction_policy.cpp | 47 ++++++-----
be/src/olap/cumulative_compaction_policy.h | 27 +++----
be/src/olap/delta_writer.cpp | 18 +++--
be/src/olap/merger.cpp | 4 -
be/src/olap/olap_server.cpp | 78 +++++++------------
be/src/olap/storage_engine.cpp | 5 --
be/src/olap/storage_engine.h | 4 -
be/src/olap/tablet.cpp | 91 +++++++---------------
be/src/olap/tablet.h | 28 +++----
be/src/olap/tablet_manager.cpp | 29 ++++---
be/test/olap/cumulative_compaction_policy_test.cpp | 12 +--
docs/en/docs/admin-manual/config/be-config.md | 52 ++++---------
docs/zh-CN/docs/admin-manual/config/be-config.md | 52 ++++---------
19 files changed, 203 insertions(+), 432 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 3f5ca98021..aa07c6834f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -726,29 +726,25 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
.error(status);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
- if (config::enable_quick_compaction &&
config::quick_compaction_batch_size > 0) {
- for (int i = 0; i < succ_tablet_ids.size(); i++) {
- TabletSharedPtr tablet =
-
StorageEngine::instance()->tablet_manager()->get_tablet(
- succ_tablet_ids[i]);
- if (tablet != nullptr) {
- tablet->publised_count++;
- if (tablet->publised_count %
config::quick_compaction_batch_size == 0) {
-
StorageEngine::instance()->submit_quick_compaction_task(tablet);
- LOG(INFO) << "trigger quick compaction succ,
tabletid:"
- << succ_tablet_ids[i]
- << ", publised:" <<
tablet->publised_count;
- }
- } else {
- LOG(WARNING) << "trigger quick compaction failed,
tabletid:"
- << succ_tablet_ids[i];
+ for (int i = 0; i < succ_tablet_ids.size(); i++) {
+ TabletSharedPtr tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(succ_tablet_ids[i]);
+ if (tablet != nullptr) {
+ tablet->publised_count++;
+ if (tablet->publised_count % 10 == 0) {
+ StorageEngine::instance()->submit_compaction_task(
+ tablet, CompactionType::CUMULATIVE_COMPACTION);
+ LOG(INFO) << "trigger compaction succ, tabletid:" <<
succ_tablet_ids[i]
+ << ", publised:" << tablet->publised_count;
}
+ } else {
+ LOG(WARNING) << "trigger compaction failed, tabletid:" <<
succ_tablet_ids[i];
}
- LOG_INFO("successfully publish version")
- .tag("signature", agent_task_req.signature)
- .tag("transaction_id",
publish_version_req.transaction_id)
- .tag("tablets_num", succ_tablet_ids.size());
}
+ LOG_INFO("successfully publish version")
+ .tag("signature", agent_task_req.signature)
+ .tag("transaction_id", publish_version_req.transaction_id)
+ .tag("tablets_num", succ_tablet_ids.size());
}
status.to_thrift(&finish_task_request.task_status);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d2687b83a2..9faf33ef00 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -260,53 +260,35 @@ CONF_Int32(vertical_compaction_max_row_source_memory_mb,
"200");
// In ordered data compaction, min segment size for input rowset
CONF_mInt32(ordered_data_compaction_min_segment_size, "10485760");
-// check the configuration of auto compaction in seconds when auto compaction
disabled
-CONF_mInt32(check_auto_compaction_interval_seconds, "5");
+// This config can be set to limit thread number in compaction thread pool.
+CONF_mInt32(max_base_compaction_threads, "4");
+CONF_mInt32(max_cumu_compaction_threads, "10");
-CONF_mInt64(base_compaction_num_cumulative_deltas, "5");
-CONF_mDouble(base_cumulative_delta_ratio, "0.3");
-CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
-CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
CONF_Bool(enable_base_compaction_idle_sched, "true");
-
-// dup key not compaction big files
-CONF_Bool(enable_dup_key_base_compaction_skip_big_file, "true");
+CONF_mInt64(base_compaction_min_rowset_num, "5");
+CONF_mDouble(base_compaction_min_data_ratio, "0.3");
CONF_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
-// In size_based policy, output rowset of cumulative compaction total disk
size exceed this config size,
+// output rowset of cumulative compaction total disk size exceed this config
size,
// this rowset will be given to base compaction, unit is m byte.
-CONF_mInt64(cumulative_size_based_promotion_size_mbytes, "1024");
+CONF_mInt64(compaction_promotion_size_mbytes, "1024");
-// In size_based policy, output rowset of cumulative compaction total disk
size exceed this config ratio of
+// output rowset of cumulative compaction total disk size exceed this config
ratio of
// base rowset's total disk size, this rowset will be given to base
compaction. The value must be between
// 0 and 1.
-CONF_mDouble(cumulative_size_based_promotion_ratio, "0.05");
+CONF_mDouble(compaction_promotion_ratio, "0.05");
-// In size_based policy, the smallest size of rowset promotion. When the
rowset is less than this config, this
+// the smallest size of rowset promotion. When the rowset is less than this
config, this
// rowset will be not given to base compaction. The unit is m byte.
-CONF_mInt64(cumulative_size_based_promotion_min_size_mbytes, "64");
+CONF_mInt64(compaction_promotion_min_size_mbytes, "64");
// The lower bound size to do cumulative compaction. When total disk size of
candidate rowsets is less than
// this size, size_based policy may not do to cumulative compaction. The unit
is m byte.
-CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
+CONF_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
-CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
-CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
-
-// if compaction of a tablet failed, this tablet should not be chosen to
-// compaction until this interval passes.
-CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
-
-// This config can be set to limit thread number in compaction thread pool.
-CONF_mInt32(max_base_compaction_threads, "4");
-CONF_mInt32(max_cumu_compaction_threads, "10");
-
-// This config can be set to limit thread number in smallcompaction thread
pool.
-CONF_mInt32(quick_compaction_max_threads, "10");
-
-// Thread count to do tablet meta checkpoint, -1 means use the data
directories count.
-CONF_Int32(max_meta_checkpoint_threads, "-1");
+CONF_mInt64(cumulative_compaction_min_deltas, "5");
+CONF_mInt64(cumulative_compaction_max_deltas, "1000");
// This config can be set to limit thread number in segcompaction thread pool.
CONF_mInt32(seg_compaction_max_threads, "10");
@@ -315,7 +297,7 @@ CONF_mInt32(seg_compaction_max_threads, "10");
CONF_mInt64(total_permits_for_compaction_score, "10000");
// sleep interval in ms after generated compaction tasks
-CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10");
+CONF_mInt32(generate_compaction_tasks_interval_ms, "10");
// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction
have at least one thread each.
@@ -329,23 +311,17 @@ CONF_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction
when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
-// Merge log will be printed for each "row_step_for_compaction_merge_log" rows
merged during compaction
-CONF_mInt64(row_step_for_compaction_merge_log, "0");
-
// Threshold to logging compaction trace, in seconds.
CONF_mInt32(base_compaction_trace_threshold, "60");
CONF_mInt32(cumulative_compaction_trace_threshold, "10");
CONF_mBool(disable_compaction_trace_log, "true");
+// Thread count to do tablet meta checkpoint, -1 means use the data
directories count.
+CONF_Int32(max_meta_checkpoint_threads, "-1");
+
// Threshold to logging agent task trace, in seconds.
CONF_mInt32(agent_task_trace_threshold_sec, "2");
-// time interval to record tablet scan count in second for the purpose of
calculating tablet scan frequency
-CONF_mInt64(tablet_scan_frequency_time_node_interval_second, "300");
-// coefficient for tablet scan frequency and compaction score when finding a
tablet for compaction
-CONF_mInt32(compaction_tablet_scan_frequency_factor, "0");
-CONF_mInt32(compaction_tablet_compaction_score_factor, "1");
-
// This config can be set to limit thread number in tablet migration thread
pool.
CONF_Int32(min_tablet_migration_threads, "1");
CONF_Int32(max_tablet_migration_threads, "1");
@@ -807,14 +783,7 @@ CONF_mInt32(orc_natural_read_size_mb, "8");
// if it is lower than a specific threshold, the predicate will be disabled.
CONF_mInt32(bloom_filter_predicate_check_row_num, "204800");
-//whether turn on quick compaction feature
-CONF_Bool(enable_quick_compaction, "false");
-// For continuous versions that rows less than quick_compaction_max_rows will
trigger compaction quickly
-CONF_Int32(quick_compaction_max_rows, "1000");
-// min compaction versions
-CONF_Int32(quick_compaction_batch_size, "10");
-// do compaction min rowsets
-CONF_Int32(quick_compaction_min_rowsets, "10");
+CONF_Bool(enable_decimalv3, "false");
// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 0e1d737b15..ba71d2fd5d 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -87,10 +87,9 @@ Status BaseCompaction::execute_compact_impl() {
}
void BaseCompaction::_filter_input_rowset() {
- // if enable dup key skip big file and no delete predicate
+ // if dup_key and no delete predicate
// we skip big files too save resources
- if (!config::enable_dup_key_base_compaction_skip_big_file ||
- _tablet->keys_type() != KeysType::DUP_KEYS ||
_tablet->delete_predicates().size() != 0) {
+ if (_tablet->keys_type() != KeysType::DUP_KEYS ||
_tablet->delete_predicates().size() != 0) {
return;
}
int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes *
1024 * 1024;
@@ -144,11 +143,11 @@ Status BaseCompaction::pick_rowsets_to_compact() {
}
// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas
threshold
- if (_input_rowsets.size() > config::base_compaction_num_cumulative_deltas)
{
+ if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
<< ", num_cumulative_rowsets=" << _input_rowsets.size() - 1
<< ", base_compaction_num_cumulative_rowsets="
- << config::base_compaction_num_cumulative_deltas;
+ << config::base_compaction_min_rowset_num;
return Status::OK();
}
@@ -160,7 +159,7 @@ Status BaseCompaction::pick_rowsets_to_compact() {
cumulative_total_size += (*it)->data_disk_size();
}
- double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio;
+ double min_data_ratio = config::base_compaction_min_data_ratio;
if (base_size == 0) {
// base_size == 0 means this may be a base version [0-1], which has no
data.
// set to 1 to void divide by zero
@@ -168,18 +167,18 @@ Status BaseCompaction::pick_rowsets_to_compact() {
}
double cumulative_base_ratio = static_cast<double>(cumulative_total_size)
/ base_size;
- if (cumulative_base_ratio > base_cumulative_delta_ratio) {
+ if (cumulative_base_ratio > min_data_ratio) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
<< ", cumulative_total_size=" << cumulative_total_size
<< ", base_size=" << base_size
<< ", cumulative_base_ratio=" << cumulative_base_ratio
- << ", policy_ratio=" << base_cumulative_delta_ratio;
+ << ", policy_min_data_ratio=" << min_data_ratio;
return Status::OK();
}
// 3. the interval since last base compaction reaches the threshold
int64_t base_creation_time = _input_rowsets[0]->creation_time();
- int64_t interval_threshold =
config::base_compaction_interval_seconds_since_last_operation;
+ int64_t interval_threshold = 86400;
int64_t interval_since_last_base_compaction = time(nullptr) -
base_creation_time;
if (interval_since_last_base_compaction > interval_threshold) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->full_name()
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 65797a8914..364fb976ed 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -57,57 +57,6 @@ Status Compaction::execute_compact() {
return st;
}
-Status Compaction::quick_rowsets_compact() {
- std::unique_lock<std::mutex>
lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
- if (!lock.owns_lock()) {
- LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
- << _tablet->full_name();
- return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
- }
-
- // Clone task may happen after compaction task is submitted to thread
pool, and rowsets picked
- // for compaction may change. In this case, current compaction task should
not be executed.
- if (_tablet->get_clone_occurred()) {
- _tablet->set_clone_occurred(false);
- return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
- }
-
- _input_rowsets.clear();
- int version_count = _tablet->version_count();
- MonotonicStopWatch watch;
- watch.start();
- int64_t permits = 0;
- _tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
- std::vector<Version> missedVersions;
- find_longest_consecutive_version(&_input_rowsets, &missedVersions);
- if (missedVersions.size() != 0) {
- LOG(WARNING) << "quick_rowsets_compaction, find missed version"
- << ",input_size:" << _input_rowsets.size();
- }
- int nums = _input_rowsets.size();
- if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
- Status st = check_version_continuity(_input_rowsets);
- if (!st.ok()) {
- LOG(WARNING) << "quick_rowsets_compaction failed, cause version
not continuous";
- return st;
- }
- st = do_compaction(permits);
- if (!st.ok()) {
- gc_output_rowset();
- LOG(WARNING) << "quick_rowsets_compaction failed";
- } else {
- LOG(INFO) << "quick_compaction succ"
- << ", before_versions:" << version_count
- << ", after_versions:" << _tablet->version_count()
- << ", cost:" << (watch.elapsed_time() / 1000 / 1000) <<
"ms"
- << ", merged: " << nums << ", batch:" <<
config::quick_compaction_batch_size
- << ", segments:" << permits << ", tabletid:" <<
_tablet->tablet_id();
- _tablet->set_last_quick_compaction_success_time(UnixMillis());
- }
- }
- return Status::OK();
-}
-
Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
uint32_t checksum_before;
@@ -391,6 +340,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}
auto cumu_policy = _tablet->cumulative_compaction_policy();
+ DCHECK(cumu_policy);
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
<< " is_vertical=" << vertical_compaction << ". tablet=" <<
_tablet->full_name()
<< ", output_version=" << _output_version
@@ -399,8 +349,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ". elapsed time=" << watch.get_elapse_second()
- << "s. cumulative_compaction_policy="
- << (cumu_policy == nullptr ? "quick" : cumu_policy->name())
+ << "s. cumulative_compaction_policy=" << cumu_policy->name()
<< ", compact_row_per_second=" << int(_input_row_num /
watch.get_elapse_second());
return Status::OK();
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index bf11629fac..08cdcd34a4 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -48,7 +48,6 @@ public:
// This is only for http CompactionAction
Status compact();
- Status quick_rowsets_compact();
virtual Status prepare_compact() = 0;
Status execute_compact();
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 4736454c09..b2fca016c7 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -117,10 +117,9 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
size_t compaction_score = 0;
int transient_size =
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
- _tablet.get(), candidate_rowsets,
- config::max_cumulative_compaction_num_singleton_deltas,
- config::min_cumulative_compaction_num_singleton_deltas,
&_input_rowsets,
- &_last_delete_version, &compaction_score);
+ _tablet.get(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
+ config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version,
+ &compaction_score);
// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return
Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION):
@@ -143,8 +142,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
int64_t last_cumu = _tablet->last_cumu_compaction_success_time();
int64_t last_base = _tablet->last_base_compaction_success_time();
if (last_cumu != 0 || last_base != 0) {
- int64_t interval_threshold =
-
config::base_compaction_interval_seconds_since_last_operation * 1000;
+ int64_t interval_threshold = 86400 * 1000;
int64_t cumu_interval = now - last_cumu;
int64_t base_interval = now - last_base;
if (cumu_interval > interval_threshold && base_interval >
interval_threshold) {
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index 599fad92be..35d65be797 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -25,16 +25,16 @@
namespace doris {
SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
- int64_t size_based_promotion_size, double size_based_promotion_ratio,
- int64_t size_based_promotion_min_size, int64_t
size_based_compaction_lower_bound_size)
+ int64_t promotion_size, double promotion_ratio, int64_t
promotion_min_size,
+ int64_t compaction_min_size)
: CumulativeCompactionPolicy(),
- _size_based_promotion_size(size_based_promotion_size),
- _size_based_promotion_ratio(size_based_promotion_ratio),
- _size_based_promotion_min_size(size_based_promotion_min_size),
-
_size_based_compaction_lower_bound_size(size_based_compaction_lower_bound_size)
{
- // init _levels by divide 2 between size_based_compaction_lower_bound_size
and 1K
- // cu compaction handle file size less then
size_based_compaction_lower_bound_size
- int64_t i_size = size_based_promotion_size / 2;
+ _promotion_size(promotion_size),
+ _promotion_ratio(promotion_ratio),
+ _promotion_min_size(promotion_min_size),
+ _compaction_min_size(compaction_min_size) {
+ // init _levels by divide 2 between promotion_size and 1K
+ // cu compaction handle file size less then promotion_size
+ int64_t i_size = promotion_size / 2;
while (i_size >= 1024) {
_levels.push_back(i_size);
@@ -122,20 +122,19 @@ void
SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
void
SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr
base_rowset_meta,
int64_t*
promotion_size) {
int64_t base_size = base_rowset_meta->total_disk_size();
- *promotion_size = base_size * _size_based_promotion_ratio;
+ *promotion_size = base_size * _promotion_ratio;
- // promotion_size is between _size_based_promotion_size and
_size_based_promotion_min_size
- if (*promotion_size >= _size_based_promotion_size) {
- *promotion_size = _size_based_promotion_size;
- } else if (*promotion_size <= _size_based_promotion_min_size) {
- *promotion_size = _size_based_promotion_min_size;
+ // promotion_size is between _promotion_size and _promotion_min_size
+ if (*promotion_size >= _promotion_size) {
+ *promotion_size = _promotion_size;
+ } else if (*promotion_size <= _promotion_min_size) {
+ *promotion_size = _promotion_min_size;
}
- _refresh_tablet_size_based_promotion_size(*promotion_size);
+ _refresh_tablet_promotion_size(*promotion_size);
}
-void
SizeBasedCumulativeCompactionPolicy::_refresh_tablet_size_based_promotion_size(
- int64_t promotion_size) {
- _tablet_size_based_promotion_size = promotion_size;
+void
SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(int64_t
promotion_size) {
+ _tablet_promotion_size = promotion_size;
}
void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
@@ -152,7 +151,7 @@ void
SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
// if rowsets have no delete version, check output_rowset total disk
size
// satisfies promotion size.
size_t total_size = output_rowset->rowset_meta()->total_disk_size();
- if (total_size >= _tablet_size_based_promotion_size) {
+ if (total_size >= _tablet_promotion_size) {
tablet->set_cumulative_layer_point(output_rowset->end_version() +
1);
}
}
@@ -240,7 +239,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
size_t* compaction_score) {
- size_t promotion_size = _tablet_size_based_promotion_size;
+ size_t promotion_size = _tablet_promotion_size;
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
@@ -319,12 +318,10 @@ int
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
// if we have a sufficient number of segments, we should process the
compaction.
// otherwise, we check number of segments and total_size whether can do
compaction.
- if (total_size < _size_based_compaction_lower_bound_size &&
- *compaction_score < min_compaction_score) {
+ if (total_size < _compaction_min_size && *compaction_score <
min_compaction_score) {
input_rowsets->clear();
*compaction_score = 0;
- } else if (total_size >= _size_based_compaction_lower_bound_size &&
- input_rowsets->size() == 1) {
+ } else if (total_size >= _compaction_min_size && input_rowsets->size() ==
1) {
auto rs_meta = input_rowsets->front()->rowset_meta();
// if there is only one rowset and not overlapping,
// we do not need to do compaction
diff --git a/be/src/olap/cumulative_compaction_policy.h
b/be/src/olap/cumulative_compaction_policy.h
index 16e9b3b19b..15454c1892 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -117,13 +117,10 @@ public:
/// it needs tablet pointer to access tablet method.
/// param tablet, the shared pointer of tablet
SizeBasedCumulativeCompactionPolicy(
- int64_t size_based_promotion_size =
- config::cumulative_size_based_promotion_size_mbytes * 1024
* 1024,
- double size_based_promotion_ratio =
config::cumulative_size_based_promotion_ratio,
- int64_t size_based_promotion_min_size =
- config::cumulative_size_based_promotion_min_size_mbytes *
1024 * 1024,
- int64_t size_based_compaction_lower_bound_size =
- config::cumulative_size_based_compaction_lower_size_mbytes
* 1024 * 1024);
+ int64_t promotion_size = config::compaction_promotion_size_mbytes
* 1024 * 1024,
+ double promotion_ratio = config::compaction_promotion_ratio,
+ int64_t promotion_min_size =
config::compaction_promotion_min_size_mbytes * 1024 * 1024,
+ int64_t compaction_min_size = config::compaction_min_size_mbytes *
1024 * 1024);
/// Destructor function of SizeBasedCumulativeCompactionPolicy.
~SizeBasedCumulativeCompactionPolicy() {}
@@ -166,24 +163,24 @@ private:
void _calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta, int64_t*
promotion_size);
/// calculate the disk size belong to which level, the level is divide by
power of 2
- /// between cumulative_size_based_promotion_min_size_mbytes
- /// and cumulative_size_based_promotion_size_mbytes
+ /// between compaction_promotion_min_size_mbytes
+ /// and compaction_promotion_size_mbytes
int _level_size(const int64_t size);
/// when policy calculate cumulative_compaction_score, update promotion
size at the same time
- void _refresh_tablet_size_based_promotion_size(int64_t promotion_size);
+ void _refresh_tablet_promotion_size(int64_t promotion_size);
private:
/// cumulative compaction promotion size, unit is byte.
- int64_t _size_based_promotion_size;
+ int64_t _promotion_size;
/// cumulative compaction promotion ratio of base rowset total disk size.
- double _size_based_promotion_ratio;
+ double _promotion_ratio;
/// cumulative compaction promotion min size, unit is byte.
- int64_t _size_based_promotion_min_size;
+ int64_t _promotion_min_size;
/// lower bound size to do compaction compaction.
- int64_t _size_based_compaction_lower_bound_size;
+ int64_t _compaction_min_size;
/// record tablet promotion size, it is updated each time when calculate
cumulative_compaction_score
- int64_t _tablet_size_based_promotion_size;
+ int64_t _tablet_promotion_size;
/// levels division of disk size, same level rowsets can do compaction
std::vector<int64_t> _levels;
};
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index d330e430fe..d75e0b63d4 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -111,15 +111,17 @@ Status DeltaWriter::init() {
}
// check tablet version number
- if (_tablet->version_count() > config::max_tablet_version_num) {
- //trigger quick compaction
- if (config::enable_quick_compaction) {
- StorageEngine::instance()->submit_quick_compaction_task(_tablet);
+ if (_tablet->version_count() > config::max_tablet_version_num - 100) {
+ //trigger compaction
+ StorageEngine::instance()->submit_compaction_task(_tablet,
+
CompactionType::CUMULATIVE_COMPACTION);
+ if (_tablet->version_count() > config::max_tablet_version_num) {
+ LOG(WARNING) << "failed to init delta writer. version count: "
+ << _tablet->version_count()
+ << ", exceed limit: " <<
config::max_tablet_version_num
+ << ". tablet: " << _tablet->full_name();
+ return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}
- LOG(WARNING) << "failed to init delta writer. version count: " <<
_tablet->version_count()
- << ", exceed limit: " << config::max_tablet_version_num
- << ". tablet: " << _tablet->full_name();
- return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}
{
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 46bb15c96d..9217a88316 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -83,10 +83,6 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
dst_rowset_writer->add_row(row_cursor),
"failed to write row when merging rowsets of tablet " +
tablet->full_name());
output_rows++;
- LOG_IF(INFO, config::row_step_for_compaction_merge_log != 0 &&
- output_rows %
config::row_step_for_compaction_merge_log == 0)
- << "Merge rowsets stay alive. "
- << "tablet=" << tablet->full_name() << ", merged rows=" <<
output_rows;
// the memory allocate by mem pool has been copied,
// so we should release memory immediately
mem_pool->clear();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 6e39b0f093..a266d643b3 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -81,10 +81,6 @@ Status StorageEngine::start_bg_threads() {
.set_min_threads(config::max_cumu_compaction_threads)
.set_max_threads(config::max_cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool);
- ThreadPoolBuilder("SmallCompactionTaskThreadPool")
- .set_min_threads(config::quick_compaction_max_threads)
- .set_max_threads(config::quick_compaction_max_threads)
- .build(&_quick_compaction_thread_pool);
if (config::enable_segcompaction && config::enable_storage_vectorization) {
ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::seg_compaction_max_threads)
@@ -251,22 +247,19 @@ void StorageEngine::_disk_stat_monitor_thread_callback() {
}
void StorageEngine::check_cumulative_compaction_config() {
- int64_t size_based_promotion_size =
config::cumulative_size_based_promotion_size_mbytes;
- int64_t size_based_promotion_min_size =
config::cumulative_size_based_promotion_min_size_mbytes;
- int64_t size_based_compaction_lower_bound_size =
- config::cumulative_size_based_compaction_lower_size_mbytes;
+ int64_t promotion_size = config::compaction_promotion_size_mbytes;
+ int64_t promotion_min_size = config::compaction_promotion_min_size_mbytes;
+ int64_t compaction_min_size = config::compaction_min_size_mbytes;
// check size_based_promotion_size must be greater than
size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
- int64_t should_min_size_based_promotion_size =
- std::max(size_based_promotion_min_size, 2 *
size_based_compaction_lower_bound_size);
-
- if (size_based_promotion_size < should_min_size_based_promotion_size) {
- size_based_promotion_size = should_min_size_based_promotion_size;
- LOG(WARNING) << "the config size_based_promotion_size is adjusted to "
- "size_based_promotion_min_size or 2 * "
- "size_based_compaction_lower_bound_size "
- << should_min_size_based_promotion_size
- << ", because size_based_promotion_size is small";
+ int64_t should_min_promotion_size = std::max(promotion_min_size, 2 *
compaction_min_size);
+
+ if (promotion_size < should_min_promotion_size) {
+ promotion_size = should_min_promotion_size;
+ LOG(WARNING) << "the config promotion_size is adjusted to "
+ "promotion_min_size or 2 * "
+ "compaction_min_size "
+ << should_min_promotion_size << ", because
size_based_promotion_size is small";
}
}
@@ -417,7 +410,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t last_base_score_update_time = 0;
static const int64_t check_score_interval_ms = 5000; // 5 secs
- int64_t interval = config::generate_compaction_tasks_min_interval_ms;
+ int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
if (!config::disable_auto_compaction) {
_adjust_compaction_thread_num();
@@ -477,9 +470,9 @@ void StorageEngine::_compaction_tasks_producer_callback() {
<< tablet->tablet_id() << ", err: " <<
st.get_error_msg();
}
}
- interval = config::generate_compaction_tasks_min_interval_ms;
+ interval = config::generate_compaction_tasks_interval_ms;
} else {
- interval = config::check_auto_compaction_interval_seconds * 1000;
+ interval = 5000; // 5s to check disable_auto_compaction
}
} while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}
@@ -541,18 +534,18 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
? copied_cumu_map[data_dir]
: copied_base_map[data_dir],
&disk_max_score, _cumulative_compaction_policy);
- if (tablet != nullptr &&
-
!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
- if (need_pick_tablet) {
- tablets_compaction.emplace_back(tablet);
+ if (tablet != nullptr) {
+ if
(!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
+ if (need_pick_tablet) {
+ tablets_compaction.emplace_back(tablet);
+ }
+ max_compaction_score = std::max(max_compaction_score,
disk_max_score);
+ } else {
+ LOG_EVERY_N(INFO, 500)
+ << "Tablet " << tablet->full_name()
+ << " will be ignored by automatic compaction tasks
since it's "
+ << "set to disabled automatic compaction.";
}
- max_compaction_score = std::max(max_compaction_score,
disk_max_score);
- } else if (tablet != nullptr &&
-
tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
- LOG_EVERY_N(INFO, 500)
- << "Tablet " << tablet->full_name()
- << " will be ignored by automatic compaction tasks
since it's "
- << "set to disabled automatic compaction.";
}
}
}
@@ -671,29 +664,10 @@ Status
StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
if (tablet->get_cumulative_compaction_policy() == nullptr) {
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);
}
+ tablet->set_skip_compaction(false);
return _submit_compaction_task(tablet, compaction_type);
}
-Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) {
- CumulativeCompaction compact(tablet);
- compact.quick_rowsets_compact();
- _pop_tablet_from_submitted_compaction(tablet,
CompactionType::CUMULATIVE_COMPACTION);
- return Status::OK();
-}
-
-Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
- bool already_exist =
- _push_tablet_into_submitted_compaction(tablet,
CompactionType::CUMULATIVE_COMPACTION);
- if (already_exist) {
- return Status::AlreadyExist(
- "compaction task has already been submitted, tablet_id={},
compaction_type={}.",
- tablet->tablet_id(), CompactionType::CUMULATIVE_COMPACTION);
- }
- _quick_compaction_thread_pool->submit_func(
- std::bind<void>(&StorageEngine::_handle_quick_compaction, this,
tablet));
- return Status::OK();
-}
-
Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr
segments) {
writer->compact_segments(segments);
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index e08b3d562e..05a6b24a1e 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -135,14 +135,9 @@ StorageEngine::~StorageEngine() {
_cumu_compaction_thread_pool->shutdown();
}
- if (_quick_compaction_thread_pool) {
- _quick_compaction_thread_pool->shutdown();
- }
-
if (_seg_compaction_thread_pool) {
_seg_compaction_thread_pool->shutdown();
}
-
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6de80ead25..3e9c5d5857 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -186,7 +186,6 @@ public:
void check_cumulative_compaction_config();
Status submit_compaction_task(TabletSharedPtr tablet, CompactionType
compaction_type);
- Status submit_quick_compaction_task(TabletSharedPtr tablet);
Status submit_seg_compaction_task(BetaRowsetWriter* writer,
SegCompactionCandidatesSharedPtr
segments);
@@ -267,8 +266,6 @@ private:
Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType
compaction_type);
- Status _handle_quick_compaction(TabletSharedPtr);
-
void _adjust_compaction_thread_num();
void _cooldown_tasks_producer_callback();
@@ -366,7 +363,6 @@ private:
HeartbeatFlags* _heartbeat_flags;
- std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1c8e1a0905..d741203f1c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -92,8 +92,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir*
data_dir,
_newly_created_rowset_num(0),
_last_checkpoint_time(0),
_cumulative_compaction_type(cumulative_compaction_type),
- _last_record_scan_count(0),
- _last_record_scan_count_timestamp(time(nullptr)),
_is_clone_occurred(false),
_last_missed_version(-1),
_last_missed_time_s(0) {
@@ -939,56 +937,6 @@ void Tablet::calculate_cumulative_point() {
set_cumulative_layer_point(ret_cumulative_point);
}
-//find rowsets that rows less then "config::quick_compaction_max_rows"
-Status Tablet::pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>*
input_rowsets,
- int64_t* permits) {
- int max_rows = config::quick_compaction_max_rows;
- if (!config::enable_quick_compaction || max_rows <= 0) {
- return Status::OK();
- }
- if (!init_succeeded()) {
- return
Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS);
- }
- int max_series_num = 1000;
-
- std::vector<std::vector<RowsetSharedPtr>>
quick_compaction_rowsets(max_series_num);
- int idx = 0;
- std::shared_lock rdlock(_meta_lock);
- std::vector<RowsetSharedPtr> sortedRowset;
- for (auto& rs : _rs_version_map) {
- sortedRowset.push_back(rs.second);
- }
- std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator);
- if (tablet_state() == TABLET_RUNNING) {
- for (int i = 0; i < sortedRowset.size(); i++) {
- bool is_delete =
version_for_delete_predicate(sortedRowset[i]->version());
- if (!is_delete && sortedRowset[i]->start_version() > 0 &&
- sortedRowset[i]->start_version() > cumulative_layer_point()) {
- if (sortedRowset[i]->num_rows() < max_rows) {
- quick_compaction_rowsets[idx].push_back(sortedRowset[i]);
- } else {
- idx++;
- if (idx > max_series_num) {
- break;
- }
- }
- }
- }
- if (quick_compaction_rowsets.size() == 0) return Status::OK();
- std::vector<RowsetSharedPtr> result = quick_compaction_rowsets[0];
- for (int i = 0; i < quick_compaction_rowsets.size(); i++) {
- if (quick_compaction_rowsets[i].size() > result.size()) {
- result = quick_compaction_rowsets[i];
- }
- }
- for (int i = 0; i < result.size(); i++) {
- *permits += result[i]->num_segments();
- input_rowsets->push_back(result[i]);
- }
- }
- return Status::OK();
-}
-
Status Tablet::split_range(const OlapTuple& start_key_strings, const
OlapTuple& end_key_strings,
uint64_t request_block_row_count,
std::vector<OlapTuple>* ranges) {
DCHECK(ranges != nullptr);
@@ -1482,18 +1430,6 @@ void
Tablet::generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_m
new_tablet_meta->init_from_pb(tablet_meta_pb);
}
-double Tablet::calculate_scan_frequency() {
- time_t now = time(nullptr);
- int64_t current_count = query_scan_count->value();
- double interval = difftime(now, _last_record_scan_count_timestamp);
- double scan_frequency = (current_count - _last_record_scan_count) * 60 /
interval;
- if (interval >= config::tablet_scan_frequency_time_node_interval_second) {
- _last_record_scan_count = current_count;
- _last_record_scan_count_timestamp = now;
- }
- return scan_frequency;
-}
-
Status Tablet::prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
TabletSharedPtr
tablet, int64_t* permits) {
std::vector<RowsetSharedPtr> compaction_rowsets;
@@ -2238,4 +2174,31 @@ bool Tablet::check_all_rowset_segment() {
return true;
}
+void Tablet::set_skip_compaction(bool skip, CompactionType compaction_type,
int64_t start) {
+ if (!skip) {
+ _skip_cumu_compaction = false;
+ _skip_base_compaction = false;
+ return;
+ }
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
+ _skip_cumu_compaction = true;
+ _skip_cumu_compaction_ts = start;
+ } else {
+ DCHECK(compaction_type == CompactionType::BASE_COMPACTION);
+ _skip_base_compaction = true;
+ _skip_base_compaction_ts = start;
+ }
+}
+
+bool Tablet::should_skip_compaction(CompactionType compaction_type, int64_t
now) {
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION &&
_skip_cumu_compaction &&
+ now < _skip_cumu_compaction_ts + 120) {
+ return true;
+ } else if (compaction_type == CompactionType::BASE_COMPACTION &&
_skip_base_compaction &&
+ now < _skip_base_compaction_ts + 120) {
+ return true;
+ }
+ return false;
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 4a66e8824c..0990de435b 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -80,8 +80,6 @@ public:
// Used in clone task, to update local meta when finishing a clone job
Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>&
rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
- Status pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>*
input_rowsets,
- int64_t* permits);
const int64_t cumulative_layer_point() const;
void set_cumulative_layer_point(int64_t new_point);
@@ -210,10 +208,6 @@ public:
_last_cumu_compaction_success_millis = millis;
}
- void set_last_quick_compaction_success_time(int64_t millis) {
- _last_quick_compaction_success_time_millis = millis;
- }
-
int64_t last_base_compaction_success_time() { return
_last_base_compaction_success_millis; }
void set_last_base_compaction_success_time(int64_t millis) {
_last_base_compaction_success_millis = millis;
@@ -263,8 +257,6 @@ public:
// return a json string to show the compaction status of this tablet
void get_compaction_status(std::string* json_result);
- double calculate_scan_frequency();
-
Status prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
TabletSharedPtr tablet,
int64_t* permits);
void execute_compaction(CompactionType compaction_type);
@@ -365,6 +357,11 @@ public:
void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
+ void set_skip_compaction(bool skip,
+ CompactionType compaction_type =
CompactionType::CUMULATIVE_COMPACTION,
+ int64_t start = -1);
+ bool should_skip_compaction(CompactionType compaction_type, int64_t now);
+
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
@@ -446,7 +443,6 @@ private:
std::atomic<int64_t> _last_cumu_compaction_success_millis;
// timestamp of last base compaction success
std::atomic<int64_t> _last_base_compaction_success_millis;
- std::atomic<int64_t> _last_quick_compaction_success_time_millis;
std::atomic<int64_t> _cumulative_point;
std::atomic<int32_t> _newly_created_rowset_num;
std::atomic<int64_t> _last_checkpoint_time;
@@ -455,14 +451,6 @@ private:
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::string _cumulative_compaction_type;
- // the value of metric 'query_scan_count' and timestamp will be recorded
when every time
- // 'config::tablet_scan_frequency_time_node_interval_second' passed to
calculate tablet
- // scan frequency.
- // the value of metric 'query_scan_count' for the last record.
- int64_t _last_record_scan_count;
- // the timestamp of the last record.
- time_t _last_record_scan_count_timestamp;
-
std::shared_ptr<CumulativeCompaction> _cumulative_compaction;
std::shared_ptr<BaseCompaction> _base_compaction;
// whether clone task occurred during the tablet is in thread pool queue
to wait for compaction
@@ -477,6 +465,12 @@ private:
// Max schema_version schema from Rowset or FE
TabletSchemaSPtr _max_version_schema;
+ bool _skip_cumu_compaction = false;
+ int64_t _skip_cumu_compaction_ts;
+
+ bool _skip_base_compaction = false;
+ int64_t _skip_base_compaction_ts;
+
DISALLOW_COPY_AND_ASSIGN(Tablet);
public:
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index c877a8d0ec..ad16257544 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -641,14 +641,16 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" :
"cumulative";
- double highest_score = 0.0;
+ uint32_t highest_score = 0;
uint32_t compaction_score = 0;
- double tablet_scan_frequency = 0.0;
TabletSharedPtr best_tablet;
for (const auto& tablets_shard : _tablets_shards) {
std::shared_lock rdlock(tablets_shard.lock);
for (const auto& tablet_map : tablets_shard.tablet_map) {
const TabletSharedPtr& tablet_ptr = tablet_map.second;
+ if (tablet_ptr->should_skip_compaction(compaction_type,
UnixSeconds())) {
+ continue;
+ }
if (!tablet_ptr->can_do_compaction(data_dir->path_hash(),
compaction_type)) {
continue;
}
@@ -662,7 +664,11 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
if (compaction_type == CompactionType::BASE_COMPACTION) {
last_failure_ms =
tablet_ptr->last_base_compaction_failure_time();
}
- if (now_ms - last_failure_ms <=
config::min_compaction_failure_interval_sec * 1000) {
+ if (now_ms - last_failure_ms <= 5000) {
+ VLOG_DEBUG << "Too often to check compaction, skip it. "
+ << "compaction_type=" << compaction_type_str
+ << ", last_failure_time_ms=" << last_failure_ms
+ << ", tablet_id=" << tablet_ptr->tablet_id();
continue;
}
@@ -684,19 +690,13 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
uint32_t current_compaction_score =
tablet_ptr->calc_compaction_score(
compaction_type, cumulative_compaction_policy);
-
- double scan_frequency = 0.0;
- if (config::compaction_tablet_scan_frequency_factor != 0) {
- scan_frequency = tablet_ptr->calculate_scan_frequency();
+ if (current_compaction_score < 5) {
+ LOG(INFO) << "tablet set skip compaction, tablet_id: " <<
tablet_ptr->tablet_id();
+ tablet_ptr->set_skip_compaction(true, compaction_type,
UnixSeconds());
}
-
- double tablet_score =
- config::compaction_tablet_scan_frequency_factor *
scan_frequency +
- config::compaction_tablet_compaction_score_factor *
current_compaction_score;
- if (tablet_score > highest_score) {
- highest_score = tablet_score;
+ if (current_compaction_score > highest_score) {
+ highest_score = current_compaction_score;
compaction_score = current_compaction_score;
- tablet_scan_frequency = scan_frequency;
best_tablet = tablet_ptr;
}
}
@@ -707,7 +707,6 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
<< "compaction_type=" << compaction_type_str
<< ", tablet_id=" << best_tablet->tablet_id() << ",
path=" << data_dir->path()
<< ", compaction_score=" << compaction_score
- << ", tablet_scan_frequency=" << tablet_scan_frequency
<< ", highest_score=" << highest_score;
*score = compaction_score;
}
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp
b/be/test/olap/cumulative_compaction_policy_test.cpp
index e0425c36b8..3f46c508d1 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -31,10 +31,10 @@ class TestSizeBasedCumulativeCompactionPolicy : public
testing::Test {
public:
TestSizeBasedCumulativeCompactionPolicy() {}
void SetUp() {
- config::cumulative_size_based_promotion_size_mbytes = 1024;
- config::cumulative_size_based_promotion_ratio = 0.05;
- config::cumulative_size_based_promotion_min_size_mbytes = 64;
- config::cumulative_size_based_compaction_lower_size_mbytes = 64;
+ config::compaction_promotion_size_mbytes = 1024;
+ config::compaction_promotion_ratio = 0.05;
+ config::compaction_promotion_min_size_mbytes = 64;
+ config::compaction_min_size_mbytes = 64;
_tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10),
@@ -653,7 +653,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
_calc_promotion_size_big) {
dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
_tablet->_cumulative_compaction_policy.get());
- EXPECT_EQ(1073741824, policy->_tablet_size_based_promotion_size);
+ EXPECT_EQ(1073741824, policy->_tablet_promotion_size);
}
TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
@@ -671,7 +671,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
_calc_promotion_size_small) {
SizeBasedCumulativeCompactionPolicy* policy =
dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
_tablet->_cumulative_compaction_policy.get());
- EXPECT_EQ(67108864, policy->_tablet_size_based_promotion_size);
+ EXPECT_EQ(67108864, policy->_tablet_promotion_size);
}
TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 6e5eda0d1a..83596c9c4d 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -105,7 +105,7 @@ Default: 3
The number of threads making schema changes
-### `generate_compaction_tasks_min_interval_ms`
+### `generate_compaction_tasks_interval_ms`
Default: 10 (ms)
@@ -117,13 +117,7 @@ Default: true
Whether to enable vectorized compaction
-### `base_compaction_interval_seconds_since_last_operation`
-
-Default: 86400
-
-One of the triggering conditions of BaseCompaction: the interval since the
last BaseCompaction
-
-### `base_compaction_num_cumulative_deltas`
+### `base_compaction_min_rowset_num`
Default: 5
@@ -160,7 +154,7 @@ Default: 5(MB)
Maximum disk write speed per second of BaseCompaction task
-### `base_cumulative_delta_ratio`
+### `base_compaction_min_data_ratio`
Default: 0.3 (30%)
@@ -224,12 +218,6 @@ Clean up pages that may be saved by the buffer pool
The maximum amount of memory available in the BE buffer pool. The buffer pool
is a new memory management structure of BE, which manages the memory by the
buffer page and enables spill data to disk. The memory for all concurrent
queries will be allocated from the buffer pool. The current buffer pool only
works on **AggregationNode** and **ExchangeNode**.
-### `check_auto_compaction_interval_seconds`
-
-* Type: int32
-* Description: Check the configuration of auto compaction in seconds when auto
compaction disabled.
-* Default value: 5
-
### `check_consistency_worker_count`
Default: 1
@@ -351,34 +339,34 @@ Similar to `base_compaction_trace_threshold`.
If set to true, the `cumulative_compaction_trace_threshold` and
`base_compaction_trace_threshold` won't work and log is disabled.
-### `cumulative_size_based_promotion_size_mbytes`
+### `compaction_promotion_size_mbytes`
* Type: int64
-* Description: Under the size_based policy, the total disk size of the output
rowset of cumulative compaction exceeds this configuration size, and the rowset
will be used for base compaction. The unit is m bytes.
+* Description: The total disk size of the output rowset of cumulative
compaction exceeds this configuration size, and the rowset will be used for
base compaction. The unit is m bytes.
* Default value: 1024
In general, if the configuration is less than 2G, in order to prevent the
cumulative compression time from being too long, resulting in the version
backlog.
-### `cumulative_size_based_promotion_ratio`
+### `compaction_promotion_ratio`
* Type: double
-* Description: Under the size_based policy, when the total disk size of the
cumulative compaction output rowset exceeds the configuration ratio of the base
version rowset, the rowset will be used for base compaction.
+* Description: When the total disk size of the cumulative compaction output
rowset exceeds the configuration ratio of the base version rowset, the rowset
will be used for base compaction.
* Default value: 0.05
Generally, it is recommended that the configuration should not be higher than
0.1 and lower than 0.02.
-### `cumulative_size_based_promotion_min_size_mbytes`
+### `compaction_promotion_min_size_mbytes`
* Type: int64
-* Description: Under the size_based strategy, if the total disk size of the
output rowset of the cumulative compaction is lower than this configuration
size, the rowset will not undergo base compaction and is still in the
cumulative compaction process. The unit is m bytes.
+* Description: If the total disk size of the output rowset of the cumulative
compaction is lower than this configuration size, the rowset will not undergo
base compaction and is still in the cumulative compaction process. The unit is
m bytes.
* Default value: 64
Generally, the configuration is within 512m. If the configuration is too
large, the size of the early base version is too small, and base compaction has
not been performed.
-### `cumulative_size_based_compaction_lower_size_mbytes`
+### `compaction_min_size_mbytes`
* Type: int64
-* Description: Under the size_based strategy, when the cumulative compaction
is merged, the selected rowsets to be merged have a larger disk size than this
configuration, then they are divided and merged according to the level policy.
When it is smaller than this configuration, merge directly. The unit is m bytes.
+* Description: When the cumulative compaction is merged, the selected rowsets
to be merged have a larger disk size than this configuration, then they are
divided and merged according to the level policy. When it is smaller than this
configuration, merge directly. The unit is m bytes.
* Default value: 64
Generally, the configuration is within 128m. Over configuration will cause
more cumulative compaction write amplification.
@@ -752,13 +740,13 @@ Default: 3
The maximum number of consumers in a data consumer group, used for routine load
-### `min_cumulative_compaction_num_singleton_deltas`
+### `cumulative_compaction_min_deltas`
Default: 5
Cumulative compaction strategy: the minimum number of incremental files
-### `max_cumulative_compaction_num_singleton_deltas`
+### `cumulative_compaction_max_deltas`
Default: 1000
@@ -880,13 +868,6 @@ Default: 1024
Minimum read buffer size (in bytes)
-### `min_compaction_failure_interval_sec`
-
-* Type: int32
-* Description: During the cumulative compaction process, when the selected
tablet fails to be merged successfully, it will wait for a period of time
before it may be selected again. The waiting period is the value of this
configuration.
-* Default value: 600
-* Unit: seconds
-
### `min_compaction_threads`
* Type: int32
@@ -1085,13 +1066,6 @@ Default: true
Check row nums for BE/CE and schema change. true is open, false is closed
-### `row_step_for_compaction_merge_log`
-
-* Type: int64
-* Description: Merge log will be printed for each
"row_step_for_compaction_merge_log" rows merged during compaction. If the value
is set to 0, merge log will not be printed.
-* Default value: 0
-* Dynamically modify: true
-
### `scan_context_gc_interval_min`
Default: 5
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 40277d6a8e..ef51e44e9d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -101,7 +101,7 @@ BE 的配置项有两种方式进行配置:
进行schema change的线程数
-### `generate_compaction_tasks_min_interval_ms`
+### `generate_compaction_tasks_interval_ms`
默认值:10 (ms)
@@ -113,13 +113,7 @@ BE 的配置项有两种方式进行配置:
是否开启向量化compaction
-### `base_compaction_interval_seconds_since_last_operation`
-
-默认值:86400
-
-BaseCompaction触发条件之一:上一次BaseCompaction距今的间隔
-
-### `base_compaction_num_cumulative_deltas`
+### `base_compaction_min_rowset_num`
默认值:5
@@ -131,7 +125,7 @@ BaseCompaction触发条件之一:Cumulative文件数目要达到的限制,
BaseCompaction任务每秒写磁盘最大速度
-### `base_cumulative_delta_ratio`
+### `base_compaction_min_data_ratio`
默认值:0.3 (30%)
@@ -217,12 +211,6 @@ Metrics:
{"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
BE缓存池最大的内存可用量,buffer pool是BE新的内存管理结构,通过buffer
page来进行内存管理,并能够实现数据的落盘。并发的所有查询的内存申请都会通过buffer pool来申请。当前buffer
pool仅作用在**AggregationNode**与**ExchangeNode**。
-### `check_auto_compaction_interval_seconds`
-
-* 类型:int32
-* 描述:当自动执行compaction的功能关闭时,检查自动compaction开关是否被开启的时间间隔。
-* 默认值:5
-
### `check_consistency_worker_count`
默认值:1
@@ -347,34 +335,34 @@ BaseCompaction触发条件之一:Singleton文件大小限制,100MB
如果设置为true,`cumulative_compaction_trace_threshold` 和
`base_compaction_trace_threshold` 将不起作用。并且trace日志将关闭。
-### `cumulative_size_based_promotion_size_mbytes`
+### `compaction_promotion_size_mbytes`
* 类型:int64
-* 描述:在size_based策略下,cumulative compaction的输出rowset总磁盘大小超过了此配置大小,该rowset将用于base
compaction。单位是m字节。
+* 描述:cumulative compaction的输出rowset总磁盘大小超过了此配置大小,该rowset将用于base
compaction。单位是m字节。
* 默认值:1024
一般情况下,配置在2G以内,为了防止cumulative compaction时间过长,导致版本积压。
-### `cumulative_size_based_promotion_ratio`
+### `compaction_promotion_ratio`
* 类型:double
-* 描述:在size_based策略下,cumulative
compaction的输出rowset总磁盘大小超过base版本rowset的配置比例时,该rowset将用于base compaction。
+* 描述:cumulative compaction的输出rowset总磁盘大小超过base版本rowset的配置比例时,该rowset将用于base
compaction。
* 默认值:0.05
一般情况下,建议配置不要高于0.1,低于0.02。
-### `cumulative_size_based_promotion_min_size_mbytes`
+### `compaction_promotion_min_size_mbytes`
* 类型:int64
-* 描述:在size_based策略下,cumulative compaction的输出rowset总磁盘大小低于此配置大小,该rowset将不进行base
compaction,仍然处于cumulative compaction流程中。单位是m字节。
+* 描述:Cumulative compaction的输出rowset总磁盘大小低于此配置大小,该rowset将不进行base
compaction,仍然处于cumulative compaction流程中。单位是m字节。
* 默认值:64
一般情况下,配置在512m以内,配置过大会导致base版本早期的大小过小,一直不进行base compaction。
-### `cumulative_size_based_compaction_lower_size_mbytes`
+### `compaction_min_size_mbytes`
* 类型:int64
-* 描述:在size_based策略下,cumulative
compaction进行合并时,选出的要进行合并的rowset的总磁盘大小大于此配置时,才按级别策略划分合并。小于这个配置时,直接执行合并。单位是m字节。
+* 描述:cumulative
compaction进行合并时,选出的要进行合并的rowset的总磁盘大小大于此配置时,才按级别策略划分合并。小于这个配置时,直接执行合并。单位是m字节。
* 默认值:64
一般情况下,配置在128m以内,配置过大会导致cumulative compaction写放大较多。
@@ -753,13 +741,13 @@ soft limit是指站单节点导入内存上限的比例。例如所有导入任
一个数据消费者组中的最大消费者数量,用于routine load
-### `min_cumulative_compaction_num_singleton_deltas`
+### `cumulative_compaction_min_deltas`
默认值:5
cumulative compaction策略:最小增量文件的数量
-### `max_cumulative_compaction_num_singleton_deltas`
+### `cumulative_compaction_max_deltas`
默认值:1000
@@ -881,13 +869,6 @@ txn 管理器中每个 txn_partition_map 的最大 txns 数,这是一种自我
最小读取缓冲区大小(以字节为单位)
-### `min_compaction_failure_interval_sec`
-
-* 类型:int32
-* 描述:在 cumulative compaction 过程中,当选中的 tablet
没能成功的进行版本合并,则会等待一段时间后才会再次有可能被选中。等待的这段时间就是这个配置的值。
-* 默认值:5
-* 单位:秒
-
### `min_compaction_threads`
* 类型:int32
@@ -1086,13 +1067,6 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren
检查 BE/CE 和schema更改的行号。 true 是打开的,false 是关闭的。
-### `row_step_for_compaction_merge_log`
-
-* 类型:int64
-*
描述:Compaction执行过程中,每次合并row_step_for_compaction_merge_log行数据会打印一条LOG。如果该参数被设置为0,表示merge过程中不需要打印LOG。
-* 默认值: 0
-* 可动态修改:是
-
### `scan_context_gc_interval_min`
默认值:5
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]