This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0c8d2eee1ee2b03d0de369d5975f153b00e26b36 Author: Alexey Serbin <[email protected]> AuthorDate: Thu Dec 7 10:22:37 2023 -0800 [rpc] extra test scenarios for exactly-once RPC logic In the context of troubleshooting one issue, it was necessary to know how the exactly-once RPC logic handles the re-ordering of RPC requests, so I added these extra scenarios to make sure the implementation behaves as expected. I think it's worth it adding the new tests into the code base to explicitly state how the system behaves in particular cases and to catch possible regressions if the corresponding code modified in the future. Change-Id: I3eedeb786420175b604fc67a9de948cedba9c53a Reviewed-on: http://gerrit.cloudera.org:8080/20831 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> --- src/kudu/rpc/exactly_once_rpc-test.cc | 203 +++++++++++++++++++++++++++++++--- 1 file changed, 187 insertions(+), 16 deletions(-) diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc index 8308b66bb..e9a0a6d13 100644 --- a/src/kudu/rpc/exactly_once_rpc-test.cc +++ b/src/kudu/rpc/exactly_once_rpc-test.cc @@ -73,16 +73,24 @@ namespace { const char* kClientId = "test-client"; -void AddRequestId(RpcController* controller, - const std::string& client_id, - ResultTracker::SequenceNumber sequence_number, - int64_t attempt_no) { - unique_ptr<RequestIdPB> request_id(new RequestIdPB()); +void AddRequestIdImpl(RpcController* ctl, + const std::string& client_id, + ResultTracker::SequenceNumber seq_num, + ResultTracker::SequenceNumber first_incomplete_seq_num, + int64_t attempt_num = 0) { + unique_ptr<RequestIdPB> request_id(new RequestIdPB); request_id->set_client_id(client_id); - request_id->set_seq_no(sequence_number); - request_id->set_attempt_no(attempt_no); - request_id->set_first_incomplete_seq_no(sequence_number); - controller->SetRequestIdPB(std::move(request_id)); + request_id->set_seq_no(seq_num); + request_id->set_first_incomplete_seq_no(first_incomplete_seq_num); + request_id->set_attempt_no(attempt_num); + ctl->SetRequestIdPB(std::move(request_id)); +} + +void AddRequestId(RpcController* ctl, + const std::string& client_id, + ResultTracker::SequenceNumber seq_num, + int64_t attempt_num = 0) { + return AddRequestIdImpl(ctl, client_id, seq_num, seq_num, attempt_num); } class TestServerPicker : public ServerPicker<CalculatorServiceProxy> { @@ -248,13 +256,13 @@ class ExactlyOnceRpcTest : public RpcTestBase { // and makes sure that only one of the calls was successful. struct SimultaneousExactlyOnceAdder { SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p, - ResultTracker::SequenceNumber sequence_number, - int value, - uint64_t client_sleep, - uint64_t server_sleep, - int64_t attempt_no) - : proxy(p), - client_sleep_for_ms(client_sleep) { + ResultTracker::SequenceNumber sequence_number, + int value, + uint64_t client_sleep, + uint64_t server_sleep, + int64_t attempt_no) + : proxy(p), + client_sleep_for_ms(client_sleep) { req.set_value_to_add(value); req.set_sleep_for_ms(server_sleep); AddRequestId(&controller, kClientId, sequence_number, attempt_no); @@ -614,6 +622,169 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) NO_PENDING_FATALS(); } +// Requests might be re-ordered due to rare network conditions if sent to +// the server as separate WriteRequest RPCs or due to scheduling anomalies. +// The latter can happen because the requests are dispatched to a multitude +// of worker threads from a queue. This test scenario makes sure that if such +// a situation occurs, the exactly-once logic behaves as expected. +TEST_F(ExactlyOnceRpcTest, ReorderedRequestsOne) { + ASSERT_OK(StartServer()); + + // The request with ID 1 is dispatched first. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/1, /*first_incomplete_seq_num=*/0, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(1, resp.current_val()); + } + + // The processing of the request with ID 0 starts a bit later. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/0, /*first_incomplete_seq_num=*/0, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(2, resp.current_val()); + } + + // And here comes the retransmission of the request with ID 1 + // (attempt_num = 1). The response should be the same as the server sent for + // attempt_num == 0, even if the current value of the counter is 2 + // since the request with ID 0 has been already received and processed by + // the server. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/1, /*first_incomplete_seq_num=*/0, /*attempt_num=*/1); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(1, resp.current_val()); + } + + // Now send in a request whose ID is a gap away from the last request ID + // seen from this client, pretending the client has received responses + // for requests 0 and 1, and there are some requests in-flight the server + // hasn't seen yet. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(10); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/10, /*first_incomplete_seq_num=*/8, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(12, resp.current_val()); + } +} + +// This is similar to ReorderRequestsSimple above, but with more variatons. +TEST_F(ExactlyOnceRpcTest, ReorderedRequestsTwo) { + ASSERT_OK(StartServer()); + + // The request with ID 1 is dispatched first. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(2); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/1, /*first_incomplete_seq_num=*/0, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(2, resp.current_val()); + } + + // The processing of the request with ID 0 starts a bit later. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/0, /*first_incomplete_seq_num=*/0, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(3, resp.current_val()); + } + + // However, the request with ID 0 is being responded first even if its + // processing started later because the worker thread that was processing + // the request with ID 1 had stuck for a while. So, request with ID 2 + // has first_incomplete_seq_num == 1 on its attempt_num == 0. + // And with that, the two attempts of the request with ID 2 are re-ordered + // as well when the exactly-once logic receives them, so the request + // with attempt_num == 1 has first_incomplete_seq_num == 2. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(3); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/2, /*first_incomplete_seq_num=*/2, /*attempt_num=*/1); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(6, resp.current_val()); + } + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(3); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/2, /*first_incomplete_seq_num=*/1, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(6, resp.current_val()); + } + + // Send two more requests to make sure that the exactly-once state machine + // isn't screwed at this point. + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(4); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/3, /*first_incomplete_seq_num=*/2, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(10, resp.current_val()); + } + { + RpcController ctl; + ExactlyOnceResponsePB resp; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + AddRequestIdImpl(&ctl, kClientId, + /*seq_num=*/4, /*first_incomplete_seq_num=*/3, /*attempt_num=*/0); + + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &ctl)); + ASSERT_EQ(11, resp.current_val()); + } +} } // namespace rpc } // namespace kudu
