This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new cf9597511 KUDU-3639 Add table operation functions follow-up
cf9597511 is described below

commit cf95975114e2ae38ad4e4cf0d81e2e59bafa49af
Author: gabriellalotz <[email protected]>
AuthorDate: Mon Feb 24 09:59:20 2025 +0000

    KUDU-3639 Add table operation functions follow-up
    
    Refactored pointer usage to remove unnecessary std::optional<>, ensuring
    consistency across function signatures. Introduced a dedicated timeout
    flag with a default of 30 seconds to prevent excessive wait times. Moved
    leader lock handling into helper functions for better code organization
    and reuse.
    
    Change-Id: I39ed6526fc66113667c21051972da191d4ed209b
    Reviewed-on: http://gerrit.cloudera.org:8080/22529
    Reviewed-by: Zoltan Chovan <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/master/catalog_manager-test.cc          | 15 ++++---
 src/kudu/master/catalog_manager.cc               | 51 ++++++++++++------------
 src/kudu/master/catalog_manager.h                | 12 +++---
 src/kudu/master/hms_notification_log_listener.cc |  7 ++++
 4 files changed, 46 insertions(+), 39 deletions(-)

diff --git a/src/kudu/master/catalog_manager-test.cc 
b/src/kudu/master/catalog_manager-test.cc
index 03d2adeb1..3d60693ca 100644
--- a/src/kudu/master/catalog_manager-test.cc
+++ b/src/kudu/master/catalog_manager-test.cc
@@ -294,22 +294,21 @@ class CatalogManagerRpcAndUserFunctionsTest : public 
KuduTest {
     KuduSchemaBuilder b;
     b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
     b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
-    KUDU_CHECK_OK(b.Build(&schema));
+    RETURN_NOT_OK(b.Build(&schema));
     vector<string> columnNames;
     columnNames.emplace_back("key");
 
-    KuduTableCreator* tableCreator = client_->NewTableCreator();
+    std::unique_ptr<KuduTableCreator> tableCreator(client_->NewTableCreator());
     
tableCreator->table_name(kTableName).schema(&schema).set_range_partition_columns(columnNames);
 
     int32_t increment = 1000 / 10;
     for (int32_t i = 1; i < 10; i++) {
       KuduPartialRow* row = schema.NewRow();
-      KUDU_CHECK_OK(row->SetInt32(0, i * increment));
+      RETURN_NOT_OK(row->SetInt32(0, i * increment));
       tableCreator->add_range_partition_split(row);
     }
     tableCreator->num_replicas(1);
     Status s = tableCreator->Create();
-    delete tableCreator;
     return s;
   }
 
