This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 323d8ea  KUDU-3223: management of per-table limit by kudu CLI
323d8ea is described below

commit 323d8ea7312678b7a910e55ed4f74acaf3813ffc
Author: Hongjiang Zhang <[email protected]>
AuthorDate: Fri May 14 18:26:14 2021 +0800

    KUDU-3223: management of per-table limit by kudu CLI
    
    Use kudu CLI to set disk_size_limit and row_count_limit on table level.
    
    If '--enable_table_write_limit' is not set 'true' for kudu-master, kudu
    CLI fails to set the limit. The table's disk_size_limit and
    row_count_limit are 'N/A' when calling 'kudu table statistics'.
    
    Only when '--enable_table_write_limit' is set 'true', can the kudu CLI
    change the table limit. Use 'kudu table statistics' to check the
    updated result.
    
    Change-Id: I59bb64cc85d5fca48ae8ec980f5cfc62e4b3a1d3
    Reviewed-on: http://gerrit.cloudera.org:8080/17444
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/master/catalog_manager.cc  |  30 ++++++-
 src/kudu/tools/kudu-tool-test.cc    | 157 ++++++++++++++++++++++++++++++++++++
 src/kudu/tools/tool_action_table.cc |  57 +++++++++++++
 3 files changed, 240 insertions(+), 4 deletions(-)

diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 12666d9..447126b 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2894,9 +2894,15 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
                              !req.new_extra_configs().empty() ||
                              !alter_schema_steps.empty() ||
                              !alter_partitioning_steps.empty();
