This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 414eb2a8ebd [fix](cooldown) Fix bug in follow cooldowned data (#32801)
414eb2a8ebd is described below
commit 414eb2a8ebdff514af5e5d7335ebccfd756ec90f
Author: plat1ko <[email protected]>
AuthorDate: Tue Mar 26 12:04:55 2024 +0800
[fix](cooldown) Fix bug in follow cooldowned data (#32801)
---
be/src/olap/tablet.cpp | 44 +++++++++++++++++++++++++++++++++++++-------
1 file changed, 37 insertions(+), 7 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index f9b12cc671d..9b745a92479 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -565,7 +565,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>&
to_add,
// delete rowset in "to_delete" directly
for (auto& rs : to_delete) {
LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because
of same version";
- StorageEngine::instance()->add_unused_rowset(rs);
+ if (rs->is_local()) {
+ StorageEngine::instance()->add_unused_rowset(rs);
+ }
}
}
return Status::OK();
@@ -604,7 +606,9 @@ void Tablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete, bool
} else {
for (auto& rs : to_delete) {
_timestamped_version_tracker.delete_version(rs->version());
- StorageEngine::instance()->add_unused_rowset(rs);
+ if (rs->is_local()) {
+ StorageEngine::instance()->add_unused_rowset(rs);
+ }
}
}
}
@@ -830,7 +834,9 @@ void Tablet::delete_expired_stale_rowset() {
auto it =
_stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
// delete rowset
- StorageEngine::instance()->add_unused_rowset(it->second);
+ if (it->second->is_local()) {
+
StorageEngine::instance()->add_unused_rowset(it->second);
+ }
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" <<
full_name() << " version["
<< timestampedVersion->version().first << ","
@@ -2311,6 +2317,12 @@ Status Tablet::_follow_cooldowned_data() {
std::vector<RowsetSharedPtr> overlap_rowsets;
bool version_aligned = false;
+ // Holding these to delete rowsets' shared ptr until save meta can avoid
trash sweeping thread
+ // deleting these rowsets' files before rowset meta has been removed from
disk, which may cause
+ // data loss when BE reboot before save meta to disk.
+ std::vector<RowsetSharedPtr> to_delete;
+ std::vector<RowsetSharedPtr> to_add;
+
{
std::lock_guard wlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
@@ -2336,17 +2348,18 @@ Status Tablet::_follow_cooldowned_data() {
}
}
std::sort(overlap_rowsets.begin(), overlap_rowsets.end(),
Rowset::comparator);
+
+ // Find different rowset in `overlap_rowsets` and
`cooldown_meta_pb.rs_metas`
auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
auto rs_it = overlap_rowsets.begin();
for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it !=
overlap_rowsets.end();
++rs_pb_it, ++rs_it) {
- // skip cooldowned rowset with same version in BE
- if ((*rs_it)->is_local() || rs_pb_it->end_version() !=
(*rs_it)->end_version()) {
+ if (rs_pb_it->rowset_id_v2() != (*rs_it)->rowset_id().to_string())
{
break;
}
}
- std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end());
- std::vector<RowsetSharedPtr> to_add;
+
+ to_delete.assign(rs_it, overlap_rowsets.end());
to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
auto rs_meta = std::make_shared<RowsetMeta>();
@@ -2361,12 +2374,29 @@ Status Tablet::_follow_cooldowned_data() {
// TODO(plat1ko): process primary key
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
}
+
{
std::lock_guard rlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
save_meta();
}
+ if (!to_add.empty()) {
+ LOG(INFO) << "modify rowsets when follow cooldowned data, tablet_id="
<< tablet_id()
+ << [&]() {
+ std::stringstream ss;
+ ss << " delete rowsets:\n";
+ for (auto&& rs : to_delete) {
+ ss << rs->version() << ' ' << rs->rowset_id() <<
'\n';
+ }
+ ss << "add rowsets:\n";
+ for (auto&& rs : to_add) {
+ ss << rs->version() << ' ' << rs->rowset_id() <<
'\n';
+ }
+ return ss.str();
+ }();
+ }
+
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]