@@ -333,7 +332,7 @@ class CatalogManagerRpcAndUserFunctionsTest : public 
KuduTest {
 };
 
 TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTable) {
-  CreateTestTable();
+  ASSERT_OK(CreateTestTable());
   DeleteTableRequestPB req;
   DeleteTableResponsePB resp;
   req.mutable_table()->set_table_name("test_table");
@@ -342,7 +341,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest, 
TestDeleteTable) {
 }
 
 TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestDeleteTableWithUser) {
-  CreateTestTable();
+  ASSERT_OK(CreateTestTable());
   DeleteTableRequestPB req;
   DeleteTableResponsePB resp;
   req.mutable_table()->set_table_name("test_table");
@@ -369,7 +368,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest, 
TestCreateTableWithUser) {
 }
 
 TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableRpc) {
-  CreateTestTable();
+  ASSERT_OK(CreateTestTable());
   AlterTableRequestPB req;
   AlterTableResponsePB resp;
 
@@ -383,7 +382,7 @@ TEST_F(CatalogManagerRpcAndUserFunctionsTest, 
TestAlterTableRpc) {
 }
 
 TEST_F(CatalogManagerRpcAndUserFunctionsTest, TestAlterTableWithUser) {
-  CreateTestTable();
+  ASSERT_OK(CreateTestTable());
   AlterTableRequestPB req;
   AlterTableResponsePB resp;
 
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 901551c99..024ece53a 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -65,7 +65,6 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/arena.h>
-#include <google/protobuf/stubs/common.h>
 
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/common.pb.h"
@@ -440,6 +439,8 @@ TAG_FLAG(default_deleted_table_reserve_seconds, runtime);
 
 DECLARE_string(hive_metastore_uris);
 
+DECLARE_int32(hive_metastore_notification_log_listener_catch_up_deadline_ms);
+
 bool ValidateDeletedTableReserveSeconds()  {
   if (FLAGS_default_deleted_table_reserve_seconds > 0 &&
       !FLAGS_hive_metastore_uris.empty()) {
@@ -1899,14 +1900,16 @@ Status ValidateClientSchema(const optional<string>& 
name,
 
 Status CatalogManager::CreateTableHelper(const CreateTableRequestPB* orig_req,
                                          CreateTableResponsePB* resp,
-                                         optional<rpc::RpcContext*> rpc,
+                                         rpc::RpcContext* rpc,
                                          const optional<string>& username) {
-  // Copy the request so we can fill in some defaults.
+  leader_lock_.AssertAcquiredForReading();
+
+  // Copy the request, so we can fill in some defaults.
   CreateTableRequestPB req = *orig_req;
   optional<string> user;
 
-  if (rpc && rpc.value() != nullptr) {
-    user.emplace(rpc.value()->remote_user().username());
+  if (rpc) {
+    user.emplace(rpc->remote_user().username());
   } else {
     user = username;
   }
@@ -1929,7 +1932,7 @@ Status CatalogManager::CreateTableHelper(const 
CreateTableRequestPB* orig_req,
   const string& normalized_table_name = NormalizeTableName(req.name());
   if (is_user_table) {
     // a. Validate the user request.
-    if ((rpc && rpc.value() != nullptr) || username) {
+    if (rpc) {
       DCHECK(user.has_value());
       RETURN_NOT_OK(SetupError(
           authz_provider_->AuthorizeCreateTable(normalized_table_name, *user, 
req.owner()),
@@ -2228,7 +2231,6 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
                                    rpc::RpcContext* rpc) {
   LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
                           RequestorString(rpc), SecureDebugString(*orig_req));
-  leader_lock_.AssertAcquiredForReading();
   return CreateTableHelper(orig_req, resp, rpc, std::nullopt);
 }
 
@@ -2239,7 +2241,6 @@ Status CatalogManager::CreateTableWithUser(const 
CreateTableRequestPB* orig_req,
   LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
                           user ? *user : "<default>",
                           SecureDebugString(*orig_req));
-  leader_lock_.AssertAcquiredForReading();
   return CreateTableHelper(orig_req, resp, nullptr, user);
 }
 
@@ -2661,11 +2662,13 @@ Status 
CatalogManager::EnableCompactionforRecalledTable(const RecallDeletedTable
 
 Status CatalogManager::DeleteTableHelper(const DeleteTableRequestPB& req,
                                          DeleteTableResponsePB* resp,
-                                         optional<rpc::RpcContext*> rpc,
+                                         rpc::RpcContext* rpc,
                                          const optional<string>& username) {
+  leader_lock_.AssertAcquiredForReading();
+
   optional<string> user;
-  if (rpc && rpc.value() != nullptr) {
-    user.emplace(rpc.value()->remote_user().username());
+  if (rpc) {
+    user.emplace(rpc->remote_user().username());
   } else {
     user = username;
   }
@@ -2727,7 +2730,6 @@ Status CatalogManager::DeleteTableRpc(const 
DeleteTableRequestPB& req,
   LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
                           RequestorString(rpc),
                           SecureShortDebugString(req));
-  leader_lock_.AssertAcquiredForReading();
   return DeleteTableHelper(req, resp, rpc, /*username=*/nullopt);
 }
 
@@ -2736,7 +2738,6 @@ Status CatalogManager::DeleteTableWithUser(const 
DeleteTableRequestPB& req,
                                            const optional<string>& user) {
   LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
                           user ? *user : "<default>", 
SecureShortDebugString(req));
-  leader_lock_.AssertAcquiredForReading();
   return DeleteTableHelper(req, resp, /*rpc=*/nullptr, user);
 }
 
@@ -3375,7 +3376,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
 
 Status CatalogManager::AlterTableHelper(const AlterTableRequestPB& req,
                                         AlterTableResponsePB* resp,
-                                        optional<rpc::RpcContext*> rpc,
+                                        rpc::RpcContext* rpc,
                                         const optional<string>& username,
                                         AlterType alter_type) {
   Status s;
@@ -3393,6 +3394,8 @@ Status CatalogManager::AlterTableHelper(const 
AlterTableRequestPB& req,
     }
   }
 
+  leader_lock_.AssertAcquiredForReading();
+
   if (req.modify_external_catalogs()) {
     // If the HMS integration is enabled, wait for the notification log 
listener
     // to catch up. This reduces the likelihood of attempting to apply an
@@ -3401,8 +3404,8 @@ Status CatalogManager::AlterTableHelper(const 
AlterTableRequestPB& req,
   }
 
   optional<const string> user;
-  if (rpc && rpc.value() != nullptr) {
-    user.emplace(rpc.value()->remote_user().username());
+  if (rpc) {
+    user.emplace(rpc->remote_user().username());
   } else if (username) {
     user.emplace(*username);
   }
@@ -3497,8 +3500,6 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
                                      AlterType alter_type) {
   LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
                           RequestorString(rpc), SecureShortDebugString(req));
-
-  leader_lock_.AssertAcquiredForReading();
   return AlterTableHelper(req, resp, rpc, nullopt, alter_type);
 }
 
@@ -3508,7 +3509,6 @@ Status CatalogManager::AlterTableWithUser(const 
AlterTableRequestPB& req,
                                          AlterType alter_type) {
   LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
                           user ? *user : "<default>", 
SecureShortDebugString(req));
-  leader_lock_.AssertAcquiredForReading();
   return AlterTableHelper(req, resp, nullptr, user, alter_type);
 }
 
@@ -6845,13 +6845,14 @@ void CatalogManager::AbortAndWaitForAllTasks(
 }
 
 template<typename RespClass>
-Status CatalogManager::WaitForNotificationLogListenerCatchUp(
-    RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt) {
+Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
+                                                             rpc::RpcContext* 
rpc) {
   if (hms_catalog_) {
-    // If an RPC context is provided, use its deadline; otherwise, use 
MonoTime::Max().
-    MonoTime deadline = (rpc_opt && rpc_opt.value() != nullptr)
-                            ? rpc_opt.value()->GetClientDeadline()
-                            : MonoTime::Max();
+    MonoTime deadline =
+        rpc ? rpc->GetClientDeadline()
+            : MonoTime::Now() +
+                  MonoDelta::FromMilliseconds(
+                      
FLAGS_hive_metastore_notification_log_listener_catch_up_deadline_ms);
     Status s = hms_notification_log_listener_->WaitForCatchUp(deadline);
     // ServiceUnavailable indicates the master has lost leadership.
     MasterErrorPB::Code code = s.IsServiceUnavailable() ? 
MasterErrorPB::NOT_THE_LEADER
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index c4dc090b4..6d9ea7485 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -1057,7 +1057,7 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // an explicitly provided username for authentication.
   Status CreateTableHelper(const CreateTableRequestPB* req,
                           CreateTableResponsePB* resp,
-                          std::optional<rpc::RpcContext*> rpc,
+                          rpc::RpcContext* rpc,
                           const std::optional<std::string>& username);
 
   // Common logic for DeleteTableRpc and DeleteTableWithUser.
@@ -1067,7 +1067,7 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // an explicitly provided username for authentication.
   Status DeleteTableHelper(const DeleteTableRequestPB& req,
                            DeleteTableResponsePB* resp,
-                           std::optional<rpc::RpcContext*> rpc,
+                           rpc::RpcContext* rpc,
                            const std::optional<std::string>& username);
 
   // Delete the specified table in the catalog. If 'user' is provided,
@@ -1089,7 +1089,7 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // an explicitly provided username for authentication.
   Status AlterTableHelper(const AlterTableRequestPB& req,
                           AlterTableResponsePB* resp,
-                          std::optional<rpc::RpcContext*> rpc,
+                          rpc::RpcContext* rpc,
                           const std::optional<std::string>& username,
                           AlterType alter_type = AlterType::kNormal);
 
@@ -1386,9 +1386,9 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // Wait for the Hive Metastore notification log listener to process the 
latest
   // events, if the HMS integration is enabled. Handles setting the correct
   // response code in the case of an error.
-  template<typename RespClass>
-  Status WaitForNotificationLogListenerCatchUp(
-      RespClass* resp, const std::optional<rpc::RpcContext*>& rpc_opt) 
WARN_UNUSED_RESULT;
+  template <typename RespClass>
+  Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
+                                               rpc::RpcContext* rpc) 
WARN_UNUSED_RESULT;
 
   enum class ValidateType {
     kCreateTable = 0,
diff --git a/src/kudu/master/hms_notification_log_listener.cc 
b/src/kudu/master/hms_notification_log_listener.cc
index f0f0a58d3..67e369478 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -66,6 +66,13 @@ 
TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, hidden);
 TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, unsafe);
 TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, runtime);
 
+DEFINE_int32(hive_metastore_notification_log_listener_catch_up_deadline_ms, 
30000,
+  "The deadline in milliseconds for the HMS log listener to catch up with the "
+  "latest log entry.");
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms, 
advanced);
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms, 
experimental);
+TAG_FLAG(hive_metastore_notification_log_listener_catch_up_deadline_ms, 
runtime);
+
 using rapidjson::Document;
 using rapidjson::Value;
 using std::optional;

Reply via email to