This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new cc1e091 KUDU-3192: [client] Expose the cluster ID in the client
KuduTable
cc1e091 is described below
commit cc1e091904ceb6b6c527498d8ff849bb42412115
Author: Grant Henke <[email protected]>
AuthorDate: Tue Sep 22 21:37:52 2020 -0500
KUDU-3192: [client] Expose the cluster ID in the client KuduTable
This patch adds a cluster ID field to the C++ and Java Kudu
clients. This field will be used in a follow on change to better
enable the HMS integration to use the cluster ID.
Change-Id: Ia1f5a451aaa44834534d2387ee1c9aa9cf95dd37
Reviewed-on: http://gerrit.cloudera.org:8080/16493
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Grant Henke <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 25 +++++++++++++++++++++-
.../java/org/apache/kudu/client/KuduClient.java | 12 +++++++++++
.../org/apache/kudu/client/TestKuduClient.java | 8 +++++++
src/kudu/client/client-internal.cc | 6 ++++++
src/kudu/client/client-internal.h | 10 +++++++--
src/kudu/client/client-test.cc | 13 +++++++++++
src/kudu/client/client.cc | 5 +++++
src/kudu/client/client.h | 5 +++++
src/kudu/integration-tests/cluster_itest_util.cc | 10 +++++++++
src/kudu/integration-tests/cluster_itest_util.h | 5 +++++
.../integration-tests/master_failover-itest.cc | 19 ++++++----------
src/kudu/master/master-test.cc | 9 ++++++++
src/kudu/master/master.proto | 3 +++
src/kudu/master/master_service.cc | 4 ++++
14 files changed, 119 insertions(+), 15 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 9260189..0d4d00c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -340,6 +340,15 @@ public class AsyncKuduClient implements AutoCloseable {
private String location = "";
/**
+ * The ID of the cluster that this client is connected to.
+ *
+ * It will be an empty string if the client is not connected
+ * or the client is connected to a cluster that doesn't support
+ * cluster IDs
+ */
+ private String clusterId = "";
+
+ /**
* Semaphore used to rate-limit master lookups
* Once we have more than this number of concurrent master lookups, we'll
* start to throttle ourselves slightly.
@@ -476,6 +485,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
synchronized (AsyncKuduClient.this) {
location = masterResponsePB.getClientLocation();
+ clusterId = masterResponsePB.getClusterId();
}
return null;
}
@@ -526,11 +536,23 @@ public class AsyncKuduClient implements AutoCloseable {
*
* @return a string representation of this client's location
*/
- public String getLocationString() {
+ public synchronized String getLocationString() {
return location;
}
/**
+ * Returns the ID of the cluster that this client is connected to.
+ * It will be an empty string if the client is not connected or
+ * the client is connected to a cluster that doesn't support
+ * cluster IDs.
+ *
+ * @return the ID of the cluster that this client is connected to
+ */
+ public synchronized String getClusterId() {
+ return clusterId;
+ }
+
+ /**
* Returns the {@link Timer} instance held by this client. This timer should
* be used everywhere for scheduling tasks after a delay, e.g., for
* timeouts.
@@ -1807,6 +1829,7 @@ public class AsyncKuduClient implements AutoCloseable {
synchronized (AsyncKuduClient.this) {
AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig;
location = respPb.getClientLocation();
+ clusterId = respPb.getClusterId();
}
hasConnectedToMaster = true;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index ebc9096..8062f16 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -93,6 +93,18 @@ public class KuduClient implements AutoCloseable {
}
/**
+ * Returns the ID of the cluster that this client is connected to.
+ * It will be an empty string if the client is not connected or
+ * the client is connected to a cluster that doesn't support
+ * cluster IDs.
+ *
+ * @return the ID of the cluster that this client is connected to
+ */
+ public String getClusterId() {
+ return asyncClient.getClusterId();
+ }
+
+ /**
* Returns the unique client id assigned to this client.
* @return the unique client id assigned to this client.
*/
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 4a7a473..32e75e3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1324,6 +1324,14 @@ public class TestKuduClient {
}
@Test(timeout = 100000)
+ public void testClusterId() throws Exception {
+ assertTrue(client.getClusterId().isEmpty());
+ // Do something that will cause the client to connect to the cluster.
+ client.listTabletServers();
+ assertFalse(client.getClusterId().isEmpty());
+ }
+
+ @Test(timeout = 100000)
public void testSessionOnceClosed() throws Exception {
client.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
KuduTable table = client.openTable(TABLE_NAME);
diff --git a/src/kudu/client/client-internal.cc
b/src/kudu/client/client-internal.cc
index 524d50d..afd961a 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -660,6 +660,7 @@ void KuduClient::Data::ConnectedToClusterCb(
hive_metastore_uuid_ = hive_config.hms_uuid();
location_ = connect_response.client_location();
+ cluster_id_ = connect_response.cluster_id();
master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr,
leader_hostname));
master_proxy_->set_user_credentials(user_credentials_);
@@ -810,6 +811,11 @@ string KuduClient::Data::location() const {
return location_;
}
+string KuduClient::Data::cluster_id() const {
+ std::lock_guard<simple_spinlock> l(leader_master_lock_);
+ return cluster_id_;
+}
+
shared_ptr<master::MasterServiceProxy> KuduClient::Data::master_proxy() const {
std::lock_guard<simple_spinlock> l(leader_master_lock_);
return master_proxy_;
diff --git a/src/kudu/client/client-internal.h
b/src/kudu/client/client-internal.h
index 4f2a85e..3adeb62 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -238,6 +238,8 @@ class KuduClient::Data {
std::string location() const;
+ std::string cluster_id() const;
+
uint64_t GetLatestObservedTimestamp() const;
void UpdateLatestObservedTimestamp(uint64_t timestamp);
@@ -249,6 +251,10 @@ class KuduClient::Data {
// been assigned by the leader master. Protected by 'leader_master_lock_'.
std::string location_;
+ // The ID of the cluster that this client is connected to.
+ // Protected by 'leader_master_lock_'.
+ std::string cluster_id_;
+
// The user credentials of the client. This field is constant after the
client
// is built.
rpc::UserCredentials user_credentials_;
@@ -302,8 +308,8 @@ class KuduClient::Data {
std::vector<StatusCallback> leader_master_callbacks_primary_creds_;
// Protects 'leader_master_rpc_{any,primary}_creds_',
- // 'leader_master_hostport_', 'master_hostports_', 'master_proxy_', and
- // 'location_'.
+ // 'leader_master_hostport_', 'master_hostports_', 'master_proxy_',
+ // 'location_', and 'cluster_id'.
//
// See: KuduClient::Data::ConnectToClusterAsync for a more
// in-depth explanation of why this is needed and how it works.
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index b9cf55f..4c76efe 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -82,6 +82,7 @@
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/data_gen_util.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
@@ -176,6 +177,7 @@ using boost::none;
using google::protobuf::util::MessageDifferencer;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
+using kudu::itest::GetClusterId;
using kudu::master::CatalogManager;
using kudu::master::GetTableLocationsRequestPB;
using kudu::master::GetTableLocationsResponsePB;
@@ -786,6 +788,17 @@ class ClientTest : public KuduTest {
const char *ClientTest::kTableName = "client-testtb";
const int32_t ClientTest::kNoBound = kint32max;
+TEST_F(ClientTest, TestClusterId) {
+ int leader_idx;
+ ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ string cluster_id;
+ ASSERT_OK(GetClusterId(cluster_->master_proxy(leader_idx),
+ MonoDelta::FromSeconds(30),
+ &cluster_id));
+ ASSERT_TRUE(!cluster_id.empty());
+ ASSERT_EQ(cluster_id, client_->cluster_id());
+}
+
TEST_F(ClientTest, TestListTables) {
const char* kTable2Name = "client-testtb2";
shared_ptr<KuduTable> second_table;
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 2d62546..b460165 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -29,6 +29,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <boost/type_traits/decay.hpp>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
@@ -683,6 +684,10 @@ string KuduClient::location() const {
return data_->location();
}
+string KuduClient::cluster_id() const {
+ return data_->cluster_id();
+}
+
////////////////////////////////////////////////////////////
// KuduTableCreator
////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index cd5df70..92addfd 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -594,6 +594,11 @@ class KUDU_EXPORT KuduClient : public
sp::enable_shared_from_this<KuduClient> {
/// assigned yet, or if the leader master did not assign a location to
/// the client.
std::string location() const KUDU_NO_EXPORT;
+
+ /// Private API.
+ ///
+ /// @return The ID of the cluster that this client is connected to.
+ std::string cluster_id() const KUDU_NO_EXPORT;
/// @endcond
private:
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc
b/src/kudu/integration-tests/cluster_itest_util.cc
index 75a411d..b8ad744 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -1329,6 +1329,16 @@ Status GetMasterRegistration(const
shared_ptr<MasterServiceProxy>& master_proxy,
return Status::OK();
}
+Status GetClusterId(const shared_ptr<MasterServiceProxy>& master_proxy,
+ const MonoDelta& timeout,
+ string* cluster_id) {
+ master::GetMasterRegistrationResponsePB registration;
+ RETURN_NOT_OK(GetMasterRegistration(master_proxy, timeout, ®istration));
+ CHECK(registration.has_cluster_id());
+ *cluster_id = registration.cluster_id();
+ return Status::OK();
+}
+
Status GetTsCounterValue(ExternalTabletServer* ets,
MetricPrototype* metric,
int64_t* value) {
diff --git a/src/kudu/integration-tests/cluster_itest_util.h
b/src/kudu/integration-tests/cluster_itest_util.h
index dc89e1c..f960df7 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -460,6 +460,11 @@ Status GetMasterRegistration(const
std::shared_ptr<master::MasterServiceProxy>&
const MonoDelta& timeout,
master::GetMasterRegistrationResponsePB*
registration);
+// Get the cluster ID from the Master.
+Status GetClusterId(const std::shared_ptr<master::MasterServiceProxy>&
master_proxy,
+ const MonoDelta& timeout,
+ std::string* cluster_id);
+
// Alter the table name.
Status AlterTableName(const std::shared_ptr<master::MasterServiceProxy>&
master_proxy,
const std::string& table_id,
diff --git a/src/kudu/integration-tests/master_failover-itest.cc
b/src/kudu/integration-tests/master_failover-itest.cc
index 04d581c..d10d5e7 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -36,7 +36,6 @@
#include "kudu/gutil/strings/strip.h" // IWYU pragma: keep
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/master/master.pb.h"
#include "kudu/master/sys_catalog.h" // IWYU pragma: keep
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/metrics.h"
@@ -58,7 +57,7 @@ using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::ScopedResumeExternalDaemon;
using kudu::itest::GetInt64Metric;
-using kudu::itest::GetMasterRegistration;
+using kudu::itest::GetClusterId;
using std::set;
using std::string;
using std::unique_ptr;
@@ -157,14 +156,6 @@ class MasterFailoverTest : public KuduTest,
->Alter();
}
- std::string GetClusterId(const int master_idx) {
- master::GetMasterRegistrationResponsePB registration;
- CHECK_OK(GetMasterRegistration(cluster_->master_proxy(master_idx),
- kOperationTimeout, ®istration));
- CHECK(registration.has_cluster_id());
- return registration.cluster_id();
- }
-
protected:
ExternalMiniClusterOptions opts_;
unique_ptr<ExternalMiniCluster> cluster_;
@@ -503,7 +494,9 @@ TEST_P(MasterFailoverTest, TestClusterIdOnFailover) {
// Validate and store the initial cluster ID.
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
- string original_cluster_id = GetClusterId(leader_idx);
+ string original_cluster_id;
+ ASSERT_OK(GetClusterId(cluster_->master_proxy(leader_idx), kOperationTimeout,
+ &original_cluster_id));
ASSERT_TRUE(!original_cluster_id.empty());
LOG(INFO) << "Shutdown the leader master";
@@ -513,7 +506,9 @@ TEST_P(MasterFailoverTest, TestClusterIdOnFailover) {
int new_leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&new_leader_idx));
ASSERT_NE(leader_idx, new_leader_idx);
- string new_cluster_id = GetClusterId(new_leader_idx);
+ string new_cluster_id;
+ ASSERT_OK(GetClusterId(cluster_->master_proxy(new_leader_idx),
kOperationTimeout,
+ &new_cluster_id));
ASSERT_TRUE(!new_cluster_id.empty());
ASSERT_EQ(original_cluster_id, new_cluster_id);
}
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 064a41e..7198740 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -57,6 +57,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
@@ -96,6 +97,7 @@
using boost::none;
using boost::optional;
using kudu::consensus::ReplicaManagementInfoPB;
+using kudu::itest::GetClusterId;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::Messenger;
@@ -1871,6 +1873,13 @@ TEST_F(MasterTest, TestConnectToMaster) {
// The returned location should be empty because no location mapping command
// is defined.
ASSERT_TRUE(resp.client_location().empty());
+
+ // The cluster ID should match the masters cluster ID.
+ string cluster_id;
+ const std::shared_ptr<MasterServiceProxy> master_proxy = std::move(proxy_);
+ ASSERT_OK(GetClusterId(master_proxy, MonoDelta::FromSeconds(30),
&cluster_id));
+ ASSERT_TRUE(!cluster_id.empty());
+ ASSERT_EQ(cluster_id, resp.cluster_id());
}
TEST_F(MasterTest, TestConnectToMasterAndAssignLocation) {
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index b569686..60fadbc 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -804,6 +804,9 @@ message ConnectToMasterResponsePB {
// The location of the client assigned by the master.
optional string client_location = 7;
+
+ // The cluster ID of this master.
+ optional string cluster_id = 8;
}
// Hive Metastore integration options and configuration.
diff --git a/src/kudu/master/master_service.cc
b/src/kudu/master/master_service.cc
index c8533a8..e1d8987 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -752,6 +752,10 @@ void MasterServiceImpl::ConnectToMaster(const
ConnectToMasterRequestPB* /*req*/,
// until we have taken over all the associated responsibilities.
resp->set_role(is_leader ? consensus::RaftPeerPB::LEADER
: consensus::RaftPeerPB::FOLLOWER);
+
+ // Add the cluster ID.
+ resp->set_cluster_id(server_->cluster_id());
+
rpc->RespondSuccess();
}