This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb745f25 KUDU-3516 No need to traverse all delta stores while
computing score.
3eb745f25 is described below
commit 3eb745f25dfa1975ab1aeba8d769697504f7fa54
Author: 宋家成 <[email protected]>
AuthorDate: Mon Oct 9 16:29:31 2023 +0800
KUDU-3516 No need to traverse all delta stores while computing score.
If we have many tables with many columns and each of them gets many
update requests, the maintenance scheduler thread might be stuck in
calculating the perf improvement scores of major delta compactions.
This is because we check all the updated columns while one single
updated column is actully enough.
This patch adds a new method to judge if a delta store needs to be
compacted. The scheduler will return as soon as it finds an updated
column. Please see KUDU-3516 for details.
Change-Id: I4b2c9525d8a12183130ee2cb7b5b00d1d2f60bfa
Reviewed-on: http://gerrit.cloudera.org:8080/20547
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/delta_stats.cc | 7 +++++++
src/kudu/tablet/delta_stats.h | 3 +++
src/kudu/tablet/delta_tracker.cc | 44 ++++++++++++++++++++++++++++++++--------
src/kudu/tablet/delta_tracker.h | 3 +++
src/kudu/tablet/diskrowset.cc | 5 ++---
5 files changed, 50 insertions(+), 12 deletions(-)
diff --git a/src/kudu/tablet/delta_stats.cc b/src/kudu/tablet/delta_stats.cc
index bab7a1efa..e8a54828a 100644
--- a/src/kudu/tablet/delta_stats.cc
+++ b/src/kudu/tablet/delta_stats.cc
@@ -16,6 +16,7 @@
// under the License.
#include "kudu/tablet/delta_stats.h"
+#include <algorithm>
#include <cstdint>
#include <ostream>
#include <utility>
@@ -148,5 +149,11 @@ void
DeltaStats::AddColumnIdsWithUpdates(std::set<ColumnId>* col_ids) const {
}
}
+bool DeltaStats::ColumnUpdated() const {
+ return std::any_of(update_counts_by_col_id_.begin(),
+ update_counts_by_col_id_.end(),
+ [&](const auto& e) { return e.second > 0; });
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/delta_stats.h b/src/kudu/tablet/delta_stats.h
index 8929ad7c7..23699f3d8 100644
--- a/src/kudu/tablet/delta_stats.h
+++ b/src/kudu/tablet/delta_stats.h
@@ -100,6 +100,9 @@ class DeltaStats {
// set 'col_ids'.
void AddColumnIdsWithUpdates(std::set<ColumnId>* col_ids) const;
+ // Return true if there is a column that has at least one update.
+ bool ColumnUpdated() const;
+
private:
std::unordered_map<ColumnId, int64_t> update_counts_by_col_id_;
uint64_t delete_count_;
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index d336867fd..2e994b00a 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -937,7 +937,7 @@ size_t DeltaTracker::CountRedoDeltaStores() const {
uint64_t DeltaTracker::UndoDeltaOnDiskSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
uint64_t size = 0;
- for (const shared_ptr<DeltaStore>& ds : undo_delta_stores_) {
+ for (const auto& ds : undo_delta_stores_) {
size += ds->EstimateSize();
}
return size;
@@ -946,7 +946,7 @@ uint64_t DeltaTracker::UndoDeltaOnDiskSize() const {
uint64_t DeltaTracker::RedoDeltaOnDiskSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
uint64_t size = 0;
- for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
+ for (const auto& ds : redo_delta_stores_) {
size += ds->EstimateSize();
}
return size;
@@ -956,20 +956,19 @@ void
DeltaTracker::GetColumnIdsToCompact(std::vector<ColumnId>* col_ids) const {
shared_lock<rw_spinlock> lock(component_lock_);
set<ColumnId> column_ids_to_compact;
- uint32_t all_delete_op_delta_store_cnt = 0;
- for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
+ uint64_t all_delete_op_delta_store_cnt = 0;
+ for (const auto& ds : redo_delta_stores_) {
// We won't force open files just to read their stats.
if (!ds->has_delta_stats()) {
continue;
}
-
- ds->delta_stats().AddColumnIdsWithUpdates(&column_ids_to_compact);
+ const auto& delta_stats = ds->delta_stats();
+ delta_stats.AddColumnIdsWithUpdates(&column_ids_to_compact);
// Count the number of REDO delta stores that contain only DELETE
// operations where more than a single DELETE operation is present.
- if ((ds->delta_stats().reinsert_count() == 0) &&
- (ds->delta_stats().UpdateCount() == 0) &&
- (ds->delta_stats().delete_count() > 1)) {
+ if ((delta_stats.reinsert_count() == 0) && (delta_stats.UpdateCount() ==
0) &&
+ (delta_stats.delete_count() > 1)) {
all_delete_op_delta_store_cnt++;
}
}
@@ -989,6 +988,33 @@ void
DeltaTracker::GetColumnIdsToCompact(std::vector<ColumnId>* col_ids) const {
col_ids->assign(column_ids_to_compact.begin(), column_ids_to_compact.end());
}
+bool DeltaTracker::DeltaStoreNeedToBeCompacted() const {
+ uint64_t all_delete_op_delta_store_cnt = 0;
+ {
+ shared_lock<rw_spinlock> lock(component_lock_);
+
+ for (const auto& ds: redo_delta_stores_) {
+ if (!ds->has_delta_stats()) {
+ continue;
+ }
+ const auto& delta_stats = ds->delta_stats();
+ if (delta_stats.ColumnUpdated()) {
+ return true;
+ }
+ if ((delta_stats.reinsert_count() == 0) && (delta_stats.UpdateCount() ==
0) &&
+ (delta_stats.delete_count() > 1)) {
+ all_delete_op_delta_store_cnt++;
+ }
+ }
+ }
+ // If there is no updated column, try to find a cold delta file
+ // with delete only ops to be compacted.
+ const auto max_delta_age =
MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec);
+ MonoTime last_update_time = last_update_time_.load();
+ return all_delete_op_delta_store_cnt >
FLAGS_all_delete_op_delta_file_cnt_for_compaction
+ && MonoTime::Now() - last_update_time > max_delta_age;
+}
+
Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
shared_lock<rw_spinlock> lock(component_lock_);
if (stores == UNDOS_AND_REDOS || stores == UNDOS_ONLY) {
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index d0b454aa7..643e33d09 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -270,6 +270,9 @@ class DeltaTracker {
// Retrieves the list of column indexes to compact.
void GetColumnIdsToCompact(std::vector<ColumnId>* col_ids) const;
+ // Check if there is at least one delta file that needs to be compacted.
+ bool DeltaStoreNeedToBeCompacted() const;
+
Mutex* compact_flush_lock() {
return &compact_flush_lock_;
}
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 38a7f0f79..07e27ff00 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -859,9 +859,8 @@ double
DiskRowSet::DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
}
if (type == RowSet::MAJOR_DELTA_COMPACTION) {
- vector<ColumnId> col_ids_for_compact;
- delta_tracker_->GetColumnIdsToCompact(&col_ids_for_compact);
- if (!col_ids_for_compact.empty()) {
+ // No need to get all the delta files which need to be compacted.
+ if (delta_tracker_->DeltaStoreNeedToBeCompacted()) {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
double ratio = static_cast<double>(drss.redo_deltas_size) /
drss.base_data_size;