Repository: kudu
Updated Branches:
  refs/heads/master 68a4b0a2d -> 7e02ede33


KUDU-2021 test for negotiation timeout on Master RPCs

This patch adds integration tests to verify that the Kudu C++ client
behaves as described in KUDU-2021: in case of a multi-master Kudu
cluster, the client retries an RPC with other master if the connection
negotiation with leader master times out.

Change-Id: Ib62126c9d8c6c65f447c5d03a0377eaff823393c
Reviewed-on: http://gerrit.cloudera.org:8080/6927
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7e02ede3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7e02ede3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7e02ede3

Branch: refs/heads/master
Commit: 7e02ede3300ac180f393c93bf781a4b74556140b
Parents: 68a4b0a
Author: Alexey Serbin <[email protected]>
Authored: Tue May 23 13:44:19 2017 -0700
Committer: Alexey Serbin <[email protected]>
Committed: Fri May 26 02:49:33 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |   2 +
 .../client-negotiation-failover-itest.cc        | 274 +++++++++++++++++++
 .../integration-tests/client_failover-itest.cc  | 130 ---------
 3 files changed, 276 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt 
b/src/kudu/integration-tests/CMakeLists.txt
index 1aa6547..f3a3cc9 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -56,6 +56,8 @@ ADD_KUDU_TEST(alter_table-test)
 ADD_KUDU_TEST(authn_token_expire-itest)
 ADD_KUDU_TEST(catalog_manager_tsk-itest)
 ADD_KUDU_TEST(client_failover-itest)
