This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1b9185830 [feature](cooldown) Implement cold data compaction (#16681)
f1b9185830 is described below
commit f1b91858309a66ea37d3d87190f36646cd081ef3
Author: plat1ko <[email protected]>
AuthorDate: Tue Feb 14 15:21:54 2023 +0800
[feature](cooldown) Implement cold data compaction (#16681)
---
be/src/common/config.h | 2 +
be/src/olap/CMakeLists.txt | 1 +
be/src/olap/base_compaction.cpp | 9 +-
be/src/olap/base_compaction.h | 2 +-
be/src/olap/cold_data_compaction.cpp | 78 ++++++++++
...ulative_compaction.h => cold_data_compaction.h} | 24 +--
be/src/olap/compaction.cpp | 93 +++++++----
be/src/olap/compaction.h | 4 +-
be/src/olap/cumulative_compaction.cpp | 2 +-
be/src/olap/cumulative_compaction.h | 2 +-
be/src/olap/cumulative_compaction_policy.cpp | 34 ++--
be/src/olap/cumulative_compaction_policy.h | 9 +-
be/src/olap/olap_common.h | 1 +
be/src/olap/olap_server.cpp | 112 +++++++++++++
be/src/olap/reader.cpp | 4 +-
be/src/olap/storage_engine.h | 3 +
be/src/olap/tablet.cpp | 173 ++++++++++++++-------
be/src/olap/tablet.h | 11 ++
be/src/olap/tablet_meta.h | 4 +-
be/src/vec/olap/vertical_merge_iterator.cpp | 4 +
20 files changed, 432 insertions(+), 140 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 176b3d665a..abf2bf3574 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -811,6 +811,8 @@ CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
CONF_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h
CONF_mInt32(confirm_unused_remote_files_interval_sec, "60");
+CONF_Int32(cold_data_compaction_thread_num, "2");
+CONF_mInt32(cold_data_compaction_interval_sec, "1800");
CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
CONF_Int32(concurrency_per_dir, "2");
CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 3ae9d90ba2..c37bf8938f 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(Olap STATIC
base_tablet.cpp
bloom_filter.hpp
block_column_predicate.cpp
+ cold_data_compaction.cpp
compaction.cpp
compaction_permit_limiter.cpp
cumulative_compaction.cpp
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 36fdb193de..d67f8a2d72 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -23,7 +23,7 @@
namespace doris {
using namespace ErrorCode;
-BaseCompaction::BaseCompaction(TabletSharedPtr tablet)
+BaseCompaction::BaseCompaction(const TabletSharedPtr& tablet)
: Compaction(tablet, "BaseCompaction:" +
std::to_string(tablet->tablet_id())) {}
BaseCompaction::~BaseCompaction() {}
@@ -90,9 +90,14 @@ Status BaseCompaction::execute_compact_impl() {
void BaseCompaction::_filter_input_rowset() {
// if dup_key and no delete predicate
// we skip big files too save resources
- if (_tablet->keys_type() != KeysType::DUP_KEYS ||
_tablet->delete_predicates().size() != 0) {
+ if (_tablet->keys_type() != KeysType::DUP_KEYS) {
return;
}
+ for (auto& rs : _input_rowsets) {
+ if (rs->rowset_meta()->has_delete_predicate()) {
+ return;
+ }
+ }
int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes *
1024 * 1024;
// first find a proper rowset for start
auto rs_iter = _input_rowsets.begin();
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index 96a4a362f4..da53a89d95 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -28,7 +28,7 @@ namespace doris {
class BaseCompaction : public Compaction {
public:
- BaseCompaction(TabletSharedPtr tablet);
+ BaseCompaction(const TabletSharedPtr& tablet);
~BaseCompaction() override;
Status prepare_compact() override;
diff --git a/be/src/olap/cold_data_compaction.cpp
b/be/src/olap/cold_data_compaction.cpp
new file mode 100644
index 0000000000..4b06ee7616
--- /dev/null
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cold_data_compaction.h"
+
+#include "common/compiler_util.h"
+#include "olap/compaction.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+ColdDataCompaction::ColdDataCompaction(const TabletSharedPtr& tablet)
+ : Compaction(tablet, "ColdDataCompaction:" +
std::to_string(tablet->tablet_id())) {}
+
+ColdDataCompaction::~ColdDataCompaction() = default;
+
+Status ColdDataCompaction::prepare_compact() {
+ if (UNLIKELY(!_tablet->init_succeeded())) {
+ return Status::Error<INVALID_ARGUMENT>();
+ }
+ return pick_rowsets_to_compact();
+}
+
+Status ColdDataCompaction::execute_compact_impl() {
+#ifndef __APPLE__
+ if (config::enable_base_compaction_idle_sched) {
+ Thread::set_idle_sched();
+ }
+#endif
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ int64_t permits = get_compaction_permits();
+ RETURN_IF_ERROR(do_compaction(permits));
+ _state = CompactionState::SUCCESS;
+ return Status::OK();
+}
+
+Status ColdDataCompaction::pick_rowsets_to_compact() {
+ _tablet->traverse_rowsets([this](const auto& rs) {
+ if (!rs->is_local()) {
+ _input_rowsets.push_back(rs);
+ }
+ });
+ std::sort(_input_rowsets.begin(), _input_rowsets.end(),
Rowset::comparator);
+ return check_version_continuity(_input_rowsets);
+}
+
+Status ColdDataCompaction::modify_rowsets() {
+ {
+ std::lock_guard wlock(_tablet->get_header_lock());
+ // Merged cooldowned rowsets MUST NOT be managed by version graph,
they will be reclaimed by `remove_unused_remote_files`.
+ _tablet->delete_rowsets(_input_rowsets, false);
+ _tablet->add_rowsets({_output_rowset});
+ // TODO(plat1ko): process primary key
+ _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid());
+ }
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ _tablet->save_meta();
+ }
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/olap/cumulative_compaction.h
b/be/src/olap/cold_data_compaction.h
similarity index 68%
copy from be/src/olap/cumulative_compaction.h
copy to be/src/olap/cold_data_compaction.h
index f228e91975..75b4d064ca 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cold_data_compaction.h
@@ -17,34 +17,24 @@
#pragma once
-#include <string>
-
#include "olap/compaction.h"
-#include "olap/cumulative_compaction_policy.h"
namespace doris {
-class CumulativeCompaction : public Compaction {
+class ColdDataCompaction final : public Compaction {
public:
- CumulativeCompaction(TabletSharedPtr tablet);
- ~CumulativeCompaction() override;
+ ColdDataCompaction(const TabletSharedPtr& tablet);
+ ~ColdDataCompaction() override;
Status prepare_compact() override;
Status execute_compact_impl() override;
- std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
-protected:
- Status pick_rowsets_to_compact() override;
-
- std::string compaction_name() const override { return "cumulative
compaction"; }
-
- ReaderType compaction_type() const override { return
ReaderType::READER_CUMULATIVE_COMPACTION; }
-
private:
- Version _last_delete_version {-1, -1};
+ std::string compaction_name() const override { return "cold data
compaction"; }
+ ReaderType compaction_type() const override { return
ReaderType::READER_COLD_DATA_COMPACTION; }
- DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction);
+ Status pick_rowsets_to_compact() override;
+ Status modify_rowsets() override;
};
} // namespace doris
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f968f219fe..b7fedd2203 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -33,7 +33,7 @@ using std::vector;
namespace doris {
using namespace ErrorCode;
-Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
+Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
: _tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
@@ -197,6 +197,10 @@ bool Compaction::handle_ordered_data_compaction() {
if (!config::enable_ordered_data_compaction) {
return false;
}
+ if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+ // The remote file system does not support to link files.
+ return false;
+ }
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
return false;
@@ -204,7 +208,7 @@ bool Compaction::handle_ordered_data_compaction() {
// check delete version: if compaction type is base compaction and
// has a delete version, use original compaction
if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
- for (auto rowset : _input_rowsets) {
+ for (auto& rowset : _input_rowsets) {
if (rowset->rowset_meta()->has_delete_predicate()) {
return false;
}
@@ -247,7 +251,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
int64_t now = UnixMillis();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
_tablet->set_last_cumu_compaction_success_time(now);
- } else {
+ } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
_tablet->set_last_base_compaction_success_time(now);
}
auto cumu_policy = _tablet->cumulative_compaction_policy();
@@ -273,41 +277,52 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// 2. write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no
need to tracker its mem pool
Merger::Statistics stats;
- Status res;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
stats.rowid_conversion = &_rowid_conversion;
}
- if (use_vectorized_compaction) {
- if (vertical_compaction) {
- res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
- _input_rs_readers,
_output_rs_writer.get(),
- get_avg_segment_rows(),
&stats);
+ auto build_output_rowset = [&]() {
+ Status res;
+ if (use_vectorized_compaction) {
+ if (vertical_compaction) {
+ res = Merger::vertical_merge_rowsets(_tablet,
compaction_type(), _cur_tablet_schema,
+ _input_rs_readers,
_output_rs_writer.get(),
+ get_avg_segment_rows(),
&stats);
+ } else {
+ res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
+ _input_rs_readers,
_output_rs_writer.get(), &stats);
+ }
} else {
- res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
- _input_rs_readers,
_output_rs_writer.get(), &stats);
+ LOG(FATAL) << "Only support vectorized compaction";
}
- } else {
- LOG(FATAL) << "Only support vectorized compaction";
- }
- if (!res.ok()) {
- LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ".
res=" << res
- << ", tablet=" << _tablet->full_name()
- << ", output_version=" << _output_version;
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to do " << merge_type << compaction_name()
<< ". res=" << res
+ << ", tablet=" << _tablet->full_name()
+ << ", output_version=" << _output_version;
+ return res;
+ }
+ TRACE("merge rowsets finished");
+ TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
+ TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
+
+ _output_rowset = _output_rs_writer->build();
+ if (_output_rowset == nullptr) {
+ LOG(WARNING) << "rowset writer build failed. writer version:"
+ << ", output_version=" << _output_version;
+ return Status::Error<ROWSET_BUILDER_INIT>();
+ }
return res;
- }
- TRACE("merge rowsets finished");
- TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
- TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
+ };
- _output_rowset = _output_rs_writer->build();
- if (_output_rowset == nullptr) {
- LOG(WARNING) << "rowset writer build failed. writer version:"
- << ", output_version=" << _output_version;
- return Status::Error<ROWSET_BUILDER_INIT>();
+ if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+ std::shared_lock slock(_tablet->get_remote_files_lock());
+ RETURN_IF_ERROR(build_output_rowset());
+ } else {
+ RETURN_IF_ERROR(build_output_rowset());
}
+
TRACE_COUNTER_INCREMENT("output_rowset_data_size",
_output_rowset->data_disk_size());
TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows());
TRACE_COUNTER_INCREMENT("output_segments_num",
_output_rowset->num_segments());
@@ -326,7 +341,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// TODO(yingchun): do the judge in Tablet class
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
_tablet->set_last_cumu_compaction_success_time(now);
- } else {
+ } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
_tablet->set_last_base_compaction_success_time(now);
}
@@ -364,6 +379,22 @@ Status Compaction::construct_output_rowset_writer(bool
is_vertical) {
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = _cur_tablet_schema;
ctx.newest_write_timestamp = _newest_write_timestamp;
+ if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+ // write output rowset to storage policy resource
+ auto storage_policy = get_storage_policy(_tablet->storage_policy_id());
+ if (storage_policy == nullptr) {
+ return Status::InternalError("could not find storage_policy,
storage_policy_id={}",
+ _tablet->storage_policy_id());
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ if (resource.fs == nullptr) {
+ return Status::InternalError("could not find resource,
resouce_id={}",
+ storage_policy->resource_id);
+ }
+ DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(resource.fs->type() != io::FileSystemType::LOCAL);
+ ctx.fs = std::move(resource.fs);
+ }
if (is_vertical) {
return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer);
}
@@ -404,6 +435,12 @@ Status Compaction::modify_rowsets() {
void Compaction::gc_output_rowset() {
if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
+ if (!_output_rowset->is_local()) {
+ _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(),
+
_output_rowset->rowset_meta()->resource_id(),
+
_output_rowset->num_segments());
+ return;
+ }
StorageEngine::instance()->add_unused_rowset(_output_rowset);
}
}
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 8501df9ef4..d136650d32 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -42,7 +42,7 @@ class Merger;
// 4. gc output rowset if failed
class Compaction {
public:
- Compaction(TabletSharedPtr tablet, const std::string& label);
+ Compaction(const TabletSharedPtr& tablet, const std::string& label);
virtual ~Compaction();
// This is only for http CompactionAction
@@ -64,7 +64,7 @@ protected:
Status do_compaction(int64_t permits);
Status do_compaction_impl(int64_t permits);
- Status modify_rowsets();
+ virtual Status modify_rowsets();
void gc_output_rowset();
Status construct_output_rowset_writer(bool is_vertical = false);
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 765790859c..f6d56d142e 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -24,7 +24,7 @@
namespace doris {
using namespace ErrorCode;
-CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet)
+CumulativeCompaction::CumulativeCompaction(const TabletSharedPtr& tablet)
: Compaction(tablet, "CumulativeCompaction:" +
std::to_string(tablet->tablet_id())) {}
CumulativeCompaction::~CumulativeCompaction() {}
diff --git a/be/src/olap/cumulative_compaction.h
b/be/src/olap/cumulative_compaction.h
index f228e91975..fdfc9dbe46 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -26,7 +26,7 @@ namespace doris {
class CumulativeCompaction : public Compaction {
public:
- CumulativeCompaction(TabletSharedPtr tablet);
+ CumulativeCompaction(const TabletSharedPtr& tablet);
~CumulativeCompaction() override;
Status prepare_compact() override;
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index c2735def03..b38271fd1a 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -149,22 +149,21 @@ void
SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
}
}
-void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
- Tablet* tablet, TabletState state, const
std::vector<RowsetMetaSharedPtr>& all_metas,
- int64_t current_cumulative_point, uint32_t* score) {
+uint32_t
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet*
tablet) {
+ uint32_t score = 0;
bool base_rowset_exist = false;
- const int64_t point = current_cumulative_point;
+ const int64_t point = tablet->cumulative_layer_point();
int64_t promotion_size = 0;
std::vector<RowsetMetaSharedPtr> rowset_to_compact;
int64_t total_size = 0;
- // check the base rowset and collect the rowsets of cumulative part
- auto rs_meta_iter = all_metas.begin();
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
- for (; rs_meta_iter != all_metas.end(); rs_meta_iter++) {
- auto rs_meta = *rs_meta_iter;
+ // NOTE: tablet._meta_lock is hold
+ auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+ // check the base rowset and collect the rowsets of cumulative part
+ for (auto& rs_meta : rs_metas) {
if (rs_meta->start_version() < first_version) {
first_version = rs_meta->start_version();
first_meta = rs_meta;
@@ -173,20 +172,19 @@ void
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
if (rs_meta->start_version() == 0) {
base_rowset_exist = true;
}
- if (rs_meta->end_version() < point) {
+ if (rs_meta->end_version() < point || !rs_meta->is_local()) {
// all_rs_metas() is not sorted, so we use _continue_ other than
_break_ here.
continue;
} else {
// collect the rowsets of cumulative part
total_size += rs_meta->total_disk_size();
- *score += rs_meta->get_compaction_score();
+ score += rs_meta->get_compaction_score();
rowset_to_compact.push_back(rs_meta);
}
}
if (first_meta == nullptr) {
- *score = 0;
- return;
+ return 0;
}
// Use "first"(not base) version to calc promotion size
@@ -195,15 +193,14 @@ void
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
// If base version does not exist, but its state is RUNNING.
// It is abnormal, do not select it and set *score = 0
- if (!base_rowset_exist && state == TABLET_RUNNING) {
+ if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
LOG(WARNING) << "tablet state is running but have no base version";
- *score = 0;
- return;
+ return 0;
}
// if total_size is greater than promotion_size, return total score
if (total_size >= promotion_size) {
- return;
+ return score;
}
// sort the rowsets of cumulative part
@@ -219,11 +216,12 @@ void
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
// if current level less then remain level, score contains current
rowset
// and process return; otherwise, score does not contains current
rowset.
if (current_level <= remain_level) {
- return;
+ return score;
}
total_size -= rs_meta->total_disk_size();
- *score -= rs_meta->get_compaction_score();
+ score -= rs_meta->get_compaction_score();
}
+ return score;
}
int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
diff --git a/be/src/olap/cumulative_compaction_policy.h
b/be/src/olap/cumulative_compaction_policy.h
index 623688290d..20a9d64b61 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -54,9 +54,7 @@ public:
/// param all_rowsets, all rowsets in tablet.
/// param current_cumulative_point, current cumulative point value.
/// return score, the result score after calculate.
- virtual void calc_cumulative_compaction_score(
- Tablet* tablet, TabletState state, const
std::vector<RowsetMetaSharedPtr>& all_rowsets,
- int64_t current_cumulative_point, uint32_t* score) = 0;
+ virtual uint32_t calc_cumulative_compaction_score(Tablet* tablet) = 0;
/// Pick input rowsets from candidate rowsets for compaction. This
function is pure virtual function.
/// Its implementation depends on concrete compaction policy.
@@ -142,10 +140,7 @@ public:
/// Num based cumulative compaction policy implements calc cumulative
compaction score function.
/// Its main policy is calculating the accumulative compaction score after
current cumulative_point in tablet.
- void calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
- const
std::vector<RowsetMetaSharedPtr>& all_rowsets,
- int64_t current_cumulative_point,
- uint32_t* score) override;
+ uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
std::string name() override { return CUMULATIVE_SIZE_BASED_POLICY; }
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 35179caf6d..a624e59a6d 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -175,6 +175,7 @@ enum ReaderType {
READER_BASE_COMPACTION = 2,
READER_CUMULATIVE_COMPACTION = 3,
READER_CHECKSUM = 4,
+ READER_COLD_DATA_COMPACTION = 5,
};
constexpr bool field_is_slice_type(const FieldType& field_type) {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 5f61adea46..707b77db5b 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "io/cache/file_cache_manager.h"
+#include "olap/cold_data_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -88,6 +89,10 @@ Status StorageEngine::start_bg_threads() {
.set_max_threads(config::seg_compaction_max_threads)
.build(&_seg_compaction_thread_pool);
}
+ ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
+ .set_min_threads(config::cold_data_compaction_thread_num)
+ .set_max_threads(config::cold_data_compaction_thread_num)
+ .build(&_cold_data_compaction_thread_pool);
// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
@@ -162,6 +167,12 @@ Status StorageEngine::start_bg_threads() {
&_remove_unused_remote_files_thread));
LOG(INFO) << "remove unused remote files thread started";
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "remove_unused_remote_files_thread",
+ [this]() { this->_cold_data_compaction_producer_callback(); },
+ &_remove_unused_remote_files_thread));
+ LOG(INFO) << "cold data compaction producer thread started";
+
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "cache_file_cleaner_tasks_producer_thread",
[this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
@@ -755,6 +766,107 @@ void
StorageEngine::_remove_unused_remote_files_callback() {
}
}
+void StorageEngine::_cold_data_compaction_producer_callback() {
+ std::unordered_set<int64_t> tablet_submitted;
+ std::mutex tablet_submitted_mtx;
+
+ while (!_stop_background_threads_latch.wait_for(
+ std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
+ if (config::disable_auto_compaction) {
+ continue;
+ }
+
+ std::unordered_set<int64_t> copied_tablet_submitted;
+ {
+ std::lock_guard lock(tablet_submitted_mtx);
+ copied_tablet_submitted = tablet_submitted;
+ }
+ int n = config::cold_data_compaction_thread_num -
copied_tablet_submitted.size();
+ if (n <= 0) {
+ continue;
+ }
+ auto tablets =
_tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING &&
+ !copied_tablet_submitted.count(t->tablet_id()) &&
+
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+ });
+ std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
+ tablet_to_compact.reserve(n + 1);
+ std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
+ tablet_to_follow.reserve(n + 1);
+
+ for (auto& t : tablets) {
+ if (t->replica_id() == t->cooldown_replica_id()) {
+ auto score = t->calc_cold_data_compaction_score();
+ if (score < 4) {
+ continue;
+ }
+ tablet_to_compact.emplace_back(t, score);
+ std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
+ [](auto& a, auto& b) { return a.second > b.second;
});
+ if (tablet_to_compact.size() > n) tablet_to_compact.pop_back();
+ continue;
+ }
+ // else, need to follow
+ {
+ std::lock_guard lock(_running_cooldown_mutex);
+ if (_running_cooldown_tablets.count(t->table_id())) {
+ // already in cooldown queue
+ continue;
+ }
+ }
+ // TODO(plat1ko): some avoidance strategy if failed to follow
+ auto score = t->calc_cold_data_compaction_score();
+ tablet_to_follow.emplace_back(t, score);
+ std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
+ [](auto& a, auto& b) { return a.second > b.second; });
+ if (tablet_to_follow.size() > n) tablet_to_follow.pop_back();
+ }
+
+ for (auto& [tablet, score] : tablet_to_compact) {
+ LOG(INFO) << "submit cold data compaction. tablet_id=" <<
tablet->tablet_id()
+ << " score=" << score;
+ _cold_data_compaction_thread_pool->submit_func([&, t =
std::move(tablet)]() {
+ auto compaction = std::make_shared<ColdDataCompaction>(t);
+ {
+ std::lock_guard lock(tablet_submitted_mtx);
+ tablet_submitted.insert(t->tablet_id());
+ }
+ auto st = compaction->compact();
+ {
+ std::lock_guard lock(tablet_submitted_mtx);
+ tablet_submitted.erase(t->tablet_id());
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to do cold data compaction.
tablet_id="
+ << t->tablet_id() << " err=" << st;
+ }
+ });
+ }
+
+ for (auto& [tablet, score] : tablet_to_follow) {
+ LOG(INFO) << "submit to follow cooldown meta. tablet_id=" <<
tablet->tablet_id()
+ << " score=" << score;
+ _cold_data_compaction_thread_pool->submit_func([&, t =
std::move(tablet)]() {
+ {
+ std::lock_guard lock(tablet_submitted_mtx);
+ tablet_submitted.insert(t->tablet_id());
+ }
+ auto st = t->cooldown();
+ {
+ std::lock_guard lock(tablet_submitted_mtx);
+ tablet_submitted.erase(t->tablet_id());
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to cooldown. tablet_id=" <<
t->tablet_id()
+ << " err=" << st;
+ }
+ });
+ }
+ }
+}
+
void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
int64_t interval = config::generate_cache_cleaner_task_interval_sec;
do {
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 8d59085908..e201d49e68 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -308,6 +308,7 @@ Status TabletReader::_init_return_columns(const
ReaderParams& read_params) {
VLOG_NOTICE << "return column is empty, using full column as default.";
} else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION ||
read_params.reader_type == READER_BASE_COMPACTION ||
+ read_params.reader_type == READER_COLD_DATA_COMPACTION ||
read_params.reader_type == READER_ALTER_TABLE) &&
!read_params.return_columns.empty()) {
_return_columns = read_params.return_columns;
@@ -592,11 +593,12 @@ Status TabletReader::_init_delete_condition(const
ReaderParams& read_params) {
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
return Status::OK();
}
- // Only BASE_COMPACTION need set filter_delete = true
+ // Only BASE_COMPACTION and COLD_DATA_COMPACTION need set filter_delete =
true
// other reader type:
// QUERY will filter the row in query layer to keep right result use where
clause.
// CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
if (read_params.reader_type == READER_BASE_COMPACTION ||
+ read_params.reader_type == READER_COLD_DATA_COMPACTION ||
read_params.reader_type == READER_CHECKSUM) {
_filter_delete = true;
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 768e95daf9..3fe49c3246 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -274,6 +274,7 @@ private:
void _cooldown_tasks_producer_callback();
void _remove_unused_remote_files_callback();
+ void _cold_data_compaction_producer_callback();
void _cache_file_cleaner_tasks_producer_callback();
@@ -372,6 +373,7 @@ private:
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
+ std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
@@ -396,6 +398,7 @@ private:
scoped_refptr<Thread> _cooldown_tasks_producer_thread;
scoped_refptr<Thread> _remove_unused_remote_files_thread;
+ scoped_refptr<Thread> _cold_data_compaction_producer_thread;
scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8748c67c86..a931b6c81b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -418,6 +418,38 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
return Status::OK();
}
+void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ rs_metas.reserve(to_add.size());
+ for (auto& rs : to_add) {
+ _rs_version_map.emplace(rs->version(), rs);
+ _timestamped_version_tracker.add_version(rs->version());
+ rs_metas.push_back(rs->rowset_meta());
+ }
+ _tablet_meta->modify_rs_metas(rs_metas, {});
+}
+
+void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
bool move_to_stale) {
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ rs_metas.reserve(to_delete.size());
+ for (auto& rs : to_delete) {
+ rs_metas.push_back(rs->rowset_meta());
+ _rs_version_map.erase(rs->version());
+ }
+ _tablet_meta->modify_rs_metas({}, rs_metas, !move_to_stale);
+ if (move_to_stale) {
+ for (auto& rs : to_delete) {
+ _stale_rs_version_map[rs->version()] = rs;
+ }
+ _timestamped_version_tracker.add_stale_path_version(rs_metas);
+ } else {
+ for (auto& rs : to_delete) {
+ _timestamped_version_tracker.delete_version(rs->version());
+ StorageEngine::instance()->add_unused_rowset(rs);
+ }
+ }
+}
+
// snapshot manager may call this api to check if version exists, so that
// the version maybe not exist
const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version,
@@ -854,6 +886,32 @@ uint32_t Tablet::calc_compaction_score(
}
}
+uint32_t Tablet::calc_cold_data_compaction_score() const {
+ uint32_t score = 0;
+ std::vector<RowsetMetaSharedPtr> cooldowned_rowsets;
+ int64_t max_delete_version = 0;
+ {
+ std::shared_lock rlock(_meta_lock);
+ for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+ cooldowned_rowsets.push_back(rs_meta);
+ if (rs_meta->has_delete_predicate() &&
+ rs_meta->end_version() > max_delete_version) {
+ max_delete_version = rs_meta->end_version();
+ }
+ }
+ }
+ }
+ for (auto& rs_meta : cooldowned_rowsets) {
+ if (rs_meta->end_version() < max_delete_version) {
+ score += rs_meta->num_segments();
+ } else {
+ score += rs_meta->get_compaction_score();
+ }
+ }
+ return (keys_type() != KeysType::DUP_KEYS) ? score * 2 : score;
+}
+
uint32_t Tablet::_calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy) {
#ifndef BE_TEST
@@ -862,10 +920,7 @@ uint32_t Tablet::_calc_cumulative_compaction_score(
_cumulative_compaction_policy = cumulative_compaction_policy;
}
#endif
- uint32_t score = 0;
- _cumulative_compaction_policy->calc_cumulative_compaction_score(
- this, tablet_state(), _tablet_meta->all_rs_metas(),
cumulative_layer_point(), &score);
- return score;
+ return
_cumulative_compaction_policy->calc_cumulative_compaction_score(this);
}
uint32_t Tablet::_calc_base_compaction_score() const {
@@ -876,7 +931,7 @@ uint32_t Tablet::_calc_base_compaction_score() const {
if (rs_meta->start_version() == 0) {
base_rowset_exist = true;
}
- if (rs_meta->start_version() >= point) {
+ if (rs_meta->start_version() >= point || !rs_meta->is_local()) {
// all_rs_metas() is not sorted, so we use _continue_ other than
_break_ here.
continue;
}
@@ -1724,11 +1779,16 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
{
std::unique_lock meta_wlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
- modify_rowsets(to_add, to_delete);
+ delete_rowsets({std::move(old_rowset)}, false);
+ add_rowsets({std::move(new_rowset)});
+ // TODO(plat1ko): process primary key
_tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
- save_meta();
}
}
+ {
+ std::unique_lock meta_rlock(_meta_lock);
+ save_meta();
+ }
return Status::OK();
}
@@ -1802,66 +1862,59 @@ Status
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
std::vector<RowsetSharedPtr> overlap_rowsets;
bool version_aligned = false;
- std::lock_guard wlock(_meta_lock);
- if (tablet_state() != TABLET_RUNNING) {
- return Status::InternalError("tablet not running");
- }
+ {
+ std::lock_guard wlock(_meta_lock);
+ if (tablet_state() != TABLET_RUNNING) {
+ return Status::InternalError("tablet not running");
+ }
- for (auto& [v, rs] : _rs_version_map) {
- if (v.second == cooldowned_version) {
- version_aligned = true;
- break;
+ for (auto& [v, rs] : _rs_version_map) {
+ if (v.second == cooldowned_version) {
+ version_aligned = true;
+ break;
+ }
}
- }
- if (!version_aligned) {
- return Status::InternalError("cooldowned version is not aligned");
- }
- for (auto& [v, rs] : _rs_version_map) {
- if (v.second <= cooldowned_version) {
- overlap_rowsets.push_back(rs);
- } else if (!rs->is_local()) {
- return Status::InternalError("cooldowned version larger than that
to follow");
+ if (!version_aligned) {
+ return Status::InternalError("cooldowned version is not aligned");
}
- }
- std::sort(overlap_rowsets.begin(), overlap_rowsets.end(),
Rowset::comparator);
- auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
- auto rs_it = overlap_rowsets.begin();
- for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it !=
overlap_rowsets.end();
- ++rs_pb_it, ++rs_it) {
- // skip cooldowned rowset with same version in BE
- if ((*rs_it)->is_local() || rs_pb_it->end_version() !=
(*rs_it)->end_version()) {
- break;
+ for (auto& [v, rs] : _rs_version_map) {
+ if (v.second <= cooldowned_version) {
+ overlap_rowsets.push_back(rs);
+ } else if (!rs->is_local()) {
+ return Status::InternalError("cooldowned version larger than
that to follow");
+ }
}
+ std::sort(overlap_rowsets.begin(), overlap_rowsets.end(),
Rowset::comparator);
+ auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
+ auto rs_it = overlap_rowsets.begin();
+ for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it !=
overlap_rowsets.end();
+ ++rs_pb_it, ++rs_it) {
+ // skip cooldowned rowset with same version in BE
+ if ((*rs_it)->is_local() || rs_pb_it->end_version() !=
(*rs_it)->end_version()) {
+ break;
+ }
+ }
+ std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end());
+ std::vector<RowsetSharedPtr> to_add;
+ to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
+ for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->init_from_pb(*rs_pb_it);
+ RowsetSharedPtr rs;
+ RowsetFactory::create_rowset(_schema, _tablet_path,
std::move(rs_meta), &rs);
+ to_add.push_back(std::move(rs));
+ }
+ // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets`
cannot process version graph correctly.
+ delete_rowsets(to_delete, false);
+ add_rowsets(to_add);
+ // TODO(plat1ko): process primary key
+
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
}
- // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets`
cannot process version graph correctly.
- std::vector<RowsetMetaSharedPtr> to_delete;
- to_delete.reserve(overlap_rowsets.end() - rs_it);
- for (; rs_it != overlap_rowsets.end(); ++rs_it) {
- _rs_version_map.erase((*rs_it)->version());
- to_delete.push_back((*rs_it)->rowset_meta());
- _timestamped_version_tracker.delete_version((*rs_it)->version());
- StorageEngine::instance()->add_unused_rowset(*rs_it);
- }
-
- std::vector<RowsetMetaSharedPtr> to_add;
- to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
- for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
- auto rs_meta = std::make_shared<RowsetMeta>();
- rs_meta->init_from_pb(*rs_pb_it);
- RowsetSharedPtr rowset;
- RowsetFactory::create_rowset(_schema, _tablet_path,
std::move(rs_meta), &rowset);
- _rs_version_map.emplace(rowset->version(), rowset);
- to_add.push_back(rowset->rowset_meta());
- _timestamped_version_tracker.add_version(rowset->version());
+ {
+ std::lock_guard rlock(_meta_lock);
+ save_meta();
}
- _tablet_meta->modify_rs_metas(to_add, to_delete);
-
- // TODO(plat1ko): process primary key
-
- _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
- save_meta();
-
return Status::OK();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index dc0034f065..f06463dcb3 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -293,9 +293,16 @@ public:
Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr*
rowset);
+ // MUST hold EXCLUSIVE `_meta_lock`
+ void add_rowsets(const std::vector<RowsetSharedPtr>& to_add);
+ // MUST hold EXCLUSIVE `_meta_lock`
+ void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool
move_to_stale);
+
////////////////////////////////////////////////////////////////////////////
// begin cooldown functions
////////////////////////////////////////////////////////////////////////////
+ int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
+
// Cooldown to remote fs.
Status cooldown();
@@ -320,6 +327,10 @@ public:
int64_t num_segments);
static void remove_unused_remote_files();
+
+ std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; }
+
+ uint32_t calc_cold_data_compaction_score() const;
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 8b147216a8..96adc4baad 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -146,7 +146,7 @@ public:
bool in_restore_mode() const;
void set_in_restore_mode(bool in_restore_mode);
- TabletSchemaSPtr tablet_schema() const;
+ const TabletSchemaSPtr& tablet_schema() const;
const TabletSchemaSPtr tablet_schema(Version version) const;
@@ -529,7 +529,7 @@ inline void TabletMeta::set_in_restore_mode(bool
in_restore_mode) {
_in_restore_mode = in_restore_mode;
}
-inline TabletSchemaSPtr TabletMeta::tablet_schema() const {
+inline const TabletSchemaSPtr& TabletMeta::tablet_schema() const {
return _schema;
}
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index f173c6b3da..1f86ca81a2 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -17,6 +17,8 @@
#include "vec/olap/vertical_merge_iterator.h"
+#include "olap/olap_common.h"
+
namespace doris {
using namespace ErrorCode;
@@ -137,6 +139,8 @@ Status RowSourcesBuffer::_create_buffer_file() {
file_path_ss << "_base";
} else if (_reader_type == READER_CUMULATIVE_COMPACTION) {
file_path_ss << "_cumu";
+ } else if (_reader_type == READER_COLD_DATA_COMPACTION) {
+ file_path_ss << "_cold";
} else {
DCHECK(false);
return Status::InternalError("unknown reader type");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]