This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ad920e69f KUDU-3367 [compaction] add supplement to gc algorithm
ad920e69f is described below
commit ad920e69fcd67ceefa25ea81a38a10a27d9e3afc
Author: kedeng <[email protected]>
AuthorDate: Sat May 7 14:13:42 2022 +0800
KUDU-3367 [compaction] add supplement to gc algorithm
If we get a REDO delta full of delete ops, which means there is
not a single update operation in the delta. The current compaction
algorithm doesn't run GC on such deltamemstores. The accumulation
of deltamemstores like that negatively affects performance of scan
operations.
This patch as a supplement to KUDU-1625, we could release storage
space for old tablet metadata that does not support the live count
function. See KUDU-3367 for details.
Change-Id: I8b26737dffecc17688b42188da959b2ba16351ed
Reviewed-on: http://gerrit.cloudera.org:8080/18503
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/delta_tracker.cc | 54 ++++++++++++++++++++++++++++++----
src/kudu/tablet/delta_tracker.h | 11 +++++--
src/kudu/tablet/diskrowset-test-base.h | 24 +++++++++++++++
src/kudu/tablet/diskrowset-test.cc | 54 ++++++++++++++++++++++++++++++++++
src/kudu/tablet/diskrowset.cc | 9 +++---
5 files changed, 140 insertions(+), 12 deletions(-)
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 71310ff35..6f88878f8 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -29,6 +29,7 @@
#include <boost/iterator/iterator_facade.hpp>
#include <boost/iterator/reverse_iterator.hpp>
#include <boost/range/adaptor/reversed.hpp>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/cfile/cfile_util.h"
@@ -58,6 +59,18 @@
#include "kudu/util/status.h"
#include "kudu/util/trace.h"
+DECLARE_int32(tablet_history_max_age_sec);
+
+DEFINE_uint64(all_delete_op_delta_file_cnt_for_compaction, 1,
+ "The minimum number of REDO delta files containing only ancient
DELETE "
+ "operations to schedule a major delta compaction on them. A
DELETE "
+ "operation is considered ancient if it was applied more than "
+ "--tablet_history_max_age_sec seconds ago. "
+ "If you want to control the number of REDO delta files
containing only "
+ "ancient DELETE operations, you can turn up this parameter
value. "
+ "Otherwise, it is recommended to keep the default value to
release "
+ "storage space efficiently.");
+
namespace kudu {
class RowChangeList;
@@ -101,7 +114,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata>
rowset_metadata,
mem_trackers_(std::move(mem_trackers)),
next_dms_id_(rowset_metadata_->last_durable_redo_dms_id() + 1),
dms_exists_(false),
- deleted_row_count_(0) {}
+ deleted_row_count_(0),
+ last_update_time_(MonoTime::Now()) {}
Status DeltaTracker::OpenDeltaReaders(vector<DeltaBlockIdAndStats> blocks,
const IOContext* io_context,
@@ -363,6 +377,8 @@ void DeltaTracker::AtomicUpdateStores(const
SharedDeltaStoreVector& stores_to_re
VLOG_WITH_PREFIX(1) << "New " << DeltaType_Name(type) << " stores: "
<< JoinDeltaStoreStrings(*stores_to_update);
+
+ last_update_time_.store(MonoTime::Now());
}
Status DeltaTracker::Compact(const IOContext* io_context) {
@@ -622,6 +638,9 @@ Status DeltaTracker::DoCompactStores(const IOContext*
io_context,
&dfw));
RETURN_NOT_OK(dfw.Finish());
*output_stats = dfw.release_delta_stats();
+
+ last_update_time_.store(MonoTime::Now());
+
return Status::OK();
}
@@ -714,6 +733,7 @@ Status DeltaTracker::Update(Timestamp timestamp,
MemStoreTargetPB* target = result->add_mutated_stores();
target->set_rs_id(rowset_metadata_->id());
target->set_dms_id(dms_->id());
+ last_update_time_.store(MonoTime::Now());
}
return s;
}
@@ -865,6 +885,8 @@ Status DeltaTracker::Flush(const IOContext* io_context,
MetadataFlushType flush_
redo_delta_stores_[idx] = dfr;
}
+ last_update_time_.store(MonoTime::Now());
+
return Status::OK();
}
@@ -921,19 +943,41 @@ uint64_t DeltaTracker::RedoDeltaOnDiskSize() const {
return size;
}
-void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids)
const {
+void DeltaTracker::GetColumnIdsToCompact(std::vector<ColumnId>* col_ids) const
{
shared_lock<rw_spinlock> lock(component_lock_);
- set<ColumnId> column_ids_with_updates;
+ set<ColumnId> column_ids_to_compact;
+ uint32_t all_delete_op_delta_store_cnt = 0;
for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
// We won't force open files just to read their stats.
if (!ds->has_delta_stats()) {
continue;
}
- ds->delta_stats().AddColumnIdsWithUpdates(&column_ids_with_updates);
+ ds->delta_stats().AddColumnIdsWithUpdates(&column_ids_to_compact);
+
+ // Count the number of REDO delta stores that contain only DELETE
+ // operations where more than a single DELETE operation is present.
+ if ((ds->delta_stats().reinsert_count() == 0) &&
+ (ds->delta_stats().UpdateCount() == 0) &&
+ (ds->delta_stats().delete_count() > 1)) {
+ all_delete_op_delta_store_cnt++;
+ }
}
- col_ids->assign(column_ids_with_updates.begin(),
column_ids_with_updates.end());
+
+ // Get the cold delta files with delete only ops to be compacted.
+ //
+ // See KUDU-3367 for more details.
+ const auto max_delta_age =
MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec);
+ MonoTime last_update_time = last_update_time_.load();
+ if (all_delete_op_delta_store_cnt >
FLAGS_all_delete_op_delta_file_cnt_for_compaction
+ && MonoTime::Now() - last_update_time > max_delta_age) {
+ const auto& schema_column_ids =
+ rowset_metadata_->tablet_metadata()->schema()->column_ids();
+ column_ids_to_compact.insert(schema_column_ids.begin(),
+ schema_column_ids.end());
+ }
+ col_ids->assign(column_ids_to_compact.begin(), column_ids_to_compact.end());
}
Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index bb3b8277a..cf5831537 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -266,8 +266,8 @@ class DeltaTracker {
// Return the size on-disk of REDO deltas, in bytes.
uint64_t RedoDeltaOnDiskSize() const;
- // Retrieves the list of column indexes that currently have updates.
- void GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const;
+ // Retrieves the list of column indexes to compact.
+ void GetColumnIdsToCompact(std::vector<ColumnId>* col_ids) const;
Mutex* compact_flush_lock() {
return &compact_flush_lock_;
@@ -396,6 +396,13 @@ class DeltaTracker {
// and reset.
int64_t deleted_row_count_;
+ // As a supplement to KUDU-1625, we need to release storage space for old
+ // tablet metadata that does not support the live row count function.
+ // We record the latest update time of the delta file, so as to trigger
+ // gc scheduler to release storage space after a long time of no update
+ // operation.
+ std::atomic<MonoTime> last_update_time_;
+
DISALLOW_COPY_AND_ASSIGN(DeltaTracker);
};
diff --git a/src/kudu/tablet/diskrowset-test-base.h
b/src/kudu/tablet/diskrowset-test-base.h
index 30949bfd8..23e424bbb 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -152,6 +152,30 @@ class TestRowSet : public KuduRowSetTest {
}
}
+ // Delete rows in the specified range from the given rowset.
+ void DeleteExistingRows(DiskRowSet* rs, int start_idx, int end_idx,
+ std::unordered_set<uint32_t>* deleted) {
+ ASSERT_LE(start_idx, end_idx);
+ ASSERT_LE(start_idx, n_rows_);
+ ASSERT_LE(end_idx, n_rows_);
+ faststring delete_buf;
+ RowChangeListEncoder update(&delete_buf);
+ for (int i = start_idx; i < end_idx; i++) {
+ update.Reset();
+ update.SetToDelete();
+ OperationResultPB result;
+ ASSERT_OK(MutateRow(rs,
+ i,
+ RowChangeList(delete_buf),
+ &result));
+ ASSERT_EQ(1, result.mutated_stores_size());
+ ASSERT_EQ(rs->metadata()->id(), result.mutated_stores(0).rs_id());
+ if (deleted != nullptr) {
+ deleted->insert(i);
+ }
+ }
+ }
+
// Delete the row with the given identifier.
Status DeleteRow(DiskRowSet *rs, uint32_t row_idx, OperationResultPB*
result) {
faststring update_buf;
diff --git a/src/kudu/tablet/diskrowset-test.cc
b/src/kudu/tablet/diskrowset-test.cc
index 37529d363..575e28b61 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -64,6 +64,7 @@
#include "kudu/util/bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/memory/arena.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
@@ -82,6 +83,8 @@ DECLARE_bool(cfile_lazy_open);
DECLARE_bool(crash_on_eio);
DECLARE_int32(cfile_default_block_size);
DECLARE_double(env_inject_eio);
+DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_bool(rowset_metadata_store_keys);
DECLARE_double(tablet_delta_store_major_compact_min_ratio);
DECLARE_int32(tablet_delta_store_minor_compact_max);
@@ -641,6 +644,57 @@ TEST_F(TestRowSet, TestCompactStores) {
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
+TEST_F(TestRowSet, TestGCScheduleRedoFilesWithFullOfDeleteOP) {
+ // Make major delta compaction runnable even with tiny amount of data
accumulated
+ // across rowset's deltas.
+ FLAGS_tablet_delta_store_major_compact_min_ratio = 0.0001;
+ // Write the min/max keys to the rowset metadata. In this way, we can
simplify the
+ // test scenario by focusing on the REDO delta store files.
+ FLAGS_rowset_metadata_store_keys = true;
+
+ WriteTestRowSet();
+ {
+ shared_ptr<DiskRowSet> rs;
+ ASSERT_OK(OpenTestRowSet(&rs));
+
+ // Generate base data.
+ ASSERT_OK(rs->FlushDeltas(nullptr));
+ }
+
+ // Reopen the rowset.
+ {
+ shared_ptr<DiskRowSet> rs;
+ ASSERT_OK(OpenTestRowSet(&rs));
+ const DeltaTracker& dt = rs->delta_tracker();
+ ASSERT_EQ(0, dt.CountUndoDeltaStores());
+ ASSERT_EQ(0, dt.CountRedoDeltaStores());
+
+ int loop_cnt = 10;
+ int loop_per_idx = FLAGS_roundtrip_num_rows / loop_cnt;
+ // Generate delta files with DELETE operations.
+ for (int i = 0; i < loop_cnt; i++) {
+ NO_FATALS(DeleteExistingRows(rs.get(), loop_per_idx * i, loop_per_idx *
(i + 1), nullptr));
+ ASSERT_OK(rs->FlushDeltas(nullptr));
+ ASSERT_EQ(0, dt.CountUndoDeltaStores());
+ ASSERT_EQ(i + 1, dt.CountRedoDeltaStores());
+ }
+
+ ASSERT_EQ(0,
rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
+
+ // Modify the threshold so we can trigger a new round of gc scheduling.
+ FLAGS_tablet_history_max_age_sec = 1;
+ SleepFor(MonoDelta::FromSeconds(2));
+
+ // Major delta compaction of the DRS should obtain a non-zero score.
+ NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+ RowSet::MAJOR_DELTA_COMPACTION)));
+
+ ASSERT_OK(rs->MajorCompactDeltaStores(nullptr, HistoryGcOpts::Disabled()));
+ ASSERT_EQ(0, dt.CountUndoDeltaStores());
+ ASSERT_EQ(1, dt.CountRedoDeltaStores());
+ }
+}
+
TEST_F(TestRowSet, TestGCAncientStores) {
// Disable lazy open so that major delta compactions don't require manual
REDO initialization.
FLAGS_cfile_lazy_open = false;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 59831a135..f4756ce7f 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -559,7 +559,7 @@ Status DiskRowSet::MinorCompactDeltaStores(const IOContext*
io_context) {
Status DiskRowSet::MajorCompactDeltaStores(const IOContext* io_context,
HistoryGcOpts history_gc_opts) {
vector<ColumnId> col_ids;
- delta_tracker_->GetColumnIdsWithUpdates(&col_ids);
+ delta_tracker_->GetColumnIdsToCompact(&col_ids);
if (col_ids.empty()) {
VLOG_WITH_PREFIX(2) << "There are no column ids with updates";
@@ -846,10 +846,9 @@ double
DiskRowSet::DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
}
if (type == RowSet::MAJOR_DELTA_COMPACTION) {
- vector<ColumnId> col_ids_with_updates;
- delta_tracker_->GetColumnIdsWithUpdates(&col_ids_with_updates);
- // If we have files but no updates, we don't want to major compact.
- if (!col_ids_with_updates.empty()) {
+ vector<ColumnId> col_ids_for_compact;
+ delta_tracker_->GetColumnIdsToCompact(&col_ids_for_compact);
+ if (!col_ids_for_compact.empty()) {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
double ratio = static_cast<double>(drss.redo_deltas_size) /
drss.base_data_size;