+  if (table_limit_change && !FLAGS_enable_table_write_limit) {
+    return SetupError(Status::NotSupported(
+                      "altering table limit is not supported because "
+                      "--enable_table_write_limit is not enabled"),
+                      resp, MasterErrorPB::UNKNOWN_ERROR);
+  }
   if (table_limit_change && other_schema_change) {
     return SetupError(Status::ConfigurationError(
-                      "Alter table limit cannot be combined with other 
alterations"),
+                      "altering table limit cannot be combined with other 
alterations"),
                       resp, MasterErrorPB::UNKNOWN_ERROR);
   }
 
@@ -2944,13 +2950,17 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   string normalized_table_name = NormalizeTableName(l.data().name());
   *resp->mutable_table_id() = table->id();
 
-  // Modify the table limit.
+  // Set the table limit.
   if (table_limit_change) {
     if (req.has_disk_size_limit()) {
       if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
         l.mutable_data()->pb.clear_table_disk_size_limit();
+        LOG(INFO) << Substitute("Resetting table $0 disk_size_limit to the 
default setting",
+                                normalized_table_name);
       } else if (req.disk_size_limit() >= 0) {
         l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit());
+        LOG(INFO) << Substitute("Setting table $0 disk_size_limit to $1",
+                                 normalized_table_name, req.disk_size_limit());
       } else {
         return SetupError(Status::InvalidArgument("disk size limit must "
             "be greater than or equal to -1"),
@@ -2960,8 +2970,12 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     if (req.has_row_count_limit()) {
       if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
         l.mutable_data()->pb.clear_table_row_count_limit();
+        LOG(INFO) << Substitute("Resetting table $0 row_count_limit to the 
default setting",
+                                normalized_table_name);
       } else if (req.row_count_limit() >= 0) {
         l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit());
+        LOG(INFO) << Substitute("Setting table $0 row_count_limit to $1",
+                                 normalized_table_name, req.row_count_limit());
       } else {
         return SetupError(Status::InvalidArgument("row count limit must "
             "be greater than or equal to -1"),
@@ -3463,8 +3477,16 @@ Status CatalogManager::GetTableStatistics(const 
GetTableStatisticsRequestPB* req
     }
   }
   if (FLAGS_enable_table_write_limit) {
-    resp->set_disk_size_limit(l.data().pb.table_disk_size_limit());
-    resp->set_row_count_limit(l.data().pb.table_row_count_limit());
+    if (l.data().pb.has_table_disk_size_limit()) {
+      resp->set_disk_size_limit(l.data().pb.table_disk_size_limit());
+    } else {
+      resp->set_disk_size_limit(TableInfo::TABLE_WRITE_DEFAULT_LIMIT);
+    }
+    if (l.data().pb.has_table_row_count_limit()) {
+      resp->set_row_count_limit(l.data().pb.table_row_count_limit());
+    } else {
+      resp->set_row_count_limit(TableInfo::TABLE_WRITE_DEFAULT_LIMIT);
+    }
   }
   return Status::OK();
 }
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b11ac15..f619251 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -164,6 +164,7 @@ using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduTableStatistics;
 using kudu::client::KuduValue;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalMiniCluster;
@@ -1195,6 +1196,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "rename_table.*Rename a table",
         "scan.*Scan rows from a table",
         "set_extra_config.*Change a extra configuration value on a table",
+        "set_limit.*Set the write limit for a table",
         "statistics.*Get table statistics",
     };
     NO_FATALS(RunTestHelp(kCmd, kTableModeRegexes));
@@ -1223,6 +1225,13 @@ TEST_F(ToolTest, TestModeHelp) {
         }));
   }
   {
+    const vector<string> kSetLimitModeRegexes = {
+        "disk_size.*Set the disk size limit",
+        "row_count.*Set the row count limit",
+    };
+    NO_FATALS(RunTestHelp("table set_limit", kSetLimitModeRegexes));
+  }
+  {
     const string kCmd = "tablet";
     const vector<string> kTabletModeRegexes = {
         "change_config.*Change.*Raft configuration",
@@ -3365,6 +3374,7 @@ TEST_F(ToolTest, TestMasterList) {
 // (6)copy a table
 // (7)alter a column
 // (8)delete a column
+// (9)set the table limit when kudu-master unsupported and supported it
 TEST_F(ToolTest, TestDeleteTable) {
   NO_FATALS(StartExternalMiniCluster());
   shared_ptr<KuduClient> client;
@@ -3958,6 +3968,153 @@ TEST_F(ToolTest, TestDeleteColumn) {
   ASSERT_STR_NOT_CONTAINS(table->schema().ToString(), kColumnName);
 }
 
+TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
+  const string kTableName = "kudu.table.failtochangelimit";
+  // Disable table write limit by default, then set limit will not take effect.
+  NO_FATALS(StartExternalMiniCluster());
+  // Create the table.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_replicas(1);
+  workload.Setup();
+
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  // Report unsupported table limit through kudu CLI.
+  string stdout;
+  NO_FATALS(RunActionStdoutString(
+      Substitute("table statistics $0 $1",
+                 master_addr, kTableName),
+      &stdout));
+  ASSERT_STR_CONTAINS(stdout, "on disk size limit: N/A");
+  ASSERT_STR_CONTAINS(stdout, "live row count limit: N/A");
+
+  // Report unsupported table limit through kudu client.
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(master_addr)
+                .Build(&client));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  unique_ptr<KuduTableStatistics> statistics;
+  KuduTableStatistics *table_statistics;
+  ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(-1, statistics->live_row_count_limit());
+  ASSERT_EQ(-1, statistics->on_disk_size_limit());
+
+  // Fail to set table limit if kudu-master does not enable it. Verify the 
error message.
+  string stderr;
+  Status s = RunActionStderrString(
+      Substitute("table set_limit disk_size $0 $1 1000000",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "altering table limit is not supported");
+  s = RunActionStderrString(
+      Substitute("table set_limit row_count $0 $1 1000000",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "altering table limit is not supported");
+}
+
+TEST_F(ToolTest, TestChangeTableLimitSupported) {
+  const string kTableName = "kudu.table.changelimit";
+  const int64_t kDiskSizeLimit = 999999;
+  const int64_t kRowCountLimit = 100000;
+
+  ExternalMiniClusterOptions opts;
+  opts.extra_master_flags.emplace_back("--enable_table_write_limit=true");
+  opts.extra_master_flags.emplace_back("--superuser_acl=*");
+
+  NO_FATALS(StartExternalMiniCluster(opts));
+
+  // Create the table.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_replicas(1);
+  workload.Setup();
+
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(master_addr)
+                .Build(&client));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  unique_ptr<KuduTableStatistics> statistics;
+  KuduTableStatistics *table_statistics;
+  ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(-1, statistics->live_row_count_limit());
+  ASSERT_EQ(-1, statistics->on_disk_size_limit());
+  // Check the invalid disk_size_limit.
+  string stderr;
+  Status s = RunActionStderrString(
+      Substitute("table set_limit disk_size $0 $1 abc",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "Could not parse");
+  stderr.clear();
+  // Check the invalid row_count_limit.
+  s = RunActionStderrString(
+      Substitute("table set_limit row_count $0 $1 abc",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "Could not parse");
+  stderr.clear();
+  // Check the invalid setting parameter: negative value
+  s = RunActionStderrString(
+      Substitute("table set_limit disk_size $0 $1 -1",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "ERROR: unknown command line flag");
+  stderr.clear();
+  s = RunActionStderrString(
+      Substitute("table set_limit row_count $0 $1 -1",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "ERROR: unknown command line flag");
+  stderr.clear();
+  // Chech the invalid setting parameter: larger than INT64_MAX
+  s = RunActionStderrString(
+      Substitute("table set_limit disk_size $0 $1 99999999999999999999999999",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "Could not parse");
+  stderr.clear();
+  s = RunActionStderrString(
+      Substitute("table set_limit row_count $0 $1 99999999999999999999999999",
+                 master_addr, kTableName),
+      &stderr);
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(stderr, "Could not parse");
+  stderr.clear();
+  // Check the normal limit setting.
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_limit disk_size $0 $1 
$2",
+                                           master_addr, kTableName, 
kDiskSizeLimit)));
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_limit row_count $0 $1 
$2",
+                                           master_addr, kTableName, 
kRowCountLimit)));
+  ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(kRowCountLimit, statistics->live_row_count_limit());
+  ASSERT_EQ(kDiskSizeLimit, statistics->on_disk_size_limit());
+  // Check unlimited setting.
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_limit disk_size $0 $1 
unlimited",
+                                           master_addr, kTableName)));
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_limit row_count $0 $1 
unlimited",
+                                           master_addr, kTableName)));
+  ASSERT_OK(client->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(-1, statistics->live_row_count_limit());
+  ASSERT_EQ(-1, statistics->on_disk_size_limit());
+}
+
 Status CreateLegacyHmsTable(HmsClient* client,
                             const string& hms_database_name,
                             const string& hms_table_name,
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index 656a342..d5da37f 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -412,6 +412,40 @@ Status LocateRow(const RunnerContext& context) {
   return Status::OK();
 }
 
+Status SetDiskSizeLimit(const RunnerContext& context) {
+  const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+  const string& disk_size_limit_str = FindOrDie(context.required_args, 
"disk_size");
+  int64_t disk_size_limit;
+  if (iequals(disk_size_limit_str, "unlimited")) {
+    disk_size_limit = -1;
+  } else if (!safe_strto64(disk_size_limit_str, &disk_size_limit)) {
+    return Status::InvalidArgument(Substitute(
+        "Could not parse $0 as disk size limit", disk_size_limit_str));
+  }
+  client::sp::shared_ptr<KuduClient> client;
+  RETURN_NOT_OK(CreateKuduClient(context, &client));
+  unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+  alterer->SetTableDiskSizeLimit(disk_size_limit);
+  return alterer->Alter();
+}
+
+Status SetRowCountLimit(const RunnerContext& context) {
+  const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+  const string& row_count_limit_str = FindOrDie(context.required_args, 
"row_count");
+  int64_t row_count_limit;
+  if (iequals(row_count_limit_str, "unlimited")) {
+    row_count_limit = -1;
+  } else if (!safe_strto64(row_count_limit_str, &row_count_limit)) {
+    return Status::InvalidArgument(Substitute(
+        "Could not parse $0 as row count limit", row_count_limit_str));
+  }
+  client::sp::shared_ptr<KuduClient> client;
+  RETURN_NOT_OK(CreateKuduClient(context, &client));
+  unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+  alterer->SetTableRowCountLimit(row_count_limit);
+  return alterer->Alter();
+}
+
 Status RenameTable(const RunnerContext& context) {
   const string& table_name = FindOrDie(context.required_args, kTableNameArg);
   const string& new_table_name = FindOrDie(context.required_args, 
kNewTableNameArg);
@@ -1175,6 +1209,28 @@ Status CreateTable(const RunnerContext& context) {
 
 } // anonymous namespace
 
+unique_ptr<Mode> BuildSetTableLimitMode() {
+  unique_ptr<Action> set_disk_size_limit =
+      ClusterActionBuilder("disk_size", &SetDiskSizeLimit)
+      .Description("Set the disk size limit")
+      .AddRequiredParameter({ kTableNameArg, "Name of the table to set limit" 
})
+      .AddRequiredParameter({ "disk_size",
+                              "The disk size limit, 'unlimited' for no write 
limit" })
+      .Build();
+  unique_ptr<Action> set_row_count_limit =
+      ClusterActionBuilder("row_count", &SetRowCountLimit)
+      .Description("Set the row count limit")
+      .AddRequiredParameter({ kTableNameArg, "Name of the table to set limit" 
})
+      .AddRequiredParameter({ "row_count",
+                              "The row count limit, 'unlimited' for no write 
limit" })
+      .Build();
+  return ModeBuilder("set_limit")
+      .Description("Set the write limit for a table")
+      .AddAction(std::move(set_disk_size_limit))
+      .AddAction(std::move(set_row_count_limit))
+      .Build();
+}
+
 unique_ptr<Mode> BuildTableMode() {
   unique_ptr<Action> delete_table =
       ClusterActionBuilder("delete", &DeleteTable)
@@ -1393,6 +1449,7 @@ unique_ptr<Mode> BuildTableMode() {
 
   return ModeBuilder("table")
       .Description("Operate on Kudu tables")
+      .AddMode(BuildSetTableLimitMode())
       .AddAction(std::move(add_range_partition))
       .AddAction(std::move(column_remove_default))
       .AddAction(std::move(column_set_block_size))

Reply via email to