This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to tag kudu-1.12.0-mdh1.0.0-4c2c075-centos-release in repository https://gitbox.apache.org/repos/asf/kudu.git
commit cdb3e186c4ee8fd8e484cd30acb7a757734da420 Author: Yingchun Lai <[email protected]> AuthorDate: Mon Sep 23 21:46:51 2019 +0800 [master] reserve table for a period time after being deleted Change-Id: I2df75bdfb3288e6cfe9a14714db7eabe046ba577 --- src/kudu/client/client-internal.cc | 24 ++- src/kudu/client/client-internal.h | 9 +- src/kudu/client/client-test.cc | 207 +++++++++++++++++++++++++ src/kudu/client/client.cc | 24 ++- src/kudu/client/client.h | 35 ++++- src/kudu/client/master_proxy_rpc.cc | 3 + src/kudu/client/table_alterer-internal.cc | 1 + src/kudu/client/table_alterer-internal.h | 2 + src/kudu/common/common.proto | 3 + src/kudu/common/wire_protocol.cc | 56 +++++-- src/kudu/common/wire_protocol.h | 12 +- src/kudu/integration-tests/alter_table-test.cc | 2 +- src/kudu/master/catalog_manager.cc | 109 +++++++++++-- src/kudu/master/catalog_manager.h | 31 +++- src/kudu/master/master-test.cc | 90 +++++++++++ src/kudu/master/master.cc | 58 +++++++ src/kudu/master/master.h | 13 ++ src/kudu/master/master.proto | 21 +++ src/kudu/master/master_service.cc | 76 ++++++++- src/kudu/master/master_service.h | 6 + src/kudu/server/server_base.cc | 4 +- src/kudu/server/server_base.h | 3 +- src/kudu/tools/kudu-admin-test.cc | 4 +- src/kudu/tools/kudu-tool-test.cc | 53 +++++++ src/kudu/tools/tool_action_table.cc | 37 ++++- 25 files changed, 839 insertions(+), 44 deletions(-) diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc index 9243836..06b303a 100644 --- a/src/kudu/client/client-internal.cc +++ b/src/kudu/client/client-internal.cc @@ -103,6 +103,8 @@ using master::IsCreateTableDoneRequestPB; using master::IsCreateTableDoneResponsePB; using master::ListTabletServersResponsePB; using master::ListTabletServersRequestPB; +using master::RecallDeletedTableRequestPB; +using master::RecallDeletedTableResponsePB; using master::MasterFeatures; using master::MasterServiceProxy; using master::TableIdentifierPB; @@ -346,12 +348,16 @@ Status KuduClient::Data::WaitForCreateTableToFinish( Status KuduClient::Data::DeleteTable(KuduClient* client, const string& table_name, const MonoTime& deadline, - bool modify_external_catalogs) { + bool modify_external_catalogs, + bool force_on_trashed_table, + uint32_t reserve_seconds) { DeleteTableRequestPB req; DeleteTableResponsePB resp; req.mutable_table()->set_table_name(table_name); req.set_modify_external_catalogs(modify_external_catalogs); + req.set_force_on_trashed_table(force_on_trashed_table); + req.set_reserve_seconds(reserve_seconds); Synchronizer sync; AsyncLeaderMasterRpc<DeleteTableRequestPB, DeleteTableResponsePB> rpc( deadline, client, BackoffType::EXPONENTIAL, req, &resp, @@ -360,6 +366,22 @@ Status KuduClient::Data::DeleteTable(KuduClient* client, return sync.Wait(); } +Status KuduClient::Data::RecallTable(KuduClient* client, + const std::string& table_name, + const MonoTime& deadline) { + RecallDeletedTableRequestPB req; + RecallDeletedTableResponsePB resp; + + req.mutable_table()->set_table_name(table_name); + Synchronizer sync; + AsyncLeaderMasterRpc<RecallDeletedTableRequestPB, RecallDeletedTableResponsePB> rpc( + deadline, client, BackoffType::EXPONENTIAL, req, &resp, + &MasterServiceProxy::RecallDeletedTableAsync, "RecallDeletedTable", sync.AsStatusCallback(), + {}); + rpc.SendRpc(); + return sync.Wait(); +} + Status KuduClient::Data::AlterTable(KuduClient* client, const AlterTableRequestPB& req, AlterTableResponsePB* resp, diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h index bf5b3ce..774eb5d 100644 --- a/src/kudu/client/client-internal.h +++ b/src/kudu/client/client-internal.h @@ -115,7 +115,14 @@ class KuduClient::Data { static Status DeleteTable(KuduClient* client, const std::string& table_name, const MonoTime& deadline, - bool modify_external_catalogs = true); + bool modify_external_catalogs = true, + bool force_on_trashed_table = false, + uint32_t reserve_seconds = 0); + + + static Status RecallTable(KuduClient* client, + const std::string& table_name, + const MonoTime& deadline); static Status AlterTable(KuduClient* client, const master::AlterTableRequestPB& req, diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 6961ff3..5499c9b 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -132,6 +132,7 @@ DECLARE_bool(master_support_connect_to_master_rpc); DECLARE_bool(mock_table_metrics_for_testing); DECLARE_bool(rpc_trace_negotiation); DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan); +DECLARE_int32(check_outdated_table_interval_seconds); DECLARE_int32(flush_threshold_mb); DECLARE_int32(flush_threshold_secs); DECLARE_int32(heartbeat_interval_ms); @@ -215,6 +216,7 @@ class ClientTest : public KuduTest { // Reduce the TS<->Master heartbeat interval FLAGS_heartbeat_interval_ms = 10; FLAGS_scanner_gc_check_interval_us = 50 * 1000; // 50 milliseconds. + FLAGS_check_outdated_table_interval_seconds = 1; SetLocationMappingCmd(); @@ -4250,6 +4252,19 @@ TEST_F(ClientTest, TestBasicAlterOperations) { ASSERT_NE(boost::none, tablet_replica->tablet()->metadata()->extra_config()); ASSERT_FALSE(tablet_replica->tablet()->metadata()->extra_config()->has_history_max_age_sec()); } + // 4. Try to alter internal config. + { + map<string, string> extra_configs; + extra_configs[kTableConfigReserveSeconds] = "60"; + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); + table_alterer->AlterExtraConfig(extra_configs); + Status s = table_alterer->Alter(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), "forbidden to change internal extra configuration by user"); + ASSERT_EQ(11, tablet_replica->tablet()->metadata()->schema_version()); + ASSERT_NE(boost::none, tablet_replica->tablet()->metadata()->extra_config()); + ASSERT_FALSE(tablet_replica->tablet()->metadata()->extra_config()->has_reserve_seconds()); + } // Test changing a table name. { @@ -4320,6 +4335,198 @@ TEST_F(ClientTest, TestDeleteTable) { NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10)); } +TEST_F(ClientTest, TestDeleteAndReserveTable) { + // Open the table before deleting it. + ASSERT_OK(client_->OpenTable(kTableName, &client_table_)); + + // Insert a few rows, and scan them back. This is to populate the MetaCache. + NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10)); + vector<string> rows; + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(10, rows.size()); + + // Remove the table. + // NOTE that it returns when the operation is completed on the master side + string tablet_id = GetFirstTabletId(client_table_.get()); + ASSERT_OK(client_->DeleteTable(kTableName, false, 60)); + CatalogManager* catalog_manager = cluster_->mini_master()->master()->catalog_manager(); + { + CatalogManager::ScopedLeaderSharedLock l(catalog_manager); + ASSERT_OK(l.first_failed_status()); + bool exists; + ASSERT_OK(catalog_manager->TableNameExists(kTableName, &exists)); + ASSERT_FALSE(exists); + } + + // Exist tablet is still visible. + scoped_refptr<TabletReplica> tablet_replica; + ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet( + tablet_id, &tablet_replica)); + + // Try to open the deleted table. + Status s = client_->OpenTable(kTableName, &client_table_); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), "the table does not exist"); + + // Old table has been renamed. + vector<string> tables; + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_EQ(1, tables.size()); + string trashed_table_name = tables[0]; + string origin_table_name; + WallTime mark_delete_time; + ASSERT_TRUE(catalog_manager->GetOriginNameAndDeleteTimeOfTrashedTable(trashed_table_name, + &origin_table_name, + &mark_delete_time)); + ASSERT_EQ(string(kTableName), origin_table_name); + + // Alter trashed table is not allowed. + { + // Not allowed to rename. + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(trashed_table_name)); + table_alterer->RenameTo(kTableName); + s = table_alterer->Alter(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), Substitute("trashed table $0 should not be altered", + trashed_table_name)); + } + + { + // Not allowed to add column. + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(trashed_table_name)); + table_alterer->AddColumn("new_column")->Type(KuduColumnSchema::INT32); + s = table_alterer->Alter(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), Substitute("trashed table $0 should not be altered", + trashed_table_name)); + } + + { + // Not allowed to delete. + s = client_->DeleteTable(trashed_table_name); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), Substitute("trashed table $0 should not be deleted", + trashed_table_name)); + } + + { + // Not allowed to set extra configs. + map<string, string> extra_configs; + extra_configs[kTableMaintenancePriority] = "3"; + unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(trashed_table_name)); + table_alterer->AlterExtraConfig(extra_configs); + s = table_alterer->Alter(); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), Substitute("trashed table $0 should not be altered", + trashed_table_name)); + + // Alter trashed table is allowed on force. + table_alterer->force_on_trashed_table(true); + ASSERT_OK(table_alterer->Alter()); + } + + { + // Write and read are allowed for trashed table. + NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 20, 10)); + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(30, rows.size()); + } + + // Create a new table with the same name. + NO_FATALS(CreateTable(kTableName, 1, GenerateSplitRows(), {}, &client_table_)); + + // Two tables exist now. + tables.clear(); + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_EQ(2, tables.size()); + std::sort(tables.begin(), tables.end()); + ASSERT_EQ(string(kTableName), tables[0]); + ASSERT_EQ(string(trashed_table_name), tables[1]); + + // Should be able to insert successfully into the new table. + NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10)); + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(10, rows.size()); + + // Force to delete the trashed table. + ASSERT_OK(client_->DeleteTable(trashed_table_name, true)); + + // Only one table left. + tables.clear(); + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_EQ(1, tables.size()); + ASSERT_EQ(kTableName, tables[0]); +} + +TEST_F(ClientTest, TestDeleteAndRecallTable) { + // Open the table before deleting it. + ASSERT_OK(client_->OpenTable(kTableName, &client_table_)); + + // Insert a few rows, and scan them back. This is to populate the MetaCache. + NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10)); + vector<string> rows; + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(10, rows.size()); + + // Remove the table + ASSERT_OK(client_->DeleteTable(kTableName, false, 60)); + ASSERT_EVENTUALLY([&] () { + Status s = client_->OpenTable(kTableName, &client_table_); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), "the table does not exist"); + }); + + // Recall and reopen table. + vector<string> tables; + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_EQ(1, tables.size()); + ASSERT_OK(client_->RecallTable(tables[0])); + ASSERT_OK(client_->OpenTable(kTableName, &client_table_)); + + // Check data from table. + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(10, rows.size()); +} + +TEST_F(ClientTest, TestDeleteAndRecallAfterReserveTimeTable) { + // Open the table before deleting it. + ASSERT_OK(client_->OpenTable(kTableName, &client_table_)); + + // Insert a few rows, and scan them back. This is to populate the MetaCache. + NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10)); + vector<string> rows; + ScanTableToStrings(client_table_.get(), &rows); + ASSERT_EQ(10, rows.size()); + + // Remove the table + ASSERT_OK(client_->DeleteTable(kTableName, false, 2)); + ASSERT_EVENTUALLY([&] () { + Status s = client_->OpenTable(kTableName, &client_table_); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), "the table does not exist"); + }); + + vector<string> tables; + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_EQ(1, tables.size()); + + // Wait util the table is removed completely. + ASSERT_EVENTUALLY([&] () { + Status s = client_->OpenTable(tables[0], &client_table_); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), "the table does not exist"); + }); + + // Try to recall the table. + Status s = client_->RecallTable(tables[0]); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), "the table does not exist"); + + tables.clear(); + ASSERT_OK(client_->ListTables(&tables)); + ASSERT_TRUE(tables.empty()); +} + TEST_F(ClientTest, TestGetTableSchema) { KuduSchema schema; diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index d4b607f..6104ca9 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -411,14 +411,24 @@ Status KuduClient::IsCreateTableInProgress(const string& table_name, create_in_progress); } -Status KuduClient::DeleteTable(const string& table_name) { - return DeleteTableInCatalogs(table_name, true); +Status KuduClient::DeleteTable(const string& table_name, + bool force_on_trashed_table, + uint32_t reserve_seconds) { + return DeleteTableInCatalogs(table_name, true, force_on_trashed_table, reserve_seconds); } Status KuduClient::DeleteTableInCatalogs(const string& table_name, - bool modify_external_catalogs) { + bool modify_external_catalogs, + bool force_on_trashed_table, + uint32_t reserve_seconds) { MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout(); - return data_->DeleteTable(this, table_name, deadline, modify_external_catalogs); + return data_->DeleteTable(this, table_name, deadline, + modify_external_catalogs, force_on_trashed_table, reserve_seconds); +} + +Status KuduClient::RecallTable(const string& table_name) { + MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout(); + return data_->RecallTable(this, table_name, deadline); } KuduTableAlterer* KuduClient::NewTableAlterer(const string& table_name) { @@ -1369,6 +1379,12 @@ KuduTableAlterer* KuduTableAlterer::modify_external_catalogs( return this; } +KuduTableAlterer* KuduTableAlterer::force_on_trashed_table( + bool force_on_trashed_table) { + data_->force_on_trashed_table_ = force_on_trashed_table; + return this; +} + Status KuduTableAlterer::Alter() { AlterTableRequestPB req; AlterTableResponsePB resp; diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index f8d16d8..2e54947 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -349,8 +349,14 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { /// /// @param [in] table_name /// Name of the table to drop. + /// @param [in] force_on_trashed_table + /// Whether to force to delete a trashed table. + /// @param [in] reserve_seconds + /// Reserve seconds after being deleted. /// @return Operation status. - Status DeleteTable(const std::string& table_name); + Status DeleteTable(const std::string& table_name, + bool force_on_trashed_table = false, + uint32_t reserve_seconds = 0); /// @cond PRIVATE_API @@ -363,9 +369,23 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { /// @param [in] modify_external_catalogs /// Whether to apply the deletion to external catalogs, such as the Hive Metastore, /// which the Kudu master has been configured to integrate with. + /// @param [in] force_on_trashed_table + /// Whether to force to delete a trashed table. + /// @param [in] reserve_seconds + /// Reserve seconds after being deleted. /// @return Operation status. Status DeleteTableInCatalogs(const std::string& table_name, - bool modify_external_catalogs) KUDU_NO_EXPORT; + bool modify_external_catalogs, + bool force_on_trashed_table = false, + uint32_t reserve_seconds = 0) KUDU_NO_EXPORT; + + /// Recall a deleted but still reserved table. + /// + /// @param [in] table_name + /// Name of the table to recall. + /// @return Operation status. + Status RecallTable(const std::string& table_name); + /// @endcond /// Create a KuduTableAlterer object. @@ -1464,6 +1484,17 @@ class KUDU_EXPORT KuduTableAlterer { /// @return Raw pointer to this alterer object. KuduTableAlterer* modify_external_catalogs(bool modify_external_catalogs) KUDU_NO_EXPORT; + /// @cond PRIVATE_API + + /// Force to alter a trashed table. + /// + /// Private API. + /// + /// @param [in] force_on_trashed_table + /// Whether to alter on a trashed table. + /// @return Raw pointer to this alterer object. + KuduTableAlterer* force_on_trashed_table(bool force_on_trashed_table) KUDU_NO_EXPORT; + /// @endcond /// @return Status of the ALTER TABLE operation. The return value diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc index 815482d..e8ed376 100644 --- a/src/kudu/client/master_proxy_rpc.cc +++ b/src/kudu/client/master_proxy_rpc.cc @@ -74,6 +74,8 @@ using master::ListTabletServersRequestPB; using master::ListTabletServersResponsePB; using master::MasterServiceProxy; using master::MasterErrorPB; +using master::RecallDeletedTableRequestPB; +using master::RecallDeletedTableResponsePB; using master::ReplaceTabletRequestPB; using master::ReplaceTabletResponsePB; using rpc::BackoffType; @@ -296,6 +298,7 @@ template class AsyncLeaderMasterRpc<GetTableStatisticsRequestPB, GetTableStatist template class AsyncLeaderMasterRpc<ListTablesRequestPB, ListTablesResponsePB>; template class AsyncLeaderMasterRpc<ListTabletServersRequestPB, ListTabletServersResponsePB>; template class AsyncLeaderMasterRpc<ListMastersRequestPB, ListMastersResponsePB>; +template class AsyncLeaderMasterRpc<RecallDeletedTableRequestPB, RecallDeletedTableResponsePB>; template class AsyncLeaderMasterRpc<ReplaceTabletRequestPB, ReplaceTabletResponsePB>; } // namespace internal diff --git a/src/kudu/client/table_alterer-internal.cc b/src/kudu/client/table_alterer-internal.cc index 846150a..290d24e 100644 --- a/src/kudu/client/table_alterer-internal.cc +++ b/src/kudu/client/table_alterer-internal.cc @@ -63,6 +63,7 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) { req->Clear(); req->set_modify_external_catalogs(modify_external_catalogs_); + req->set_force_on_trashed_table(force_on_trashed_table_); req->mutable_table()->set_table_name(table_name_); if (rename_to_) { req->set_new_table_name(rename_to_.get()); diff --git a/src/kudu/client/table_alterer-internal.h b/src/kudu/client/table_alterer-internal.h index 2cfa1de..1cb074a 100644 --- a/src/kudu/client/table_alterer-internal.h +++ b/src/kudu/client/table_alterer-internal.h @@ -87,6 +87,8 @@ class KuduTableAlterer::Data { // Metastore. The default value is true. bool modify_external_catalogs_ = true; + bool force_on_trashed_table_ = false; + private: DISALLOW_COPY_AND_ASSIGN(Data); }; diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index 559cd28..882e16c 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -446,4 +446,7 @@ message TableExtraConfigPB { // range [-FLAGS_max_priority_range, FLAGS_max_priority_range] when // calculate maintenance priority score. optional int32 maintenance_priority = 2; + + // Reserve seconds after the table has been deleted. + optional uint32 reserve_seconds = 3; } diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc index ef14d8b..821dbec 100644 --- a/src/kudu/common/wire_protocol.cc +++ b/src/kudu/common/wire_protocol.cc @@ -23,6 +23,7 @@ #include <cstdint> #include <cstring> #include <ostream> +#include <set> #include <string> #include <vector> @@ -41,6 +42,7 @@ #include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/fixedarray.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/fastmem.h" #include "kudu/gutil/strings/numbers.h" @@ -66,6 +68,7 @@ using google::protobuf::RepeatedPtrField; using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using std::map; +using std::set; using std::string; using std::vector; using strings::Substitute; @@ -646,8 +649,6 @@ Status ColumnPredicateFromPB(const Schema& schema, return Status::OK(); } -const char kTableHistoryMaxAgeSec[] = "kudu.table.history_max_age_sec"; -const char kTableMaintenancePriority[] = "kudu.table.maintenance_priority"; Status ExtraConfigPBToMap(const TableExtraConfigPB& pb, map<string, string>* configs) { Map<string, string> tmp; RETURN_NOT_OK(ExtraConfigPBToPBMap(pb, &tmp)); @@ -664,28 +665,61 @@ Status ParseInt32Config(const string& name, const string& value, int32_t* result return Status::OK(); } -Status ExtraConfigPBFromPBMap(const Map<string, string>& configs, TableExtraConfigPB* pb) { - TableExtraConfigPB result; - for (const auto& config : configs) { +Status ParseUint32Config(const string& name, const string& value, uint32_t* result) { + CHECK(result); + if (!safe_strtou32(value, result)) { + return Status::InvalidArgument(Substitute("unable to parse $0", name), value); + } + return Status::OK(); +} + +Status UpdateExtraConfigPB(const Map<string, string>& new_extra_configs, + bool external_request, + TableExtraConfigPB* pb) { + static const set<string> kSupportedConfigs({kTableHistoryMaxAgeSec, + kTableMaintenancePriority, + kTableConfigReserveSeconds}); + static const set<string> kInternalConfigs({kTableConfigReserveSeconds}); + for (const auto& config : new_extra_configs) { const string& name = config.first; const string& value = config.second; + if (!ContainsKey(kSupportedConfigs, name)) { + return Status::InvalidArgument( + Substitute("invalid extra configuration property: $0", name)); + } + if (external_request && ContainsKey(kInternalConfigs, name)) { + return Status::InvalidArgument( + Substitute("forbidden to change internal extra configuration by user, property: $0", name)); + } + if (name == kTableHistoryMaxAgeSec) { if (!value.empty()) { int32_t history_max_age_sec; RETURN_NOT_OK(ParseInt32Config(name, value, &history_max_age_sec)); - result.set_history_max_age_sec(history_max_age_sec); + pb->set_history_max_age_sec(history_max_age_sec); + } else { + pb->clear_history_max_age_sec(); } } else if (name == kTableMaintenancePriority) { if (!value.empty()) { int32_t maintenance_priority; RETURN_NOT_OK(ParseInt32Config(name, value, &maintenance_priority)); - result.set_maintenance_priority(maintenance_priority); + pb->set_maintenance_priority(maintenance_priority); + } else { + pb->clear_maintenance_priority(); + } + } else if (name == kTableConfigReserveSeconds) { + if (!value.empty()) { + uint32_t reserve_seconds; + RETURN_NOT_OK(ParseUint32Config(name, value, &reserve_seconds)); + pb->set_reserve_seconds(reserve_seconds); + } else { + pb->clear_reserve_seconds(); } } else { - LOG(WARNING) << "Unknown extra configuration property: " << name; + LOG(FATAL) << "unparsed property: " << name; } } - *pb = std::move(result); return Status::OK(); } @@ -697,6 +731,10 @@ Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb, Map<string, string>* c if (pb.has_maintenance_priority()) { result[kTableMaintenancePriority] = std::to_string(pb.maintenance_priority()); } + if (pb.has_reserve_seconds()) { + result[kTableConfigReserveSeconds] = std::to_string(pb.reserve_seconds()); + } + *configs = std::move(result); return Status::OK(); } diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h index 7ac76fd..d14d6a9 100644 --- a/src/kudu/common/wire_protocol.h +++ b/src/kudu/common/wire_protocol.h @@ -62,6 +62,10 @@ class ServerEntryPB; class ServerRegistrationPB; class TableExtraConfigPB; +static const std::string kTableHistoryMaxAgeSec = "kudu.table.history_max_age_sec"; +static const std::string kTableMaintenancePriority = "kudu.table.maintenance_priority"; +static const std::string kTableConfigReserveSeconds = "kudu.table.reserve_seconds"; + // Convert the given C++ Status object into the equivalent Protobuf. void StatusToPB(const Status& status, AppStatusPB* pb); @@ -147,9 +151,11 @@ Status ColumnPredicateFromPB(const Schema& schema, Status ExtraConfigPBToMap(const TableExtraConfigPB& pb, std::map<std::string, std::string>* configs); -// Convert the table's extra configuration protobuf::map to protobuf. -Status ExtraConfigPBFromPBMap(const google::protobuf::Map<std::string, std::string>& configs, - TableExtraConfigPB* pb); +// Update or insert the table's extra configuration according to protobuf::map. +Status UpdateExtraConfigPB( + const google::protobuf::Map<std::string, std::string>& new_extra_configs, + bool external_request, + TableExtraConfigPB* pb); // Parse int32_t type value from 'value', and store in 'result' when succeed. Status ParseInt32Config(const std::string& name, const std::string& value, int32_t* result); diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc index 714e923..99cbd2e 100644 --- a/src/kudu/integration-tests/alter_table-test.cc +++ b/src/kudu/integration-tests/alter_table-test.cc @@ -359,7 +359,7 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) { cluster_->mini_master()->master()->catalog_manager(); master::CatalogManager::ScopedLeaderSharedLock l(catalog); ASSERT_OK(l.first_failed_status()); - Status s = catalog->AlterTableRpc(req, &resp, /*rpc=*/nullptr); + Status s = catalog->AlterTableRpc(req, &resp, /*rpc=*/nullptr, true); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default"); } diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index af1b5f1..6aefb42 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -88,10 +88,11 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/escaping.h" #include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/sysinfo.h" #include "kudu/gutil/utf/utf.h" -#include "kudu/gutil/walltime.h" #include "kudu/hms/hms_catalog.h" #include "kudu/master/authz_provider.h" #include "kudu/master/auto_rebalancer.h" @@ -358,6 +359,7 @@ using kudu::consensus::RaftPeerPB; using kudu::consensus::StartTabletCopyRequestPB; using kudu::consensus::kMinimumTerm; using kudu::hms::HmsClientVerifyKuduSyncConfig; +using kudu::master::TableIdentifierPB; using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using kudu::rpc::RpcContext; @@ -1689,7 +1691,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, // Verify the table's extra configuration properties. TableExtraConfigPB extra_config_pb; - RETURN_NOT_OK(ExtraConfigPBFromPBMap(req.extra_configs(), &extra_config_pb)); + RETURN_NOT_OK(UpdateExtraConfigPB(req.extra_configs(), true, &extra_config_pb)); scoped_refptr<TableInfo> table; { @@ -2175,6 +2177,32 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, return Status::OK(); } +Status CatalogManager::RecallDeletedTableRpc(const RecallDeletedTableRequestPB& req, + RecallDeletedTableResponsePB* resp, + rpc::RpcContext* rpc) { + leader_lock_.AssertAcquiredForReading(); + RETURN_NOT_OK(CheckOnline()); + + string origin_table_name; + WallTime mark_delete_time; + if (!GetOriginNameAndDeleteTimeOfTrashedTable(req.table().table_name(), + &origin_table_name, &mark_delete_time)) { + return SetupError(Status::InvalidArgument("not a trashed table"), + resp, MasterErrorPB::TABLE_NOT_FOUND); + } + + AlterTableRequestPB alter_req; + alter_req.mutable_table()->CopyFrom(req.table()); + // Revert table name + alter_req.set_new_table_name(origin_table_name); + (*alter_req.mutable_new_extra_configs())[kTableMaintenancePriority] = ""; + (*alter_req.mutable_new_extra_configs())[kTableConfigReserveSeconds] = ""; + + AlterTableResponsePB alter_resp; + RETURN_NOT_OK(AlterTableRpc(alter_req, &alter_resp, rpc, false)); + return Status::OK(); +} + Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb, vector<AlterTableRequestPB::Step> steps, Schema* new_schema, @@ -2434,7 +2462,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps( Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req, AlterTableResponsePB* resp, - rpc::RpcContext* rpc) { + rpc::RpcContext* rpc, + bool external_request) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); @@ -2514,10 +2543,11 @@ Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req, return AlterTable(r, resp, /*hms_notification_log_event_id=*/none, - /*user=*/none); + /*user=*/none, + external_request); } - return AlterTable(req, resp, /*hms_notification_log_event_id=*/ none, user); + return AlterTable(req, resp, /*hms_notification_log_event_id=*/ none, user, external_request); } Status CatalogManager::RenameTableHms(const string& table_id, @@ -2533,7 +2563,8 @@ Status CatalogManager::RenameTableHms(const string& table_id, // Use empty user to skip the authorization validation since the operation // originates from catalog manager. Moreover, this avoids duplicate effort, // because we already perform authorization before making any changes to the HMS. - RETURN_NOT_OK(AlterTable(req, &resp, notification_log_event_id, /*user=*/none)); + RETURN_NOT_OK(AlterTable(req, &resp, notification_log_event_id, /*user=*/none, + /*external_request=*/true)); // Update the cached HMS notification log event ID. DCHECK_GT(notification_log_event_id, hms_notification_log_event_id_); @@ -2545,7 +2576,8 @@ Status CatalogManager::RenameTableHms(const string& table_id, Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp, optional<int64_t> hms_notification_log_event_id, - optional<const string&> user) { + optional<const string&> user, + bool external_request) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); @@ -2684,14 +2716,12 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, if (!req.new_extra_configs().empty()) { TRACE("Apply alter extra-config"); Map<string, string> new_extra_configs; - RETURN_NOT_OK(ExtraConfigPBToPBMap(l.data().pb.extra_config(), - &new_extra_configs)); - // Merge table's extra configuration properties. for (auto config : req.new_extra_configs()) { new_extra_configs[config.first] = config.second; } - RETURN_NOT_OK(ExtraConfigPBFromPBMap(new_extra_configs, - l.mutable_data()->pb.mutable_extra_config())); + RETURN_NOT_OK(UpdateExtraConfigPB(new_extra_configs, + external_request, + l.mutable_data()->pb.mutable_extra_config())); } // Set to true if columns are altered, added or dropped. @@ -5303,6 +5333,60 @@ const char* CatalogManager::StateToString(State state) { __builtin_unreachable(); } +Status CatalogManager::IsOutdatedTable(const std::string& table_name, + bool* is_trashed_table, + bool* is_outdated_table) { + *is_trashed_table = false; + string origin_table_name; + WallTime mark_delete_time; + if (!GetOriginNameAndDeleteTimeOfTrashedTable(table_name, + &origin_table_name, + &mark_delete_time)) { + return Status::OK(); + } + + // TODO(yingchun): Check whether a table is 'trashed' or not by GetTableSchema is a little + // expensive. + GetTableSchemaRequestPB schema_req; + schema_req.mutable_table()->set_table_name(table_name); + GetTableSchemaResponsePB schema_resp; + RETURN_NOT_OK(GetTableSchema(&schema_req, &schema_resp, boost::none, nullptr)); + auto found = FindOrNull(schema_resp.extra_configs(), kTableConfigReserveSeconds); + if (!found) { + return Status::OK(); + } + + uint32_t reserve_seconds = 0; + if (!safe_strtou32(*found, &reserve_seconds)) { + return Status::Corruption(Substitute("Table $0's config $1 is invalid", + table_name, kTableConfigReserveSeconds)); + } + + *is_trashed_table = true; + if (is_outdated_table) { + *is_outdated_table = (WallTime_Now() - mark_delete_time > reserve_seconds); + } + + return Status::OK(); +} + +bool CatalogManager::GetOriginNameAndDeleteTimeOfTrashedTable(const string& table_name, + string* origin_table_name, + WallTime* mark_delete_time) { + vector<string> sections = strings::Split(table_name, ":", strings::AllowEmpty()); + if (sections.size() < 3 || sections[0] != Master::kTrashedTag || sections[2].empty()) { + return false; + } + + if (!safe_strtod(sections[1], mark_delete_time) + || *mark_delete_time < 0 || *mark_delete_time >= WallTime_Now()) { + return false; + } + + *origin_table_name = sections[2]; + return true; +} + //////////////////////////////////////////////////////////// // CatalogManager::ScopedLeaderSharedLock //////////////////////////////////////////////////////////// @@ -5409,6 +5493,7 @@ INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTableStatisticsResponsePB); INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB); +INITTED_AND_LEADER_OR_RESPOND(RecallDeletedTableResponsePB); INITTED_AND_LEADER_OR_RESPOND(ReplaceTabletResponsePB); #undef INITTED_OR_RESPOND diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index d6d5ff6..d927299 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -41,6 +41,7 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/stringpiece.h" +#include "kudu/gutil/walltime.h" #include "kudu/master/master.pb.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tserver/tablet_replica_lookup.h" @@ -72,6 +73,7 @@ struct ColumnId; // Working around FRIEND_TEST() ugliness. namespace client { +class ClientTest_TestDeleteAndReserveTable_Test; class ServiceUnavailableRetryClientTest_CreateTable_Test; } // namespace client @@ -610,13 +612,22 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { const std::string& table_id, int64_t notification_log_event_id) WARN_UNUSED_RESULT; + // Recall a table in response to a RecallDeletedTableRequestPB RPC. + // + // The RPC context is provided for logging/tracing purposes, + // but this function does not itself respond to the RPC. + Status RecallDeletedTableRpc(const RecallDeletedTableRequestPB& req, + RecallDeletedTableResponsePB* resp, + rpc::RpcContext* rpc) WARN_UNUSED_RESULT; + // Alter the specified table in response to an AlterTableRequest RPC. // // The RPC context is provided for logging/tracing purposes, // but this function does not itself respond to the RPC. Status AlterTableRpc(const AlterTableRequestPB& req, AlterTableResponsePB* resp, - rpc::RpcContext* rpc); + rpc::RpcContext* rpc, + bool external_request); // Rename the specified table in response to an 'ALTER TABLE RENAME' HMS // notification log listener event. @@ -783,8 +794,14 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { // name is returned. static std::string NormalizeTableName(const std::string& table_name); + // Check whether the table is trashed and outdated. + Status IsOutdatedTable(const std::string& table_name, + bool* is_trashed_table, + bool* is_outdated_table = nullptr); + private: // These tests call ElectedAsLeaderCb() directly. + FRIEND_TEST(kudu::client::ClientTest, TestDeleteAndReserveTable); FRIEND_TEST(MasterTest, TestShutdownDuringTableVisit); FRIEND_TEST(MasterTest, TestGetTableLocationsDuringRepeatedTableVisit); FRIEND_TEST(kudu::AuthzTokenTest, TestSingleMasterUnavailable); @@ -795,6 +812,9 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { // This test exclusively acquires the leader_lock_ directly. FRIEND_TEST(kudu::client::ServiceUnavailableRetryClientTest, CreateTable); + // This test call GetOriginNameAndDeleteTimeOfTrashedTable directly. + FRIEND_TEST(MasterTest, TestGetOriginNameAndDeleteTimeOfTrashedTable); + friend class AutoRebalancerTest; friend class TableLoader; friend class TabletLoader; @@ -824,7 +844,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { Status AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp, boost::optional<int64_t> hms_notification_log_event_id, - boost::optional<const std::string&> user) WARN_UNUSED_RESULT; + boost::optional<const std::string&> user, + bool external_request) WARN_UNUSED_RESULT; // Called by SysCatalog::SysCatalogStateChanged when this node // becomes the leader of a consensus configuration. Executes @@ -1068,6 +1089,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { Status WaitForNotificationLogListenerCatchUp(RespClass* resp, rpc::RpcContext* rpc) WARN_UNUSED_RESULT; + // Get origin name and delete time of a trashed table. + // Returns false if failed. + static bool GetOriginNameAndDeleteTimeOfTrashedTable(const std::string& table_name, + std::string* origin_table_name, + WallTime* mark_delete_time); + // TODO(unknown): the maps are a little wasteful of RAM, since the TableInfo/TabletInfo // objects have a copy of the string key. But STL doesn't make it // easy to make a "gettable set". diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc index fc6c101..15bdedc 100644 --- a/src/kudu/master/master-test.cc +++ b/src/kudu/master/master-test.cc @@ -35,6 +35,7 @@ #include <gflags/gflags_declare.h> #include <glog/logging.h> +#include <google/protobuf/stubs/common.h> #include <gtest/gtest.h> #include <rapidjson/document.h> #include <rapidjson/rapidjson.h> @@ -159,6 +160,7 @@ class MasterTest : public KuduTest { const Schema& schema, const vector<KuduPartialRow>& split_rows, const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds); + Status SetConfig(const string& table_name, const string& key, const string& value); shared_ptr<Messenger> client_messenger_; unique_ptr<MiniMaster> mini_master_; @@ -556,6 +558,16 @@ Status MasterTest::CreateTable(const string& table_name, return Status::OK(); } +Status MasterTest::SetConfig(const string& table_name, const string& key, const string& value) { + AlterTableRequestPB req; + AlterTableResponsePB resp; + RpcController controller; + req.mutable_table()->set_table_name(table_name); + (*req.mutable_new_extra_configs())[key] = value; + RETURN_NOT_OK(master_->catalog_manager()->AlterTableRpc(req, &resp, nullptr, false)); + return Status::OK(); +} + void MasterTest::DoListTables(const ListTablesRequestPB& req, ListTablesResponsePB* resp) { RpcController controller; ASSERT_OK(proxy_->ListTables(req, resp, &controller)); @@ -2037,5 +2049,83 @@ TEST_P(AuthzTokenMasterTest, TestGenerateAuthzTokens) { INSTANTIATE_TEST_CASE_P(SupportsAuthzTokens, AuthzTokenMasterTest, ::testing::Bool()); +const vector<string> kInvalidTableNames + = { "abc", + Substitute("a$0:$1:abc", Master::kTrashedTag, WallTime_Now()), + Substitute("$0a:$1:abc", Master::kTrashedTag, WallTime_Now()), + Substitute("$0:$1", Master::kTrashedTag, WallTime_Now()), + Substitute("$0:a:abc", Master::kTrashedTag), + Substitute("$0:-123:abc", Master::kTrashedTag) + }; + +TEST_F(MasterTest, TestGetOriginNameAndDeleteTimeOfTrashedTable) { + string origin_table_name; + WallTime mark_delete_time; + for (const auto& table_name : kInvalidTableNames) { + ASSERT_FALSE(CatalogManager::GetOriginNameAndDeleteTimeOfTrashedTable( + table_name, &origin_table_name, &mark_delete_time)); + } + + ASSERT_TRUE(CatalogManager::GetOriginNameAndDeleteTimeOfTrashedTable( + Substitute("$0:123:abc", Master::kTrashedTag), + &origin_table_name, &mark_delete_time)); + ASSERT_EQ("abc", origin_table_name); + ASSERT_EQ(123, mark_delete_time); +} + +TEST_F(MasterTest, TestIsOutdatedTable) { + const char* kTableName = "testtable"; + const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1); + + bool is_trashed_table = false; + bool is_outdated_table = false; + // Invalid table names. + for (const auto& table_name : kInvalidTableNames) { + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(table_name, &is_trashed_table, &is_outdated_table)); + ASSERT_FALSE(is_trashed_table); + } + + // Create a new table. + const string kTrashedTableName + = Substitute("$0:$1:$2", Master::kTrashedTag, WallTime_Now(), kTableName); + ASSERT_OK(CreateTable(kTrashedTableName, kTableSchema)); + + // Default table is not outdated. + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(kTrashedTableName, &is_trashed_table, &is_outdated_table)); + ASSERT_FALSE(is_trashed_table); + ASSERT_FALSE(is_outdated_table); + + // Set config: kTableConfigReserveSeconds + ASSERT_TRUE(SetConfig(kTrashedTableName, kTableConfigReserveSeconds, "-1").IsInvalidArgument()); + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(kTrashedTableName, &is_trashed_table, &is_outdated_table)); + ASSERT_FALSE(is_trashed_table); + ASSERT_FALSE(is_outdated_table); + + ASSERT_TRUE(SetConfig(kTrashedTableName, kTableConfigReserveSeconds, "a").IsInvalidArgument()); + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(kTrashedTableName, &is_trashed_table, &is_outdated_table)); + ASSERT_FALSE(is_trashed_table); + ASSERT_FALSE(is_outdated_table); + + // In reserve time, table is not outdated. + ASSERT_OK(SetConfig(kTrashedTableName, kTableConfigReserveSeconds, "100")); + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(kTrashedTableName, &is_trashed_table, &is_outdated_table)); + ASSERT_TRUE(is_trashed_table); + ASSERT_FALSE(is_outdated_table); + + // After reserve time, table is outdated. + ASSERT_OK(SetConfig(kTrashedTableName, kTableConfigReserveSeconds, "1")); + SleepFor(MonoDelta::FromSeconds(2)); + ASSERT_OK(master_->catalog_manager() + ->IsOutdatedTable(kTrashedTableName, &is_trashed_table, &is_outdated_table)); + ASSERT_TRUE(is_trashed_table); + ASSERT_TRUE(is_outdated_table); +} + } // namespace master } // namespace kudu diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc index f55cc6a..32d6e8d 100644 --- a/src/kudu/master/master.cc +++ b/src/kudu/master/master.cc @@ -24,6 +24,7 @@ #include <string> #include <vector> +#include <boost/optional/optional.hpp> #include <gflags/gflags.h> #include <glog/logging.h> @@ -53,12 +54,14 @@ #include "kudu/server/webserver.h" #include "kudu/tserver/tablet_copy_service.h" #include "kudu/tserver/tablet_service.h" +#include "kudu/util/countdown_latch.h" #include "kudu/util/flag_tags.h" #include "kudu/util/maintenance_manager.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/status.h" +#include "kudu/util/thread.h" #include "kudu/util/threadpool.h" #include "kudu/util/version_info.h" @@ -84,6 +87,11 @@ DEFINE_int64(authz_token_validity_seconds, 60 * 5, "validity period expires."); TAG_FLAG(authz_token_validity_seconds, experimental); +DEFINE_int32(check_outdated_table_interval_seconds, 60, + "Interval seconds to check whether there is any trashed table is " + "outdated, this kind of table will be deleted and can not be " + "recalled later."); + DEFINE_string(location_mapping_cmd, "", "A Unix command which takes a single argument, the IP address or " "hostname of a tablet server or client, and returns the location " @@ -109,6 +117,8 @@ using strings::Substitute; namespace kudu { namespace master { +const char Master::kTrashedTag[] = "trashed"; + Master::Master(const MasterOptions& opts) : KuduServer("Master", opts, "kudu.master"), state_(kStopped), @@ -190,6 +200,7 @@ Status Master::StartAsync() { // Start initializing the catalog manager. RETURN_NOT_OK(init_pool_->Submit([this]() { this->InitCatalogManagerTask(); })); + RETURN_NOT_OK(StartOutdatedReservedTablesDeleterThread()); state_ = kRunning; return Status::OK(); @@ -299,6 +310,53 @@ void Master::CrashMasterOnCFileCorruption(const string& tablet_id) { LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0", tablet_id); } +Status Master::StartOutdatedReservedTablesDeleterThread() { + return Thread::Create("master", "outdated-reserved-tables-deleter", + &Master::OutdatedReservedTablesDeleterThread, + this, &outdated_reserved_tables_deleter_thread_); +} + +void Master::OutdatedReservedTablesDeleterThread() { + // How often to attempt to delete outdated tables. + const MonoDelta kWait = MonoDelta::FromSeconds(FLAGS_check_outdated_table_interval_seconds); + while (!stop_background_threads_latch_.WaitUntil(MonoTime::Now() + kWait)) { + WARN_NOT_OK(DeleteOutdatedReservedTables(), "Unable to delete outdated reserved tables"); + } +} + +Status Master::DeleteOutdatedReservedTables() { + CatalogManager::ScopedLeaderSharedLock l(catalog_manager()); + if (!l.first_failed_status().ok()) { + // Skip checking if this master is not leader. + return Status::OK(); + } + + ListTablesRequestPB list_req; + ListTablesResponsePB list_resp; + RETURN_NOT_OK(catalog_manager_->ListTables(&list_req, &list_resp, boost::none)); + for (const auto& table : list_resp.tables()) { + bool is_trashed_table = false; + bool is_outdated_table = false; + Status s = catalog_manager_->IsOutdatedTable(table.name(), + &is_trashed_table, &is_outdated_table); + if (!s.ok() || !is_trashed_table || !is_outdated_table) { + continue; + } + + // Delete the table. + DeleteTableRequestPB del_req; + del_req.mutable_table()->set_table_id(table.id()); + del_req.mutable_table()->set_table_name(table.name()); + del_req.set_reserve_seconds(0); + DeleteTableResponsePB del_resp; + LOG(INFO) << "Start to delete trashed table " << table.name(); + WARN_NOT_OK(catalog_manager_->DeleteTableRpc(del_req, &del_resp, nullptr), + Substitute("Failed to delete trashed table $0", table.name())); + } + + return Status::OK(); +} + namespace { // TODO(Alex Feinberg) this method should be moved to a separate class (along with diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h index 3f77074..3a42c6f 100644 --- a/src/kudu/master/master.h +++ b/src/kudu/master/master.h @@ -22,9 +22,12 @@ #include <string> #include <vector> +#include <gtest/gtest_prod.h> + #include "kudu/common/wire_protocol.pb.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" #include "kudu/kserver/kserver.h" #include "kudu/master/master_options.h" #include "kudu/util/promise.h" @@ -35,6 +38,7 @@ namespace kudu { class HostPortPB; class MaintenanceManager; class MonoDelta; +class Thread; class ThreadPool; namespace master { @@ -56,6 +60,7 @@ class Master : public kserver::KuduServer { public: static const uint16_t kDefaultPort = 7051; static const uint16_t kDefaultWebPort = 8051; + static const char kTrashedTag[]; explicit Master(const MasterOptions& opts); ~Master(); @@ -123,6 +128,7 @@ class Master : public kserver::KuduServer { private: friend class MasterTest; + FRIEND_TEST(MasterTest, TestIsTableOutdated); void InitCatalogManagerTask(); Status InitCatalogManager(); @@ -136,6 +142,11 @@ class Master : public kserver::KuduServer { // safe in a particular case. void ShutdownImpl(); + // Start thread to delete outdated reserved tables. + Status StartOutdatedReservedTablesDeleterThread(); + void OutdatedReservedTablesDeleterThread(); + Status DeleteOutdatedReservedTables(); + enum MasterState { kStopped, kInitialized, @@ -170,6 +181,8 @@ class Master : public kserver::KuduServer { std::unique_ptr<TSManager> ts_manager_; + scoped_refptr<Thread> outdated_reserved_tables_deleter_thread_; + DISALLOW_COPY_AND_ASSIGN(Master); }; diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index bda76e2..10d45a9 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -520,6 +520,12 @@ message DeleteTableRequestPB { // Whether to apply the deletion to external catalogs, such as the Hive Metastore, // which the Kudu master has been configured to integrate with. optional bool modify_external_catalogs = 2 [default = true]; + + // Reserve seconds after the table has been deleted. + optional uint32 reserve_seconds = 3; + + // Force to delete a trashed table. + optional bool force_on_trashed_table = 4 [default = false]; } message DeleteTableResponsePB { @@ -527,6 +533,15 @@ message DeleteTableResponsePB { optional MasterErrorPB error = 1; } +message RecallDeletedTableRequestPB { + required TableIdentifierPB table = 1; +} + +message RecallDeletedTableResponsePB { + // The error, if an error occurred with this request. + optional MasterErrorPB error = 1; +} + message ListTablesRequestPB { // When used, only returns tables that satisfy a substring match on name_filter. optional string name_filter = 1; @@ -671,6 +686,9 @@ message AlterTableRequestPB { optional bool modify_external_catalogs = 5 [default = true]; map<string, string> new_extra_configs = 6; + + // Force to alter a trashed table. + optional bool force_on_trashed_table = 7 [default = false]; } message AlterTableResponsePB { @@ -991,6 +1009,9 @@ service MasterService { rpc DeleteTable(DeleteTableRequestPB) returns (DeleteTableResponsePB) { option (kudu.rpc.authz_method) = "AuthorizeClient"; } + rpc RecallDeletedTable(RecallDeletedTableRequestPB) returns (RecallDeletedTableResponsePB) { + option (kudu.rpc.authz_method) = "AuthorizeClient"; + } rpc AlterTable(AlterTableRequestPB) returns (AlterTableResponsePB) { option (kudu.rpc.authz_method) = "AuthorizeClient"; diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index 50ed51d..73cffab 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -27,6 +27,7 @@ #include <boost/optional/optional.hpp> #include <gflags/gflags.h> #include <glog/logging.h> +#include <google/protobuf/stubs/common.h> #include "kudu/common/common.pb.h" #include "kudu/common/wire_protocol.h" @@ -36,6 +37,7 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" #include "kudu/hms/hms_catalog.h" #include "kudu/master/authz_provider.h" #include "kudu/master/catalog_manager.h" @@ -62,6 +64,7 @@ DECLARE_bool(hive_metastore_sasl_enabled); DECLARE_bool(raft_prepare_replacement_before_eviction); +DECLARE_int32(max_priority_range); DECLARE_string(hive_metastore_uris); DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0, @@ -446,7 +449,65 @@ void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req, return; } - Status s = server_->catalog_manager()->DeleteTableRpc(*req, resp, rpc); + bool is_trashed_table = false; + Status s = server_->catalog_manager() + ->IsOutdatedTable(req->table().table_name(), &is_trashed_table); + if (s.ok() && is_trashed_table && !req->force_on_trashed_table()) { + s = Status::InvalidArgument(Substitute("trashed table $0 should not be deleted", + req->table().table_name())); + } + + if (!s.ok()) { + CheckRespErrorOrSetUnknown(s, resp); + rpc->RespondSuccess(); + return; + } + + if ((is_trashed_table && req->force_on_trashed_table()) || req->reserve_seconds() == 0) { + Status s = server_->catalog_manager()->DeleteTableRpc(*req, resp, rpc); + CheckRespErrorOrSetUnknown(s, resp); + rpc->RespondSuccess(); + } else { + DCHECK(!is_trashed_table); + AlterTableRequestPB alter_req; + alter_req.mutable_table()->CopyFrom(req->table()); + alter_req.set_new_table_name(string(Master::kTrashedTag) + ":" + + std::to_string(WallTime_Now()) + ":" + + req->table().table_name()); + alter_req.set_modify_external_catalogs(req->modify_external_catalogs()); + (*alter_req.mutable_new_extra_configs())[kTableMaintenancePriority] + = std::to_string(-FLAGS_max_priority_range); + (*alter_req.mutable_new_extra_configs())[kTableConfigReserveSeconds] + = std::to_string(req->reserve_seconds()); + + AlterTableResponsePB alter_resp; + Status s = server_->catalog_manager()->AlterTableRpc(alter_req, &alter_resp, rpc, false); + CheckRespErrorOrSetUnknown(s, &alter_resp); + resp->set_allocated_error(alter_resp.release_error()); + rpc->RespondSuccess(); + } +} + +void MasterServiceImpl::RecallDeletedTable(const RecallDeletedTableRequestPB* req, + RecallDeletedTableResponsePB* resp, + rpc::RpcContext* rpc) { + CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager()); + if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) { + return; + } + + bool is_trashed_table = false; + Status s = server_->catalog_manager() + ->IsOutdatedTable(req->table().table_name(), &is_trashed_table); + if (s.ok()) { + if (is_trashed_table) { + s = server_->catalog_manager()->RecallDeletedTableRpc(*req, resp, rpc); + } else { + s = Status::InvalidArgument(Substitute("common table $0 should not be recalled", + req->table().table_name())); + } + } + CheckRespErrorOrSetUnknown(s, resp); rpc->RespondSuccess(); } @@ -459,7 +520,18 @@ void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req, return; } - Status s = server_->catalog_manager()->AlterTableRpc(*req, resp, rpc); + bool is_trashed_table = false; + Status s = server_->catalog_manager() + ->IsOutdatedTable(req->table().table_name(), &is_trashed_table); + if (s.ok()) { + if (!is_trashed_table || req->force_on_trashed_table()) { + s = server_->catalog_manager()->AlterTableRpc(*req, resp, rpc, true); + } else { + s = Status::InvalidArgument(Substitute("trashed table $0 should not be altered", + req->table().table_name())); + } + } + CheckRespErrorOrSetUnknown(s, resp); rpc->RespondSuccess(); } diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h index 4098ce7..ae4670f 100644 --- a/src/kudu/master/master_service.h +++ b/src/kudu/master/master_service.h @@ -69,6 +69,8 @@ class ListTabletServersResponsePB; class Master; class PingRequestPB; class PingResponsePB; +class RecallDeletedTableRequestPB; +class RecallDeletedTableResponsePB; class ReplaceTabletRequestPB; class ReplaceTabletResponsePB; class ResetAuthzCacheRequestPB; @@ -128,6 +130,10 @@ class MasterServiceImpl : public MasterServiceIf { DeleteTableResponsePB* resp, rpc::RpcContext* rpc) override; + void RecallDeletedTable(const RecallDeletedTableRequestPB* req, + RecallDeletedTableResponsePB* resp, + rpc::RpcContext* rpc) override; + void AlterTable(const AlterTableRequestPB* req, AlterTableResponsePB* resp, rpc::RpcContext* rpc) override; diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc index 610d51d..8d63af2 100644 --- a/src/kudu/server/server_base.cc +++ b/src/kudu/server/server_base.cc @@ -400,12 +400,12 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options, result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>( MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))), is_first_run_(false), + stop_background_threads_latch_(1), dns_resolver_(new DnsResolver( FLAGS_dns_resolver_max_threads_num, FLAGS_dns_resolver_cache_capacity_mb * 1024 * 1024, MonoDelta::FromSeconds(FLAGS_dns_resolver_cache_ttl_sec))), - options_(options), - stop_background_threads_latch_(1) { + options_(options) { metric_entity_->NeverRetire( METRIC_merged_entities_count_of_server.InstantiateHidden(metric_entity_, 1)); diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h index 93aaf16..29f1603 100644 --- a/src/kudu/server/server_base.h +++ b/src/kudu/server/server_base.h @@ -203,6 +203,8 @@ class ServerBase { // The ACL of users who may act as part of the Kudu service. security::SimpleAcl service_acl_; + CountDownLatch stop_background_threads_latch_; + private: Status InitAcls(); void GenerateInstanceID(); @@ -240,7 +242,6 @@ class ServerBase { #ifdef TCMALLOC_ENABLED scoped_refptr<Thread> tcmalloc_memory_gc_thread_; #endif - CountDownLatch stop_background_threads_latch_; std::unique_ptr<ScopedGLogMetrics> glog_metrics_; diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc index eae3b14..25cce5d 100644 --- a/src/kudu/tools/kudu-admin-test.cc +++ b/src/kudu/tools/kudu-admin-test.cc @@ -1637,7 +1637,9 @@ TEST_F(AdminCliTest, TestDeleteTable) { "table", "delete", master_address, - kTableId + kTableId, + "-force_on_trashed_table=false", + "-reserve_seconds=0" ); vector<string> tables; diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 4bc44c5..84ebb4e 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -1128,6 +1128,7 @@ TEST_F(ToolTest, TestModeHelp) { "get_extra_configs.*Get the extra configuration properties for a table", "list.*List tables", "locate_row.*Locate which tablet a row belongs to", + "recall.*Recall a deleted but still reserved table", "rename_column.*Rename a column", "rename_table.*Rename a table", "scan.*Scan rows from a table", @@ -3148,6 +3149,58 @@ TEST_F(ToolTest, TestRenameTable) { ASSERT_OK(client->OpenTable(kTableName, &table)); } +TEST_F(ToolTest, TestRecallTable) { + NO_FATALS(StartExternalMiniCluster()); + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + string master_addr = cluster_->master()->bound_rpc_addr().ToString(); + + const string& kTableName = "kudu.table"; + + // Create the table. + TestWorkload workload(cluster_.get()); + workload.set_table_name(kTableName); + workload.set_num_replicas(1); + workload.Setup(); + + // Delete the table. + string out; + NO_FATALS(RunActionStdoutNone(Substitute("table delete $0 $1", + master_addr, kTableName))); + shared_ptr<KuduTable> table; + + // Try to open the table. + Status s = client->OpenTable(kTableName, &table); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_STR_CONTAINS(s.ToString(), Substitute("the table does not exist")); + + // List trashed table. + vector<string> kudu_tables; + client->ListTables(&kudu_tables); + ASSERT_EQ(kudu_tables.size(), 1); + + // Create another table. + workload.Setup(); + ASSERT_OK(client->OpenTable(kTableName, &table)); + + // Try to recall the trashed table. + string stderr; + s = RunTool(Substitute("table recall $0 $1", + master_addr, kudu_tables[0]), + nullptr, &stderr, {}, {}); + ASSERT_TRUE(s.IsRuntimeError()); + ASSERT_STR_CONTAINS(s.ToString(), "process exited with non-zero status"); + SCOPED_TRACE(stderr); + ASSERT_STR_CONTAINS(stderr, Substitute("Already present: table $0 already exists", kTableName)); + + // Rename the new table and try to recall the trashed table. + NO_FATALS(RunActionStdoutNone( + Substitute("table rename_table $0 $1 $2", + master_addr, kTableName, kTableName + "new"))); + NO_FATALS(RunActionStdoutNone(Substitute("table recall $0 $1", + master_addr, kudu_tables[0]))); +} + TEST_F(ToolTest, TestRenameColumn) { NO_FATALS(StartExternalMiniCluster()); const string& kTableName = "table"; diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 006cc71..8e98c5d 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <stdlib.h> + #include <algorithm> #include <cstdint> #include <functional> @@ -114,6 +116,10 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND", DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND", "The type of the upper bound, either inclusive or exclusive. " "Defaults to exclusive. This flag is case-insensitive."); +DEFINE_uint32(reserve_seconds, 604800, + "Reserve seconds after being deleted."); +DEFINE_bool(force_on_trashed_table, false, + "Force to alter a trashed table"); DECLARE_bool(show_values); DECLARE_string(tables); @@ -192,7 +198,10 @@ Status DeleteTable(const RunnerContext& context) { const string& table_name = FindOrDie(context.required_args, kTableNameArg); client::sp::shared_ptr<KuduClient> client; RETURN_NOT_OK(CreateKuduClient(context, &client)); - return client->DeleteTableInCatalogs(table_name, FLAGS_modify_external_catalogs); + return client->DeleteTableInCatalogs(table_name, + FLAGS_modify_external_catalogs, + FLAGS_force_on_trashed_table, + FLAGS_reserve_seconds); } Status DescribeTable(const RunnerContext& context) { @@ -407,6 +416,14 @@ Status LocateRow(const RunnerContext& context) { return Status::OK(); } +Status RecallTable(const RunnerContext& context) { + const string& table_name = FindOrDie(context.required_args, kTableNameArg); + + client::sp::shared_ptr<KuduClient> client; + RETURN_NOT_OK(CreateKuduClient(context, &client)); + return client->RecallTable(table_name); +} + Status RenameTable(const RunnerContext& context) { const string& table_name = FindOrDie(context.required_args, kTableNameArg); const string& new_table_name = FindOrDie(context.required_args, kNewTableNameArg); @@ -415,6 +432,7 @@ Status RenameTable(const RunnerContext& context) { RETURN_NOT_OK(CreateKuduClient(context, &client)); unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); return alterer->RenameTo(new_table_name) + ->force_on_trashed_table(FLAGS_force_on_trashed_table) ->modify_external_catalogs(FLAGS_modify_external_catalogs) ->Alter(); } @@ -477,8 +495,9 @@ Status SetExtraConfig(const RunnerContext& context) { client::sp::shared_ptr<KuduClient> client; RETURN_NOT_OK(CreateKuduClient(context, &client)); unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); - alterer->AlterExtraConfig({ { config_name, config_value} }); - return alterer->Alter(); + return alterer->AlterExtraConfig({ { config_name, config_value} }) + ->force_on_trashed_table(FLAGS_force_on_trashed_table) + ->Alter(); } Status GetExtraConfigs(const RunnerContext& context) { @@ -1175,7 +1194,9 @@ unique_ptr<Mode> BuildTableMode() { .Description("Delete a table") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kTableNameArg, "Name of the table to delete" }) + .AddOptionalParameter("force_on_trashed_table") .AddOptionalParameter("modify_external_catalogs") + .AddOptionalParameter("reserve_seconds") .Build(); unique_ptr<Action> describe_table = @@ -1220,12 +1241,20 @@ unique_ptr<Mode> BuildTableMode() { .AddRequiredParameter({ kNewColumnNameArg, "New column name" }) .Build(); + unique_ptr<Action> recall = + ActionBuilder("recall", &RecallTable) + .Description("Recall a deleted but still reserved table") + .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) + .AddRequiredParameter({ kTableNameArg, "Name of the table to recall" }) + .Build(); + unique_ptr<Action> rename_table = ActionBuilder("rename_table", &RenameTable) .Description("Rename a table") .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) .AddRequiredParameter({ kTableNameArg, "Name of the table to rename" }) .AddRequiredParameter({ kNewTableNameArg, "New table name" }) + .AddOptionalParameter("force_on_trashed_table") .AddOptionalParameter("modify_external_catalogs") .Build(); @@ -1269,6 +1298,7 @@ unique_ptr<Mode> BuildTableMode() { .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) .AddRequiredParameter({ kConfigNameArg, "Name of the configuration" }) .AddRequiredParameter({ kConfigValueArg, "New value for the configuration" }) + .AddOptionalParameter("force_on_trashed_table") .Build(); unique_ptr<Action> get_extra_configs = @@ -1425,6 +1455,7 @@ unique_ptr<Mode> BuildTableMode() { .AddAction(std::move(list_tables)) .AddAction(std::move(locate_row)) .AddAction(std::move(rename_column)) + .AddAction(std::move(recall)) .AddAction(std::move(rename_table)) .AddAction(std::move(scan_table)) .AddAction(std::move(set_extra_config))
