This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa9ac4fe44 Prevent making snapshot on remote rowset in single replica
compaction (#28716)
1aa9ac4fe44 is described below
commit 1aa9ac4fe44ee23db95a757d2d2ecf368d5774b9
Author: plat1ko <[email protected]>
AuthorDate: Wed Dec 27 23:43:43 2023 +0800
Prevent making snapshot on remote rowset in single replica compaction
(#28716)
---
be/src/common/status.h | 2 -
be/src/olap/base_compaction.cpp | 8 -
be/src/olap/base_compaction.h | 2 -
be/src/olap/compaction.h | 7 +-
be/src/olap/cumulative_compaction.cpp | 8 -
be/src/olap/cumulative_compaction.h | 2 -
be/src/olap/full_compaction.cpp | 10 -
be/src/olap/full_compaction.h | 2 -
be/src/olap/olap_server.cpp | 61 +++--
be/src/olap/single_replica_compaction.cpp | 8 -
be/src/olap/snapshot_manager.cpp | 4 +-
be/src/olap/storage_engine.cpp | 22 --
be/src/olap/storage_engine.h | 12 -
be/src/olap/tablet.cpp | 289 ++++++++-------------
be/src/olap/tablet.h | 37 +--
be/src/olap/task/engine_clone_task.cpp | 1 -
be/src/olap/task/engine_storage_migration_task.cpp | 2 -
17 files changed, 162 insertions(+), 315 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 14aec46cc22..c0d92152877 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -160,7 +160,6 @@ namespace ErrorCode {
E(BE_INVALID_NEED_MERGED_VERSIONS, -810, true); \
E(BE_ERROR_DELETE_ACTION, -811, true); \
E(BE_SEGMENTS_OVERLAPPING, -812, true); \
- E(BE_CLONE_OCCURRED, -813, true); \
E(PUSH_INIT_ERROR, -900, true); \
E(PUSH_VERSION_INCORRECT, -902, true); \
E(PUSH_SCHEMA_MISMATCH, -903, true); \
@@ -228,7 +227,6 @@ namespace ErrorCode {
E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004, true); \
E(CUMULATIVE_ERROR_DELETE_ACTION, -2005, true); \
E(CUMULATIVE_MISS_VERSION, -2006, true); \
- E(CUMULATIVE_CLONE_OCCURRED, -2007, true); \
E(FULL_NO_SUITABLE_VERSION, -2008, false); \
E(FULL_MISS_VERSION, -2009, true); \
E(META_INVALID_ARGUMENT, -3000, true); \
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 6ae006709a7..474909cbf45 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -56,7 +56,6 @@ Status BaseCompaction::prepare_compact() {
// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
- _tablet->set_clone_occurred(false);
return Status::OK();
}
@@ -73,13 +72,6 @@ Status BaseCompaction::execute_compact_impl() {
"another base compaction is running. tablet={}",
_tablet->tablet_id());
}
- // Clone task may happen after compaction task is submitted to thread
pool, and rowsets picked
- // for compaction may change. In this case, current compaction task should
not be executed.
- if (_tablet->get_clone_occurred()) {
- _tablet->set_clone_occurred(false);
- return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred
failed");
- }
-
SCOPED_ATTACH_TASK(_mem_tracker);
// 2. do base compaction, merge rowsets
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index 73aca0d5e1f..e86cb3330a5 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -43,8 +43,6 @@ public:
Status prepare_compact() override;
Status execute_compact_impl() override;
- std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
protected:
Status pick_rowsets_to_compact() override;
std::string compaction_name() const override { return "base compaction"; }
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 2aff05ced83..a8e3b1a5c28 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -58,6 +58,8 @@ public:
virtual Status prepare_compact() = 0;
Status execute_compact();
virtual Status execute_compact_impl() = 0;
+
+ const std::vector<RowsetSharedPtr>& input_rowsets() { return
_input_rowsets; }
#ifdef BE_TEST
void set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets);
RowsetSharedPtr output_rowset();
@@ -65,10 +67,11 @@ public:
RuntimeProfile* runtime_profile() const { return _profile.get(); }
+ virtual ReaderType compaction_type() const = 0;
+ virtual std::string compaction_name() const = 0;
+
protected:
virtual Status pick_rowsets_to_compact() = 0;
- virtual std::string compaction_name() const = 0;
- virtual ReaderType compaction_type() const = 0;
Status do_compaction(int64_t permits);
Status do_compaction_impl(int64_t permits);
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 1c65df768d0..1f54c1f3285 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -61,7 +61,6 @@ Status CumulativeCompaction::prepare_compact() {
// 2. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
- _tablet->set_clone_occurred(false);
return Status::OK();
}
@@ -73,13 +72,6 @@ Status CumulativeCompaction::execute_compact_impl() {
"The tablet is under cumulative compaction. tablet={}",
_tablet->tablet_id());
}
- // Clone task may happen after compaction task is submitted to thread
pool, and rowsets picked
- // for compaction may change. In this case, current compaction task should
not be executed.
- if (_tablet->get_clone_occurred()) {
- _tablet->set_clone_occurred(false);
- return Status::Error<CUMULATIVE_CLONE_OCCURRED,
false>("get_clone_occurred failed");
- }
-
SCOPED_ATTACH_TASK(_mem_tracker);
// 3. do cumulative compaction, merge rowsets
diff --git a/be/src/olap/cumulative_compaction.h
b/be/src/olap/cumulative_compaction.h
index d74542a2ffe..7ea7fb383f1 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -39,8 +39,6 @@ public:
Status prepare_compact() override;
Status execute_compact_impl() override;
- std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
protected:
Status pick_rowsets_to_compact() override;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 01bd5e3dc61..927b4a33198 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -51,29 +51,19 @@ Status FullCompaction::prepare_compact() {
return Status::Error<INVALID_ARGUMENT, false>("Full compaction init
failed");
}
- std::unique_lock full_lock(_tablet->get_full_compaction_lock());
std::unique_lock base_lock(_tablet->get_base_compaction_lock());
std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
- _tablet->set_clone_occurred(false);
return Status::OK();
}
Status FullCompaction::execute_compact_impl() {
- std::unique_lock full_lock(_tablet->get_full_compaction_lock());
std::unique_lock base_lock(_tablet->get_base_compaction_lock());
std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
- // Clone task may happen after compaction task is submitted to thread
pool, and rowsets picked
- // for compaction may change. In this case, current compaction task should
not be executed.
- if (_tablet->get_clone_occurred()) {
- _tablet->set_clone_occurred(false);
- return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred
failed");
- }
-
SCOPED_ATTACH_TASK(_mem_tracker);
// 2. do full compaction, merge rowsets
diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h
index bce9ac745b6..631d901e846 100644
--- a/be/src/olap/full_compaction.h
+++ b/be/src/olap/full_compaction.h
@@ -39,8 +39,6 @@ public:
Status execute_compact_impl() override;
Status modify_rowsets(const Merger::Statistics* stats = nullptr) override;
- std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
protected:
Status pick_rowsets_to_compact() override;
std::string compaction_name() const override { return "full compaction"; }
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f7caa7d8d02..7268d7959f9 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -57,6 +57,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/segcompaction.h"
#include "olap/schema_change.h"
+#include "olap/single_replica_compaction.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
@@ -593,11 +594,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
continue;
}
- /// Regardless of whether the tablet is submitted for compaction
or not,
- /// we need to call 'reset_compaction' to clean up the
base_compaction or cumulative_compaction objects
- /// in the tablet, because these two objects store the tablet's
own shared_ptr.
- /// If it is not cleaned up, the reference count of the tablet
will always be greater than 1,
- /// thus cannot be collected by the garbage collector.
(TabletManager::start_trash_sweep)
for (const auto& tablet : tablets_compaction) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
tablet->set_last_base_compaction_schedule_time(UnixMillis());
@@ -717,33 +713,39 @@ Status
StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tab
return Status::AlreadyExist<false>(
"compaction task has already been submitted, tablet_id={}",
tablet->tablet_id());
}
- Status st = tablet->prepare_single_replica_compaction(tablet,
compaction_type);
+
+ auto compaction = std::make_shared<SingleReplicaCompaction>(tablet,
compaction_type);
+ auto st = compaction->prepare_compact();
+
auto clean_single_replica_compaction = [tablet, this]() {
- tablet->reset_single_replica_compaction();
_pop_tablet_from_submitted_compaction(tablet,
CompactionType::CUMULATIVE_COMPACTION);
_pop_tablet_from_submitted_compaction(tablet,
CompactionType::BASE_COMPACTION);
};
- if (st.ok()) {
- auto submit_st = _single_replica_compaction_thread_pool->submit_func(
- [tablet, compaction_type, clean_single_replica_compaction]() {
- tablet->execute_single_replica_compaction(compaction_type);
- clean_single_replica_compaction();
- });
- if (!submit_st.ok()) {
- clean_single_replica_compaction();
- return Status::InternalError(
- "failed to submit single replica compaction task to thread
pool, "
- "tablet_id={} ",
- tablet->tablet_id());
+ if (!st.ok()) {
+ clean_single_replica_compaction();
+ if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
+ LOG(WARNING) << "failed to prepare single replica compaction,
tablet_id="
+ << tablet->tablet_id() << " : " << st;
+ return st;
}
- return Status::OK();
- } else {
+ return Status::OK(); // No suitable version, regard as OK
+ }
+
+ auto submit_st = _single_replica_compaction_thread_pool->submit_func(
+ [tablet, compaction = std::move(compaction),
+ clean_single_replica_compaction]() mutable {
+ tablet->execute_single_replica_compaction(*compaction);
+ clean_single_replica_compaction();
+ });
+ if (!submit_st.ok()) {
clean_single_replica_compaction();
return Status::InternalError(
- "failed to prepare single replica compaction task tablet_id={}
",
+ "failed to submit single replica compaction task to thread
pool, "
+ "tablet_id={}",
tablet->tablet_id());
}
+ return Status::OK();
}
void StorageEngine::get_tablet_rowset_versions(const
PGetTabletVersionsRequest* request,
@@ -917,8 +919,10 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
"compaction task has already been submitted, tablet_id={},
compaction_type={}.",
tablet->tablet_id(), compaction_type);
}
+ std::shared_ptr<Compaction> compaction;
int64_t permits = 0;
- Status st =
tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet,
&permits);
+ Status st =
Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
+ compaction,
permits);
bool is_low_priority_task = [&]() {
// Can add more strategies to determine whether a task is a low
priority task in the future
if (!config::enable_compaction_priority_scheduling) {
@@ -938,27 +942,24 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
(compaction_type == CompactionType::CUMULATIVE_COMPACTION)
? _cumu_compaction_thread_pool
: _base_compaction_thread_pool;
- auto st = thread_pool->submit_func([tablet, compaction_type, permits,
is_low_priority_task,
+ auto st = thread_pool->submit_func([tablet, compaction =
std::move(compaction),
+ compaction_type, permits,
is_low_priority_task,
this]() {
if (is_low_priority_task &&
!_increase_low_priority_task_nums(tablet->data_dir())) {
VLOG_DEBUG << "skip low priority compaction task for tablet: "
<< tablet->tablet_id();
// Todo: push task back
} else {
- tablet->execute_compaction(compaction_type);
+ tablet->execute_compaction(*compaction);
if (is_low_priority_task) {
_decrease_low_priority_task_nums(tablet->data_dir());
}
}
_permit_limiter.release(permits);
- // reset compaction
- tablet->reset_compaction(compaction_type);
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
- // reset compaction
- tablet->reset_compaction(compaction_type);
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
return Status::InternalError(
"failed to submit compaction task to thread pool, "
@@ -967,8 +968,6 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
}
return Status::OK();
} else {
- // reset compaction
- tablet->reset_compaction(compaction_type);
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
if (!st.ok()) {
return Status::InternalError(
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index a5e060147d6..2a8edb60471 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -70,7 +70,6 @@ Status SingleReplicaCompaction::prepare_compact() {
// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
- _tablet->set_clone_occurred(false);
if (_input_rowsets.size() == 1) {
return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1");
}
@@ -105,13 +104,6 @@ Status SingleReplicaCompaction::execute_compact_impl() {
"another base compaction is running. tablet={}",
_tablet->tablet_id());
}
- // Clone task may happen after compaction task is submitted to thread
pool, and rowsets picked
- // for compaction may change. In this case, current compaction task should
not be executed.
- if (_tablet->get_clone_occurred()) {
- _tablet->set_clone_occurred(false);
- return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred
failed");
- }
-
SCOPED_ATTACH_TASK(_mem_tracker);
// 2. do single replica compaction
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index fd865712c5f..b5090f0e280 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -426,10 +426,10 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
<< ref_tablet->tablet_id();
Version version(request.start_version, request.end_version);
const RowsetSharedPtr rowset =
ref_tablet->get_rowset_by_version(version, false);
- if (rowset != nullptr) {
+ if (rowset && rowset->is_local()) {
consistent_rowsets.push_back(rowset);
} else {
- LOG(WARNING) << "failed to find version when do compaction
snapshot. "
+ LOG(WARNING) << "failed to find local version when do
compaction snapshot. "
<< " tablet=" << request.tablet_id
<< " schema_hash=" << request.schema_hash
<< " version=" << version;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 4e0fb2eddf1..cf4687caf3e 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1256,28 +1256,6 @@ PendingRowsetGuard
StorageEngine::add_pending_rowset(const RowsetWriterContext&
return _pending_remote_rowsets.add(ctx.rowset_id);
}
-void StorageEngine::create_cumulative_compaction(
- TabletSharedPtr best_tablet, std::shared_ptr<CumulativeCompaction>&
cumulative_compaction) {
- cumulative_compaction.reset(new CumulativeCompaction(best_tablet));
-}
-
-void StorageEngine::create_base_compaction(TabletSharedPtr best_tablet,
- std::shared_ptr<BaseCompaction>&
base_compaction) {
- base_compaction.reset(new BaseCompaction(best_tablet));
-}
-
-void StorageEngine::create_full_compaction(TabletSharedPtr best_tablet,
- std::shared_ptr<FullCompaction>&
full_compaction) {
- full_compaction.reset(new FullCompaction(best_tablet));
-}
-
-void StorageEngine::create_single_replica_compaction(
- TabletSharedPtr best_tablet,
- std::shared_ptr<SingleReplicaCompaction>& single_replica_compaction,
- CompactionType compaction_type) {
- single_replica_compaction.reset(new SingleReplicaCompaction(best_tablet,
compaction_type));
-}
-
bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo*
replica,
std::string* token) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6b028b01734..77d3efeaaf8 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -180,18 +180,6 @@ public:
void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
PGetTabletVersionsResponse* response);
- void create_cumulative_compaction(TabletSharedPtr best_tablet,
- std::shared_ptr<CumulativeCompaction>&
cumulative_compaction);
- void create_base_compaction(TabletSharedPtr best_tablet,
- std::shared_ptr<BaseCompaction>&
base_compaction);
-
- void create_full_compaction(TabletSharedPtr best_tablet,
- std::shared_ptr<FullCompaction>&
full_compaction);
-
- void create_single_replica_compaction(
- TabletSharedPtr best_tablet,
- std::shared_ptr<SingleReplicaCompaction>&
single_replica_compaction,
- CompactionType compaction_type);
bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica,
std::string* token);
bool should_fetch_from_peer(int64_t tablet_id);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5f915ad1de1..8cfce2d35ec 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -69,6 +69,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
+#include "io/io_common.h"
#include "olap/base_compaction.h"
#include "olap/base_tablet.h"
#include "olap/binlog.h"
@@ -143,21 +144,39 @@ using std::string;
using std::vector;
using io::FileSystemSPtr;
-static bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk",
"tablet_lookup_rowkey");
-static bvar::LatencyRecorder
g_tablet_commit_phase_update_delete_bitmap_latency(
+namespace {
+
+bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk",
"tablet_lookup_rowkey");
+bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency(
"doris_pk", "commit_phase_update_delete_bitmap");
-static bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk",
-
"update_delete_bitmap");
-static bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk",
"lookup_not_found");
-static bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
+bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk",
"update_delete_bitmap");
+bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found");
+bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
"doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);
-const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 1s;
-
bvar::Adder<uint64_t> exceed_version_limit_counter;
bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
&exceed_version_limit_counter, 60);
+void set_last_failure_time(Tablet* tablet, const Compaction& compaction,
int64_t ms) {
+ switch (compaction.compaction_type()) {
+ case ReaderType::READER_CUMULATIVE_COMPACTION:
+ tablet->set_last_cumu_compaction_failure_time(ms);
+ return;
+ case ReaderType::READER_BASE_COMPACTION:
+ tablet->set_last_base_compaction_failure_time(ms);
+ return;
+ case ReaderType::READER_FULL_COMPACTION:
+ tablet->set_last_full_compaction_failure_time(ms);
+ return;
+ default:
+ LOG(FATAL) << "invalid compaction type " <<
compaction.compaction_name()
+ << " tablet_id: " << tablet->tablet_id();
+ }
+};
+
+} // namespace
+
struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);
@@ -251,7 +270,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir*
data_dir,
_newly_created_rowset_num(0),
_last_checkpoint_time(0),
_cumulative_compaction_type(cumulative_compaction_type),
- _is_clone_occurred(false),
_is_tablet_path_exists(true),
_last_missed_version(-1),
_last_missed_time_s(0) {
@@ -475,6 +493,22 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
return Status::OK();
}
+ if (check_delete) {
+ for (auto&& rs : to_delete) {
+ if (auto it = _rs_version_map.find(rs->version()); it ==
_rs_version_map.end()) {
+ return Status::Error<DELETE_VERSION_ERROR>(
+ "try to delete not exist version {} from {}",
rs->version().to_string(),
+ tablet_id());
+ } else if (rs->rowset_id() != it->second->rowset_id()) {
+ return Status::Error<DELETE_VERSION_ERROR>(
+ "try to delete version {} from {}, but rowset id
changed, delete rowset id "
+ "is {}, exists rowsetid is {}",
+ rs->version().to_string(), tablet_id(),
rs->rowset_id().to_string(),
+ it->second->rowset_id().to_string());
+ }
+ }
+ }
+
bool same_version = true;
std::sort(to_add.begin(), to_add.end(), Rowset::comparator);
std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator);
@@ -489,23 +523,6 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
same_version = false;
}
- if (check_delete) {
- for (auto& rs : to_delete) {
- auto find_rs = _rs_version_map.find(rs->version());
- if (find_rs == _rs_version_map.end()) {
- return Status::Error<DELETE_VERSION_ERROR>(
- "try to delete not exist version {} from {}",
rs->version().to_string(),
- tablet_id());
- } else if (find_rs->second->rowset_id() != rs->rowset_id()) {
- return Status::Error<DELETE_VERSION_ERROR>(
- "try to delete version {} from {}, but rowset id
changed, delete rowset id "
- "is {}, exists rowsetid is {}",
- rs->version().to_string(), tablet_id(),
rs->rowset_id().to_string(),
- find_rs->second->rowset_id().to_string());
- }
- }
- }
-
std::vector<RowsetMetaSharedPtr> rs_metas_to_delete;
for (auto& rs : to_delete) {
rs_metas_to_delete.push_back(rs->rowset_meta());
@@ -1755,138 +1772,100 @@ void
Tablet::generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_m
}
Status Tablet::prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
- TabletSharedPtr
tablet, int64_t* permits) {
- std::vector<RowsetSharedPtr> compaction_rowsets;
+ const TabletSharedPtr&
tablet,
+
std::shared_ptr<Compaction>& compaction,
+ int64_t& permits) {
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
MonotonicStopWatch watch;
watch.start();
- SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log &&
- watch.elapsed_time() / 1e9 >
config::cumulative_compaction_trace_threshold) {
- std::stringstream ss;
- _cumulative_compaction->runtime_profile()->pretty_print(&ss);
- LOG(WARNING) << "prepare cumulative compaction cost " <<
watch.elapsed_time() / 1e9
- << std::endl
- << ss.str();
- }
- });
- StorageEngine::instance()->create_cumulative_compaction(tablet,
_cumulative_compaction);
+ compaction = std::make_shared<CumulativeCompaction>(tablet);
DorisMetrics::instance()->cumulative_compaction_request_total->increment(1);
- Status res = _cumulative_compaction->prepare_compact();
+ Status res = compaction->prepare_compact();
+ if (!config::disable_compaction_trace_log &&
+ watch.elapsed_time() / 1e9 >
config::cumulative_compaction_trace_threshold) {
+ std::stringstream ss;
+ compaction->runtime_profile()->pretty_print(&ss);
+ LOG(WARNING) << "prepare cumulative compaction cost " <<
watch.elapsed_time() / 1e9
+ << std::endl
+ << ss.str();
+ }
+
if (!res.ok()) {
- set_last_cumu_compaction_failure_time(UnixMillis());
- *permits = 0;
+ tablet->set_last_cumu_compaction_failure_time(UnixMillis());
+ permits = 0;
if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
- return Status::InternalError("prepare cumulative compaction
with err: {}",
- res.to_string());
+ return Status::InternalError("prepare cumulative compaction
with err: {}", res);
}
// return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that
we don't need to
// print too much useless logs.
// And because we set permits to 0, so even if we return OK here,
nothing will be done.
return Status::OK();
}
- compaction_rowsets = _cumulative_compaction->get_input_rowsets();
} else if (compaction_type == CompactionType::BASE_COMPACTION) {
- DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
MonotonicStopWatch watch;
watch.start();
- SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log &&
- watch.elapsed_time() / 1e9 >
config::base_compaction_trace_threshold) {
- std::stringstream ss;
- _base_compaction->runtime_profile()->pretty_print(&ss);
- LOG(WARNING) << "prepare base compaction cost " <<
watch.elapsed_time() / 1e9
- << std::endl
- << ss.str();
- }
- });
- StorageEngine::instance()->create_base_compaction(tablet,
_base_compaction);
+ compaction = std::make_shared<BaseCompaction>(tablet);
DorisMetrics::instance()->base_compaction_request_total->increment(1);
- Status res = _base_compaction->prepare_compact();
- set_last_base_compaction_status(res.to_string());
+ Status res = compaction->prepare_compact();
+ if (!config::disable_compaction_trace_log &&
+ watch.elapsed_time() / 1e9 >
config::base_compaction_trace_threshold) {
+ std::stringstream ss;
+ compaction->runtime_profile()->pretty_print(&ss);
+ LOG(WARNING) << "prepare base compaction cost " <<
watch.elapsed_time() / 1e9
+ << std::endl
+ << ss.str();
+ }
+
+ tablet->set_last_base_compaction_status(res.to_string());
if (!res.ok()) {
- set_last_base_compaction_failure_time(UnixMillis());
- *permits = 0;
+ tablet->set_last_base_compaction_failure_time(UnixMillis());
+ permits = 0;
if (!res.is<BE_NO_SUITABLE_VERSION>()) {
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
- return Status::InternalError("prepare base compaction with
err: {}",
- res.to_string());
+ return Status::InternalError("prepare base compaction with
err: {}", res);
}
// return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't
need to
// print too much useless logs.
// And because we set permits to 0, so even if we return OK here,
nothing will be done.
return Status::OK();
}
- compaction_rowsets = _base_compaction->get_input_rowsets();
} else {
DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION);
- MonotonicStopWatch watch;
- watch.start();
- StorageEngine::instance()->create_full_compaction(tablet,
_full_compaction);
- Status res = _full_compaction->prepare_compact();
+
+ compaction = std::make_shared<FullCompaction>(tablet);
+ Status res = compaction->prepare_compact();
if (!res.ok()) {
- set_last_full_compaction_failure_time(UnixMillis());
- *permits = 0;
+ tablet->set_last_full_compaction_failure_time(UnixMillis());
+ permits = 0;
if (!res.is<FULL_NO_SUITABLE_VERSION>()) {
- return Status::InternalError("prepare full compaction with
err: {}",
- res.to_string());
+ return Status::InternalError("prepare full compaction with
err: {}", res);
}
// return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't
need to
// print too much useless logs.
// And because we set permits to 0, so even if we return OK here,
nothing will be done.
return Status::OK();
}
- compaction_rowsets = _full_compaction->get_input_rowsets();
- }
- *permits = 0;
- for (auto rowset : compaction_rowsets) {
- *permits += rowset->rowset_meta()->get_compaction_score();
}
- return Status::OK();
-}
-Status Tablet::prepare_single_replica_compaction(TabletSharedPtr tablet,
- CompactionType
compaction_type) {
- StorageEngine::instance()->create_single_replica_compaction(tablet,
_single_replica_compaction,
-
compaction_type);
- Status res = _single_replica_compaction->prepare_compact();
- if (!res.ok()) {
- if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
- return Status::InternalError("prepare single replica compaction
with err: {}",
- res.to_string());
- }
+ permits = 0;
+ for (auto&& rowset : compaction->input_rowsets()) {
+ permits += rowset->rowset_meta()->get_compaction_score();
}
return Status::OK();
}
-void Tablet::execute_single_replica_compaction(CompactionType compaction_type)
{
- Status res = _single_replica_compaction->execute_compact();
+void Tablet::execute_single_replica_compaction(SingleReplicaCompaction&
compaction) {
+ Status res = compaction.execute_compact();
if (!res.ok()) {
- if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
- set_last_cumu_compaction_failure_time(UnixMillis());
- } else if (compaction_type == CompactionType::BASE_COMPACTION) {
- set_last_base_compaction_failure_time(UnixMillis());
- } else if (compaction_type == CompactionType::FULL_COMPACTION) {
- set_last_full_compaction_failure_time(UnixMillis());
- }
+ set_last_failure_time(this, compaction, UnixMillis());
LOG(WARNING) << "failed to do single replica compaction. res=" << res
<< ", tablet=" << tablet_id();
return;
}
- if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
- set_last_cumu_compaction_failure_time(0);
- } else if (compaction_type == CompactionType::BASE_COMPACTION) {
- set_last_base_compaction_failure_time(0);
- } else if (compaction_type == CompactionType::FULL_COMPACTION) {
- set_last_full_compaction_failure_time(0);
- }
-}
-
-void Tablet::reset_single_replica_compaction() {
- _single_replica_compaction.reset();
+ set_last_failure_time(this, compaction, 0);
}
std::vector<Version> Tablet::get_all_versions() {
@@ -1903,78 +1882,38 @@ std::vector<Version> Tablet::get_all_versions() {
return local_versions;
}
-void Tablet::execute_compaction(CompactionType compaction_type) {
+void Tablet::execute_compaction(Compaction& compaction) {
signal::tablet_id = tablet_id();
- if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
- MonotonicStopWatch watch;
- watch.start();
- SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log &&
- watch.elapsed_time() / 1e9 >
config::cumulative_compaction_trace_threshold) {
- std::stringstream ss;
- _cumulative_compaction->runtime_profile()->pretty_print(&ss);
- LOG(WARNING) << "execute cumulative compaction cost " <<
watch.elapsed_time() / 1e9
- << std::endl
- << ss.str();
- }
- });
- Status res = _cumulative_compaction->execute_compact();
- if (!res.ok()) {
- set_last_cumu_compaction_failure_time(UnixMillis());
-
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
- LOG(WARNING) << "failed to do cumulative compaction. res=" << res
- << ", tablet=" << tablet_id();
- return;
- }
- set_last_cumu_compaction_failure_time(0);
- } else if (compaction_type == CompactionType::BASE_COMPACTION) {
- DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
- MonotonicStopWatch watch;
- watch.start();
- SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log &&
- watch.elapsed_time() / 1e9 >
config::base_compaction_trace_threshold) {
- std::stringstream ss;
- _base_compaction->runtime_profile()->pretty_print(&ss);
- LOG(WARNING) << "execute base compaction cost " <<
watch.elapsed_time() / 1e9
- << std::endl
- << ss.str();
- }
- });
+ MonotonicStopWatch watch;
+ watch.start();
- Status res = _base_compaction->execute_compact();
- set_last_base_compaction_status(res.to_string());
- if (!res.ok()) {
- set_last_base_compaction_failure_time(UnixMillis());
-
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
- LOG(WARNING) << "failed to do base compaction. res=" << res
- << ", tablet=" << tablet_id();
- return;
- }
- set_last_base_compaction_failure_time(0);
+ Status res = compaction.execute_compact();
+
+ if (!res.ok()) [[unlikely]] {
+ set_last_failure_time(this, compaction, UnixMillis());
+ LOG(WARNING) << "failed to do " << compaction.compaction_name()
+ << ", tablet=" << tablet_id() << " : " << res;
} else {
- DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION);
- MonotonicStopWatch watch;
- watch.start();
- Status res = _full_compaction->execute_compact();
- if (!res.ok()) {
- set_last_full_compaction_failure_time(UnixMillis());
- LOG(WARNING) << "failed to do full compaction. res=" << res
- << ", tablet=" << tablet_id();
- return;
- }
- set_last_full_compaction_failure_time(0);
+ set_last_failure_time(this, compaction, 0);
}
-}
-void Tablet::reset_compaction(CompactionType compaction_type) {
- if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
- _cumulative_compaction.reset();
- } else if (compaction_type == CompactionType::BASE_COMPACTION) {
- _base_compaction.reset();
- } else {
- _full_compaction.reset();
+ if (!config::disable_compaction_trace_log) {
+ auto need_trace = [&compaction, &watch] {
+ return compaction.compaction_type() ==
ReaderType::READER_CUMULATIVE_COMPACTION
+ ? watch.elapsed_time() / 1e9 >
+
config::cumulative_compaction_trace_threshold
+ : compaction.compaction_type() ==
ReaderType::READER_BASE_COMPACTION
+ ? watch.elapsed_time() / 1e9 >
config::base_compaction_trace_threshold
+ : false;
+ };
+ if (need_trace()) {
+ std::stringstream ss;
+ compaction.runtime_profile()->pretty_print(&ss);
+ LOG(WARNING) << "execute " << compaction.compaction_name() << "
cost "
+ << watch.elapsed_time() / 1e9 << std::endl
+ << ss.str();
+ }
}
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 4174a0ec26d..614288921ed 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -59,9 +59,7 @@ namespace doris {
class Tablet;
class CumulativeCompactionPolicy;
-class CumulativeCompaction;
-class BaseCompaction;
-class FullCompaction;
+class Compaction;
class SingleReplicaCompaction;
class RowsetWriter;
struct RowsetWriterContext;
@@ -85,7 +83,7 @@ enum SortType : int;
enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE,
STORAGE_TYPE_REMOTE_AND_LOCAL };
-extern const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD;
+static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD =
std::chrono::seconds(1);
class Tablet final : public BaseTablet {
public:
@@ -146,6 +144,8 @@ public:
// operation in rowsets
Status add_rowset(RowsetSharedPtr rowset);
Status create_initial_rowset(const int64_t version);
+
+ // MUST hold EXCLUSIVE `_meta_lock`.
Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete = false);
@@ -194,7 +194,6 @@ public:
std::mutex& get_push_lock() { return _ingest_lock; }
std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
std::mutex& get_cumulative_compaction_lock() { return
_cumulative_compaction_lock; }
- std::mutex& get_full_compaction_lock() { return _full_compaction_lock; }
std::shared_mutex& get_migration_lock() { return _migration_lock; }
@@ -296,18 +295,13 @@ public:
// return a json string to show the compaction status of this tablet
void get_compaction_status(std::string* json_result);
- Status prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
- TabletSharedPtr tablet,
int64_t* permits);
-
- Status prepare_single_replica_compaction(TabletSharedPtr tablet,
- CompactionType compaction_type);
- void execute_compaction(CompactionType compaction_type);
- void reset_compaction(CompactionType compaction_type);
- void execute_single_replica_compaction(CompactionType compaction_type);
- void reset_single_replica_compaction();
+ static Status prepare_compaction_and_calculate_permits(CompactionType
compaction_type,
+ const
TabletSharedPtr& tablet,
+
std::shared_ptr<Compaction>& compaction,
+ int64_t& permits);
- void set_clone_occurred(bool clone_occurred) { _is_clone_occurred =
clone_occurred; }
- bool get_clone_occurred() { return _is_clone_occurred; }
+ void execute_compaction(Compaction& compaction);
+ void execute_single_replica_compaction(SingleReplicaCompaction&
compaction);
void set_cumulative_compaction_policy(
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy) {
@@ -319,7 +313,7 @@ public:
}
void set_last_base_compaction_status(std::string status) {
- _last_base_compaction_status = status;
+ _last_base_compaction_status = std::move(status);
}
std::string get_last_base_compaction_status() { return
_last_base_compaction_status; }
@@ -622,7 +616,6 @@ private:
std::mutex _ingest_lock;
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
- std::mutex _full_compaction_lock;
std::mutex _schema_change_lock;
std::shared_mutex _migration_lock;
std::mutex _build_inverted_index_lock;
@@ -666,14 +659,6 @@ private:
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::string_view _cumulative_compaction_type;
- std::shared_ptr<CumulativeCompaction> _cumulative_compaction;
- std::shared_ptr<BaseCompaction> _base_compaction;
- std::shared_ptr<FullCompaction> _full_compaction;
- std::shared_ptr<SingleReplicaCompaction> _single_replica_compaction;
-
- // whether clone task occurred during the tablet is in thread pool queue
to wait for compaction
- std::atomic<bool> _is_clone_occurred;
-
// use a seperate thread to check all tablets paths existance
std::atomic<bool> _is_tablet_path_exists;
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 068e8723354..e27db6188bd 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -702,7 +702,6 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const
std::string& clone_d
std::lock_guard
cumulative_compaction_lock(tablet->get_cumulative_compaction_lock());
std::lock_guard cold_compaction_lock(tablet->get_cold_compaction_lock());
std::lock_guard
build_inverted_index_lock(tablet->get_build_inverted_index_lock());
- tablet->set_clone_occurred(true);
std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 8fba16c67df..9e8fc39abde 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -201,12 +201,10 @@ Status EngineStorageMigrationTask::_migrate() {
// compaction will be prohibited for the mow table when migration.
Moreover, it is useless
// to perform a compaction operation on the migration data, as the
migration still migrates
// the data of rowsets before the compaction operation.
- std::unique_lock full_compaction_lock(_tablet->get_full_compaction_lock(),
std::defer_lock);
std::unique_lock base_compaction_lock(_tablet->get_base_compaction_lock(),
std::defer_lock);
std::unique_lock
cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
std::defer_lock);
if (_tablet->enable_unique_key_merge_on_write()) {
- full_compaction_lock.lock();
base_compaction_lock.lock();
cumu_compaction_lock.lock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]