+ADD_KUDU_TEST(client-negotiation-failover-itest
+  RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(client-stress-test
   RESOURCE_LOCK "master-rpc-ports"
   RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/client-negotiation-failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc 
b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
new file mode 100644
index 0000000..8f5ff13
--- /dev/null
+++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/client-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/util/scoped_cleanup.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using std::string;
+using std::thread;
+using std::vector;
+using std::unique_ptr;
+using strings::Substitute;
+
+DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_int64(rpc_negotiation_timeout_ms);
+
+namespace kudu {
+
+// Series of tests to verify that client fails over to another available server
+// if it experiences a timeout on connection negotiation with current server.
+// The 'server' can be a master or a tablet server.
+class ClientFailoverOnNegotiationTimeoutITest : public KuduTest {
+ public:
+  ClientFailoverOnNegotiationTimeoutITest() {
+    // Since we want to catch timeout during connection negotiation phase,
+    // let's make the client re-establishing connections on every RPC.
+    FLAGS_rpc_reopen_outbound_connections = true;
+
+    // Set the connection negotiation timeout shorter than its default value
+    // to run the test faster. For sanitizer builds we don't want the timeout
+    // to be too short: running the test concurrently with other activities
+    // might lead client to fail even if the client retries again and again.
+#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+    FLAGS_rpc_negotiation_timeout_ms = 3000;
+#else
+    FLAGS_rpc_negotiation_timeout_ms = 1000;
+#endif
+
+    cluster_opts_.extra_tserver_flags = {
+        // Speed up Raft elections.
+        "--raft_heartbeat_interval_ms=25",
+        "--leader_failure_exp_backoff_max_delta_ms=1000",
+        // Decreasing TS->master heartbeat interval speeds up the test.
+        "--heartbeat_interval_ms=25",
+    };
+    cluster_opts_.extra_master_flags = {
+        // Speed up Raft elections.
+        "--raft_heartbeat_interval_ms=25",
+        "--leader_failure_exp_backoff_max_delta_ms=1000",
+    };
+  }
+
+  Status CreateAndStartCluster() {
+    cluster_.reset(new ExternalMiniCluster(cluster_opts_));
+    return cluster_->Start();
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+ protected:
+  ExternalMiniClusterOptions cluster_opts_;
+  shared_ptr<ExternalMiniCluster> cluster_;
+};
+
+// Regression test for KUDU-1580: if a client times out on negotiating a 
connection
+// to a tablet server, it should retry with other available tablet server.
+TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) {
+  static const int kNumTabletServers = 3;
+  static const int kTimeoutMs = 5 * 60 * 1000;
+  static const char* kTableName = "kudu1580";
+
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  cluster_opts_.num_tablet_servers = kNumTabletServers;
+  ASSERT_OK(CreateAndStartCluster());
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(
+      &KuduClientBuilder()
+          
.default_admin_operation_timeout(MonoDelta::FromMilliseconds(kTimeoutMs))
+          .default_rpc_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)),
+      &client));
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  KuduSchema schema(client::KuduSchemaFromSchema(CreateKeyValueTestSchema()));
+  ASSERT_OK(table_creator->table_name(kTableName)
+      .schema(&schema)
+      .add_hash_partitions({ "key" }, kNumTabletServers)
+      .num_replicas(kNumTabletServers)
+      .Create());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+
+  shared_ptr<KuduSession> session = client->NewSession();
+  session->SetTimeoutMillis(kTimeoutMs);
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  // Running multiple iterations to cover possible variations of tablet leader
+  // placement among tablet servers.
+  for (int i = 0; i < 8 * kNumTabletServers; ++i) {
+    vector<unique_ptr<ScopedResumeExternalDaemon>> resumers;
+    for (int tsi = 0; tsi < kNumTabletServers; ++tsi) {
+      ExternalTabletServer* ts(cluster_->tablet_server(tsi));
+      ASSERT_NE(nullptr, ts);
+      ASSERT_OK(ts->Pause());
+      resumers.emplace_back(new ScopedResumeExternalDaemon(ts));
+    }
+
+    // Resume 2 out of 3 tablet servers (i.e. the majority), so the client
+    // could eventially succeed with its write operations.
+    thread resume_thread([&]() {
+        const int idx0 = rand() % kNumTabletServers;
+        unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
+        const int idx1 = (idx0 + 1) % kNumTabletServers;
+        unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
+        SleepFor(MonoDelta::FromSeconds(1));
+      });
+    // An automatic clean-up to handle both success and failure cases
+    // in the code below.
+    auto cleanup = MakeScopedCleanup([&]() {
+        resume_thread.join();
+      });
+
+    // Since the table is hash-partitioned with kNumTabletServer partitions,
+    // hopefully three sequential numbers would go into different partitions.
+    for (int ii = 0; ii < kNumTabletServers; ++ii) {
+      unique_ptr<KuduInsert> ins(table->NewInsert());
+      ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii));
+      ASSERT_OK(ins->mutable_row()->SetInt32(1, 0));
+      ASSERT_OK(session->Apply(ins.release()));
+    }
+  }
+}
+
+// Regression test for KUDU-2021: if client times out on establishing a
+// connection to the leader master, it should retry with other master in case 
of
+// a multi-master configuration.
+TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021ConnectToMaster) {
+  static const int kNumMasters = 3;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+
+  cluster_opts_.num_masters = kNumMasters;
+  cluster_opts_.num_tablet_servers = 1;
+  cluster_opts_.master_rpc_ports = { 32037, 32038, 32039 };
+  ASSERT_OK(CreateAndStartCluster());
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(
+      &KuduClientBuilder()
+          .default_admin_operation_timeout(kTimeout)
+          .default_rpc_timeout(kTimeout),
+      &client));
+
+  // Make a call to the master to populate the client's metadata cache.
+  vector<string> tables;
+  ASSERT_OK(client->ListTables(&tables));
+
+  // Pause the leader master so next call client would time out on connection
+  // negotiation. Do that few times.
+  for (int i = 0; i < kNumMasters; ++i) {
+    int leader_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    ASSERT_OK(cluster_->master(leader_idx)->Pause());
+    ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx));
+
+    ASSERT_OK(client->ListTables(&tables));
+  }
+}
+
+// Regression test for KUDU-2021: if client times out on negotiating a
+// connection with the master, it should retry with other master in case of
+// a multi-master configuration.
+TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021NegotiateWithMaster) {
+  static const int kNumMasters = 3;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+
+  cluster_opts_.num_masters = kNumMasters;
+  cluster_opts_.num_tablet_servers = 1;
+  cluster_opts_.master_rpc_ports = { 31037, 31038, 31039 };
+  ASSERT_OK(CreateAndStartCluster());
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(
+      &KuduClientBuilder()
+          .default_admin_operation_timeout(kTimeout)
+          .default_rpc_timeout(kTimeout),
+      &client));
+
+  // Check client can successfully call ListTables().
+  vector<string> tables;
+  ASSERT_OK(client->ListTables(&tables));
+
+  // The test sets the client-side RPC negotiation timeout via the flag
+  // 'rpc_negotiation_timeout_ms'. We want the client to open a TCP connection
+  // and start the negotiation process with the leader master and then time out
+  // during the negotiation process. For the test to check the client's 
behavior
+  // on timing out while establishing a TCP connection see the
+  // Kudu2021ConnectToMaster test above.
+  //
+  // So, after the client times out on the negotiation process, it should 
re-resolve
+  // the leader master and connect to the new leader. Since the former leader
+  // has been paused in the middle of the negotiation, the client is supposed 
to
+  // connect to the new leader and succeed with its ListTables() call.
+  int leader_idx;
+  ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+  ExternalMaster* m = cluster_->master(leader_idx);
+  ASSERT_OK(
+      cluster_->SetFlag(m, "rpc_negotiation_inject_delay_ms",
+                        Substitute("$0", FLAGS_rpc_negotiation_timeout_ms * 
2)));
+  thread pause_thread([&]() {
+      SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms / 
2));
+      CHECK_OK(m->Pause())
+    });
+  // An automatic clean-up to handle both success and failure cases.
+  auto pause_thread_cleanup = MakeScopedCleanup([&]() {
+      pause_thread.join();
+      CHECK_OK(m->Resume());
+    });
+
+  // After an attempt to negotiate with the former leader master, timing out,
+  // and re-resolving the leader master, the client will eventually connect to
+  // a new leader master elected after the former one is paused. The new leader
+  // master doesn't impose any negotiation delay, so the client should succeed
+  // with the ListTables() call.
+  ASSERT_OK(client->ListTables(&tables));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7e02ede3/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc 
b/src/kudu/integration-tests/client_failover-itest.cc
index 7f25fe7..bd6d0b8 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -18,14 +18,10 @@
 #include <memory>
 #include <set>
 #include <string>
