This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 323d8ea KUDU-3223: management of per-table limit by kudu CLI
323d8ea is described below
commit 323d8ea7312678b7a910e55ed4f74acaf3813ffc
Author: Hongjiang Zhang <[email protected]>
AuthorDate: Fri May 14 18:26:14 2021 +0800
KUDU-3223: management of per-table limit by kudu CLI
Use kudu CLI to set disk_size_limit and row_count_limit on table level.
If '--enable_table_write_limit' is not set 'true' for kudu-master, kudu
CLI fails to set the limit. The table's disk_size_limit and
row_count_limit are 'N/A' when calling 'kudu table statistics'.
Only when '--enable_table_write_limit' is set 'true', can the kudu CLI
change the table limit. Use 'kudu table statistics' to check the
updated result.
Change-Id: I59bb64cc85d5fca48ae8ec980f5cfc62e4b3a1d3
Reviewed-on: http://gerrit.cloudera.org:8080/17444
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/master/catalog_manager.cc | 30 ++++++-
src/kudu/tools/kudu-tool-test.cc | 157 ++++++++++++++++++++++++++++++++++++
src/kudu/tools/tool_action_table.cc | 57 +++++++++++++
3 files changed, 240 insertions(+), 4 deletions(-)
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 12666d9..447126b 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2894,9 +2894,15 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
!req.new_extra_configs().empty() ||
!alter_schema_steps.empty() ||
!alter_partitioning_steps.empty();
+ if (table_limit_change && !FLAGS_enable_table_write_limit) {
+ return SetupError(Status::NotSupported(
+ "altering table limit is not supported because "
+ "--enable_table_write_limit is not enabled"),
+ resp, MasterErrorPB::UNKNOWN_ERROR);
+ }
if (table_limit_change && other_schema_change) {
return SetupError(Status::ConfigurationError(
- "Alter table limit cannot be combined with other
alterations"),
+ "altering table limit cannot be combined with other
alterations"),
resp, MasterErrorPB::UNKNOWN_ERROR);
}
@@ -2944,13 +2950,17 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
string normalized_table_name = NormalizeTableName(l.data().name());
*resp->mutable_table_id() = table->id();
- // Modify the table limit.
+ // Set the table limit.
if (table_limit_change) {
if (req.has_disk_size_limit()) {
if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
l.mutable_data()->pb.clear_table_disk_size_limit();
+ LOG(INFO) << Substitute("Resetting table $0 disk_size_limit to the
default setting",
+ normalized_table_name);
} else if (req.disk_size_limit() >= 0) {
l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit());
+ LOG(INFO) << Substitute("Setting table $0 disk_size_limit to $1",
+ normalized_table_name, req.disk_size_limit());
} else {
return SetupError(Status::InvalidArgument("disk size limit must "
"be greater than or equal to -1"),
@@ -2960,8 +2970,12 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
if (req.has_row_count_limit()) {
if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
l.mutable_data()->pb.clear_table_row_count_limit();
+ LOG(INFO) << Substitute("Resetting table $0 row_count_limit to the
default setting",
+ normalized_table_name);
} else if (req.row_count_limit() >= 0) {
l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit());
+ LOG(INFO) << Substitute("Setting table $0 row_count_limit to $1",
+ normalized_table_name, req.row_count_limit());
} else {
return SetupError(Status::InvalidArgument("row count limit must "
"be greater than or equal to -1"),
@@ -3463,8 +3477,16 @@ Status CatalogManager::GetTableStatistics(const
GetTableStatisticsRequestPB* req
}
}
if (FLAGS_enable_table_write_limit) {
- resp->set_disk_size_limit(l.data().pb.table_disk_size_limit());
- resp->set_row_count_limit(l.data().pb.table_row_count_limit());
+ if (l.data().pb.has_table_disk_size_limit()) {
+ resp->set_disk_size_limit(l.data().pb.table_disk_size_limit());
+ } else {
+ resp->set_disk_size_limit(TableInfo::TABLE_WRITE_DEFAULT_LIMIT);
+ }
+ if (l.data().pb.has_table_row_count_limit()) {
+ resp->set_row_count_limit(l.data().pb.table_row_count_limit());
+ } else {
+ resp->set_row_count_limit(TableInfo::TABLE_WRITE_DEFAULT_LIMIT);
+ }
}
return Status::OK();
}
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b11ac15..f619251 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -164,6 +164,7 @@ using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
+using kudu::client::KuduTableStatistics;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalMiniCluster;
@@ -1195,6 +1196,7 @@ TEST_F(ToolTest, TestModeHelp) {
"rename_table.*Rename a table",
"scan.*Scan rows from a table",
"set_extra_config.*Change a extra configuration value on a table",
+ "set_limit.*Set the write limit for a table",
"statistics.*Get table statistics",
};
NO_FATALS(RunTestHelp(kCmd, kTableModeRegexes));
@@ -1223,6 +1225,13 @@ TEST_F(ToolTest, TestModeHelp) {
}));
}
{
+ const vector<string> kSetLimitModeRegexes = {
+ "disk_size.*Set the disk size limit",
+ "row_count.*Set the row count limit",
+ };
+ NO_FATALS(RunTestHelp("table set_limit", kSetLimitModeRegexes));
+ }
+ {
const string kCmd = "tablet";
const vector<string> kTabletModeRegexes = {
"change_config.*Change.*Raft configuration",
@@ -3365,6 +3374,7 @@ TEST_F(ToolTest, TestMasterList) {
// (6)copy a table
// (7)alter a column
// (8)delete a column
+// (9)set the table limit when kudu-master unsupported and supported it
TEST_F(ToolTest, TestDeleteTable) {
NO_FATALS(StartExternalMiniCluster());
shared_ptr<KuduClient> client;
@@ -3958,6 +3968,153 @@ TEST_F(ToolTest, TestDeleteColumn) {
ASSERT_STR_NOT_CONTAINS(table->schema().ToString(), kColumnName);
}
+TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
+ const string kTableName = "kudu.table.failtochangelimit";
+ // Disable table write limit by default, then set limit will not take effect.
+ NO_FATALS(StartExternalMiniCluster());
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableName);
+ workload.set_num_replicas(1);
+ workload.Setup();
+
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+ // Report unsupported table limit through kudu CLI.
+ string stdout;
+ NO_FATALS(RunActionStdoutString(
+ Substitute("table statistics $0 $1",
+ master_addr, kTableName),
+ &stdout));
+ ASSERT_STR_CONTAINS(stdout, "on disk size limit: N/A");
+ ASSERT_STR_CONTAINS(stdout, "live row count limit: N/A");
+
+ // Report unsupported table limit through kudu client.
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(KuduClientBuilder()
+ .add_master_server_addr(master_addr)
+ .Build(&client));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ unique_ptr<KuduTableStatistics> statistics;
+ KuduTableStatistics *table_statistics;
+ ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+ statistics.reset(table_statistics);
+ ASSERT_EQ(-1, statistics->live_row_count_limit());
+ ASSERT_EQ(-1, statistics->on_disk_size_limit());
+
+ // Fail to set table limit if kudu-master does not enable it. Verify the
error message.
+ string stderr;
+ Status s = RunActionStderrString(
+ Substitute("table set_limit disk_size $0 $1 1000000",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "altering table limit is not supported");
+ s = RunActionStderrString(
+ Substitute("table set_limit row_count $0 $1 1000000",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "altering table limit is not supported");
+}
+
+TEST_F(ToolTest, TestChangeTableLimitSupported) {
+ const string kTableName = "kudu.table.changelimit";
+ const int64_t kDiskSizeLimit = 999999;
+ const int64_t kRowCountLimit = 100000;
+
+ ExternalMiniClusterOptions opts;
+ opts.extra_master_flags.emplace_back("--enable_table_write_limit=true");
+ opts.extra_master_flags.emplace_back("--superuser_acl=*");
+
+ NO_FATALS(StartExternalMiniCluster(opts));
+
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableName);
+ workload.set_num_replicas(1);
+ workload.Setup();
+
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(KuduClientBuilder()
+ .add_master_server_addr(master_addr)
+ .Build(&client));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ unique_ptr<KuduTableStatistics> statistics;
+ KuduTableStatistics *table_statistics;
+ ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+ statistics.reset(table_statistics);
+ ASSERT_EQ(-1, statistics->live_row_count_limit());
+ ASSERT_EQ(-1, statistics->on_disk_size_limit());
+ // Check the invalid disk_size_limit.
+ string stderr;
+ Status s = RunActionStderrString(
+ Substitute("table set_limit disk_size $0 $1 abc",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_TRUE(s.IsRuntimeError());
+ ASSERT_STR_CONTAINS(stderr, "Could not parse");
+ stderr.clear();
+ // Check the invalid row_count_limit.
+ s = RunActionStderrString(
+ Substitute("table set_limit row_count $0 $1 abc",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "Could not parse");
+ stderr.clear();
+ // Check the invalid setting parameter: negative value
+ s = RunActionStderrString(
+ Substitute("table set_limit disk_size $0 $1 -1",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "ERROR: unknown command line flag");
+ stderr.clear();
+ s = RunActionStderrString(
+ Substitute("table set_limit row_count $0 $1 -1",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "ERROR: unknown command line flag");
+ stderr.clear();
+ // Chech the invalid setting parameter: larger than INT64_MAX
+ s = RunActionStderrString(
+ Substitute("table set_limit disk_size $0 $1 99999999999999999999999999",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "Could not parse");
+ stderr.clear();
+ s = RunActionStderrString(
+ Substitute("table set_limit row_count $0 $1 99999999999999999999999999",
+ master_addr, kTableName),
+ &stderr);
+ ASSERT_FALSE(s.ok());
+ ASSERT_STR_CONTAINS(stderr, "Could not parse");
+ stderr.clear();
+ // Check the normal limit setting.
+ NO_FATALS(RunActionStdoutNone(Substitute("table set_limit disk_size $0 $1
$2",
+ master_addr, kTableName,
kDiskSizeLimit)));
+ NO_FATALS(RunActionStdoutNone(Substitute("table set_limit row_count $0 $1
$2",
+ master_addr, kTableName,
kRowCountLimit)));
+ ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+ statistics.reset(table_statistics);
+ ASSERT_EQ(kRowCountLimit, statistics->live_row_count_limit());
+ ASSERT_EQ(kDiskSizeLimit, statistics->on_disk_size_limit());
+ // Check unlimited setting.
+ NO_FATALS(RunActionStdoutNone(Substitute("table set_limit disk_size $0 $1
unlimited",
+ master_addr, kTableName)));
+ NO_FATALS(RunActionStdoutNone(Substitute("table set_limit row_count $0 $1
unlimited",
+ master_addr, kTableName)));
+ ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+ statistics.reset(table_statistics);
+ ASSERT_EQ(-1, statistics->live_row_count_limit());
+ ASSERT_EQ(-1, statistics->on_disk_size_limit());
+}
+
Status CreateLegacyHmsTable(HmsClient* client,
const string& hms_database_name,
const string& hms_table_name,
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 656a342..d5da37f 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -412,6 +412,40 @@ Status LocateRow(const RunnerContext& context) {
return Status::OK();
}
+Status SetDiskSizeLimit(const RunnerContext& context) {
+ const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+ const string& disk_size_limit_str = FindOrDie(context.required_args,
"disk_size");
+ int64_t disk_size_limit;
+ if (iequals(disk_size_limit_str, "unlimited")) {
+ disk_size_limit = -1;
+ } else if (!safe_strto64(disk_size_limit_str, &disk_size_limit)) {
+ return Status::InvalidArgument(Substitute(
+ "Could not parse $0 as disk size limit", disk_size_limit_str));
+ }
+ client::sp::shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(CreateKuduClient(context, &client));
+ unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+ alterer->SetTableDiskSizeLimit(disk_size_limit);
+ return alterer->Alter();
+}
+
+Status SetRowCountLimit(const RunnerContext& context) {
+ const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+ const string& row_count_limit_str = FindOrDie(context.required_args,
"row_count");
+ int64_t row_count_limit;
+ if (iequals(row_count_limit_str, "unlimited")) {
+ row_count_limit = -1;
+ } else if (!safe_strto64(row_count_limit_str, &row_count_limit)) {
+ return Status::InvalidArgument(Substitute(
+ "Could not parse $0 as row count limit", row_count_limit_str));
+ }
+ client::sp::shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(CreateKuduClient(context, &client));
+ unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+ alterer->SetTableRowCountLimit(row_count_limit);
+ return alterer->Alter();
+}
+
Status RenameTable(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& new_table_name = FindOrDie(context.required_args,
kNewTableNameArg);
@@ -1175,6 +1209,28 @@ Status CreateTable(const RunnerContext& context) {
} // anonymous namespace
+unique_ptr<Mode> BuildSetTableLimitMode() {
+ unique_ptr<Action> set_disk_size_limit =
+ ClusterActionBuilder("disk_size", &SetDiskSizeLimit)
+ .Description("Set the disk size limit")
+ .AddRequiredParameter({ kTableNameArg, "Name of the table to set limit"
})
+ .AddRequiredParameter({ "disk_size",
+ "The disk size limit, 'unlimited' for no write
limit" })
+ .Build();
+ unique_ptr<Action> set_row_count_limit =
+ ClusterActionBuilder("row_count", &SetRowCountLimit)
+ .Description("Set the row count limit")
+ .AddRequiredParameter({ kTableNameArg, "Name of the table to set limit"
})
+ .AddRequiredParameter({ "row_count",
+ "The row count limit, 'unlimited' for no write
limit" })
+ .Build();
+ return ModeBuilder("set_limit")
+ .Description("Set the write limit for a table")
+ .AddAction(std::move(set_disk_size_limit))
+ .AddAction(std::move(set_row_count_limit))
+ .Build();
+}
+
unique_ptr<Mode> BuildTableMode() {
unique_ptr<Action> delete_table =
ClusterActionBuilder("delete", &DeleteTable)
@@ -1393,6 +1449,7 @@ unique_ptr<Mode> BuildTableMode() {
return ModeBuilder("table")
.Description("Operate on Kudu tables")
+ .AddMode(BuildSetTableLimitMode())
.AddAction(std::move(add_range_partition))
.AddAction(std::move(column_remove_default))
.AddAction(std::move(column_set_block_size))