This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cc5fa509ad [fix](cooldown) Fix bug in concurrent
`update_cooldown_conf` and operations that update cooldowned data (#17086)
cc5fa509ad is described below
commit cc5fa509ad0059bc337dba84858e907782baa9d0
Author: plat1ko <[email protected]>
AuthorDate: Fri Mar 3 14:36:58 2023 +0800
[fix](cooldown) Fix bug in concurrent `update_cooldown_conf` and operations
that update cooldowned data (#17086)
---
be/src/agent/task_worker_pool.cpp | 1 +
be/src/olap/cold_data_compaction.cpp | 17 +--
be/src/olap/olap_define.h | 11 ++
be/src/olap/olap_server.cpp | 2 +-
be/src/olap/rowset/beta_rowset.cpp | 10 --
be/src/olap/rowset/beta_rowset.h | 4 -
be/src/olap/tablet.cpp | 155 ++++++++++++---------
be/src/olap/tablet.h | 44 ++++--
.../java/org/apache/doris/catalog/Replica.java | 9 ++
.../apache/doris/catalog/TabletInvertedIndex.java | 5 +-
.../org/apache/doris/master/ReportHandler.java | 38 +++--
.../apache/doris/service/FrontendServiceImpl.java | 12 +-
gensrc/thrift/MasterService.thrift | 2 +-
13 files changed, 192 insertions(+), 118 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 3ed931ddaf..4056e0d176 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1848,6 +1848,7 @@ void
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
}
tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
cooldown_conf.cooldown_replica_id);
+ // TODO(AlexYue): if `update_cooldown_conf` success, async call
`write_cooldown_meta`
}
}
}
diff --git a/be/src/olap/cold_data_compaction.cpp
b/be/src/olap/cold_data_compaction.cpp
index 65b2330dc7..9fbeef8245 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -45,6 +45,10 @@ Status ColdDataCompaction::execute_compact_impl() {
#endif
SCOPED_ATTACH_TASK(_mem_tracker);
int64_t permits = get_compaction_permits();
+ std::shared_lock cooldown_conf_rlock(_tablet->get_cooldown_conf_lock());
+ if (_tablet->cooldown_conf_unlocked().first != _tablet->replica_id()) {
+ return Status::Aborted("this replica is not cooldown replica");
+ }
RETURN_IF_ERROR(do_compaction(permits));
_state = CompactionState::SUCCESS;
return Status::OK();
@@ -62,16 +66,6 @@ Status ColdDataCompaction::pick_rowsets_to_compact() {
Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
UniqueId cooldown_meta_id = UniqueId::gen_uid();
-
- // write remote tablet meta
- std::shared_ptr<io::RemoteFileSystem> fs;
- RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs));
- std::vector<RowsetMetaSharedPtr> to_deletes;
- for (auto& rs : _input_rowsets) {
- to_deletes.emplace_back(rs->rowset_meta());
- }
- RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id,
-
_output_rowset->rowset_meta(), to_deletes));
{
std::lock_guard wlock(_tablet->get_header_lock());
// Merged cooldowned rowsets MUST NOT be managed by version graph,
they will be reclaimed by `remove_unused_remote_files`.
@@ -85,6 +79,9 @@ Status ColdDataCompaction::modify_rowsets(const
Merger::Statistics* stats) {
std::shared_lock rlock(_tablet->get_header_lock());
_tablet->save_meta();
}
+ // write remote tablet meta
+ // TODO(AlexYue): async call `write_cooldown_meta`
+ RETURN_IF_ERROR(_tablet->write_cooldown_meta());
return Status::OK();
}
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 367d0d8ba7..fbc4f24557 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -98,6 +98,17 @@ static const std::string PENDING_DELTA_PREFIX =
"pending_delta";
static const std::string INCREMENTAL_DELTA_PREFIX = "incremental_delta";
static const std::string CLONE_PREFIX = "clone";
+// define paths
+static inline std::string remote_tablet_path(int64_t tablet_id) {
+ // data/{tablet_id}
+ return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
+}
+static inline std::string remote_tablet_meta_path(int64_t tablet_id, int64_t
replica_id,
+ int64_t cooldown_term) {
+ // data/{tablet_id}/{replica_id}.{cooldown_term}.meta
+ return fmt::format("{}/{}.{}.meta", remote_tablet_path(tablet_id),
replica_id, cooldown_term);
+}
+
static const std::string TABLET_UID = "tablet_uid";
static const std::string STORAGE_NAME = "storage_name";
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 36e67bc1f9..e621e893f8 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -797,7 +797,7 @@ void
StorageEngine::_cold_data_compaction_producer_callback() {
tablet_to_follow.reserve(n + 1);
for (auto& t : tablets) {
- if (t->replica_id() == t->cooldown_replica_id()) {
+ if (t->replica_id() == t->cooldown_conf_unlocked().first) {
auto score = t->calc_cold_data_compaction_score();
if (score < 4) {
continue;
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 26d4b0b7ce..4f459d8365 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -69,16 +69,6 @@ std::string BetaRowset::segment_file_path(const std::string&
rowset_dir, const R
return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(),
segment_id);
}
-std::string BetaRowset::remote_tablet_path(int64_t tablet_id) {
- // data/{tablet_id}
- return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
-}
-
-std::string BetaRowset::remote_tablet_meta_path(int64_t tablet_id, int64_t
replica_id) {
- // data/{tablet_id}/{replica_id}.meta
- return fmt::format("{}/{}.meta", remote_tablet_path(tablet_id),
replica_id);
-}
-
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId&
rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index c2a86f13c8..5401c60860 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -61,10 +61,6 @@ public:
static std::string remote_segment_path(int64_t tablet_id, const
std::string& rowset_id,
int segment_id);
- static std::string remote_tablet_path(int64_t tablet_id);
-
- static std::string remote_tablet_meta_path(int64_t tablet_id, int64_t
replica_id);
-
Status remove() override;
Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 62d8ceae60..0563e3dd76 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1409,13 +1409,14 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
- if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id()
> 0) {
- tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
+ if (_tablet_meta->cooldown_meta_id().initialized()) { // has cooldowned
data
tablet_info->__set_cooldown_term(_cooldown_term);
- }
- if (_tablet_meta->cooldown_meta_id().initialized()) {
tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift());
}
+ if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id()
> 0) {
+ // tablet may not have cooldowned data, but the storage policy is set
+ tablet_info->__set_cooldown_term(_cooldown_term);
+ }
}
// should use this method to get a copy of current tablet meta
@@ -1633,7 +1634,7 @@ void
Tablet::_init_context_common_fields(RowsetWriterContext& context) {
context.rowset_type = StorageEngine::instance()->default_rowset_type();
}
if (context.fs != nullptr && context.fs->type() !=
io::FileSystemType::LOCAL) {
- context.rowset_dir = BetaRowset::remote_tablet_path(tablet_id());
+ context.rowset_dir = remote_tablet_path(tablet_id());
} else {
context.rowset_dir = tablet_path();
}
@@ -1648,37 +1649,38 @@ Status Tablet::create_rowset(const RowsetMetaSharedPtr&
rowset_meta, RowsetShare
Status Tablet::cooldown() {
std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
- LOG(WARNING) << "Failed to own schema_change_lock. tablet=" <<
tablet_id();
- return Status::Error<TRY_LOCK_FAILED>();
+ return Status::Error<TRY_LOCK_FAILED>("try schema_change_lock failed");
}
// Check executing serially with compaction task.
std::unique_lock base_compaction_lock(_base_compaction_lock,
std::try_to_lock);
if (!base_compaction_lock.owns_lock()) {
- LOG(WARNING) << "Failed to own base_compaction_lock. tablet=" <<
tablet_id();
- return Status::Error<TRY_LOCK_FAILED>();
+ return Status::Error<TRY_LOCK_FAILED>("try base_compaction_lock
failed");
}
std::unique_lock cumu_compaction_lock(_cumulative_compaction_lock,
std::try_to_lock);
if (!cumu_compaction_lock.owns_lock()) {
- LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" <<
tablet_id();
- return Status::Error<TRY_LOCK_FAILED>();
+ return Status::Error<TRY_LOCK_FAILED>("try cumu_compaction_lock
failed");
}
- int64_t cooldown_replica_id = _cooldown_replica_id;
- if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
+ std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock);
+ if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
return Status::InternalError("invalid cooldown_replica_id");
}
- std::shared_ptr<io::RemoteFileSystem> fs;
- RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
-
- if (cooldown_replica_id == replica_id()) {
- RETURN_IF_ERROR(_cooldown_data(fs));
+ if (_cooldown_replica_id == replica_id()) {
+ // this replica is cooldown replica
+ RETURN_IF_ERROR(_cooldown_data());
} else {
- RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id));
+ // try to follow cooldowned data from cooldown replica
+ RETURN_IF_ERROR(_follow_cooldowned_data());
}
return Status::OK();
}
-Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>&
dest_fs) {
+// hold SHARED `cooldown_conf_lock`
+Status Tablet::_cooldown_data() {
+ DCHECK(_cooldown_replica_id == replica_id());
+
+ std::shared_ptr<io::RemoteFileSystem> dest_fs;
+ RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs));
auto old_rowset = pick_cooldown_rowset();
if (!old_rowset) {
return Status::InternalError("cannot pick cooldown rowset in tablet
{}", tablet_id());
@@ -1711,13 +1713,6 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
UniqueId cooldown_meta_id = UniqueId::gen_uid();
-
- // upload cooldowned rowset meta to remote fs
- st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {});
- if (!st.ok()) {
- return st;
- }
-
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
@@ -1735,13 +1730,17 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
std::unique_lock meta_rlock(_meta_lock);
save_meta();
}
+ // upload cooldowned rowset meta to remote fs
+ // TODO(AlexYue): async call `write_cooldown_meta`
+ RETURN_IF_ERROR(write_cooldown_meta());
return Status::OK();
}
+// hold SHARED `cooldown_conf_lock`
Status Tablet::_read_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>& fs,
- int64_t cooldown_replica_id, TabletMetaPB*
tablet_meta_pb) {
+ TabletMetaPB* tablet_meta_pb) {
std::string remote_meta_path =
- BetaRowset::remote_tablet_meta_path(tablet_id(),
cooldown_replica_id);
+ remote_tablet_meta_path(tablet_id(), _cooldown_replica_id,
_cooldown_term);
IOContext io_ctx;
io::FileReaderSPtr tablet_meta_reader;
RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader,
&io_ctx));
@@ -1756,46 +1755,50 @@ Status Tablet::_read_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>&
return Status::OK();
}
-Status Tablet::write_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>& fs,
- UniqueId cooldown_meta_id,
- const RowsetMetaSharedPtr& new_rs_meta,
- const std::vector<RowsetMetaSharedPtr>&
to_deletes) {
- std::unordered_set<Version, HashOfVersion> to_delete_set;
- for (auto& rs_meta : to_deletes) {
- to_delete_set.emplace(rs_meta->version());
+// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
+Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>&
rs_metas) {
+ if (rs_metas.size() < 2) {
+ return Status::OK();
+ }
+ auto prev = rs_metas.begin();
+ for (auto it = rs_metas.begin() + 1; it != rs_metas.end(); ++it) {
+ if ((*prev)->end_version() + 1 != (*it)->start_version()) {
+ return Status::InternalError("versions are not continuity: prev={}
cur={}",
+ (*prev)->version().to_string(),
+ (*it)->version().to_string());
+ }
+ prev = it;
+ }
+ return Status::OK();
+}
+
+Status Tablet::write_cooldown_meta() {
+ auto [cooldown_replica_id, cooldown_term] = cooldown_conf();
+ if (cooldown_replica_id != replica_id()) {
+ return Status::Aborted("this replica is not cooldown replica");
}
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
+
std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
+ UniqueId cooldown_meta_id;
{
std::shared_lock meta_rlock(_meta_lock);
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (!rs_meta->is_local()) {
- if (to_delete_set.find(rs_meta->version()) !=
to_delete_set.end()) {
- continue;
- }
- cooldowned_rs_metas.emplace_back(rs_meta);
+ cooldowned_rs_metas.push_back(rs_meta);
}
}
+ cooldown_meta_id = _tablet_meta->cooldown_meta_id();
}
- cooldowned_rs_metas.emplace_back(new_rs_meta);
- std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(),
RowsetMeta::comparator);
-
- // check_version_continuity
- if (!cooldowned_rs_metas.empty()) {
- RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front();
- for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) {
- RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i];
- if (rowset_meta->start_version() !=
prev_rowset_meta->end_version() + 1) {
- LOG(WARNING) << "There are missed versions among rowsets. "
- << "prev_rowset_meta version=" <<
prev_rowset_meta->start_version()
- << "-" << prev_rowset_meta->end_version()
- << ", rowset_meta version=" <<
rowset_meta->start_version() << "-"
- << rowset_meta->end_version();
- return Status::Error<CUMULATIVE_MISS_VERSION>();
- }
- prev_rowset_meta = rowset_meta;
- }
+ if (cooldowned_rs_metas.empty()) {
+ LOG(INFO) << "no cooldown meta to write, tablet_id=" << tablet_id();
+ return Status::OK();
}
+ std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(),
RowsetMeta::comparator);
+ DCHECK(cooldowned_rs_metas.front()->start_version() == 0);
+ RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas));
TabletMetaPB tablet_meta_pb;
auto rs_metas = tablet_meta_pb.mutable_rs_metas();
@@ -1807,7 +1810,7 @@ Status Tablet::write_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>&
tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
std::string remote_meta_path =
- BetaRowset::remote_tablet_meta_path(tablet_id(),
_tablet_meta->replica_id());
+ remote_tablet_meta_path(tablet_id(), cooldown_replica_id,
cooldown_term);
io::FileWriterPtr tablet_meta_writer;
RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
auto val = tablet_meta_pb.SerializeAsString();
@@ -1815,11 +1818,15 @@ Status Tablet::write_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>&
return tablet_meta_writer->close();
}
-Status Tablet::_follow_cooldowned_data(const
std::shared_ptr<io::RemoteFileSystem>& fs,
- int64_t cooldown_replica_id) {
+// hold SHARED `cooldown_conf_lock`
+Status Tablet::_follow_cooldowned_data() {
+ DCHECK(_cooldown_replica_id != replica_id());
LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
- << " cooldown_replica_id=" << cooldown_replica_id
+ << " cooldown_replica_id=" << _cooldown_replica_id
<< " local replica=" << replica_id();
+
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
// MUST executing serially with cold data compaction, because compaction
input rowsets may be deleted by this function
std::unique_lock cold_compaction_lock(_cold_compaction_lock,
std::try_to_lock);
if (!cold_compaction_lock.owns_lock()) {
@@ -1827,7 +1834,7 @@ Status Tablet::_follow_cooldowned_data(const
std::shared_ptr<io::RemoteFileSyste
}
TabletMetaPB cooldown_meta_pb;
- RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id,
&cooldown_meta_pb));
+ RETURN_IF_ERROR(_read_cooldown_meta(fs, &cooldown_meta_pb));
DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
if (_tablet_meta->cooldown_meta_id() ==
cooldown_meta_pb.cooldown_meta_id()) {
// cooldowned rowsets are same, no need to follow
@@ -2047,11 +2054,18 @@ void Tablet::remove_unused_remote_files() {
DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
- Status st;
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ auto st = get_remote_file_system(t->storage_policy_id(), &fs);
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ return;
+ }
+
std::vector<io::Path> files;
// FIXME(plat1ko): What if user reset resource in storage policy to
another resource?
// Maybe we should also list files in previously uploaded resources.
- st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ st = dest_fs->list(remote_tablet_path(t->tablet_id()), &files);
if (!st.ok()) {
LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
<< t->tablet_id() << " : " << st;
@@ -2075,8 +2089,13 @@ void Tablet::remove_unused_remote_files() {
}
cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
}
- // {replica_id}.meta
- std::string remote_meta_path = std::to_string(t->replica_id()) +
".meta";
+ auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
+ if (cooldown_replica_id != t->replica_id()) {
+ return;
+ }
+ // {cooldown_replica_id}.{cooldown_term}.meta
+ std::string remote_meta_path =
+ fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term);
// filter out the paths that should be reserved
// clang-format off
files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path&
path) {
@@ -2111,7 +2130,7 @@ void Tablet::remove_unused_remote_files() {
buffer.insert({t->tablet_id(), {std::move(dest_fs),
std::move(files)}});
auto& info = req.confirm_list.emplace_back();
info.__set_tablet_id(t->tablet_id());
- info.__set_cooldown_replica_id(t->replica_id());
+ info.__set_cooldown_replica_id(cooldown_replica_id);
info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
};
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a4114d2ca5..c14f27e748 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -17,6 +17,7 @@
#pragma once
+#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
@@ -308,8 +309,6 @@ public:
////////////////////////////////////////////////////////////////////////////
// begin cooldown functions
////////////////////////////////////////////////////////////////////////////
- int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
-
// Cooldown to remote fs.
Status cooldown();
@@ -317,7 +316,22 @@ public:
bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
- void update_cooldown_conf(int64_t cooldown_term, int64_t
cooldown_replica_id) {
+ std::pair<int64_t, int64_t> cooldown_conf() const {
+ std::shared_lock rlock(_cooldown_conf_lock);
+ return {_cooldown_replica_id, _cooldown_term};
+ }
+
+ std::pair<int64_t, int64_t> cooldown_conf_unlocked() const {
+ return {_cooldown_replica_id, _cooldown_term};
+ }
+
+ // return true if update success
+ bool update_cooldown_conf(int64_t cooldown_term, int64_t
cooldown_replica_id) {
+ std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
+ if (!wlock.owns_lock()) {
+ LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" <<
tablet_id();
+ return false;
+ }
if (cooldown_term > _cooldown_term) {
LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
<< " cooldown_replica_id: " << _cooldown_replica_id << "
-> "
@@ -325,7 +339,9 @@ public:
<< cooldown_term;
_cooldown_replica_id = cooldown_replica_id;
_cooldown_term = cooldown_term;
+ return true;
}
+ return false;
}
Status remove_all_remote_rowsets();
@@ -344,6 +360,10 @@ public:
uint32_t calc_cold_data_compaction_score() const;
std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
+
+ std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
+
+ Status write_cooldown_meta();
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
@@ -412,10 +432,6 @@ public:
return config::max_tablet_io_errors > 0 && _io_error_times >=
config::max_tablet_io_errors;
}
- Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
- UniqueId cooldown_meta_id, const
RowsetMetaSharedPtr& new_rs_meta,
- const std::vector<RowsetMetaSharedPtr>&
to_deletes);
-
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
@@ -455,11 +471,10 @@ private:
////////////////////////////////////////////////////////////////////////////
// begin cooldown functions
////////////////////////////////////////////////////////////////////////////
- Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>&
dest_fs);
- Status _follow_cooldowned_data(const
std::shared_ptr<io::RemoteFileSystem>& fs,
- int64_t cooldown_replica_id);
+ Status _cooldown_data();
+ Status _follow_cooldowned_data();
Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
- int64_t cooldown_replica_id, TabletMetaPB*
tablet_meta_pb);
+ TabletMetaPB* tablet_meta_pb);
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
@@ -539,6 +554,13 @@ private:
// cooldown related
int64_t _cooldown_replica_id = -1;
int64_t _cooldown_term = -1;
+ // `_cooldown_conf_lock` is used to serialize update cooldown conf and all
operations that:
+ // 1. read cooldown conf
+ // 2. upload rowsets to remote storage
+ // 3. update cooldown meta id
+ mutable std::shared_mutex _cooldown_conf_lock;
+ // `_cold_compaction_lock` is used to serialize cold data compaction and
all operations that
+ // may delete compaction input rowsets.
std::mutex _cold_compaction_lock;
DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index fb8b834787..0f694348ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -111,6 +111,7 @@ public class Replica implements Writable {
private boolean bad = false;
private TUniqueId cooldownMetaId;
+ private long cooldownTerm = -1;
/*
* If set to true, with means this replica need to be repaired. explicitly.
@@ -246,6 +247,14 @@ public class Replica implements Writable {
this.cooldownMetaId = cooldownMetaId;
}
+ public long getCooldownTerm() {
+ return cooldownTerm;
+ }
+
+ public void setCooldownTerm(long cooldownTerm) {
+ this.cooldownTerm = cooldownTerm;
+ }
+
public boolean needFurtherRepair() {
if (needFurtherRepair && System.currentTimeMillis() -
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 726e86544a..2068b93755 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -190,10 +190,11 @@ public class TabletInvertedIndex {
}
}
- if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownReplicaId()) {
+ if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownTerm()) {
handleCooldownConf(tabletMeta,
backendTabletInfo, cooldownConfToPush,
cooldownConfToUpdate);
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
+
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
}
long partitionId = tabletMeta.getPartitionId();
@@ -395,7 +396,7 @@ public class TabletInvertedIndex {
return;
}
- if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) {
+ if (beTabletInfo.getCooldownTerm() < cooldownConf.second) {
CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id,
cooldownConf.first, cooldownConf.second);
synchronized (cooldownConfToPush) {
cooldownConfToPush.add(conf);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 1f4a8715e9..ec22328c55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -1147,18 +1147,40 @@ public class ReportHandler extends Daemon {
if (backendTabletInfo.isSetCooldownMetaId()) {
// replica has cooldowned data
do {
- if (backendTabletInfo.getReplicaId() ==
tablet.getCooldownConf().first) {
+ Pair<Long, Long> cooldownConf =
tablet.getCooldownConf();
+ if (backendTabletInfo.getCooldownTerm() >
cooldownConf.second) {
+ // should not be here
+ LOG.warn("report cooldownTerm({}) > cooldownTerm
in TabletMeta({}), tabletId={}",
+ backendTabletInfo.getCooldownTerm(),
cooldownConf.second, tabletId);
+ return false;
+ }
+ if (backendTabletInfo.getReplicaId() ==
cooldownConf.first) {
// this replica is true cooldown replica, so
replica's cooldowned data must not be deleted
break;
}
- if (backendTabletInfo.getReplicaId() !=
backendTabletInfo.getCooldownReplicaId()
- &&
Env.getCurrentInvertedIndex().getReplicas(tabletId).stream()
- .anyMatch(r ->
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
- // this replica can not cooldown data, and shares
same cooldowned data with others replica,
- // so replica's cooldowned data must not be deleted
- break;
+ List<Replica> replicas =
Env.getCurrentInvertedIndex().getReplicas(tabletId);
+ if (backendTabletInfo.getCooldownTerm() <= 0) {
+ if (replicas.stream().anyMatch(
+ r ->
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
+ // this backend is just restarted, and shares
same cooldowned data with others replica,
+ // so replica's cooldowned data must not be
deleted
+ break;
+ }
+ }
+ long minCooldownTerm = Long.MAX_VALUE;
+ for (Replica r : replicas) {
+ minCooldownTerm = Math.min(r.getCooldownTerm(),
minCooldownTerm);
+ }
+ if (backendTabletInfo.getCooldownTerm() >=
minCooldownTerm) {
+ if (replicas.stream().anyMatch(
+ r ->
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
+ // this replica shares same cooldowned data
with others replica, and won't follow data
+ // of lower cooldown term, so replica's
cooldowned data must not be deleted
+ break;
+ }
}
- LOG.warn("replica's cooldowned data may have been
deleted");
+ LOG.warn("replica's cooldowned data may have been
deleted. tabletId={}, replicaId={}", tabletId,
+ replicaId);
return false;
} while (false);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 52341ebf45..773bae3f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherException;
import org.apache.doris.common.ThriftServerContext;
@@ -220,9 +221,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return;
}
// check cooldownReplicaId
- long cooldownReplicaId = tablet.getCooldownConf().first;
- if (cooldownReplicaId != info.cooldown_replica_id) {
- LOG.info("cooldown replica id not match({} vs {}), tablet={}",
cooldownReplicaId,
+ Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
+ if (cooldownConf.first != info.cooldown_replica_id) {
+ LOG.info("cooldown replica id not match({} vs {}), tablet={}",
cooldownConf.first,
info.cooldown_replica_id, info.tablet_id);
return;
}
@@ -239,6 +240,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.info("replica is not alive, tablet={}, replica={}",
info.tablet_id, replica.getId());
return;
}
+ if (replica.getCooldownTerm() != cooldownConf.second) {
+ LOG.info("replica's cooldown term not match({} vs {}),
tablet={}", cooldownConf.second,
+ replica.getCooldownTerm(), info.tablet_id);
+ return;
+ }
if
(!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
LOG.info("cooldown meta id are not same, tablet={}",
info.tablet_id);
return;
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 3c1d4ced03..99ca74a22b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -42,7 +42,7 @@ struct TTabletInfo {
15: optional Types.TReplicaId replica_id
// data size on remote storage
16: optional Types.TSize remote_data_size
- 17: optional Types.TReplicaId cooldown_replica_id
+ // 17: optional Types.TReplicaId cooldown_replica_id
// 18: optional bool is_cooldown
19: optional i64 cooldown_term
20: optional Types.TUniqueId cooldown_meta_id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]