-#include <thread>
 #include <unordered_map>
 #include <vector>
 
-#include <boost/optional.hpp>
 #include <glog/logging.h>
-
-#include <gflags/gflags_declare.h>
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
@@ -35,33 +31,22 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
-#include "kudu/integration-tests/ts_itest-base.h"
-#include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/scoped_cleanup.h"
 
 using kudu::client::CountTableRows;
-using kudu::client::KuduClient;
-using kudu::client::KuduClientBuilder;
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
-using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
-using kudu::client::KuduTableCreator;
 using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::itest::TServerDetails;
 using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using std::set;
 using std::string;
-using std::thread;
 using std::vector;
-using std::unique_ptr;
 using std::unordered_map;
 
-DECLARE_bool(rpc_reopen_outbound_connections);
-DECLARE_int64(rpc_negotiation_timeout_ms);
-
 namespace kudu {
 
 enum ClientTestBehavior {
@@ -358,119 +343,4 @@ TEST_F(ClientFailoverTServerTimeoutITest, 
FailoverOnLeaderTimeout) {
   EXPECT_GE(workload.rows_inserted(), rows_target);
 }
 
-// Series of tests to verify that client fails over to another available server
-// if it experiences a timeout on connection negotiation with current server.
-// The 'server' can be both a master and a tablet server.
-class ClientFailoverOnNegotiationTimeoutITest : public KuduTest {
- public:
-  ClientFailoverOnNegotiationTimeoutITest() {
-    // Since we want to catch timeout during connection negotiation phase,
-    // let's make the client re-establishing connections on every RPC.
-    FLAGS_rpc_reopen_outbound_connections = true;
-    // Set the connection negotiation timeout shorter than its default value
-    // to run the test faster.
-    FLAGS_rpc_negotiation_timeout_ms = 1000;
-
-    cluster_opts_.extra_tserver_flags = {
-        // Speed up Raft elections.
-        "--raft_heartbeat_interval_ms=25",
-        "--leader_failure_exp_backoff_max_delta_ms=1000",
-        // Decreasing TS->master heartbeat interval speeds up the test.
-        "--heartbeat_interval_ms=25",
-    };
-    cluster_opts_.extra_master_flags = {
-        // Speed up Raft elections.
-        "--raft_heartbeat_interval_ms=25",
-        "--leader_failure_exp_backoff_max_delta_ms=1000",
-    };
-  }
-
-  Status CreateAndStartCluster() {
-    cluster_.reset(new ExternalMiniCluster(cluster_opts_));
-    return cluster_->Start();
-  }
-
-  void TearDown() override {
-    if (cluster_) {
-      cluster_->Shutdown();
-    }
-    KuduTest::TearDown();
-  }
- protected:
-  ExternalMiniClusterOptions cluster_opts_;
-  shared_ptr<ExternalMiniCluster> cluster_;
-};
-
-// Regression test for KUDU-1580: if a client times out on negotiating a 
connection
-// to a tablet server, it should retry with other available tablet server.
-TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu1580ConnectToTServer) {
-  static const int kNumTabletServers = 3;
-  static const int kTimeoutMs = 5 * 60 * 1000;
-  static const char* kTableName = "kudu1580";
-
-  if (!AllowSlowTests()) {
-    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
-    return;
-  }
-
-  cluster_opts_.num_tablet_servers = kNumTabletServers;
-  ASSERT_OK(CreateAndStartCluster());
-
-  shared_ptr<KuduClient> client;
-  ASSERT_OK(cluster_->CreateClient(
-      &KuduClientBuilder()
-          
.default_admin_operation_timeout(MonoDelta::FromMilliseconds(kTimeoutMs))
-          .default_rpc_timeout(MonoDelta::FromMilliseconds(kTimeoutMs)),
-      &client));
-  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
-  KuduSchema schema(client::KuduSchemaFromSchema(CreateKeyValueTestSchema()));
-  ASSERT_OK(table_creator->table_name(kTableName)
-      .schema(&schema)
-      .add_hash_partitions({ "key" }, kNumTabletServers)
-      .num_replicas(kNumTabletServers)
-      .Create());
-  shared_ptr<KuduTable> table;
-  ASSERT_OK(client->OpenTable(kTableName, &table));
-
-  shared_ptr<KuduSession> session = client->NewSession();
-  session->SetTimeoutMillis(kTimeoutMs);
-  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
-
-  // Running multiple iterations to cover possible variations of tablet leader
-  // placement among tablet servers.
-  for (int i = 0; i < 8 * kNumTabletServers; ++i) {
-    vector<unique_ptr<ScopedResumeExternalDaemon>> resumers;
-    for (int tsi = 0; tsi < kNumTabletServers; ++tsi) {
-      ExternalTabletServer* ts(cluster_->tablet_server(tsi));
-      ASSERT_NE(nullptr, ts);
-      ASSERT_OK(ts->Pause());
-      resumers.emplace_back(new ScopedResumeExternalDaemon(ts));
-    }
-
-    // Resume 2 out of 3 tablet servers (i.e. the majority), so the client
-    // could eventially succeed with its write operations.
-    thread resume_thread([&]() {
-        const int idx0 = rand() % kNumTabletServers;
-        unique_ptr<ScopedResumeExternalDaemon> r0(resumers[idx0].release());
-        const int idx1 = (idx0 + 1) % kNumTabletServers;
-        unique_ptr<ScopedResumeExternalDaemon> r1(resumers[idx1].release());
-        SleepFor(MonoDelta::FromSeconds(1));
-      });
-    // An automatic clean-up to handle both success and failure cases
-    // in the code below.
-    auto cleanup = MakeScopedCleanup([&]() {
-        resume_thread.join();
-      });
-
-    // Since the table is hash-partitioned with kNumTabletServer partitions,
-    // hopefully three sequential numbers would go into different partitions.
-    for (int ii = 0; ii < kNumTabletServers; ++ii) {
-      unique_ptr<KuduInsert> ins(table->NewInsert());
-      ASSERT_OK(ins->mutable_row()->SetInt32(0, kNumTabletServers * i + ii));
-      ASSERT_OK(ins->mutable_row()->SetInt32(1, 0));
-      ASSERT_OK(session->Apply(ins.release()));
-    }
-  }
-}
-
 } // namespace kudu

Reply via email to