This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new cf9597511 KUDU-3639 Add table operation functions follow-up
cf9597511 is described below
commit cf95975114e2ae38ad4e4cf0d81e2e59bafa49af
Author: gabriellalotz <[email protected]>
AuthorDate: Mon Feb 24 09:59:20 2025 +0000
KUDU-3639 Add table operation functions follow-up
Refactored pointer usage to remove unnecessary std::optional<>, ensuring
consistency across function signatures. Introduced a dedicated timeout
flag with a default of 30 seconds to prevent excessive wait times. Moved
leader lock handling into helper functions for better code organization
and reuse.
Change-Id: I39ed6526fc66113667c21051972da191d4ed209b
Reviewed-on: http://gerrit.cloudera.org:8080/22529
Reviewed-by: Zoltan Chovan <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/master/catalog_manager-test.cc | 15 ++++---
src/kudu/master/catalog_manager.cc | 51 ++++++++++++------------
src/kudu/master/catalog_manager.h | 12 +++---
src/kudu/master/hms_notification_log_listener.cc | 7 ++++
4 files changed, 46 insertions(+), 39 deletions(-)
diff --git a/src/kudu/master/catalog_manager-test.cc
b/src/kudu/master/catalog_manager-test.cc
index 03d2adeb1..3d60693ca 100644
--- a/src/kudu/master/catalog_manager-test.cc
+++ b/src/kudu/master/catalog_manager-test.cc
@@ -294,22 +294,21 @@ class CatalogManagerRpcAndUserFunctionsTest : public
KuduTest {
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
- KUDU_CHECK_OK(b.Build(&schema));
+ RETURN_NOT_OK(b.Build(&schema));
vector<string> columnNames;
columnNames.emplace_back("key");
- KuduTableCreator* tableCreator = client_->NewTableCreator();
+ std::unique_ptr<KuduTableCreator> tableCreator(client_->NewTableCreator());
tableCreator->table_name(kTableName).schema(&schema).set_range_partition_columns(columnNames);
int32_t increment = 1000 / 10;
for (int32_t i = 1; i < 10; i++) {
KuduPartialRow* row = schema.NewRow();
- KUDU_CHECK_OK(row->SetInt32(0, i * increment));
+ RETURN_NOT_OK(row->SetInt32(0, i * increment));
tableCreator->add_range_partition_split(row);
}
tableCreator->num_replicas(1);
Status s = tableCreator->Create();
- delete tableCreator;
return s;
}
@@ -333,7 +332,7 @@ class CatalogManagerRpcAndUserFunctionsTest : public
KuduTest {
};
TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTable) {
- CreateTestTable();
+ ASSERT_OK(CreateTestTable());
DeleteTableRequestPB req;
DeleteTableResponsePB resp;
req.mutable_table()->set_table_name("test_table");
@@ -342,7 +341,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest,
TestDeleteTable) {
}
TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTableWithUser) {
- CreateTestTable();
+ ASSERT_OK(CreateTestTable());
DeleteTableRequestPB req;
DeleteTableResponsePB resp;
req.mutable_table()->set_table_name("test_table");
@@ -369,7 +368,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest,
TestCreateTableWithUser) {
}
TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableRpc) {
- CreateTestTable();
+ ASSERT_OK(CreateTestTable());
AlterTableRequestPB req;
AlterTableResponsePB resp;
@@ -383,7 +382,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest,
TestAlterTableRpc) {
}
TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableWithUser) {
- CreateTestTable();
+ ASSERT_OK(CreateTestTable());
AlterTableRequestPB req;
AlterTableResponsePB resp;
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 901551c99..024ece53a 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -65,7 +65,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/arena.h>
-#include <google/protobuf/stubs/common.h>
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/common.pb.h"
@@ -440,6 +439,8 @@ TAG_FLAG(default_deleted_table_reserve_seconds, runtime);
DECLARE_string(hive_metastore_uris);
+DECLARE_int32(hive_metastore_notification_log_listener_catch_up_deadline_ms);
+
bool ValidateDeletedTableReserveSeconds() {
if (FLAGS_default_deleted_table_reserve_seconds > 0 &&
!FLAGS_hive_metastore_uris.empty()) {
@@ -1899,14 +1900,16 @@ Status ValidateClientSchema(const optional<string>&
name,
Status CatalogManager::CreateTableHelper(const CreateTableRequestPB* orig_req,
CreateTableResponsePB* resp,
- optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const optional<string>& username) {
- // Copy the request so we can fill in some defaults.
+ leader_lock_.AssertAcquiredForReading();
+
+ // Copy the request, so we can fill in some defaults.
CreateTableRequestPB req = *orig_req;
optional<string> user;
- if (rpc && rpc.value() != nullptr) {
- user.emplace(rpc.value()->remote_user().username());
+ if (rpc) {
+ user.emplace(rpc->remote_user().username());
} else {
user = username;
}
@@ -1929,7 +1932,7 @@ Status CatalogManager::CreateTableHelper(const
CreateTableRequestPB* orig_req,
const string& normalized_table_name = NormalizeTableName(req.name());
if (is_user_table) {
// a. Validate the user request.
- if ((rpc && rpc.value() != nullptr) || username) {
+ if (rpc) {
DCHECK(user.has_value());
RETURN_NOT_OK(SetupError(
authz_provider_->AuthorizeCreateTable(normalized_table_name, *user,
req.owner()),
@@ -2228,7 +2231,6 @@ Status CatalogManager::CreateTable(const
CreateTableRequestPB* orig_req,
rpc::RpcContext* rpc) {
LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
RequestorString(rpc), SecureDebugString(*orig_req));
- leader_lock_.AssertAcquiredForReading();
return CreateTableHelper(orig_req, resp, rpc, std::nullopt);
}
@@ -2239,7 +2241,6 @@ Status CatalogManager::CreateTableWithUser(const
CreateTableRequestPB* orig_req,
LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
user ? *user : "<default>",
SecureDebugString(*orig_req));
- leader_lock_.AssertAcquiredForReading();
return CreateTableHelper(orig_req, resp, nullptr, user);
}
@@ -2661,11 +2662,13 @@ Status
CatalogManager::EnableCompactionforRecalledTable(const RecallDeletedTable
Status CatalogManager::DeleteTableHelper(const DeleteTableRequestPB& req,
DeleteTableResponsePB* resp,
- optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const optional<string>& username) {
+ leader_lock_.AssertAcquiredForReading();
+
optional<string> user;
- if (rpc && rpc.value() != nullptr) {
- user.emplace(rpc.value()->remote_user().username());
+ if (rpc) {
+ user.emplace(rpc->remote_user().username());
} else {
user = username;
}
@@ -2727,7 +2730,6 @@ Status CatalogManager::DeleteTableRpc(const
DeleteTableRequestPB& req,
LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
RequestorString(rpc),
SecureShortDebugString(req));
- leader_lock_.AssertAcquiredForReading();
return DeleteTableHelper(req, resp, rpc, /*username=*/nullopt);
}
@@ -2736,7 +2738,6 @@ Status CatalogManager::DeleteTableWithUser(const
DeleteTableRequestPB& req,
const optional<string>& user) {
LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
user ? *user : "<default>",
SecureShortDebugString(req));
- leader_lock_.AssertAcquiredForReading();
return DeleteTableHelper(req, resp, /*rpc=*/nullptr, user);
}
@@ -3375,7 +3376,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
Status CatalogManager::AlterTableHelper(const AlterTableRequestPB& req,
AlterTableResponsePB* resp,
- optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const optional<string>& username,
AlterType alter_type) {
Status s;
@@ -3393,6 +3394,8 @@ Status CatalogManager::AlterTableHelper(const
AlterTableRequestPB& req,
}
}
+ leader_lock_.AssertAcquiredForReading();
+
if (req.modify_external_catalogs()) {
// If the HMS integration is enabled, wait for the notification log
listener
// to catch up. This reduces the likelihood of attempting to apply an
@@ -3401,8 +3404,8 @@ Status CatalogManager::AlterTableHelper(const
AlterTableRequestPB& req,
}
optional<const string> user;
- if (rpc && rpc.value() != nullptr) {
- user.emplace(rpc.value()->remote_user().username());
+ if (rpc) {
+ user.emplace(rpc->remote_user().username());
} else if (username) {
user.emplace(*username);
}
@@ -3497,8 +3500,6 @@ Status CatalogManager::AlterTableRpc(const
AlterTableRequestPB& req,
AlterType alter_type) {
LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
RequestorString(rpc), SecureShortDebugString(req));
-
- leader_lock_.AssertAcquiredForReading();
return AlterTableHelper(req, resp, rpc, nullopt, alter_type);
}
@@ -3508,7 +3509,6 @@ Status CatalogManager::AlterTableWithUser(const
AlterTableRequestPB& req,
AlterType alter_type) {
LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
user ? *user : "<default>",
SecureShortDebugString(req));
- leader_lock_.AssertAcquiredForReading();
return AlterTableHelper(req, resp, nullptr, user, alter_type);
}
@@ -6845,13 +6845,14 @@ void CatalogManager::AbortAndWaitForAllTasks(
}
template<typename RespClass>
-Status CatalogManager::WaitForNotificationLogListenerCatchUp(
- RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt) {
+Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
+ rpc::RpcContext*
rpc) {
if (hms_catalog_) {
- // If an RPC context is provided, use its deadline; otherwise, use
MonoTime::Max().
- MonoTime deadline = (rpc_opt && rpc_opt.value() != nullptr)
- ? rpc_opt.value()->GetClientDeadline()
- : MonoTime::Max();
+ MonoTime deadline =
+ rpc ? rpc->GetClientDeadline()
+ : MonoTime::Now() +
+ MonoDelta::FromMilliseconds(
+
FLAGS_hive_metastore_notification_log_listener_catch_up_deadline_ms);
Status s = hms_notification_log_listener_->WaitForCatchUp(deadline);
// ServiceUnavailable indicates the master has lost leadership.
MasterErrorPB::Code code = s.IsServiceUnavailable() ?
MasterErrorPB::NOT_THE_LEADER
diff --git a/src/kudu/master/catalog_manager.h
b/src/kudu/master/catalog_manager.h
index c4dc090b4..6d9ea7485 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -1057,7 +1057,7 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// an explicitly provided username for authentication.
Status CreateTableHelper(const CreateTableRequestPB* req,
CreateTableResponsePB* resp,
- std::optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const std::optional<std::string>& username);
// Common logic for DeleteTableRpc and DeleteTableWithUser.
@@ -1067,7 +1067,7 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// an explicitly provided username for authentication.
Status DeleteTableHelper(const DeleteTableRequestPB& req,
DeleteTableResponsePB* resp,
- std::optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const std::optional<std::string>& username);
// Delete the specified table in the catalog. If 'user' is provided,
@@ -1089,7 +1089,7 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// an explicitly provided username for authentication.
Status AlterTableHelper(const AlterTableRequestPB& req,
AlterTableResponsePB* resp,
- std::optional<rpc::RpcContext*> rpc,
+ rpc::RpcContext* rpc,
const std::optional<std::string>& username,
AlterType alter_type = AlterType::kNormal);
@@ -1386,9 +1386,9 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// Wait for the Hive Metastore notification log listener to process the
latest
// events, if the HMS integration is enabled. Handles setting the correct
// response code in the case of an error.
- template<typename RespClass>
- Status WaitForNotificationLogListenerCatchUp(
- RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt)
WARN_UNUSED_RESULT;
+ template <typename RespClass>
+ Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
+ rpc::RpcContext* rpc)
WARN_UNUSED_RESULT;
enum class ValidateType {
kCreateTable = 0,
diff --git a/src/kudu/master/hms_notification_log_listener.cc
b/src/kudu/master/hms_notification_log_listener.cc
index f0f0a58d3..67e369478 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -66,6 +66,13 @@
TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, hidden);
TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, unsafe);
TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, runtime);
+DEFINE_int32(hive_metastore_notification_log_listener_catch_up_deadline_ms,
30000,
+ "The deadline in milliseconds for the HMS log listener to catch up with the "
+ "latest log entry.");
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms,
advanced);
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms,
experimental);
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms,
runtime);
+
using rapidjson::Document;
using rapidjson::Value;
using std::optional;