This is an automated email from the ASF dual-hosted git repository.
mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 7dbf89e9e KUDU-3639 Add REST-compatible table operation functions
7dbf89e9e is described below
commit 7dbf89e9ecff1c05e78e522fc77c409387b88ca5
Author: gabriellalotz <[email protected]>
AuthorDate: Thu Jan 23 15:32:44 2025 +0000
KUDU-3639 Add REST-compatible table operation functions
Added new functions to CatalogManager (CreateTableWithUser,
DeleteTableWithUser, and AlterTableWithUser) to support the REST API
implementation. These functions accept a username parameter instead of
relying on RpcContext, making them suitable for contexts where
RpcContext is unavailable.
Shared logic between the RPC-based and REST-compatible functions has
been refactored into reusable helper methods to minimize code
duplication.
Change-Id: If52359603e3aa8af2dedc41ecc5eb78d03151fe5
Reviewed-on: http://gerrit.cloudera.org:8080/22420
Tested-by: Marton Greber <[email protected]>
Reviewed-by: Marton Greber <[email protected]>
Reviewed-by: Zoltan Martonka <[email protected]>
---
src/kudu/master/catalog_manager-test.cc | 153 +++++++++++++++++++++++++++++++-
src/kudu/master/catalog_manager.cc | 141 ++++++++++++++++++++---------
src/kudu/master/catalog_manager.h | 51 ++++++++++-
3 files changed, 298 insertions(+), 47 deletions(-)
diff --git a/src/kudu/master/catalog_manager-test.cc
b/src/kudu/master/catalog_manager-test.cc
index 96898656e..03d2adeb1 100644
--- a/src/kudu/master/catalog_manager-test.cc
+++ b/src/kudu/master/catalog_manager-test.cc
@@ -17,23 +17,35 @@
#include "kudu/master/catalog_manager.h"
+#include <cstdint>
+#include <memory>
#include <numeric>
#include <ostream>
#include <string>
+#include <type_traits>
#include <vector>
-#include <glog/logging.h>
#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.pb.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
+#include "kudu/master/mini_master.h"
#include "kudu/master/ts_descriptor.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/util/cow_object.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -44,6 +56,18 @@ using std::iota;
using std::string;
using std::vector;
using strings::Substitute;
+using std::unique_ptr;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::Status;
+using kudu::KuduPartialRow;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
namespace kudu {
namespace master {
@@ -250,5 +274,128 @@ TEST(TableInfoTest, MaxReturnedLocationsNotSpecified) {
}
}
-} // namespace master
-} // namespace kudu
+class CatalogManagerRpcAndUserFunctionsTest : public KuduTest {
+ protected:
+ void SetUp() override {
+ KuduTest::SetUp();
+
+ cluster_.reset(new InternalMiniCluster(env_,
InternalMiniClusterOptions()));
+ ASSERT_OK(cluster_->Start());
+ master_ = cluster_->mini_master()->master();
+
+ ASSERT_OK(KuduClientBuilder()
+
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+ .Build(&client_));
+ }
+
+ Status CreateTestTable() {
+ string kTableName = "test_table";
+ KuduSchema schema;
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
+ KUDU_CHECK_OK(b.Build(&schema));
+ vector<string> columnNames;
+ columnNames.emplace_back("key");
+
+ KuduTableCreator* tableCreator = client_->NewTableCreator();
+
tableCreator->table_name(kTableName).schema(&schema).set_range_partition_columns(columnNames);
+
+ int32_t increment = 1000 / 10;
+ for (int32_t i = 1; i < 10; i++) {
+ KuduPartialRow* row = schema.NewRow();
+ KUDU_CHECK_OK(row->SetInt32(0, i * increment));
+ tableCreator->add_range_partition_split(row);
+ }
+ tableCreator->num_replicas(1);
+ Status s = tableCreator->Create();
+ delete tableCreator;
+ return s;
+ }
+
+ static void PopulateCreateTableRequest(CreateTableRequestPB* req) {
+ SchemaPB* schema = req->mutable_schema();
+ ColumnSchemaPB* col = schema->add_columns();
+ col->set_name("key");
+ col->set_type(INT32);
+ col->set_is_key(true);
+ ColumnSchemaPB* col2 = schema->add_columns();
+ col2->set_name("int_val");
+ col2->set_type(INT32);
+ req->set_name("test_table");
+ req->set_owner("default");
+ req->set_num_replicas(1);
+ }
+
+ unique_ptr<InternalMiniCluster> cluster_;
+ Master* master_;
+ shared_ptr<KuduClient> client_;
+};
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTable) {
+ CreateTestTable();
+ DeleteTableRequestPB req;
+ DeleteTableResponsePB resp;
+ req.mutable_table()->set_table_name("test_table");
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ ASSERT_OK(master_->catalog_manager()->DeleteTableRpc(req, &resp, nullptr));
+}
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTableWithUser) {
+ CreateTestTable();
+ DeleteTableRequestPB req;
+ DeleteTableResponsePB resp;
+ req.mutable_table()->set_table_name("test_table");
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ const string user = "test_user";
+ ASSERT_OK(master_->catalog_manager()->DeleteTableWithUser(req, &resp, user));
+}
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestCreateTableRpc) {
+ CreateTableRequestPB req;
+ CreateTableResponsePB resp;
+ PopulateCreateTableRequest(&req);
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ ASSERT_OK(master_->catalog_manager()->CreateTable(&req, &resp, nullptr));
+}
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestCreateTableWithUser) {
+ CreateTableRequestPB req;
+ CreateTableResponsePB resp;
+ PopulateCreateTableRequest(&req);
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ const string user = "test_user";
+ ASSERT_OK(master_->catalog_manager()->CreateTableWithUser(&req, &resp,
user));
+}
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableRpc) {
+ CreateTestTable();
+ AlterTableRequestPB req;
+ AlterTableResponsePB resp;
+
+ req.mutable_table()->set_table_name("test_table");
+ AlterTableRequestPB::Step *step = req.add_alter_schema_steps();
+ step->set_type(AlterTableRequestPB::ADD_COLUMN);
+ ColumnSchemaToPB(ColumnSchema("int_val2", INT32, true),
+ step->mutable_add_column()->mutable_schema());
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ ASSERT_OK(master_->catalog_manager()->AlterTableRpc(req, &resp, nullptr));
+}
+
+TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableWithUser) {
+ CreateTestTable();
+ AlterTableRequestPB req;
+ AlterTableResponsePB resp;
+
+ req.mutable_table()->set_table_name("test_table");
+ AlterTableRequestPB::Step *step = req.add_alter_schema_steps();
+ step->set_type(AlterTableRequestPB::ADD_COLUMN);
+ ColumnSchemaToPB(ColumnSchema("int_val2", INT32, true),
+ step->mutable_add_column()->mutable_schema());
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ const string user = "test_user";
+ ASSERT_OK(master_->catalog_manager()->AlterTableWithUser(req, &resp, user));
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index d275b61a9..901551c99 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1897,21 +1897,18 @@ Status ValidateClientSchema(const optional<string>&
name,
} // anonymous namespace
-// Create a new table.
-// See README file in this directory for a description of the design.
-Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
- CreateTableResponsePB* resp,
- rpc::RpcContext* rpc) {
- leader_lock_.AssertAcquiredForReading();
-
- // Copy the request, so we can fill in some defaults.
+Status CatalogManager::CreateTableHelper(const CreateTableRequestPB* orig_req,
+ CreateTableResponsePB* resp,
+ optional<rpc::RpcContext*> rpc,
+ const optional<string>& username) {
+ // Copy the request so we can fill in some defaults.
CreateTableRequestPB req = *orig_req;
- LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
- RequestorString(rpc), SecureDebugString(req));
+ optional<string> user;
- optional<const string> user;
- if (rpc) {
- user.emplace(rpc->remote_user().username());
+ if (rpc && rpc.value() != nullptr) {
+ user.emplace(rpc.value()->remote_user().username());
+ } else {
+ user = username;
}
// Default the owner if it isn't set.
if (user && !req.has_owner()) {
@@ -1932,7 +1929,7 @@ Status CatalogManager::CreateTable(const
CreateTableRequestPB* orig_req,
const string& normalized_table_name = NormalizeTableName(req.name());
if (is_user_table) {
// a. Validate the user request.
- if (rpc) {
+ if ((rpc && rpc.value() != nullptr) || username) {
DCHECK(user.has_value());
RETURN_NOT_OK(SetupError(
authz_provider_->AuthorizeCreateTable(normalized_table_name, *user,
req.owner()),
@@ -2224,6 +2221,28 @@ Status CatalogManager::CreateTable(const
CreateTableRequestPB* orig_req,
return Status::OK();
}
+// Create a new table.
+// See README file in this directory for a description of the design.
+Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
+ CreateTableResponsePB* resp,
+ rpc::RpcContext* rpc) {
+ LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
+ RequestorString(rpc), SecureDebugString(*orig_req));
+ leader_lock_.AssertAcquiredForReading();
+ return CreateTableHelper(orig_req, resp, rpc, std::nullopt);
+}
+
+// Create a new table.
+Status CatalogManager::CreateTableWithUser(const CreateTableRequestPB*
orig_req,
+ CreateTableResponsePB* resp,
+ const optional<string>& user) {
+ LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
+ user ? *user : "<default>",
+ SecureDebugString(*orig_req));
+ leader_lock_.AssertAcquiredForReading();
+ return CreateTableHelper(orig_req, resp, nullptr, user);
+}
+
Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
IsCreateTableDoneResponsePB* resp,
const optional<string>& user) {
@@ -2640,17 +2659,15 @@ Status
CatalogManager::EnableCompactionforRecalledTable(const RecallDeletedTable
return Status::OK();
}
-Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
- DeleteTableResponsePB* resp,
- rpc::RpcContext* rpc) {
- LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
- RequestorString(rpc), SecureShortDebugString(req));
-
- leader_lock_.AssertAcquiredForReading();
-
+Status CatalogManager::DeleteTableHelper(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ optional<rpc::RpcContext*> rpc,
+ const optional<string>& username) {
optional<string> user;
- if (rpc) {
- user.emplace(rpc->remote_user().username());
+ if (rpc && rpc.value() != nullptr) {
+ user.emplace(rpc.value()->remote_user().username());
+ } else {
+ user = username;
}
// If the HMS integration is enabled and the table should be deleted in the
HMS,
@@ -2704,6 +2721,25 @@ Status CatalogManager::DeleteTableRpc(const
DeleteTableRequestPB& req,
return DeleteTable(req, resp, /*hms_notification_log_event_id=*/nullopt,
user);
}
+Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ rpc::RpcContext* rpc) {
+ LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
+ RequestorString(rpc),
+ SecureShortDebugString(req));
+ leader_lock_.AssertAcquiredForReading();
+ return DeleteTableHelper(req, resp, rpc, /*username=*/nullopt);
+}
+
+Status CatalogManager::DeleteTableWithUser(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ const optional<string>& user) {
+ LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
+ user ? *user : "<default>",
SecureShortDebugString(req));
+ leader_lock_.AssertAcquiredForReading();
+ return DeleteTableHelper(req, resp, /*rpc=*/nullptr, user);
+}
+
Status CatalogManager::DeleteTableHms(const string& table_name,
const string& table_id,
int64_t notification_log_event_id) {
@@ -3337,13 +3373,11 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
return Status::OK();
}
-Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
- AlterTableResponsePB* resp,
- rpc::RpcContext* rpc,
- AlterType alter_type) {
- LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
- RequestorString(rpc), SecureShortDebugString(req));
-
+Status CatalogManager::AlterTableHelper(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ optional<rpc::RpcContext*> rpc,
+ const optional<string>& username,
+ AlterType alter_type) {
Status s;
if (alter_type == AlterType::kNormal) {
bool is_soft_deleted_table = false;
@@ -3359,8 +3393,6 @@ Status CatalogManager::AlterTableRpc(const
AlterTableRequestPB& req,
}
}
- leader_lock_.AssertAcquiredForReading();
-
if (req.modify_external_catalogs()) {
// If the HMS integration is enabled, wait for the notification log
listener
// to catch up. This reduces the likelihood of attempting to apply an
@@ -3369,8 +3401,10 @@ Status CatalogManager::AlterTableRpc(const
AlterTableRequestPB& req,
}
optional<const string> user;
- if (rpc) {
- user.emplace(rpc->remote_user().username());
+ if (rpc && rpc.value() != nullptr) {
+ user.emplace(rpc.value()->remote_user().username());
+ } else if (username) {
+ user.emplace(*username);
}
// If the HMS integration is enabled, the alteration includes a table
// rename and the table should be altered in the HMS, then don't directly
@@ -3457,6 +3491,27 @@ Status CatalogManager::AlterTableRpc(const
AlterTableRequestPB& req,
return AlterTable(req, resp, /*hms_notification_log_event_id=*/nullopt,
user);
}
+Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ rpc::RpcContext* rpc,
+ AlterType alter_type) {
+ LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
+ RequestorString(rpc), SecureShortDebugString(req));
+
+ leader_lock_.AssertAcquiredForReading();
+ return AlterTableHelper(req, resp, rpc, nullopt, alter_type);
+}
+
+Status CatalogManager::AlterTableWithUser(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ const optional<string>& user,
+ AlterType alter_type) {
+ LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
+ user ? *user : "<default>",
SecureShortDebugString(req));
+ leader_lock_.AssertAcquiredForReading();
+ return AlterTableHelper(req, resp, nullptr, user, alter_type);
+}
+
Status CatalogManager::AlterTableHms(const string& table_id,
const string& table_name,
const optional<string>& new_table_name,
@@ -6790,15 +6845,17 @@ void CatalogManager::AbortAndWaitForAllTasks(
}
template<typename RespClass>
-Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
- rpc::RpcContext*
rpc) {
+Status CatalogManager::WaitForNotificationLogListenerCatchUp(
+ RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt) {
if (hms_catalog_) {
- CHECK(rpc);
- Status s =
hms_notification_log_listener_->WaitForCatchUp(rpc->GetClientDeadline());
+ // If an RPC context is provided, use its deadline; otherwise, use
MonoTime::Max().
+ MonoTime deadline = (rpc_opt && rpc_opt.value() != nullptr)
+ ? rpc_opt.value()->GetClientDeadline()
+ : MonoTime::Max();
+ Status s = hms_notification_log_listener_->WaitForCatchUp(deadline);
// ServiceUnavailable indicates the master has lost leadership.
- MasterErrorPB::Code code = s.IsServiceUnavailable() ?
- MasterErrorPB::NOT_THE_LEADER :
- MasterErrorPB::HIVE_METASTORE_ERROR;
+ MasterErrorPB::Code code = s.IsServiceUnavailable() ?
MasterErrorPB::NOT_THE_LEADER
+ :
MasterErrorPB::HIVE_METASTORE_ERROR;
return SetupError(s, resp, code);
}
return Status::OK();
diff --git a/src/kudu/master/catalog_manager.h
b/src/kudu/master/catalog_manager.h
index 0d6848749..c4dc090b4 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -666,6 +666,11 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
CreateTableResponsePB* resp,
rpc::RpcContext* rpc);
+ // Create a new Table with the specified attributes.
+ Status CreateTableWithUser(const CreateTableRequestPB* req,
+ CreateTableResponsePB* resp,
+ const std::optional<std::string>& user);
+
// Get the information about an in-progress create operation. If 'user' is
// provided, checks that the user is authorized to get such information.
Status IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
@@ -680,6 +685,11 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
DeleteTableResponsePB* resp,
rpc::RpcContext* rpc) WARN_UNUSED_RESULT;
+ // Delete the specified table.
+ Status DeleteTableWithUser(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ const std::optional<std::string>& user);
+
// Mark the table as soft-deleted with ability to restore it back within
// the soft-delete reservation period.
Status SoftDeleteTableRpc(const DeleteTableRequestPB& req,
@@ -718,6 +728,12 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
rpc::RpcContext* rpc,
AlterType alter_type = AlterType::kNormal);
+ // Alter the specified table in response to an AlterTableRequest.
+ Status AlterTableWithUser(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ const std::optional<std::string>& user,
+ AlterType alter_type = AlterType::kNormal);
+
// Alter the specified table in response to an 'ALTER TABLE' HMS
// notification log listener event.
Status AlterTableHms(const std::string& table_id,
@@ -1034,6 +1050,26 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
ColumnId* next_col_id,
bool* needs_range_bounds_refresh);
+ // Common logic for CreateTable and CreateTableWithUser.
+ // The rpc context is optional because this function is used by both
CreateTable (RPC API)
+ // and CreateTableWithUser (REST API). CreateTable passes an RPC context,
which provides
+ // authentication details, whereas CreateTableWithUser does not and instead
relies on
+ // an explicitly provided username for authentication.
+ Status CreateTableHelper(const CreateTableRequestPB* req,
+ CreateTableResponsePB* resp,
+ std::optional<rpc::RpcContext*> rpc,
+ const std::optional<std::string>& username);
+
+ // Common logic for DeleteTableRpc and DeleteTableWithUser.
+ // The rpc context is optional because this function is used by both
DeleteTableRpc (RPC API)
+ // and DeleteTableWithUser (REST API). DeleteTableRpc passes an RPC context,
which provides
+ // authentication details, whereas DeleteTableWithUser does not and instead
relies on
+ // an explicitly provided username for authentication.
+ Status DeleteTableHelper(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ std::optional<rpc::RpcContext*> rpc,
+ const std::optional<std::string>& username);
+
// Delete the specified table in the catalog. If 'user' is provided,
// checks that the user is authorized to delete the table. Otherwise,
// it indicates its an internal operation (originates from catalog
@@ -1046,6 +1082,17 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
std::optional<int64_t> hms_notification_log_event_id,
const std::optional<std::string>& user)
WARN_UNUSED_RESULT;
+ // Common logic for AlterTableRpc and AlterTableWithUser.
+ // The rpc context is optional because this function is used by both
AlterTableRpc (RPC API)
+ // and AlterTableWithUser (REST API). AlterTableRpc passes an RPC context,
which provides
+ // authentication details, whereas AlterTableWithUser does not and instead
relies on
+ // an explicitly provided username for authentication.
+ Status AlterTableHelper(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ std::optional<rpc::RpcContext*> rpc,
+ const std::optional<std::string>& username,
+ AlterType alter_type = AlterType::kNormal);
+
// Alter the specified table in the catalog. If 'user' is provided,
// checks that the user is authorized to alter the table. Otherwise,
// it indicates its an internal operation (originates from catalog
@@ -1340,8 +1387,8 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// events, if the HMS integration is enabled. Handles setting the correct
// response code in the case of an error.
template<typename RespClass>
- Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
- rpc::RpcContext* rpc)
WARN_UNUSED_RESULT;
+ Status WaitForNotificationLogListenerCatchUp(
+ RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt)
WARN_UNUSED_RESULT;
enum class ValidateType {
kCreateTable = 0,