Repository: kudu Updated Branches: refs/heads/master 12683435f -> e3c5dd18c
KUDU-1034 client does not failover due to timeout This patch fixes the issue described by KUDU-1034: the client does not mark the failed tablet server as 'failed' in case of timeout and continues to use it over and over again to send further requests, even if other tablet replicas might be available. Besides the actual fix, this patch incorporates an integration test (ClientFailoverTServerTimeoutITest.FailoverOnLeaderTimeout) written by Mike. Change-Id: Icfcece485e4053d921ffdc865612b3e7b9a992a3 Reviewed-on: http://gerrit.cloudera.org:8080/6924 Reviewed-by: Mike Percy <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4263b037 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4263b037 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4263b037 Branch: refs/heads/master Commit: 4263b037844fca595a35f99479fbb5765ba7a443 Parents: 1268343 Author: Alexey Serbin <[email protected]> Authored: Thu May 18 13:50:01 2017 -0700 Committer: Alexey Serbin <[email protected]> Committed: Thu May 25 01:07:43 2017 +0000 ---------------------------------------------------------------------- src/kudu/client/meta_cache.h | 5 + .../integration-tests/client_failover-itest.cc | 98 +++++++++++++++++++- src/kudu/integration-tests/tablet_copy-itest.cc | 7 +- src/kudu/rpc/retriable_rpc.h | 18 +++- 4 files changed, 125 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/client/meta_cache.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h index a8a67f1..2e68f6c 100644 --- a/src/kudu/client/meta_cache.h +++ b/src/kudu/client/meta_cache.h @@ -130,7 +130,12 @@ class MetaCacheServerPicker : public rpc::ServerPicker<RemoteTabletServer> { virtual ~MetaCacheServerPicker() {} void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override; + + // In the case of this MetaCacheServerPicker class, the implementation of this + // method is very selective. It marks only servers hosting the remote tablet + // the MetaCacheServerPicker object is bound to, not the entire RemoteTabletServer. void MarkServerFailed(RemoteTabletServer* replica, const Status& status) override; + void MarkReplicaNotLeader(RemoteTabletServer* replica) override; void MarkResourceNotFound(RemoteTabletServer* replica) override; private: http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/integration-tests/client_failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc index 0066d2f..bd0657b 100644 --- a/src/kudu/integration-tests/client_failover-itest.cc +++ b/src/kudu/integration-tests/client_failover-itest.cc @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include <boost/optional.hpp> #include <memory> #include <set> +#include <string> #include <unordered_map> +#include <vector> + +#include <boost/optional.hpp> +#include <glog/logging.h> #include "kudu/client/client-test-util.h" #include "kudu/common/wire_protocol.h" @@ -26,6 +30,7 @@ #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/external_mini_cluster-itest-base.h" #include "kudu/integration-tests/test_workload.h" +#include "kudu/tserver/tserver.pb.h" using kudu::client::CountTableRows; using kudu::client::KuduInsert; @@ -245,4 +250,95 @@ TEST_F(ClientFailoverITest, TestClusterCrashDuringWorkload) { workload.StopAndJoin(); } +class ClientFailoverTServerTimeoutITest : public ExternalMiniClusterITestBase { + public: + void SetUp() override { + ExternalMiniClusterITestBase::SetUp(); + + // Extra flags to speed up the test. + const vector<string> extra_flags_tserver = { + "--consensus_rpc_timeout_ms=250", + "--heartbeat_interval_ms=10", + "--raft_heartbeat_interval_ms=25", + "--leader_failure_exp_backoff_max_delta_ms=1000", + }; + const vector<string> extra_flags_master = { + "--raft_heartbeat_interval_ms=25", + "--leader_failure_exp_backoff_max_delta_ms=1000", + }; + NO_FATALS(StartCluster(extra_flags_tserver, extra_flags_master, kTSNum)); + } + + protected: + static const int kTSNum = 3; + + Status GetLeaderReplica(TServerDetails** leader) { + string tablet_id; + RETURN_NOT_OK(GetTabletId(&tablet_id)); + Status s; + for (int i = 0; i < 128; ++i) { + // FindTabletLeader tries to connect the the reported leader to verify + // that it thinks it's the leader. + s = itest::FindTabletLeader( + ts_map_, tablet_id, MonoDelta::FromMilliseconds(100), leader); + if (s.ok()) { + break; + } + SleepFor(MonoDelta::FromMilliseconds(10L * (i + 1))); + } + return s; + } + + private: + // Get identifier of any tablet running on the tablet server with index 0. + Status GetTabletId(string* tablet_id) { + TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; + vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets; + RETURN_NOT_OK(itest::WaitForNumTabletsOnTS( + ts, 1, MonoDelta::FromSeconds(32), &tablets)); + *tablet_id = tablets[0].tablet_status().tablet_id(); + + return Status::OK(); + } +}; + +// Test that a client fails over to other available tablet replicas when a RPC +// with the former leader times out. This is a regression test for KUDU-1034. +TEST_F(ClientFailoverTServerTimeoutITest, FailoverOnLeaderTimeout) { + TestWorkload workload(cluster_.get()); + workload.set_num_replicas(kTSNum); + workload.Setup(); + workload.Start(); + while (workload.rows_inserted() < 100) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + workload.StopAndJoin(); + + TServerDetails* leader; + ASSERT_OK(GetLeaderReplica(&leader)); + ASSERT_NE(nullptr, leader); + + // Pause the leader: this will cause the client to get timeout errors + // if trying to send RPCs to the corresponding tablet server. + ExternalTabletServer* ts(cluster_->tablet_server_by_uuid(leader->uuid())); + ASSERT_NE(nullptr, ts); + ScopedResumeExternalDaemon leader_resumer(ts); + ASSERT_OK(ts->Pause()); + + // Write 100 more rows. + int rows_target = workload.rows_inserted() + 100; + workload.set_timeout_allowed(true); + workload.set_write_timeout_millis(500); + workload.Start(); + for (int i = 0; i < 1000; ++i) { + if (workload.rows_inserted() >= rows_target) { + break; + } + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + // Verify all rows have reached the destiation. + EXPECT_GE(workload.rows_inserted(), rows_target); +} + } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/integration-tests/tablet_copy-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc index 85e25f7..2f747c5 100644 --- a/src/kudu/integration-tests/tablet_copy-itest.cc +++ b/src/kudu/integration-tests/tablet_copy-itest.cc @@ -157,7 +157,12 @@ TEST_F(TabletCopyITest, TestRejectRogueLeader) { // leader's tablet copy request when we bring it back online. int log_index = workload.batches_completed() + 2; // 2 terms == 2 additional NO_OP entries. ASSERT_OK(WaitForServersToAgree(timeout, active_ts_map, tablet_id, log_index)); - // TODO: Write more rows to the new leader once KUDU-1034 is fixed. + // Write more rows to the new leader. + workload.Start(); + while (workload.rows_inserted() < 100) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + workload.StopAndJoin(); // Now kill the new leader and tombstone the replica on TS 0. cluster_->tablet_server(new_leader_index)->Shutdown(); http://git-wip-us.apache.org/repos/asf/kudu/blob/4263b037/src/kudu/rpc/retriable_rpc.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h index a5ff7ad..c896027 100644 --- a/src/kudu/rpc/retriable_rpc.h +++ b/src/kudu/rpc/retriable_rpc.h @@ -179,6 +179,9 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded( // test coverage here to understand why the back-off is not taking effect. if (server != nullptr) { VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString(); + // Mark the server as failed. As for details on the only existing + // implementation of ServerPicker::MarkServerFailed(), see the note on + // the MetaCacheServerPicker::MarkServerFailed() method. server_picker_->MarkServerFailed(server, result.status); } break; @@ -209,8 +212,21 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded( return false; } + case RetriableRpcStatus::NON_RETRIABLE_ERROR: + if (server != nullptr && result.status.IsTimedOut()) { + // For the NON_RETRIABLE_ERROR result in case of TimedOut status, + // mark the server as failed. As for details on the only existing + // implementation of ServerPicker::MarkServerFailed(), see the note on + // the MetaCacheServerPicker::MarkServerFailed() method. + VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString(); + server_picker_->MarkServerFailed(server, result.status); + } + // Do not retry in the case of non-retriable error. + return false; + default: - // For the OK and NON_RETRIABLE_ERROR cases we can't/won't retry. + // For the OK case we should not retry. + DCHECK(result.result == RetriableRpcStatus::OK); return false; } resp_.Clear();
