Repository: kudu Updated Branches: refs/heads/master 68a4b0a2d -> 7e02ede33
KUDU-2021 test for negotiation timeout on Master RPCs This patch adds integration tests to verify that the Kudu C++ client behaves as described in KUDU-2021: in case of a multi-master Kudu cluster, the client retries an RPC with other master if the connection negotiation with leader master times out. Change-Id: Ib62126c9d8c6c65f447c5d03a0377eaff823393c Reviewed-on: http://gerrit.cloudera.org:8080/6927 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7e02ede3 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7e02ede3 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7e02ede3 Branch: refs/heads/master Commit: 7e02ede3300ac180f393c93bf781a4b74556140b Parents: 68a4b0a Author: Alexey Serbin <[email protected]> Authored: Tue May 23 13:44:19 2017 -0700 Committer: Alexey Serbin <[email protected]> Committed: Fri May 26 02:49:33 2017 +0000 ---------------------------------------------------------------------- src/kudu/integration-tests/CMakeLists.txt | 2 + .../client-negotiation-failover-itest.cc | 274 +++++++++++++++++++ .../integration-tests/client_failover-itest.cc | 130 --------- 3 files changed, 276 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index 1aa6547..f3a3cc9 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -56,6 +56,8 @@ ADD_KUDU_TEST(alter_table-test) ADD_KUDU_TEST(authn_token_expire-itest) ADD_KUDU_TEST(catalog_manager_tsk-itest) ADD_KUDU_TEST(client_failover-itest) +ADD_KUDU_TEST(client-negotiation-failover-itest + RESOURCE_LOCK "master-rpc-ports") ADD_KUDU_TEST(client-stress-test RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true) http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/client-negotiation-failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc b/src/kudu/integration-tests/client-negotiation-failover-itest.cc new file mode 100644 index 0000000..8f5ff13 --- /dev/null +++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include <glog/logging.h> + +#include <gflags/gflags_declare.h> +#include <gtest/gtest.h> + +#include "kudu/client/client.h" +#include "kudu/client/client-test-util.h" +#include "kudu/common/wire_protocol.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/cluster_itest_util.h" +#include "kudu/integration-tests/external_mini_cluster-itest-base.h" +#include "kudu/integration-tests/test_workload.h" +#include "kudu/integration-tests/ts_itest-base.h" +#include "kudu/tablet/key_value_test_schema.h" +#include "kudu/util/scoped_cleanup.h" + +using kudu::client::KuduClient; +using kudu::client::KuduClientBuilder; +using kudu::client::KuduInsert; +using kudu::client::KuduSession; +using kudu::client::KuduSchema; +using kudu::client::KuduTable; +using kudu::client::KuduTableCreator; +using kudu::client::sp::shared_ptr; +using std::string; +using std::thread; +using std::vector; +using std::unique_ptr; +using strings::Substitute; + +DECLARE_bool(rpc_reopen_outbound_connections); +DECLARE_int64(rpc_negotiation_timeout_ms); + +namespace kudu { + +// Series of tests to verify that client fails over to another available server +// if it experiences a timeout on connection negotiation with current server. +// The 'server' can be a master or a tablet server. +class ClientFailoverOnNegotiationTimeoutITest : public KuduTest { + public: + ClientFailoverOnNegotiationTimeoutITest() { + // Since we want to catch timeout during connection negotiation phase, + // let's make the client re-establishing connections on every RPC. + FLAGS_rpc_reopen_outbound_connections = true; + + // Set the connection negotiation timeout shorter than its default value + // to run the test faster. For sanitizer builds we don't want the timeout + // to be too short: running the test concurrently with other activities + // might lead client to fail even if the client retries again and again. +#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER) + FLAGS_rpc_negotiation_timeout_ms = 3000; +#else + FLAGS_rpc_negotiation_timeout_ms = 1000; +#endif + + cluster_opts_.extra_tserver_flags = { + // Speed up Raft elections. + "--raft_heartbeat_interval_ms=25", + "--leader_failure_exp_backoff_max_delta_ms=1000", + // Decreasing TS->master heartbeat interval speeds up the test. + "--heartbeat_interval_ms=25", + }; + cluster_opts_.extra_master_flags = { + // Speed up Raft elections. + "--raft_heartbeat_interval_ms=25", + "--leader_failure_exp_backoff_max_delta_ms=1000", + }; + } + + Status CreateAndStartCluster() { + cluster_.reset(new ExternalMiniCluster(cluster_opts_)); + return cluster_->Start(); + } + + void TearDown() override { + if (cluster_) { + cluster_->Shutdown(); + } + KuduTest::TearDown(); + } + protected: + ExternalMiniClusterOptions cluster_opts_; + shared_ptr<ExternalMiniCluster> cluster_; +}; + +// Regression test for KUDU-1580: if a client times out on negotiating a connection +// to a tablet server, it should retry with other available tablet server. +TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) { + static const int kNumTabletServers = 3; + static const int kTimeoutMs = 5 * 60 * 1000; + static const char* kTableName = "kudu1580"; + + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + cluster_opts_.num_tablet_servers = kNumTabletServers; + ASSERT_OK(CreateAndStartCluster()); + + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient( + &KuduClientBuilder() + .default_admin_operation_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)) + .default_rpc_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)), + &client)); + unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); + KuduSchema schema(client::KuduSchemaFromSchema(CreateKeyValueTestSchema())); + ASSERT_OK(table_creator->table_name(kTableName) + .schema(&schema) + .add_hash_partitions({ "key" }, kNumTabletServers) + .num_replicas(kNumTabletServers) + .Create()); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(kTableName, &table)); + + shared_ptr<KuduSession> session = client->NewSession(); + session->SetTimeoutMillis(kTimeoutMs); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); + + // Running multiple iterations to cover possible variations of tablet leader + // placement among tablet servers. + for (int i = 0; i < 8 * kNumTabletServers; ++i) { + vector<unique_ptr<ScopedResumeExternalDaemon>> resumers; + for (int tsi = 0; tsi < kNumTabletServers; ++tsi) { + ExternalTabletServer* ts(cluster_->tablet_server(tsi)); + ASSERT_NE(nullptr, ts); + ASSERT_OK(ts->Pause()); + resumers.emplace_back(new ScopedResumeExternalDaemon(ts)); + } + + // Resume 2 out of 3 tablet servers (i.e. the majority), so the client + // could eventially succeed with its write operations. + thread resume_thread([&]() { + const int idx0 = rand() % kNumTabletServers; + unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release()); + const int idx1 = (idx0 + 1) % kNumTabletServers; + unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release()); + SleepFor(MonoDelta::FromSeconds(1)); + }); + // An automatic clean-up to handle both success and failure cases + // in the code below. + auto cleanup = MakeScopedCleanup([&]() { + resume_thread.join(); + }); + + // Since the table is hash-partitioned with kNumTabletServer partitions, + // hopefully three sequential numbers would go into different partitions. + for (int ii = 0; ii < kNumTabletServers; ++ii) { + unique_ptr<KuduInsert> ins(table->NewInsert()); + ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii)); + ASSERT_OK(ins->mutable_row()->SetInt32(1, 0)); + ASSERT_OK(session->Apply(ins.release())); + } + } +} + +// Regression test for KUDU-2021: if client times out on establishing a +// connection to the leader master, it should retry with other master in case of +// a multi-master configuration. +TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021ConnectToMaster) { + static const int kNumMasters = 3; + const MonoDelta kTimeout = MonoDelta::FromSeconds(60); + + cluster_opts_.num_masters = kNumMasters; + cluster_opts_.num_tablet_servers = 1; + cluster_opts_.master_rpc_ports = { 32037, 32038, 32039 }; + ASSERT_OK(CreateAndStartCluster()); + + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient( + &KuduClientBuilder() + .default_admin_operation_timeout(kTimeout) + .default_rpc_timeout(kTimeout), + &client)); + + // Make a call to the master to populate the client's metadata cache. + vector<string> tables; + ASSERT_OK(client->ListTables(&tables)); + + // Pause the leader master so next call client would time out on connection + // negotiation. Do that few times. + for (int i = 0; i < kNumMasters; ++i) { + int leader_idx; + ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); + ASSERT_OK(cluster_->master(leader_idx)->Pause()); + ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx)); + + ASSERT_OK(client->ListTables(&tables)); + } +} + +// Regression test for KUDU-2021: if client times out on negotiating a +// connection with the master, it should retry with other master in case of +// a multi-master configuration. +TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021NegotiateWithMaster) { + static const int kNumMasters = 3; + const MonoDelta kTimeout = MonoDelta::FromSeconds(60); + + cluster_opts_.num_masters = kNumMasters; + cluster_opts_.num_tablet_servers = 1; + cluster_opts_.master_rpc_ports = { 31037, 31038, 31039 }; + ASSERT_OK(CreateAndStartCluster()); + + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient( + &KuduClientBuilder() + .default_admin_operation_timeout(kTimeout) + .default_rpc_timeout(kTimeout), + &client)); + + // Check client can successfully call ListTables(). + vector<string> tables; + ASSERT_OK(client->ListTables(&tables)); + + // The test sets the client-side RPC negotiation timeout via the flag + // 'rpc_negotiation_timeout_ms'. We want the client to open a TCP connection + // and start the negotiation process with the leader master and then time out + // during the negotiation process. For the test to check the client's behavior + // on timing out while establishing a TCP connection see the + // Kudu2021ConnectToMaster test above. + // + // So, after the client times out on the negotiation process, it should re-resolve + // the leader master and connect to the new leader. Since the former leader + // has been paused in the middle of the negotiation, the client is supposed to + // connect to the new leader and succeed with its ListTables() call. + int leader_idx; + ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx)); + ExternalMaster* m = cluster_->master(leader_idx); + ASSERT_OK( + cluster_->SetFlag(m, "rpc_negotiation_inject_delay_ms", + Substitute("$0", FLAGS_rpc_negotiation_timeout_ms * 2))); + thread pause_thread([&]() { + SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms / 2)); + CHECK_OK(m->Pause()) + }); + // An automatic clean-up to handle both success and failure cases. + auto pause_thread_cleanup = MakeScopedCleanup([&]() { + pause_thread.join(); + CHECK_OK(m->Resume()); + }); + + // After an attempt to negotiate with the former leader master, timing out, + // and re-resolving the leader master, the client will eventually connect to + // a new leader master elected after the former one is paused. The new leader + // master doesn't impose any negotiation delay, so the client should succeed + // with the ListTables() call. + ASSERT_OK(client->ListTables(&tables)); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/client_failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc index 7f25fe7..bd6d0b8 100644 --- a/src/kudu/integration-tests/client_failover-itest.cc +++ b/src/kudu/integration-tests/client_failover-itest.cc @@ -18,14 +18,10 @@ #include <memory> #include <set> #include <string> -#include <thread> #include <unordered_map> #include <vector> -#include <boost/optional.hpp> #include <glog/logging.h> - -#include <gflags/gflags_declare.h> #include <gtest/gtest.h> #include "kudu/client/client.h" @@ -35,33 +31,22 @@ #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" #include "kudu/integration-tests/test_workload.h" -#include "kudu/integration-tests/ts_itest-base.h" -#include "kudu/tablet/key_value_test_schema.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/scoped_cleanup.h" using kudu::client::CountTableRows; -using kudu::client::KuduClient; -using kudu::client::KuduClientBuilder; using kudu::client::KuduInsert; using kudu::client::KuduSession; -using kudu::client::KuduSchema; using kudu::client::KuduTable; -using kudu::client::KuduTableCreator; using kudu::client::KuduUpdate; using kudu::client::sp::shared_ptr; using kudu::itest::TServerDetails; using kudu::tablet::TABLET_DATA_TOMBSTONED; using std::set; using std::string; -using std::thread; using std::vector; -using std::unique_ptr; using std::unordered_map; -DECLARE_bool(rpc_reopen_outbound_connections); -DECLARE_int64(rpc_negotiation_timeout_ms); - namespace kudu { enum ClientTestBehavior { @@ -358,119 +343,4 @@ TEST_F(ClientFailoverTServerTimeoutITest, FailoverOnLeaderTimeout) { EXPECT_GE(workload.rows_inserted(), rows_target); } -// Series of tests to verify that client fails over to another available server -// if it experiences a timeout on connection negotiation with current server. -// The 'server' can be both a master and a tablet server. -class ClientFailoverOnNegotiationTimeoutITest : public KuduTest { - public: - ClientFailoverOnNegotiationTimeoutITest() { - // Since we want to catch timeout during connection negotiation phase, - // let's make the client re-establishing connections on every RPC. - FLAGS_rpc_reopen_outbound_connections = true; - // Set the connection negotiation timeout shorter than its default value - // to run the test faster. - FLAGS_rpc_negotiation_timeout_ms = 1000; - - cluster_opts_.extra_tserver_flags = { - // Speed up Raft elections. - "--raft_heartbeat_interval_ms=25", - "--leader_failure_exp_backoff_max_delta_ms=1000", - // Decreasing TS->master heartbeat interval speeds up the test. - "--heartbeat_interval_ms=25", - }; - cluster_opts_.extra_master_flags = { - // Speed up Raft elections. - "--raft_heartbeat_interval_ms=25", - "--leader_failure_exp_backoff_max_delta_ms=1000", - }; - } - - Status CreateAndStartCluster() { - cluster_.reset(new ExternalMiniCluster(cluster_opts_)); - return cluster_->Start(); - } - - void TearDown() override { - if (cluster_) { - cluster_->Shutdown(); - } - KuduTest::TearDown(); - } - protected: - ExternalMiniClusterOptions cluster_opts_; - shared_ptr<ExternalMiniCluster> cluster_; -}; - -// Regression test for KUDU-1580: if a client times out on negotiating a connection -// to a tablet server, it should retry with other available tablet server. -TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) { - static const int kNumTabletServers = 3; - static const int kTimeoutMs = 5 * 60 * 1000; - static const char* kTableName = "kudu1580"; - - if (!AllowSlowTests()) { - LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; - return; - } - - cluster_opts_.num_tablet_servers = kNumTabletServers; - ASSERT_OK(CreateAndStartCluster()); - - shared_ptr<KuduClient> client; - ASSERT_OK(cluster_->CreateClient( - &KuduClientBuilder() - .default_admin_operation_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)) - .default_rpc_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)), - &client)); - unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); - KuduSchema schema(client::KuduSchemaFromSchema(CreateKeyValueTestSchema())); - ASSERT_OK(table_creator->table_name(kTableName) - .schema(&schema) - .add_hash_partitions({ "key" }, kNumTabletServers) - .num_replicas(kNumTabletServers) - .Create()); - shared_ptr<KuduTable> table; - ASSERT_OK(client->OpenTable(kTableName, &table)); - - shared_ptr<KuduSession> session = client->NewSession(); - session->SetTimeoutMillis(kTimeoutMs); - ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC)); - - // Running multiple iterations to cover possible variations of tablet leader - // placement among tablet servers. - for (int i = 0; i < 8 * kNumTabletServers; ++i) { - vector<unique_ptr<ScopedResumeExternalDaemon>> resumers; - for (int tsi = 0; tsi < kNumTabletServers; ++tsi) { - ExternalTabletServer* ts(cluster_->tablet_server(tsi)); - ASSERT_NE(nullptr, ts); - ASSERT_OK(ts->Pause()); - resumers.emplace_back(new ScopedResumeExternalDaemon(ts)); - } - - // Resume 2 out of 3 tablet servers (i.e. the majority), so the client - // could eventially succeed with its write operations. - thread resume_thread([&]() { - const int idx0 = rand() % kNumTabletServers; - unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release()); - const int idx1 = (idx0 + 1) % kNumTabletServers; - unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release()); - SleepFor(MonoDelta::FromSeconds(1)); - }); - // An automatic clean-up to handle both success and failure cases - // in the code below. - auto cleanup = MakeScopedCleanup([&]() { - resume_thread.join(); - }); - - // Since the table is hash-partitioned with kNumTabletServer partitions, - // hopefully three sequential numbers would go into different partitions. - for (int ii = 0; ii < kNumTabletServers; ++ii) { - unique_ptr<KuduInsert> ins(table->NewInsert()); - ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii)); - ASSERT_OK(ins->mutable_row()->SetInt32(1, 0)); - ASSERT_OK(session->Apply(ins.release())); - } - } -} - } // namespace kudu
