This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.15.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.15.x by this push:
new 5cc9652 KUDU-1884: set sasl_protocol_name for the TxnSystemClient
5cc9652 is described below
commit 5cc96523299445e5ee203042522a9f0f0652ee19
Author: Grant Henke <[email protected]>
AuthorDate: Thu May 20 22:04:58 2021 -0500
KUDU-1884: set sasl_protocol_name for the TxnSystemClient
In clusters with a custom principal we need to set the
sasl_protocol_name on the embedded client within the TxnSystemClient.
This logic matches that in server_base.cc for server messenger
configuration.
Change-Id: Ic7b7c7fd154796215bd3a3ce748a2fd3155adebf
Reviewed-on: http://gerrit.cloudera.org:8080/17480
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
(cherry picked from commit 0569c7055cfd22be86a95fabb41e27afd570d784)
Reviewed-on: http://gerrit.cloudera.org:8080/17509
Reviewed-by: Bankim Bhavsar <[email protected]>
---
.../integration-tests/auth_token_expire-itest.cc | 4 +-
.../client-negotiation-failover-itest.cc | 4 +-
src/kudu/integration-tests/master_authz-itest.cc | 8 +-
src/kudu/integration-tests/master_hms-itest.cc | 4 +-
src/kudu/integration-tests/security-itest.cc | 93 +++++++++++++---------
src/kudu/integration-tests/txn_commit-itest.cc | 5 +-
.../integration-tests/txn_participant-itest.cc | 41 +++++++---
.../integration-tests/txn_status_table-itest.cc | 17 +++-
src/kudu/master/txn_manager.cc | 5 +-
src/kudu/mini-cluster/external_mini_cluster.h | 7 ++
src/kudu/tools/tool_action_txn.cc | 5 +-
src/kudu/transactions/txn_system_client.cc | 7 +-
src/kudu/transactions/txn_system_client.h | 1 +
13 files changed, 140 insertions(+), 61 deletions(-)
diff --git a/src/kudu/integration-tests/auth_token_expire-itest.cc
b/src/kudu/integration-tests/auth_token_expire-itest.cc
index b0f04c7..9e347c4 100644
--- a/src/kudu/integration-tests/auth_token_expire-itest.cc
+++ b/src/kudu/integration-tests/auth_token_expire-itest.cc
@@ -450,7 +450,9 @@ TEST_F(TokenBasedConnectionITest, ReacquireAuthnToken) {
TEST_F(TokenBasedConnectionITest, TxnSystemClientReacquireAuthnToken) {
SKIP_IF_SLOW_NOT_ALLOWED();
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->service_principal(),
+ &txn_client));
ASSERT_OK(txn_client->CreateTxnStatusTable(10));
ASSERT_OK(txn_client->OpenTxnStatusTable());
diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc
b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
index 30afd1c..a09c595 100644
--- a/src/kudu/integration-tests/client-negotiation-failover-itest.cc
+++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
@@ -204,7 +204,9 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest,
TestTxnSystemClientRetryOnPause)
ASSERT_OK(CreateAndStartCluster());
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->service_principal(),
+ &txn_client));
ASSERT_OK(txn_client->CreateTxnStatusTable(100, kNumTabletServers));
ASSERT_OK(txn_client->OpenTxnStatusTable());
diff --git a/src/kudu/integration-tests/master_authz-itest.cc
b/src/kudu/integration-tests/master_authz-itest.cc
index 9ea64f6..3c174de 100644
--- a/src/kudu/integration-tests/master_authz-itest.cc
+++ b/src/kudu/integration-tests/master_authz-itest.cc
@@ -746,7 +746,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) {
// NotAuthorized error.
{
unique_ptr<TxnSystemClient> non_admin_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&non_admin_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->service_principal(),
+ &non_admin_client));
Status s = non_admin_client->CreateTxnStatusTable(100);
ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
s = non_admin_client->AddTxnStatusTableRange(100, 200);
@@ -755,7 +757,9 @@ TEST_P(MasterAuthzITest, TestCreateTransactionStatusTable) {
// But as service user, we should have no trouble making the calls.
ASSERT_OK(this->cluster_->kdc()->Kinit(kAdminUser));
unique_ptr<TxnSystemClient> txn_sys_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_sys_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->service_principal(),
+ &txn_sys_client));
ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100));
ASSERT_OK(txn_sys_client->AddTxnStatusTableRange(100, 200));
}
diff --git a/src/kudu/integration-tests/master_hms-itest.cc
b/src/kudu/integration-tests/master_hms-itest.cc
index 3729235..4dedec2 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -709,7 +709,9 @@ TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
TEST_F(MasterHmsTest, TestTransactionStatusTableDoesntSync) {
// Create a transaction status table.
unique_ptr<TxnSystemClient> txn_sys_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_sys_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->service_principal(),
+ &txn_sys_client));
ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100));
// We shouldn't see the table in the HMS catalog.
diff --git a/src/kudu/integration-tests/security-itest.cc
b/src/kudu/integration-tests/security-itest.cc
index 2607886..0f5fad1 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -21,6 +21,7 @@
#include <cstdio>
#include <cstdlib>
#include <functional>
+#include <initializer_list>
#include <memory>
#include <ostream>
#include <string>
@@ -93,7 +94,7 @@ using strings::Substitute;
namespace kudu {
-static const char* kTableName = "test-table";
+static const char* const kTableName = "test-table";
static const Schema kTestSchema = CreateKeyValueTestSchema();
static const KuduSchema kTestKuduSchema =
client::KuduSchema::FromSchema(kTestSchema);
@@ -126,7 +127,7 @@ class SecurityITest : public KuduTest {
return proxy.SetFlag(req, &resp, &controller);
}
- Status CreateTestTable(const client::sp::shared_ptr<KuduClient>& client) {
+ static Status CreateTestTable(const shared_ptr<KuduClient>& client) {
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
return table_creator->table_name(kTableName)
.set_range_partition_columns({ "key" })
@@ -135,10 +136,9 @@ class SecurityITest : public KuduTest {
.Create();
}
- // Create a table, insert a row, scan it back, and delete the table.
- void SmokeTestCluster(const client::sp::shared_ptr<KuduClient>& client,
+ // Create a table, insert a row, scan it back, and then drop the table.
+ void SmokeTestCluster(const shared_ptr<KuduClient>& client = {},
bool transactional = false);
- void SmokeTestCluster();
Status TryRegisterAsTS() {
// Make a new messenger so that we don't reuse any cached connections from
@@ -202,21 +202,27 @@ class SecurityITest : public KuduTest {
unique_ptr<ExternalMiniCluster> cluster_;
};
-void SecurityITest::SmokeTestCluster(const client::sp::shared_ptr<KuduClient>&
client,
+void SecurityITest::SmokeTestCluster(const shared_ptr<KuduClient>& client,
const bool transactional) {
+ shared_ptr<KuduClient> new_client;
+ const shared_ptr<KuduClient>& c = client ? client : new_client;
+ if (!client) {
+ ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+ }
+
// Create a table.
- ASSERT_OK(CreateTestTable(client));
+ ASSERT_OK(CreateTestTable(c));
// Insert a row.
- client::sp::shared_ptr<KuduTable> table;
- ASSERT_OK(client->OpenTable(kTableName, &table));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(c->OpenTable(kTableName, &table));
shared_ptr<KuduTransaction> txn;
- client::sp::shared_ptr<KuduSession> session;
+ shared_ptr<KuduSession> session;
if (transactional) {
- ASSERT_OK(client->NewTransaction(&txn));
+ ASSERT_OK(c->NewTransaction(&txn));
ASSERT_OK(txn->CreateSession(&session));
} else {
- session = client->NewSession();
+ session = c->NewSession();
}
session->SetTimeoutMillis(60000);
unique_ptr<KuduInsert> ins(table->NewInsert());
@@ -228,18 +234,11 @@ void SecurityITest::SmokeTestCluster(const
client::sp::shared_ptr<KuduClient>& c
ASSERT_OK(txn->Commit());
}
- // Read it back.
+ // Read the inserted row back.
ASSERT_EQ(1, CountTableRows(table.get()));
- // Delete the table.
- ASSERT_OK(client->DeleteTable(kTableName));
-}
-
-void SecurityITest::SmokeTestCluster() {
- client::sp::shared_ptr<KuduClient> client;
- ASSERT_OK(cluster_->CreateClient(nullptr, &client));
-
- SmokeTestCluster(client);
+ // Drop the table.
+ ASSERT_OK(c->DeleteTable(kTableName));
}
// Test authorizing list tablets.
@@ -259,7 +258,7 @@ TEST_F(SecurityITest, TestAuthorizationOnListTablets) {
TEST_F(SecurityITest, TestAuthorizationOnChecksum) {
cluster_opts_.extra_tserver_flags.emplace_back("--tserver_enforce_access_control");
ASSERT_OK(StartCluster());
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
ASSERT_OK(CreateTestTable(client));
vector<string> tablet_ids;
@@ -288,7 +287,7 @@ TEST_F(SecurityITest, SmokeTestAsAuthorizedUser) {
ASSERT_OK(StartCluster());
ASSERT_OK(cluster_->kdc()->Kinit("test-user"));
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
NO_FATALS(SmokeTestCluster(client));
NO_FATALS(SmokeTestCluster(client, /* transactional */ true));
@@ -309,7 +308,7 @@ TEST_F(SecurityITest, TestNoKerberosCredentials) {
ASSERT_OK(StartCluster());
ASSERT_OK(cluster_->kdc()->Kdestroy());
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
Status s = cluster_->CreateClient(nullptr, &client);
ASSERT_STR_MATCHES(s.ToString(),
"Not authorized: Could not connect to the cluster: "
@@ -329,7 +328,7 @@ TEST_F(SecurityITest, SaslPlainFallback) {
ASSERT_OK(StartCluster());
ASSERT_OK(cluster_->kdc()->Kdestroy());
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
// Check client can successfully call ListTables().
@@ -341,7 +340,7 @@ TEST_F(SecurityITest, SaslPlainFallback) {
TEST_F(SecurityITest, TestUnauthorizedClientKerberosCredentials) {
ASSERT_OK(StartCluster());
ASSERT_OK(cluster_->kdc()->Kinit("joe-interloper"));
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
Status s = cluster_->CreateClient(nullptr, &client);
ASSERT_EQ("Remote error: Could not connect to the cluster: "
"Not authorized: unauthorized access to method: ConnectToMaster",
@@ -472,11 +471,22 @@ TEST_F(SecurityITest, TestCorruptKerberosCC) {
TEST_F(SecurityITest, TestNonDefaultPrincipal) {
const string kPrincipal = "oryx";
cluster_opts_.principal = kPrincipal;
+ // Enable TxnManager in Kudu masters: it's necessary to test txn-related
+ // operations along with others.
+ cluster_opts_.extra_master_flags.emplace_back("--txn_manager_enabled=true");
+
cluster_opts_.extra_tserver_flags.emplace_back("--enable_txn_system_client_init=true");
ASSERT_OK(StartCluster());
- // A client with the default SASL proto shouldn't be able to connect
- {
- client::sp::shared_ptr<KuduClient> client;
+ // A client with the default SASL proto shouldn't be able to connect to
+ // a cluster using custom Kerberos principal for Kudu service user.
+ for (const auto& username : {"test-user", "test-admin"}) {
+ // Verify that for both the regular and the super-user.
+ ASSERT_OK(cluster_->kdc()->Kinit(username));
+
+ // Instantiate a KuduClientBuilder outside of this cluster's context, so
+ // the custom service principals for this cluster don't affect the default
+ // SASL proto name when creating this separate client instance.
+ shared_ptr<KuduClient> client;
KuduClientBuilder builder;
for (auto i = 0; i < cluster_->num_masters(); ++i) {
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
@@ -486,12 +496,19 @@ TEST_F(SecurityITest, TestNonDefaultPrincipal) {
ASSERT_STR_CONTAINS(s.ToString(), "not found in Kerberos database");
}
+ // Create a client with the matching SASL proto name and verify it's able to
+ // connect to the cluster and perform basic actions.
{
- // Create a client with the matching SASL proto name and verify it's able
to
- // connect to the cluster and perform basic actions.
- client::sp::shared_ptr<KuduClient> client;
+ // StartCluster() does 'kinit' as test-admin super-user. Anyways, let's
+ // switch to a regular user credentials to perform user-specific tasks.
+ ASSERT_OK(cluster_->kdc()->Kinit("test-user"));
+
+ // Here we don't use out-of-this-cluster KuduClientBuilder instance,
+ // so the client is created with matching SASL proto name.
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
SmokeTestCluster(client);
+ SmokeTestCluster(client, /*transactional*/ true);
}
}
@@ -679,7 +696,7 @@ TEST_P(AuthTokenIssuingTest, ChannelConfidentiality) {
// In current implementation, KuduClientBuilder calls ConnectToCluster() on
// the newly created instance of the KuduClient.
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
string authn_creds;
@@ -747,7 +764,7 @@ TEST_P(ConnectToFollowerMasterTest,
AuthnTokenVerifierHaveKeys) {
// Get authentication credentials.
string authn_creds;
{
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
ASSERT_OK(client->ExportAuthenticationCredentials(&authn_creds));
}
@@ -758,7 +775,7 @@ TEST_P(ConnectToFollowerMasterTest,
AuthnTokenVerifierHaveKeys) {
// Make sure it's not possible to connect without authn token at this point:
// the server side is configured to require authentication.
{
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
KuduClientBuilder builder;
for (auto i = 0; i < cluster_->num_masters(); ++i) {
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
@@ -775,7 +792,7 @@ TEST_P(ConnectToFollowerMasterTest,
AuthnTokenVerifierHaveKeys) {
// of masters' endpoints while trying to connect to a multi-master Kudu
cluster.
ASSERT_EVENTUALLY([&] {
for (auto i = 1; i < cluster_->num_masters(); ++i) {
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
const auto s = KuduClientBuilder()
.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString())
.import_authentication_credentials(authn_creds)
@@ -791,7 +808,7 @@ TEST_P(ConnectToFollowerMasterTest,
AuthnTokenVerifierHaveKeys) {
// to connect and perform basic operations (like listing tables) when using
// secondary credentials only (i.e. authn token).
{
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
KuduClientBuilder builder;
for (auto i = 0; i < cluster_->num_masters(); ++i) {
builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc
b/src/kudu/integration-tests/txn_commit-itest.cc
index 48b5781..1a01867 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -49,6 +49,7 @@
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
@@ -142,7 +143,9 @@ class TxnCommitITest : public KuduTest {
}
ASSERT_FALSE(tsm_id_.empty());
});
- TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client_);
+ TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client_);
ASSERT_OK(txn_client_->OpenTxnStatusTable());
client::KuduClientBuilder builder;
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc
b/src/kudu/integration-tests/txn_participant-itest.cc
index a8e7267..5a0fd26 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -47,6 +47,7 @@
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/mvcc.h"
@@ -833,7 +834,9 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
constexpr const auto kTxnId = 0;
constexpr const int kLeaderIdx = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
@@ -902,7 +905,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientBeginTxnDoesntLock) {
// Start a transaction and make sure it results in the expected state
// server-side.
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } }));
@@ -926,7 +931,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientCommitSequence) {
// Start a transaction and make sure it results in the expected state
// server-side.
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
@@ -995,7 +1002,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientAbortSequence) {
const auto tablet_id = leader_replica->tablet_id();
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
@@ -1049,7 +1058,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientErrorWhenNotBegun) {
const auto tablet_id = leader_replica->tablet_id();
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
for (auto type : { ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::FINALIZE_COMMIT }) {
@@ -1073,7 +1084,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientRepeatCalls) {
const auto tablet_id = leader_replica->tablet_id();
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
// Repeat each op twice. There should be no issues here since each op is
// idempotent. There should also be no issues with the partition lock.
for (const auto& type : kCommitSequence) {
@@ -1109,7 +1122,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientTimeoutWhenNoMajority) {
cluster_->mini_tablet_server(i)->Shutdown();
}
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
MonoDelta::FromSeconds(1));
@@ -1165,7 +1180,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientSucceedsOnBootstrap) {
// Start a thread that sends participant ops to the tablet.
int next_txn_id = 0;
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
CountDownLatch stop(1);
Status client_error;
thread t([&] {
@@ -1217,7 +1234,9 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientRetriesWhenReplicaNotFound) {
// Start a thread that sends participant ops to the tablet.
int next_txn_id = 0;
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
CountDownLatch stop(1);
Status client_error;
thread t([&] {
@@ -1432,7 +1451,9 @@ TEST_F(TxnParticipantElectionStormITest,
TestTxnSystemClientRetriesThroughStorm)
}
const auto kTimeout = MonoDelta::FromSeconds(10);
unique_ptr<TxnSystemClient> txn_client;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_client));
// Start injecting latency to Raft-related traffic to spur elections.
FLAGS_raft_enable_pre_election = false;
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc
b/src/kudu/integration-tests/txn_status_table-itest.cc
index c478cea..e18bc28 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -49,6 +49,7 @@
#include "kudu/master/ts_manager.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
@@ -125,7 +126,9 @@ class TxnStatusTableITest : public KuduTest {
ASSERT_OK(cluster_->Start());
// Create the txn system client with which to communicate with the cluster.
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_sys_client_));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_sys_client_));
}
// Ensures that all replicas have the right table type set.
@@ -803,7 +806,9 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) {
// Behind the scenes, create tablets for the next transaction IDs range
// and start a new transaction.
unique_ptr<TxnSystemClient> tsc;
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &tsc));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &tsc));
// Re-open the system table.
ASSERT_OK(tsc->OpenTxnStatusTable());
ASSERT_OK(tsc->AddTxnStatusTableRange(kPartitionWidth, 2 *
kPartitionWidth));
@@ -865,7 +870,9 @@ class MultiServerTxnStatusTableITest : public
TxnStatusTableITest {
opts.num_tablet_servers = 4;
cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
ASSERT_OK(cluster_->Start());
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_sys_client_));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_sys_client_));
// Create the initial transaction status table partitions and start an
// initial transaction.
@@ -1039,7 +1046,9 @@ class TxnStatusTableElectionStormITest : public
TxnStatusTableITest {
opts.num_tablet_servers = 3;
cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
ASSERT_OK(cluster_->Start());
- ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_sys_client_));
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
+ cluster_->messenger()->sasl_proto_name(),
+ &txn_sys_client_));
// Create the initial transaction status table partitions.
ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3));
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index 0d4f871..33477a0 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -30,6 +30,7 @@
#include "kudu/gutil/port.h"
#include "kudu/master/master.h"
#include "kudu/master/txn_manager.pb.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/transactions/txn_system_client.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
@@ -246,7 +247,9 @@ Status TxnManager::Init() {
}
vector<HostPort> hostports;
RETURN_NOT_OK(server_->GetMasterHostPorts(&hostports));
- RETURN_NOT_OK(TxnSystemClient::Create(hostports, &txn_sys_client_));
+ RETURN_NOT_OK(TxnSystemClient::Create(hostports,
+
server_->messenger()->sasl_proto_name(),
+ &txn_sys_client_));
DCHECK(txn_sys_client_);
auto s = txn_sys_client_->CreateTxnStatusTable(
FLAGS_txn_manager_status_table_range_partition_span,
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h
b/src/kudu/mini-cluster/external_mini_cluster.h
index d1cc42a..3057edf 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -369,6 +369,13 @@ class ExternalMiniCluster : public MiniCluster {
return opts_.cluster_root;
}
+ // Kerberos principal prefix name whose credentials are used to run Kudu
+ // servers in the cluster. Matches the SASL protocol name used for connection
+ // negotiation.
+ const std::string& service_principal() const {
+ return opts_.principal;
+ }
+
int num_tablet_servers() const override {
return tablet_servers_.size();
}
diff --git a/src/kudu/tools/tool_action_txn.cc
b/src/kudu/tools/tool_action_txn.cc
index a1cf35a..346ea38 100644
--- a/src/kudu/tools/tool_action_txn.cc
+++ b/src/kudu/tools/tool_action_txn.cc
@@ -69,6 +69,7 @@ DEFINE_string(included_states,
"open,abort_in_progress,commit_in_progress,finali
"currently active transactions.");
DECLARE_int64(timeout_ms);
+DECLARE_string(sasl_protocol_name);
using kudu::client::sp::shared_ptr;
using kudu::client::KuduClient;
@@ -339,7 +340,9 @@ Status ShowTxn(const RunnerContext& context) {
hp.ParseString(m, master::Master::kDefaultPort);
master_hps.emplace_back(hp);
}
- RETURN_NOT_OK(TxnSystemClient::Create(master_hps, &txn_client));
+ RETURN_NOT_OK(TxnSystemClient::Create(master_hps,
+ FLAGS_sasl_protocol_name,
+ &txn_client));
RETURN_NOT_OK(txn_client->OpenTxnStatusTable());
shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
diff --git a/src/kudu/transactions/txn_system_client.cc
b/src/kudu/transactions/txn_system_client.cc
index 35fe87d..232cfb9 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -41,6 +41,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/transactions/coordinator_rpc.h"
#include "kudu/transactions/participant_rpc.h"
@@ -95,6 +96,7 @@ namespace kudu {
namespace transactions {
Status TxnSystemClient::Create(const vector<HostPort>& master_addrs,
+ const string& sasl_protocol_name,
unique_ptr<TxnSystemClient>* sys_client) {
vector<string> master_strings;
for (const auto& hp : master_addrs) {
@@ -103,6 +105,7 @@ Status TxnSystemClient::Create(const vector<HostPort>&
master_addrs,
DCHECK(!master_addrs.empty());
KuduClientBuilder builder;
builder.master_server_addrs(master_strings);
+ builder.sasl_protocol_name(sasl_protocol_name);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(builder.Build(&client));
sys_client->reset(new TxnSystemClient(std::move(client)));
@@ -467,7 +470,9 @@ Status TxnSystemClientInitializer::Init(const
shared_ptr<Messenger>& messenger,
// Only if we can reach at least one of the masters should we try
// connecting.
if (PREDICT_TRUE(s.ok())) {
- s = TxnSystemClient::Create(master_addrs, &txn_client);
+ s = TxnSystemClient::Create(master_addrs,
+ messenger->sasl_proto_name(),
+ &txn_client);
}
if (PREDICT_TRUE(s.ok())) {
txn_client_ = std::move(txn_client);
diff --git a/src/kudu/transactions/txn_system_client.h
b/src/kudu/transactions/txn_system_client.h
index 6c919a8..a40384d 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -71,6 +71,7 @@ class TxnStatusEntryPB;
class TxnSystemClient {
public:
static Status Create(const std::vector<HostPort>& master_addrs,
+ const std::string& sasl_protocol_name,
std::unique_ptr<TxnSystemClient>* sys_client);
// Creates the transaction status table with a single range partition of the