This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 2de290d KUDU-3344: catalog manager clean up metadata for deleted
tables/tablets
2de290d is described below
commit 2de290d1b342696cd22c0a800aee5b1bf684a0fa
Author: Zhang Yifan <[email protected]>
AuthorDate: Fri Jan 7 21:02:47 2022 +0800
KUDU-3344: catalog manager clean up metadata for deleted tables/tablets
Kudu masters now retain metadata for deleted tables and tablets forever, and
the leader master loads all of them into memory when starts. If we have a
lot
of tables and tablets in a cluster, memory usage of the leader master will
be
large and it will take a long time to start the leader master. Consider that
in many cases users drop tables and partitions, useless metadata should be
cleaned up in backgroud tasks.
But it's hard to decide when we should clean them up, because the deletion
of
tablet replicas is asynchronous. If metadata is deleted before the tablet
data
is deleted, the unknown tablet reported by a tablet server will not be
processed
by catalog manager and we must delete it manually. So this patch add a new
flag
'deleted_table_and_tablet_reserved_secs', its default value is the same as
'unresponsive_ts_rpc_timeout_ms', we can roughly assume that after this
amount
of time the tablet data will be actually deleted and it's safe to delete its
metadata entry from sys.catalog table and in-memory map.
Change-Id: Idefa2ee2f5108ba913fe0057a4061c3c28351547
Reviewed-on: http://gerrit.cloudera.org:8080/18152
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client.cc | 1 +
src/kudu/integration-tests/master-stress-test.cc | 26 +++-
.../integration-tests/ts_tablet_manager-itest.cc | 2 +-
src/kudu/master/auto_rebalancer.cc | 2 +-
src/kudu/master/catalog_manager.cc | 163 +++++++++++++++++++--
src/kudu/master/catalog_manager.h | 23 ++-
src/kudu/master/master-test.cc | 68 ++++++++-
src/kudu/master/master.proto | 6 +
8 files changed, 267 insertions(+), 24 deletions(-)
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 4b22424..2e37713 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -562,6 +562,7 @@ Status
KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers)
}
Status KuduClient::ListTables(vector<string>* tables, const string& filter) {
+ tables->clear();
vector<Data::TableInfo> tables_info;
RETURN_NOT_OK(data_->ListTablesWithInfo(this, &tables_info, filter));
for (auto& info : tables_info) {
diff --git a/src/kudu/integration-tests/master-stress-test.cc
b/src/kudu/integration-tests/master-stress-test.cc
index 58bf519..1889ff4 100644
--- a/src/kudu/integration-tests/master-stress-test.cc
+++ b/src/kudu/integration-tests/master-stress-test.cc
@@ -22,6 +22,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -116,7 +117,7 @@ static const MonoDelta kTransientStateBackoff =
MonoDelta::FromMilliseconds(50);
// Parameterized based on HmsMode.
class MasterStressTest : public ExternalMiniClusterITestBase,
- public ::testing::WithParamInterface<HmsMode> {
+ public
::testing::WithParamInterface<std::tuple<HmsMode, bool>> {
public:
MasterStressTest()
: done_(1),
@@ -146,10 +147,19 @@ class MasterStressTest : public
ExternalMiniClusterITestBase,
opts.start_process_timeout = MonoDelta::FromSeconds(60);
opts.rpc_negotiation_timeout = MonoDelta::FromSeconds(30);
- opts.hms_mode = GetParam();
+ opts.hms_mode = std::get<0>(GetParam());
// Tune down the notification log poll period in order to speed up catalog
convergence.
opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
+ if (std::get<1>(GetParam())) {
+ // Set shorter intervals to trigger frequent cleanup tasks.
+ opts.extra_master_flags.emplace_back(
+ "--enable_metadata_cleanup_for_deleted_tables_and_tablets=true");
+
opts.extra_master_flags.emplace_back("--catalog_manager_bg_task_wait_ms=10");
+ opts.extra_master_flags.emplace_back(
+ "--metadata_for_deleted_table_and_tablet_reserved_secs=0");
+ }
+
// Set max missed heartbeats periods to 1.0 (down from 3.0).
opts.extra_master_flags.emplace_back("--leader_failure_max_missed_heartbeat_periods=1.0");
@@ -205,7 +215,7 @@ class MasterStressTest : public
ExternalMiniClusterITestBase,
new MasterServiceProxy(cluster_->messenger(), addr, addr.host()));
ASSERT_OK(CreateTabletServerMap(m_proxy, cluster_->messenger(), &ts_map_));
- if (GetParam() == HmsMode::ENABLE_METASTORE_INTEGRATION) {
+ if (std::get<0>(GetParam()) == HmsMode::ENABLE_METASTORE_INTEGRATION) {
thrift::ClientOptions hms_opts;
hms_opts.service_principal = "hive";
hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
@@ -525,10 +535,12 @@ class MasterStressTest : public
ExternalMiniClusterITestBase,
std::unordered_map<string, itest::TServerDetails*> ts_map_;
};
-// Run the test with the HMS integration enabled and disabled.
-INSTANTIATE_TEST_SUITE_P(HmsConfigurations, MasterStressTest,
::testing::ValuesIn(
- vector<HmsMode> { HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION }
-));
+INSTANTIATE_TEST_SUITE_P(
+ CatalogManagerConfigurations,
+ MasterStressTest,
+ ::testing::Combine(/*hms_mode*/ ::testing::ValuesIn(vector<HmsMode>{
+ HmsMode::NONE,
HmsMode::ENABLE_METASTORE_INTEGRATION}),
+ /*enable_metadata_cleanup_for_deleted_table(t)s*/
::testing::Bool()));
TEST_P(MasterStressTest, Test) {
OverrideFlagForSlowTests("num_create_table_threads", "10");
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2081cb8..57e6e6b 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -761,7 +761,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
ASSERT_OK(l.first_failed_status());
// Get the TableInfo.
vector<scoped_refptr<TableInfo>> table_infos;
- ASSERT_OK(catalog->GetAllTables(&table_infos));
+ catalog->GetAllTables(&table_infos);
ASSERT_EQ(1, table_infos.size());
// Run the check function.
NO_FATALS(check_function(table_infos[0].get(), live_row_count));
diff --git a/src/kudu/master/auto_rebalancer.cc
b/src/kudu/master/auto_rebalancer.cc
index 55b9f5c..789d75f 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -488,7 +488,7 @@ Status AutoRebalancerTask::BuildClusterRawInfo(
{
CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
RETURN_NOT_OK(leader_lock.first_failed_status());
- RETURN_NOT_OK(catalog_manager_->GetAllTables(&table_infos));
+ catalog_manager_->GetAllTables(&table_infos);
}
table_summaries.reserve(table_infos.size());
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 0a2a830..3f5333f 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -378,6 +378,25 @@ DEFINE_double(table_write_limit_ratio, 0.95,
"Set the ratio of how much write limit can be reached");
TAG_FLAG(table_write_limit_ratio, experimental);
+DEFINE_bool(enable_metadata_cleanup_for_deleted_tables_and_tablets, false,
+ "Whether to clean up metadata for deleted tables and tablets from
master's "
+ "in-memory map and the 'sys.catalog' table.");
+TAG_FLAG(enable_metadata_cleanup_for_deleted_tables_and_tablets, experimental);
+TAG_FLAG(enable_metadata_cleanup_for_deleted_tables_and_tablets, runtime);
+
+DEFINE_int32(metadata_for_deleted_table_and_tablet_reserved_secs, 60 * 60,
+ "After this amount of time, the metadata of a deleted
table/tablet will be "
+ "cleaned up if
--enable_metadata_cleanup_for_deleted_tables_and_tablets=true.");
+TAG_FLAG(metadata_for_deleted_table_and_tablet_reserved_secs, experimental);
+TAG_FLAG(metadata_for_deleted_table_and_tablet_reserved_secs, runtime);
+
+DEFINE_bool(enable_chunked_tablet_writes, true,
+ "Whether to split tablet actions into chunks when persisting them
in sys.catalog "
+ "table. If disabled, any update of the sys.catalog table will be
rejected if exceeds "
+ "--rpc_max_message_size.");
+TAG_FLAG(enable_chunked_tablet_writes, experimental);
+TAG_FLAG(enable_chunked_tablet_writes, runtime);
+
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int64(tsk_rotation_seconds);
DECLARE_string(ranger_config_path);
@@ -778,6 +797,26 @@ void CatalogManagerBgTasks::Run() {
}
}
+ if (FLAGS_enable_metadata_cleanup_for_deleted_tables_and_tablets) {
+ vector<scoped_refptr<TableInfo>> deleted_tables;
+ vector<scoped_refptr<TabletInfo>> deleted_tablets;
+ catalog_manager_->ExtractDeletedTablesAndTablets(&deleted_tables,
&deleted_tablets);
+ Status s = Status::OK();
+ // Clean up metadata for deleted tablets first and then clean up
metadata for deleted
+ // tables. This is the reverse of the order in which we load them.
So for any remaining
+ // tablet, the metadata of the table to which it belongs must exist.
+ const time_t now = time(nullptr);
+ if (!deleted_tablets.empty()) {
+ s = catalog_manager_->ProcessDeletedTablets(deleted_tablets, now);
+ }
+ if (s.ok() && !deleted_tables.empty()) {
+ s = catalog_manager_->ProcessDeletedTables(deleted_tables, now);
+ }
+ if (!s.ok()) {
+ LOG(ERROR) << "Error processing tables/tablets deletions: " <<
s.ToString();
+ }
+ }
+
// If this is the leader master, check if it's time to generate
// and store a new TSK (Token Signing Key).
Status s = catalog_manager_->TryGenerateNewTskUnlocked();
@@ -2393,8 +2432,10 @@ Status CatalogManager::DeleteTable(const
DeleteTableRequestPB& req,
}
TRACE("Modifying in-memory table state");
- string deletion_msg = "Table deleted at " + LocalTimeAsString();
+ const time_t timestamp = time(nullptr);
+ string deletion_msg = "Table deleted at " + TimestampAsString(timestamp);
l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
+ l.mutable_data()->pb.set_delete_timestamp(timestamp);
// 2. Look up the tablets, lock them, and mark them as deleted.
{
@@ -2408,6 +2449,7 @@ Status CatalogManager::DeleteTable(const
DeleteTableRequestPB& req,
for (const auto& t : tablets) {
t->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
+
t->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}
// 3. Update sys-catalog with the removed table and tablet state.
@@ -3207,7 +3249,8 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
LocalTimeAsString()));
}
- const string deletion_msg = "Partition dropped at " + LocalTimeAsString();
+ const time_t timestamp = time(nullptr);
+ const string deletion_msg = "Partition dropped at " +
TimestampAsString(timestamp);
TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);
@@ -3229,6 +3272,7 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
+
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}
actions.tablets_to_update = tablets_to_drop;
@@ -3472,7 +3516,9 @@ Status CatalogManager::ListTables(const
ListTablesRequestPB* req,
}
}
InsertOrUpdate(&table_info_by_name, table_name, table_info);
- EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
+ if (user) {
+ EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
+ }
}
MAYBE_INJECT_FIXED_LATENCY(FLAGS_catalog_manager_inject_latency_list_authz_ms);
@@ -3626,14 +3672,20 @@ Status CatalogManager::GetTableInfo(const string&
table_id, scoped_refptr<TableI
return Status::OK();
}
-Status CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
+void CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
leader_lock_.AssertAcquiredForReading();
tables->clear();
shared_lock<LockType> l(lock_);
AppendValuesFromMap(table_ids_map_, tables);
+}
- return Status::OK();
+void CatalogManager::GetAllTabletsForTests(vector<scoped_refptr<TabletInfo>>*
tablets) {
+ leader_lock_.AssertAcquiredForReading();
+
+ tablets->clear();
+ shared_lock<LockType> l(lock_);
+ AppendValuesFromMap(tablet_map_, tablets);
}
Status CatalogManager::TableNameExists(const string& table_name, bool* exists)
{
@@ -4646,9 +4698,9 @@ Status CatalogManager::ProcessTabletReport(
// It'd be unsafe to ask the tserver to delete this tablet without
first
// replicating something to our followers (i.e. to guarantee that we're
// the leader). For example, if we were a rogue master, we might be
- // deleting a tablet created by a new master accidentally. But masters
- // retain metadata for deleted tablets forever, so a tablet can only be
- // truly unknown in the event of a serious misconfiguration, such as a
+ // deleting a tablet created by a new master accidentally. Though
masters
+ // don't always retain metadata for deleted tablets forever, a tablet
+ // may be unknown in the event of a serious misconfiguration, such as a
// tserver heartbeating to the wrong cluster. Therefore, it should be
// reasonable to ignore it and wait for an operator fix the situation.
LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
@@ -5140,6 +5192,27 @@ void CatalogManager::ExtractTabletsToProcess(
}
}
+void CatalogManager::ExtractDeletedTablesAndTablets(
+ vector<scoped_refptr<TableInfo>>* deleted_tables,
+ vector<scoped_refptr<TabletInfo>>* deleted_tablets) {
+ shared_lock<LockType> l(lock_);
+ for (const auto& table_entry : table_ids_map_) {
+ scoped_refptr<TableInfo> table = table_entry.second;
+ TableMetadataLock table_lock(table.get(), LockMode::READ);
+ if (table_lock.data().is_deleted()) {
+ deleted_tables->emplace_back(table);
+ }
+ }
+ for (const auto& tablet_entry : tablet_map_) {
+ scoped_refptr<TabletInfo> tablet = tablet_entry.second;
+ TableMetadataLock table_lock(tablet->table().get(), LockMode::READ);
+ TabletMetadataLock tablet_lock(tablet.get(), LockMode::READ);
+ if (tablet_lock.data().is_deleted() || table_lock.data().is_deleted()) {
+ deleted_tablets->emplace_back(tablet);
+ }
+ }
+}
+
// Check if it's time to roll TokenSigner's key. There's a bit of subtlety
here:
// we shouldn't start exporting a key until it is properly persisted.
// So, the protocol is:
@@ -5256,10 +5329,11 @@ void CatalogManager::HandleAssignCreatingTablet(const
scoped_refptr<TabletInfo>&
tablet->ToString(), replacement->id());
// Mark old tablet as replaced.
+ const time_t timestamp = time(nullptr);
tablet->mutable_metadata()->mutable_dirty()->set_state(
- SysTabletsEntryPB::REPLACED,
- Substitute("Replaced by $0 at $1",
- replacement->id(), LocalTimeAsString()));
+ SysTabletsEntryPB::REPLACED,
+ Substitute("Replaced by $0 at $1", replacement->id(),
TimestampAsString(timestamp)));
+
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
// Mark new tablet as being created.
replacement->mutable_metadata()->mutable_dirty()->set_state(
@@ -5503,6 +5577,72 @@ void CatalogManager::SendCreateTabletRequest(const
scoped_refptr<TabletInfo>& ta
}
}
+Status CatalogManager::ProcessDeletedTablets(const
vector<scoped_refptr<TabletInfo>>& tablets,
+ time_t current_timestamp) {
+ TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);
+ tablets_lock.AddMutableInfos(tablets);
+ tablets_lock.Lock(LockMode::WRITE);
+
+ vector<scoped_refptr<TabletInfo>> tablets_to_clean_up;
+ for (const auto& tablet : tablets) {
+ if (current_timestamp - tablet->metadata().state().pb.delete_timestamp() >
+ FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs) {
+ tablets_to_clean_up.emplace_back(tablet);
+ }
+ }
+ // Persist the changes to the sys.catalog table.
+ SysCatalogTable::Actions actions;
+ actions.tablets_to_delete = tablets_to_clean_up;
+ const auto write_mode = FLAGS_enable_chunked_tablet_writes ?
SysCatalogTable::WriteMode::CHUNKED
+ :
SysCatalogTable::WriteMode::ATOMIC;
+ Status s = sys_catalog_->Write(std::move(actions), write_mode);
+ if (PREDICT_FALSE(!s.ok())) {
+ s = s.CloneAndPrepend("an error occurred while writing to the
sys-catalog");
+ LOG(WARNING) << s.ToString();
+ return s;
+ }
+ // Remove expired tablets from the global map.
+ {
+ std::lock_guard<LockType> l(lock_);
+ for (const auto& t : tablets_to_clean_up) {
+ DCHECK(ContainsKey(tablet_map_, t->id()));
+ tablet_map_.erase(t->id());
+ VLOG(1) << "Cleaned up deleted tablet: " << t->id();
+ }
+ }
+ tablets_lock.Unlock();
+ return Status::OK();
+}
+
+Status CatalogManager::ProcessDeletedTables(const
vector<scoped_refptr<TableInfo>>& tables,
+ time_t current_timestamp) {
+ TableMetadataGroupLock tables_lock(LockMode::RELEASED);
+ tables_lock.AddMutableInfos(tables);
+ tables_lock.Lock(LockMode::WRITE);
+
+ for (const auto& table : tables) {
+ if (current_timestamp - table->metadata().state().pb.delete_timestamp() >
+ FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs) {
+ SysCatalogTable::Actions action;
+ action.table_to_delete = table;
+ Status s = sys_catalog_->Write(std::move(action));
+ if (PREDICT_FALSE(!s.ok())) {
+ s = s.CloneAndPrepend("an error occurred while writing to the
sys-catalog");
+ LOG(WARNING) << s.ToString();
+ return s;
+ }
+
+ std::lock_guard<LockType> l(lock_);
+ DCHECK(ContainsKey(table_ids_map_, table->id()));
+ table_ids_map_.erase(table->id());
+ VLOG(1) << "Cleaned up deleted table: " << table->ToString();
+ }
+ }
+
+ tables_lock.Unlock();
+ return Status::OK();
+}
+
Status CatalogManager::BuildLocationsForTablet(
const scoped_refptr<TabletInfo>& tablet,
ReplicaTypeFilter filter,
@@ -5663,6 +5803,7 @@ Status CatalogManager::ReplaceTablet(const string&
tablet_id, ReplaceTabletRespo
const string replace_msg = Substitute("replaced by tablet $0",
new_tablet->id());
old_tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
replace_msg);
+
old_tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(time(nullptr));
// Persist the changes to the syscatalog table.
{
diff --git a/src/kudu/master/catalog_manager.h
b/src/kudu/master/catalog_manager.h
index 5e14bc2..0379ec1 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -17,6 +17,8 @@
#pragma once
+#include <sys/types.h>
+
#include <atomic>
#include <cstdint>
#include <functional>
@@ -767,7 +769,7 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// the catalog manager is not yet running. Caller must hold leader_lock_.
//
// NOTE: This should only be used by tests or web-ui
- Status GetAllTables(std::vector<scoped_refptr<TableInfo>>* tables);
+ void GetAllTables(std::vector<scoped_refptr<TableInfo>>* tables);
// Check if a table exists by name, setting 'exist' appropriately. May fail
// if the catalog manager is not yet running. Caller must hold leader_lock_.
@@ -848,6 +850,9 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// This test exclusively acquires the leader_lock_ directly.
FRIEND_TEST(kudu::client::ServiceUnavailableRetryClientTest, CreateTable);
+ // // This test calls GetAllTabletsForTests() directly.
+ FRIEND_TEST(MasterTest, TestDeletedTablesAndTabletsCleanup);
+
friend class AutoRebalancerTest;
friend class TableLoader;
friend class TabletLoader;
@@ -855,6 +860,8 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
typedef std::unordered_map<std::string, scoped_refptr<TableInfo>>
TableInfoMap;
typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>>
TabletInfoMap;
+ void GetAllTabletsForTests(std::vector<scoped_refptr<TabletInfo>>* tablets);
+
// Check whether the table's write limit is reached,
// if true, the write permission should be disabled.
static bool IsTableWriteDisabled(const scoped_refptr<TableInfo>& table,
@@ -1037,6 +1044,10 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// Extract the set of tablets that must be processed because not running yet.
void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>*
tablets_to_process);
+ // Extract the set of tables and tablets that have been deleted or replaced.
+ void ExtractDeletedTablesAndTablets(std::vector<scoped_refptr<TableInfo>>*
deleted_tables,
+ std::vector<scoped_refptr<TabletInfo>>*
deleted_tablets);
+
// Check if it's time to generate a new Token Signing Key for TokenSigner.
// If so, generate one and persist it into the system table. After that,
// push it into the TokenSigner's key queue.
@@ -1128,6 +1139,16 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
void ResetTableLocationsCache();
+ // Task that takes care of deleted tables, is called in a backgroud thread.
+ // Clean up the metadata of tables if the time since they were deleted has
passed
+ // 'FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs'.
+ Status ProcessDeletedTables(const std::vector<scoped_refptr<TableInfo>>&
tables,
+ time_t current_timestamp);
+
+ // Tasks that takes care of delete tablets, similar to
'ProcessDeletedTables'.
+ Status ProcessDeletedTablets(const std::vector<scoped_refptr<TabletInfo>>&
tablets,
+ time_t current_timestamp);
+
std::string GenerateId() { return oid_generator_.Next(); }
// Conventional "T xxx P yyy: " prefix for logging.
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 537686b..cf971c8 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -120,13 +120,16 @@ using std::vector;
using strings::Substitute;
DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
+DECLARE_bool(enable_metadata_cleanup_for_deleted_tables_and_tablets);
DECLARE_bool(enable_per_range_hash_schemas);
DECLARE_bool(master_client_location_assignment_enabled);
DECLARE_bool(master_support_authz_tokens);
DECLARE_bool(mock_table_metrics_for_testing);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_double(sys_catalog_fail_during_write);
+DECLARE_int32(catalog_manager_bg_task_wait_ms);
DECLARE_int32(default_num_replicas);
+DECLARE_int32(metadata_for_deleted_table_and_tablet_reserved_secs);
DECLARE_int32(diagnostics_log_stack_traces_interval_ms);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
@@ -2397,7 +2400,7 @@ TEST_F(MasterTest, TestDuplicateRequest) {
vector<scoped_refptr<TableInfo>> tables;
{
CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
- ASSERT_OK(master_->catalog_manager()->GetAllTables(&tables));
+ master_->catalog_manager()->GetAllTables(&tables);
ASSERT_EQ(1, tables.size());
}
@@ -2472,7 +2475,7 @@ TEST_F(MasterTest, TestHideLiveRowCountInTableMetrics) {
vector<scoped_refptr<TableInfo>> tables;
{
CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
- ASSERT_OK(master_->catalog_manager()->GetAllTables(&tables));
+ master_->catalog_manager()->GetAllTables(&tables);
ASSERT_EQ(1, tables.size());
}
vector<scoped_refptr<TabletInfo>> tablets;
@@ -2539,7 +2542,7 @@ TEST_F(MasterTest, TestTableStatisticsWithOldVersion) {
vector<scoped_refptr<TableInfo>> tables;
{
CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
- ASSERT_OK(master_->catalog_manager()->GetAllTables(&tables));
+ master_->catalog_manager()->GetAllTables(&tables);
ASSERT_EQ(1, tables.size());
}
const auto& table = tables[0];
@@ -2565,6 +2568,65 @@ TEST_F(MasterTest, TestTableStatisticsWithOldVersion) {
}
}
+TEST_F(MasterTest, TestDeletedTablesAndTabletsCleanup) {
+ FLAGS_enable_metadata_cleanup_for_deleted_tables_and_tablets = true;
+ FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs = 1;
+ FLAGS_catalog_manager_bg_task_wait_ms = 10;
+
+ constexpr const char* const kTableName = "testtable";
+ const Schema kTableSchema({ColumnSchema("key", INT32)}, 1);
+ ASSERT_OK(CreateTable(kTableName, kTableSchema));
+
+ vector<scoped_refptr<TableInfo>> tables;
+ vector<scoped_refptr<TabletInfo>> tablets;
+ {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ master_->catalog_manager()->GetAllTables(&tables);
+ ASSERT_EQ(1, tables.size());
+ master_->catalog_manager()->GetAllTabletsForTests(&tablets);
+ ASSERT_EQ(3, tablets.size());
+ }
+
+ // Replace a tablet.
+ {
+ const string& tablet_id = tablets[0]->id();
+ ReplaceTabletRequestPB req;
+ ReplaceTabletResponsePB resp;
+ req.set_tablet_id(tablet_id);
+ RpcController controller;
+ ASSERT_OK(proxy_->ReplaceTablet(req, &resp, &controller));
+ SCOPED_TRACE(SecureDebugString(resp));
+ ASSERT_FALSE(resp.has_error());
+ }
+
+ // The replaced tablet should be cleaned up.
+ ASSERT_EVENTUALLY([&] {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ master_->catalog_manager()->GetAllTabletsForTests(&tablets);
+ ASSERT_EQ(3, tablets.size());
+ });
+
+ // Delete the table.
+ {
+ DeleteTableRequestPB req;
+ DeleteTableResponsePB resp;
+ RpcController controller;
+ req.mutable_table()->set_table_name(kTableName);
+ ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+ SCOPED_TRACE(SecureDebugString(resp));
+ ASSERT_FALSE(resp.has_error());
+ }
+
+ // The deleted table and tablets should be cleaned up.
+ ASSERT_EVENTUALLY([&] {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ master_->catalog_manager()->GetAllTables(&tables);
+ ASSERT_EQ(0, tables.size());
+ master_->catalog_manager()->GetAllTabletsForTests(&tablets);
+ ASSERT_EQ(0, tablets.size());
+ });
+}
+
class AuthzTokenMasterTest : public MasterTest,
public ::testing::WithParamInterface<bool> {};
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 2967f24..bfccafe 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -151,6 +151,9 @@ message SysTabletsEntryPB {
// The dimension label for the tablet. Used for dimension-specific
// placement of the tablet's replicas.
optional string dimension_label = 8;
+
+ // The delete time of the tablet, in seconds since the epoch.
+ optional int64 delete_timestamp = 9;
}
// The on-disk entry in the sys.catalog table ("metadata" column) for
@@ -215,6 +218,9 @@ message SysTablesEntryPB {
// The comment on the table.
optional string comment = 17;
+
+ // The delete time of the table, in seconds since the epoch.
+ optional int64 delete_timestamp = 18;
}
// The on-disk entry in the sys.catalog table ("metadata" column) to represent