http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-test-base.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h new file mode 100644 index 0000000..c40f546 --- /dev/null +++ b/be/src/kudu/rpc/rpc-test-base.h @@ -0,0 +1,585 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_RPC_TEST_BASE_H +#define KUDU_RPC_RPC_TEST_BASE_H + +#include <algorithm> +#include <atomic> +#include <memory> +#include <string> + +#include "kudu/gutil/walltime.h" +#include "kudu/rpc/acceptor_pool.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/proxy.h" +#include "kudu/rpc/reactor.h" +#include "kudu/rpc/remote_method.h" +#include "kudu/rpc/result_tracker.h" +#include "kudu/rpc/rpc_context.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/rpc/rtest.pb.h" +#include "kudu/rpc/rtest.proxy.h" +#include "kudu/rpc/rtest.service.h" +#include "kudu/rpc/service_if.h" +#include "kudu/rpc/service_pool.h" +#include "kudu/security/security-test-util.h" +#include "kudu/util/env.h" +#include "kudu/util/faststring.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/path_util.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" +#include "kudu/util/trace.h" + +namespace kudu { +namespace rpc { + +using kudu::rpc_test::AddRequestPB; +using kudu::rpc_test::AddResponsePB; +using kudu::rpc_test::CalculatorError; +using kudu::rpc_test::CalculatorServiceIf; +using kudu::rpc_test::CalculatorServiceProxy; +using kudu::rpc_test::EchoRequestPB; +using kudu::rpc_test::EchoResponsePB; +using kudu::rpc_test::ExactlyOnceRequestPB; +using kudu::rpc_test::ExactlyOnceResponsePB; +using kudu::rpc_test::FeatureFlags; +using kudu::rpc_test::PanicRequestPB; +using kudu::rpc_test::PanicResponsePB; +using kudu::rpc_test::PushTwoStringsRequestPB; +using kudu::rpc_test::PushTwoStringsResponsePB; +using kudu::rpc_test::SendTwoStringsRequestPB; +using kudu::rpc_test::SendTwoStringsResponsePB; +using kudu::rpc_test::SleepRequestPB; +using kudu::rpc_test::SleepResponsePB; +using kudu::rpc_test::TestInvalidResponseRequestPB; +using kudu::rpc_test::TestInvalidResponseResponsePB; +using kudu::rpc_test::WhoAmIRequestPB; +using kudu::rpc_test::WhoAmIResponsePB; +using kudu::rpc_test_diff_package::ReqDiffPackagePB; +using kudu::rpc_test_diff_package::RespDiffPackagePB; + +// Implementation of CalculatorService which just implements the generic +// RPC handler (no generated code). +class GenericCalculatorService : public ServiceIf { + public: + static const char *kFullServiceName; + static const char *kAddMethodName; + static const char *kSleepMethodName; + static const char *kPushTwoStringsMethodName; + static const char *kSendTwoStringsMethodName; + static const char *kAddExactlyOnce; + + static const char* kFirstString; + static const char* kSecondString; + + GenericCalculatorService() { + } + + // To match the argument list of the generated CalculatorService. + explicit GenericCalculatorService(const scoped_refptr<MetricEntity>& entity, + const scoped_refptr<ResultTracker>& result_tracker) { + // this test doesn't generate metrics, so we ignore the argument. + } + + void Handle(InboundCall *incoming) override { + if (incoming->remote_method().method_name() == kAddMethodName) { + DoAdd(incoming); + } else if (incoming->remote_method().method_name() == kSleepMethodName) { + DoSleep(incoming); + } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) { + DoSendTwoStrings(incoming); + } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) { + DoPushTwoStrings(incoming); + } else { + incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD, + Status::InvalidArgument("bad method")); + } + } + + std::string service_name() const override { return kFullServiceName; } + static std::string static_service_name() { return kFullServiceName; } + + private: + void DoAdd(InboundCall *incoming) { + Slice param(incoming->serialized_request()); + AddRequestPB req; + if (!req.ParseFromArray(param.data(), param.size())) { + LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); + } + + AddResponsePB resp; + resp.set_result(req.x() + req.y()); + incoming->RespondSuccess(resp); + } + + void DoSendTwoStrings(InboundCall* incoming) { + Slice param(incoming->serialized_request()); + SendTwoStringsRequestPB req; + if (!req.ParseFromArray(param.data(), param.size())) { + LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); + } + + std::unique_ptr<faststring> first(new faststring); + std::unique_ptr<faststring> second(new faststring); + + Random r(req.random_seed()); + first->resize(req.size1()); + RandomString(first->data(), req.size1(), &r); + + second->resize(req.size2()); + RandomString(second->data(), req.size2(), &r); + + SendTwoStringsResponsePB resp; + int idx1, idx2; + CHECK_OK(incoming->AddOutboundSidecar( + RpcSidecar::FromFaststring(std::move(first)), &idx1)); + CHECK_OK(incoming->AddOutboundSidecar( + RpcSidecar::FromFaststring(std::move(second)), &idx2)); + resp.set_sidecar1(idx1); + resp.set_sidecar2(idx2); + + incoming->RespondSuccess(resp); + } + + void DoPushTwoStrings(InboundCall* incoming) { + Slice param(incoming->serialized_request()); + PushTwoStringsRequestPB req; + if (!req.ParseFromArray(param.data(), param.size())) { + LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); + } + + Slice sidecar1; + CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1)); + + Slice sidecar2; + CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2)); + + // Check that reading non-existant sidecars doesn't work. + Slice tmp; + CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok()); + + PushTwoStringsResponsePB resp; + resp.set_size1(sidecar1.size()); + resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size()); + resp.set_size2(sidecar2.size()); + resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size()); + + // Drop the sidecars etc, just to confirm that it's safe to do so. + CHECK_GT(incoming->GetTransferSize(), 0); + incoming->DiscardTransfer(); + CHECK_EQ(0, incoming->GetTransferSize()); + incoming->RespondSuccess(resp); + } + + void DoSleep(InboundCall *incoming) { + Slice param(incoming->serialized_request()); + SleepRequestPB req; + if (!req.ParseFromArray(param.data(), param.size())) { + incoming->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST, + Status::InvalidArgument("Couldn't parse pb", + req.InitializationErrorString())); + return; + } + + LOG(INFO) << "got call: " << SecureShortDebugString(req); + SleepFor(MonoDelta::FromMicroseconds(req.sleep_micros())); + SleepResponsePB resp; + incoming->RespondSuccess(resp); + } +}; + +class CalculatorService : public CalculatorServiceIf { + public: + explicit CalculatorService(const scoped_refptr<MetricEntity>& entity, + const scoped_refptr<ResultTracker> result_tracker) + : CalculatorServiceIf(entity, result_tracker), + exactly_once_test_val_(0) { + } + + void Add(const AddRequestPB *req, AddResponsePB *resp, RpcContext *context) override { + resp->set_result(req->x() + req->y()); + context->RespondSuccess(); + } + + void Sleep(const SleepRequestPB *req, SleepResponsePB *resp, RpcContext *context) override { + if (req->return_app_error()) { + CalculatorError my_error; + my_error.set_extra_error_data("some application-specific error data"); + context->RespondApplicationError(CalculatorError::app_error_ext.number(), + "Got some error", my_error); + return; + } + + // Respond w/ error if the RPC specifies that the client deadline is set, + // but it isn't. + if (req->client_timeout_defined()) { + MonoTime deadline = context->GetClientDeadline(); + if (deadline == MonoTime::Max()) { + CalculatorError my_error; + my_error.set_extra_error_data("Timeout not set"); + context->RespondApplicationError(CalculatorError::app_error_ext.number(), + "Missing required timeout", my_error); + return; + } + } + + if (req->deferred()) { + // Spawn a new thread which does the sleep and responds later. + scoped_refptr<Thread> thread; + CHECK_OK(Thread::Create("rpc-test", "deferred", + &CalculatorService::DoSleep, this, req, context, + &thread)); + return; + } + DoSleep(req, context); + } + + void Echo(const EchoRequestPB *req, EchoResponsePB *resp, RpcContext *context) override { + resp->set_data(req->data()); + context->RespondSuccess(); + } + + void WhoAmI(const WhoAmIRequestPB* /*req*/, + WhoAmIResponsePB* resp, + RpcContext* context) override { + const RemoteUser& user = context->remote_user(); + resp->mutable_credentials()->set_real_user(user.username()); + resp->set_address(context->remote_address().ToString()); + context->RespondSuccess(); + } + + void TestArgumentsInDiffPackage(const ReqDiffPackagePB *req, + RespDiffPackagePB *resp, + ::kudu::rpc::RpcContext *context) override { + context->RespondSuccess(); + } + + void Panic(const PanicRequestPB* req, PanicResponsePB* resp, RpcContext* context) override { + TRACE("Got panic request"); + PANIC_RPC(context, "Test method panicking!"); + } + + void TestInvalidResponse(const TestInvalidResponseRequestPB* req, + TestInvalidResponseResponsePB* resp, + RpcContext* context) override { + switch (req->error_type()) { + case rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD: + // Respond without setting the 'resp->response' protobuf field, which is + // marked as required. This exercises the error path of invalid responses. + context->RespondSuccess(); + break; + case rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE: + resp->mutable_response()->resize(FLAGS_rpc_max_message_size + 1000); + context->RespondSuccess(); + break; + default: + LOG(FATAL); + } + } + + bool SupportsFeature(uint32_t feature) const override { + return feature == FeatureFlags::FOO; + } + + void AddExactlyOnce(const ExactlyOnceRequestPB* req, ExactlyOnceResponsePB* resp, + ::kudu::rpc::RpcContext* context) override { + if (req->sleep_for_ms() > 0) { + usleep(req->sleep_for_ms() * 1000); + } + // If failures are enabled, cause them some percentage of the time. + if (req->randomly_fail()) { + if (rand() % 10 < 3) { + context->RespondFailure(Status::ServiceUnavailable("Random injected failure.")); + return; + } + } + int result = exactly_once_test_val_ += req->value_to_add(); + resp->set_current_val(result); + resp->set_current_time_micros(GetCurrentTimeMicros()); + context->RespondSuccess(); + } + + bool AuthorizeDisallowAlice(const google::protobuf::Message* /*req*/, + google::protobuf::Message* /*resp*/, + RpcContext* context) override { + if (context->remote_user().username() == "alice") { + context->RespondFailure(Status::NotAuthorized("alice is not allowed to call this method")); + return false; + } + return true; + } + + bool AuthorizeDisallowBob(const google::protobuf::Message* /*req*/, + google::protobuf::Message* /*resp*/, + RpcContext* context) override { + if (context->remote_user().username() == "bob") { + context->RespondFailure(Status::NotAuthorized("bob is not allowed to call this method")); + return false; + } + return true; + } + + private: + void DoSleep(const SleepRequestPB *req, + RpcContext *context) { + TRACE_COUNTER_INCREMENT("test_sleep_us", req->sleep_micros()); + if (Trace::CurrentTrace()) { + scoped_refptr<Trace> child_trace(new Trace()); + Trace::CurrentTrace()->AddChildTrace("test_child", child_trace.get()); + ADOPT_TRACE(child_trace.get()); + TRACE_COUNTER_INCREMENT("related_trace_metric", 1); + } + + SleepFor(MonoDelta::FromMicroseconds(req->sleep_micros())); + context->RespondSuccess(); + } + + std::atomic_int exactly_once_test_val_; + +}; + +const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService"; +const char *GenericCalculatorService::kAddMethodName = "Add"; +const char *GenericCalculatorService::kSleepMethodName = "Sleep"; +const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings"; +const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings"; +const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce"; + +const char *GenericCalculatorService::kFirstString = + "1111111111111111111111111111111111111111111111111111111111"; +const char *GenericCalculatorService::kSecondString = + "2222222222222222222222222222222222222222222222222222222222222222222222"; + +class RpcTestBase : public KuduTest { + public: + RpcTestBase() + : n_worker_threads_(3), + service_queue_length_(100), + n_server_reactor_threads_(3), + keepalive_time_ms_(1000), + metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "test.rpc_test")) { + } + + void SetUp() override { + KuduTest::SetUp(); + } + + void TearDown() override { + if (service_pool_) { + server_messenger_->UnregisterService(service_name_); + service_pool_->Shutdown(); + } + if (server_messenger_) { + server_messenger_->Shutdown(); + } + KuduTest::TearDown(); + } + + protected: + std::shared_ptr<Messenger> CreateMessenger(const string &name, + int n_reactors = 1, + bool enable_ssl = false) { + MessengerBuilder bld(name); + + if (enable_ssl) { + bld.enable_inbound_tls(); + } + + bld.set_num_reactors(n_reactors); + bld.set_connection_keepalive_time( + MonoDelta::FromMilliseconds(keepalive_time_ms_)); + // In order for the keepalive timing to be accurate, we need to scan connections + // significantly more frequently than the keepalive time. This "coarse timer" + // granularity determines this. + bld.set_coarse_timer_granularity(MonoDelta::FromMilliseconds( + std::min(keepalive_time_ms_ / 5, 100))); + bld.set_metric_entity(metric_entity_); + std::shared_ptr<Messenger> messenger; + CHECK_OK(bld.Build(&messenger)); + return messenger; + } + + Status DoTestSyncCall(const Proxy &p, const char *method, + CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) { + AddRequestPB req; + req.set_x(rand()); + req.set_y(rand()); + AddResponsePB resp; + RpcController controller; + controller.set_timeout(MonoDelta::FromMilliseconds(10000)); + controller.set_credentials_policy(policy); + RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller)); + + CHECK_EQ(req.x() + req.y(), resp.result()); + return Status::OK(); + } + + void DoTestSidecar(const Proxy &p, int size1, int size2) { + const uint32_t kSeed = 12345; + + SendTwoStringsRequestPB req; + req.set_size1(size1); + req.set_size2(size2); + req.set_random_seed(kSeed); + + SendTwoStringsResponsePB resp; + RpcController controller; + controller.set_timeout(MonoDelta::FromMilliseconds(10000)); + CHECK_OK(p.SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName, + req, &resp, &controller)); + + Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1); + Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2); + Random rng(kSeed); + faststring expected; + + expected.resize(size1); + RandomString(expected.data(), size1, &rng); + CHECK_EQ(0, first.compare(Slice(expected))); + + expected.resize(size2); + RandomString(expected.data(), size2, &rng); + CHECK_EQ(0, second.compare(Slice(expected))); + } + + void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) { + PushTwoStringsRequestPB request; + RpcController controller; + + int idx1; + string s1(size1, 'a'); + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1)); + + int idx2; + string s2(size2, 'b'); + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2)); + + request.set_sidecar1_idx(idx1); + request.set_sidecar2_idx(idx2); + + PushTwoStringsResponsePB resp; + CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + request, &resp, &controller)); + CHECK_EQ(size1, resp.size1()); + CHECK_EQ(resp.data1(), s1); + CHECK_EQ(size2, resp.size2()); + CHECK_EQ(resp.data2(), s2); + } + + void DoTestExpectTimeout(const Proxy& p, + const MonoDelta& timeout, + bool* is_negotiaton_error = nullptr) { + SleepRequestPB req; + SleepResponsePB resp; + // Sleep for 500ms longer than the call timeout. + int sleep_micros = timeout.ToMicroseconds() + 500 * 1000; + req.set_sleep_micros(sleep_micros); + + RpcController c; + c.set_timeout(timeout); + Stopwatch sw; + sw.start(); + Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c); + sw.stop(); + ASSERT_FALSE(s.ok()); + if (is_negotiaton_error != nullptr) { + *is_negotiaton_error = c.negotiation_failed(); + } + + int expected_millis = timeout.ToMilliseconds(); + int elapsed_millis = sw.elapsed().wall_millis(); + + // We shouldn't timeout significantly faster than our configured timeout. + EXPECT_GE(elapsed_millis, expected_millis - 10); + // And we also shouldn't take the full time that we asked for + EXPECT_LT(elapsed_millis * 1000, sleep_micros); + EXPECT_TRUE(s.IsTimedOut()); + LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << sw.elapsed().wall_seconds(); + } + + void StartTestServer(Sockaddr *server_addr, bool enable_ssl = false) { + DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl); + } + + void StartTestServerWithGeneratedCode(Sockaddr *server_addr, bool enable_ssl = false) { + DoStartTestServer<CalculatorService>(server_addr, enable_ssl); + } + + // Start a simple socket listening on a local port, returning the address. + // This isn't an RPC server -- just a plain socket which can be helpful for testing. + Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) { + Sockaddr bind_addr; + bind_addr.set_port(0); + RETURN_NOT_OK(listen_sock->Init(0)); + RETURN_NOT_OK(listen_sock->BindAndListen(bind_addr, 1)); + RETURN_NOT_OK(listen_sock->GetSocketAddress(listen_addr)); + LOG(INFO) << "Bound to: " << listen_addr->ToString(); + return Status::OK(); + } + + private: + + static Slice GetSidecarPointer(const RpcController& controller, int idx, + int expected_size) { + Slice sidecar; + CHECK_OK(controller.GetInboundSidecar(idx, &sidecar)); + CHECK_EQ(expected_size, sidecar.size()); + return Slice(sidecar.data(), expected_size); + } + + template<class ServiceClass> + void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) { + server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl); + std::shared_ptr<AcceptorPool> pool; + ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool)); + ASSERT_OK(pool->Start(2)); + *server_addr = pool->bind_address(); + mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker"); + result_tracker_.reset(new ResultTracker(mem_tracker_)); + + gscoped_ptr<ServiceIf> service(new ServiceClass(metric_entity_, result_tracker_)); + service_name_ = service->service_name(); + scoped_refptr<MetricEntity> metric_entity = server_messenger_->metric_entity(); + service_pool_ = new ServicePool(std::move(service), metric_entity, service_queue_length_); + server_messenger_->RegisterService(service_name_, service_pool_); + ASSERT_OK(service_pool_->Init(n_worker_threads_)); + } + + protected: + string service_name_; + std::shared_ptr<Messenger> server_messenger_; + scoped_refptr<ServicePool> service_pool_; + std::shared_ptr<kudu::MemTracker> mem_tracker_; + scoped_refptr<ResultTracker> result_tracker_; + int n_worker_threads_; + int service_queue_length_; + int n_server_reactor_threads_; + int keepalive_time_ms_; + + MetricRegistry metric_registry_; + scoped_refptr<MetricEntity> metric_entity_; +}; + +} // namespace rpc +} // namespace kudu +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc new file mode 100644 index 0000000..63b7b73 --- /dev/null +++ b/be/src/kudu/rpc/rpc-test.cc @@ -0,0 +1,808 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc-test-base.h" + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/bind.hpp> +#include <gtest/gtest.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/serialization.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/env.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/test_util.h" + +METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep); +METRIC_DECLARE_histogram(rpc_incoming_queue_time); + +DECLARE_bool(rpc_reopen_outbound_connections); +DECLARE_int32(rpc_negotiation_inject_delay_ms); + +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::unordered_map; +using std::vector; + +namespace kudu { +namespace rpc { + +class TestRpc : public RpcTestBase, public ::testing::WithParamInterface<bool> { +}; + +// This is used to run all parameterized tests with and without SSL. +INSTANTIATE_TEST_CASE_P(OptionalSSL, TestRpc, testing::Values(false, true)); + +TEST_F(TestRpc, TestSockaddr) { + Sockaddr addr1, addr2; + addr1.set_port(1000); + addr2.set_port(2000); + // port is ignored when comparing Sockaddr objects + ASSERT_FALSE(addr1 < addr2); + ASSERT_FALSE(addr2 < addr1); + ASSERT_EQ(1000, addr1.port()); + ASSERT_EQ(2000, addr2.port()); + ASSERT_EQ(string("0.0.0.0:1000"), addr1.ToString()); + ASSERT_EQ(string("0.0.0.0:2000"), addr2.ToString()); + Sockaddr addr3(addr1); + ASSERT_EQ(string("0.0.0.0:1000"), addr3.ToString()); +} + +TEST_P(TestRpc, TestMessengerCreateDestroy) { + shared_ptr<Messenger> messenger(CreateMessenger("TestCreateDestroy", 1, GetParam())); + LOG(INFO) << "started messenger " << messenger->name(); + messenger->Shutdown(); +} + +// Test starting and stopping a messenger. This is a regression +// test for a segfault seen in early versions of the RPC code, +// in which shutting down the acceptor would trigger an assert, +// making our tests flaky. +TEST_P(TestRpc, TestAcceptorPoolStartStop) { + int n_iters = AllowSlowTests() ? 100 : 5; + for (int i = 0; i < n_iters; i++) { + shared_ptr<Messenger> messenger(CreateMessenger("TestAcceptorPoolStartStop", 1, GetParam())); + shared_ptr<AcceptorPool> pool; + ASSERT_OK(messenger->AddAcceptorPool(Sockaddr(), &pool)); + Sockaddr bound_addr; + ASSERT_OK(pool->GetBoundAddress(&bound_addr)); + ASSERT_NE(0, bound_addr.port()); + ASSERT_OK(pool->Start(2)); + messenger->Shutdown(); + } +} + +TEST_F(TestRpc, TestConnHeaderValidation) { + MessengerBuilder mb("TestRpc.TestConnHeaderValidation"); + const int conn_hdr_len = kMagicNumberLength + kHeaderFlagsLength; + uint8_t buf[conn_hdr_len]; + serialization::SerializeConnHeader(buf); + ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len))); +} + +// Test making successful RPC calls. +TEST_P(TestRpc, TestCall) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@" + "{remote=$0, user_credentials=", + server_addr.ToString())); + + for (int i = 0; i < 10; i++) { + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + } +} + +// Test that connecting to an invalid server properly throws an error. +TEST_P(TestRpc, TestCallToBadServer) { + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam())); + Sockaddr addr; + addr.set_port(0); + Proxy p(client_messenger, addr, GenericCalculatorService::static_service_name()); + + // Loop a few calls to make sure that we properly set up and tear down + // the connections. + for (int i = 0; i < 5; i++) { + Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName); + LOG(INFO) << "Status: " << s.ToString(); + ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString(); + } +} + +// Test that RPC calls can be failed with an error status on the server. +TEST_P(TestRpc, TestInvalidMethodCall) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Call the method which fails. + Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist"); + ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "bad method"); +} + +// Test that the error message returned when connecting to the wrong service +// is reasonable. +TEST_P(TestRpc, TestWrongService) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client with the wrong service name. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, "WrongServiceName"); + + // Call the method which fails. + Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist"); + ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), + "Service unavailable: service WrongServiceName " + "not registered on TestServer"); +} + +// Test that we can still make RPC connections even if many fds are in use. +// This is a regression test for KUDU-650. +TEST_P(TestRpc, TestHighFDs) { + // This test can only run if ulimit is set high. + const int kNumFakeFiles = 3500; + const int kMinUlimit = kNumFakeFiles + 100; + if (env_->GetOpenFileLimit() < kMinUlimit) { + LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit; + return; + } + + // Open a bunch of fds just to increase our fd count. + vector<RandomAccessFile*> fake_files; + ElementDeleter d(&fake_files); + for (int i = 0; i < kNumFakeFiles; i++) { + unique_ptr<RandomAccessFile> f; + CHECK_OK(Env::Default()->NewRandomAccessFile("/dev/zero", &f)); + fake_files.push_back(f.release()); + } + + // Set up server and client, and verify we can make a successful call. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); +} + +// Test that connections are kept alive between calls. +TEST_P(TestRpc, TestConnectionKeepalive) { + // Only run one reactor per messenger, so we can grab the metrics from that + // one without having to check all. + n_server_reactor_threads_ = 1; + keepalive_time_ms_ = 500; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + + SleepFor(MonoDelta::FromMilliseconds(5)); + + ReactorMetrics metrics; + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connection"; + ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections"; + + ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections"; + ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections"; + + SleepFor(MonoDelta::FromMilliseconds(2 * keepalive_time_ms_)); + + // After sleeping, the keepalive timer should have closed both sides of + // the connection. + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.num_server_connections_) << "Server should have 0 server connections"; + ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections"; + + ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections"; + ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections"; +} + +// Test that outbound connections to the same server are reopen upon every RPC +// call when the 'rpc_reopen_outbound_connections' flag is set. +TEST_P(TestRpc, TestReopenOutboundConnections) { + // Set the flag to enable special mode: close and reopen already established + // outbound connections. + FLAGS_rpc_reopen_outbound_connections = true; + + // Only run one reactor per messenger, so we can grab the metrics from that + // one without having to check all. + n_server_reactor_threads_ = 1; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Verify the initial counters. + ReactorMetrics metrics; + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.total_client_connections_); + ASSERT_EQ(0, metrics.total_server_connections_); + ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.total_client_connections_); + ASSERT_EQ(0, metrics.total_server_connections_); + + // Run several iterations, just in case. + for (int i = 0; i < 32; ++i) { + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.total_client_connections_); + ASSERT_EQ(i + 1, metrics.total_server_connections_); + ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(i + 1, metrics.total_client_connections_); + ASSERT_EQ(0, metrics.total_server_connections_); + } +} + +// Test that an outbound connection is closed and a new one is open if going +// from ANY_CREDENTIALS to PRIMARY_CREDENTIALS policy for RPC calls to the same +// destination. +// Test that changing from PRIMARY_CREDENTIALS policy to ANY_CREDENTIALS policy +// re-uses the connection established with PRIMARY_CREDENTIALS policy. +TEST_P(TestRpc, TestCredentialsPolicy) { + // Only run one reactor per messenger, so we can grab the metrics from that + // one without having to check all. + n_server_reactor_threads_ = 1; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Verify the initial counters. + ReactorMetrics metrics; + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.total_client_connections_); + ASSERT_EQ(0, metrics.total_server_connections_); + ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + ASSERT_EQ(0, metrics.total_client_connections_); + ASSERT_EQ(0, metrics.total_server_connections_); + + // Make an RPC call with ANY_CREDENTIALS policy. + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(0, metrics.total_client_connections_); + EXPECT_EQ(1, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_server_connections_); + EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(1, metrics.total_client_connections_); + EXPECT_EQ(0, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_client_connections_); + + // This is to allow all the data to be sent so the connection becomes idle. + SleepFor(MonoDelta::FromMilliseconds(5)); + + // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open connection + // with ANY_CREDENTIALS policy should be closed and a new one established + // with PRIMARY_CREDENTIALS policy. + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName, + CredentialsPolicy::PRIMARY_CREDENTIALS)); + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(0, metrics.total_client_connections_); + EXPECT_EQ(2, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_server_connections_); + EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(2, metrics.total_client_connections_); + EXPECT_EQ(0, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_client_connections_); + + // Make another RPC call with ANY_CREDENTIALS policy. The already established + // connection with PRIMARY_CREDENTIALS policy should be re-used because + // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which + // the currently open connection has been established with. + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(0, metrics.total_client_connections_); + EXPECT_EQ(2, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_server_connections_); + EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics)); + EXPECT_EQ(2, metrics.total_client_connections_); + EXPECT_EQ(0, metrics.total_server_connections_); + EXPECT_EQ(1, metrics.num_client_connections_); +} + +// Test that a call which takes longer than the keepalive time +// succeeds -- i.e that we don't consider a connection to be "idle" on the +// server if there is a call outstanding on it. +TEST_P(TestRpc, TestCallLongerThanKeepalive) { + // Set a short keepalive. + keepalive_time_ms_ = 1000; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Make a call which sleeps longer than the keepalive. + RpcController controller; + SleepRequestPB req; + req.set_sleep_micros(3 * 1000 * 1000); // 3 seconds. + req.set_deferred(true); + SleepResponsePB resp; + ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName, + req, &resp, &controller)); +} + +// Test that the RpcSidecar transfers the expected messages. +TEST_P(TestRpc, TestRpcSidecar) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam())); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Test a zero-length sidecar + DoTestSidecar(p, 0, 0); + + // Test some small sidecars + DoTestSidecar(p, 123, 456); + + // Test some larger sidecars to verify that we properly handle the case where + // we can't write the whole response to the socket in a single call. + DoTestSidecar(p, 3000 * 1024, 2000 * 1024); + + DoTestOutgoingSidecar(p, 0, 0); + DoTestOutgoingSidecar(p, 123, 456); + DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024); +} + +TEST_P(TestRpc, TestRpcSidecarLimits) { + { + // Test that the limits on the number of sidecars is respected. + RpcController controller; + string s = "foo"; + int idx; + for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) { + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); + } + + CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok()); + } + + { + // Test that the payload may not exceed --rpc_max_message_size. + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam())); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + RpcController controller; + string s(FLAGS_rpc_max_message_size + 1, 'a'); + int idx; + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); + + PushTwoStringsRequestPB request; + request.set_sidecar1_idx(idx); + request.set_sidecar2_idx(idx); + PushTwoStringsResponsePB resp; + Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + request, &resp, &controller); + ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString(); + // Remote responds to extra-large payloads by closing the connection. + ASSERT_STR_MATCHES(status.ToString(), + // Linux + "Connection reset by peer" + // macOS, while reading from socket. + "|got EOF from remote" + // macOS, while writing to socket. + "|Protocol wrong type for socket"); + } +} + +// Test that timeouts are properly handled. +TEST_P(TestRpc, TestCallTimeout) { + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Test a very short timeout - we expect this will time out while the + // call is still trying to connect, or in the send queue. This was triggering ASAN failures + // before. + ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1))); + + // Test a longer timeout - expect this will time out after we send the request, + // but shorter than our threshold for two-stage timeout handling. + ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(200))); + + // Test a longer timeout - expect this will trigger the "two-stage timeout" + // code path. + ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1500))); +} + +// Inject 500ms delay in negotiation, and send a call with a short timeout, followed by +// one with a long timeout. The call with the long timeout should succeed even though +// the previous one failed. +// +// This is a regression test against prior behavior where the connection negotiation +// was assigned the timeout of the first call on that connection. So, if the first +// call had a short timeout, the later call would also inherit the timed-out negotiation. +TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) { + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + FLAGS_rpc_negotiation_inject_delay_ms = 500; + ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50))); + ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + + // Only the second call should have been received by the server, because we + // don't bother sending an already-timed-out call. + auto metric_map = server_messenger_->metric_entity()->UnsafeMetricsMapForTests(); + auto* metric = FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time).get(); + ASSERT_EQ(1, down_cast<Histogram*>(metric)->TotalCount()); +} + +static void AcceptAndReadForever(Socket* listen_sock) { + // Accept the TCP connection. + Socket server_sock; + Sockaddr remote; + CHECK_OK(listen_sock->Accept(&server_sock, &remote, 0)); + + MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(10); + + size_t nread; + uint8_t buf[1024]; + while (server_sock.BlockingRecv(buf, sizeof(buf), &nread, deadline).ok()) { + } +} + +// Starts a fake listening socket which never actually negotiates. +// Ensures that the client gets a reasonable status code in this case. +TEST_F(TestRpc, TestNegotiationTimeout) { + // Set up a simple socket server which accepts a connection. + Sockaddr server_addr; + Socket listen_sock; + ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); + + // Create another thread to accept the connection on the fake server. + scoped_refptr<Thread> acceptor_thread; + ASSERT_OK(Thread::Create("test", "acceptor", + AcceptAndReadForever, &listen_sock, + &acceptor_thread)); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client")); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + bool is_negotiation_error = false; + ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout( + p, MonoDelta::FromMilliseconds(100), &is_negotiation_error)); + EXPECT_TRUE(is_negotiation_error); + + acceptor_thread->Join(); +} + +// Test that client calls get failed properly when the server they're connected to +// shuts down. +TEST_F(TestRpc, TestServerShutsDown) { + // Set up a simple socket server which accepts a connection. + Sockaddr server_addr; + Socket listen_sock; + ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); + + // Set up client. + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("Client")); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + // Send a call. + AddRequestPB req; + req.set_x(rand()); + req.set_y(rand()); + AddResponsePB resp; + + vector<unique_ptr<RpcController>> controllers; + + // We'll send several calls async, and ensure that they all + // get the error status when the connection drops. + int n_calls = 5; + + CountDownLatch latch(n_calls); + for (int i = 0; i < n_calls; i++) { + controllers.emplace_back(new RpcController()); + p.AsyncRequest(GenericCalculatorService::kAddMethodName, req, &resp, controllers.back().get(), + boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); + } + + // Accept the TCP connection. + Socket server_sock; + Sockaddr remote; + ASSERT_OK(listen_sock.Accept(&server_sock, &remote, 0)); + + // The call is still in progress at this point. + for (const auto& controller : controllers) { + ASSERT_FALSE(controller->finished()); + } + + // Shut down the socket. + ASSERT_OK(listen_sock.Close()); + ASSERT_OK(server_sock.Close()); + + // Wait for the call to be marked finished. + latch.Wait(); + + // Should get the appropriate error on the client for all calls; + for (const auto& controller : controllers) { + ASSERT_TRUE(controller->finished()); + Status s = controller->status(); + ASSERT_TRUE(s.IsNetworkError()) << + "Unexpected status: " << s.ToString(); + + // Any of these errors could happen, depending on whether we were + // in the middle of sending a call while the connection died, or + // if we were already waiting for responses. + // + // ECONNREFUSED is possible because the sending of the calls is async. + // For example, the following interleaving: + // - Enqueue 3 calls + // - Reactor wakes up, creates connection, starts writing calls + // - Enqueue 2 more calls + // - Shut down socket + // - Reactor wakes up, tries to write more of the first 3 calls, gets error + // - Reactor shuts down connection + // - Reactor sees the 2 remaining calls, makes a new connection + // - Because the socket is shut down, gets ECONNREFUSED. + // + // EINVAL is possible if the controller socket had already disconnected by + // the time it trys to set the SO_SNDTIMEO socket option as part of the + // normal blocking SASL handshake. + ASSERT_TRUE(s.posix_code() == EPIPE || + s.posix_code() == ECONNRESET || + s.posix_code() == ESHUTDOWN || + s.posix_code() == ECONNREFUSED || + s.posix_code() == EINVAL) + << "Unexpected status: " << s.ToString(); + } +} + +// Test handler latency metric. +TEST_P(TestRpc, TestRpcHandlerLatencyMetric) { + + const uint64_t sleep_micros = 20 * 1000; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); + + RpcController controller; + SleepRequestPB req; + req.set_sleep_micros(sleep_micros); + req.set_deferred(true); + SleepResponsePB resp; + ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller)); + + const unordered_map<const MetricPrototype*, scoped_refptr<Metric> > metric_map = + server_messenger_->metric_entity()->UnsafeMetricsMapForTests(); + + scoped_refptr<Histogram> latency_histogram = down_cast<Histogram *>( + FindOrDie(metric_map, + &METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get()); + + LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests(); + LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests(); + LOG(INFO) << "Sleep() max lat: " << latency_histogram->MaxValueForTests(); + LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount(); + + ASSERT_EQ(1, latency_histogram->TotalCount()); + ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros); + ASSERT_TRUE(latency_histogram->MinValueForTests() == latency_histogram->MaxValueForTests()); + + // TODO: Implement an incoming queue latency test. + // For now we just assert that the metric exists. + ASSERT_TRUE(FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time)); +} + +static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger, + CountDownLatch* latch) { + messenger->reset(); + latch->CountDown(); +} + +TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) { + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam())); + Sockaddr bad_addr; + CountDownLatch latch(1); + + AddRequestPB req; + req.set_x(rand()); + req.set_y(rand()); + AddResponsePB resp; + RpcController controller; + controller.set_timeout(MonoDelta::FromMilliseconds(1)); + { + Proxy p(client_messenger, bad_addr, "xxx"); + p.AsyncRequest("my-fake-method", req, &resp, &controller, + boost::bind(&DestroyMessengerCallback, &client_messenger, &latch)); + } + latch.Wait(); +} + +// Test that setting the client timeout / deadline gets propagated to RPC +// services. +TEST_P(TestRpc, TestRpcContextClientDeadline) { + const uint64_t sleep_micros = 20 * 1000; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); + + SleepRequestPB req; + req.set_sleep_micros(sleep_micros); + req.set_client_timeout_defined(true); + SleepResponsePB resp; + RpcController controller; + Status s = p.SyncRequest("Sleep", req, &resp, &controller); + ASSERT_TRUE(s.IsRemoteError()); + ASSERT_STR_CONTAINS(s.ToString(), "Missing required timeout"); + + controller.Reset(); + controller.set_timeout(MonoDelta::FromMilliseconds(1000)); + ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller)); +} + +// Test that setting an call-level application feature flag to an unknown value +// will make the server reject the call. +TEST_P(TestRpc, TestApplicationFeatureFlag) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); + + { // Supported flag + AddRequestPB req; + req.set_x(1); + req.set_y(2); + AddResponsePB resp; + RpcController controller; + controller.RequireServerFeature(FeatureFlags::FOO); + Status s = p.SyncRequest("Add", req, &resp, &controller); + SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString())); + ASSERT_TRUE(s.ok()); + ASSERT_EQ(resp.result(), 3); + } + + { // Unsupported flag + AddRequestPB req; + req.set_x(1); + req.set_y(2); + AddResponsePB resp; + RpcController controller; + controller.RequireServerFeature(FeatureFlags::FOO); + controller.RequireServerFeature(99); + Status s = p.SyncRequest("Add", req, &resp, &controller); + SCOPED_TRACE(strings::Substitute("unsupported response: $0", s.ToString())); + ASSERT_TRUE(s.IsRemoteError()); + } +} + +TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) { + auto savedFlags = kSupportedServerRpcFeatureFlags; + auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; }); + kSupportedServerRpcFeatureFlags = {}; + + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServerWithGeneratedCode(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl)); + Proxy p(client_messenger, server_addr, CalculatorService::static_service_name()); + + { // Required flag + AddRequestPB req; + req.set_x(1); + req.set_y(2); + AddResponsePB resp; + RpcController controller; + controller.RequireServerFeature(FeatureFlags::FOO); + Status s = p.SyncRequest("Add", req, &resp, &controller); + SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString())); + ASSERT_TRUE(s.IsNotSupported()); + } + + { // No required flag + AddRequestPB req; + req.set_x(1); + req.set_y(2); + AddResponsePB resp; + RpcController controller; + Status s = p.SyncRequest("Add", req, &resp, &controller); + SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString())); + ASSERT_TRUE(s.ok()); + } +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc.cc b/be/src/kudu/rpc/rpc.cc new file mode 100644 index 0000000..685da13 --- /dev/null +++ b/be/src/kudu/rpc/rpc.cc @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc.h" + +#include <boost/bind.hpp> +#include <string> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/rpc_header.pb.h" + +using std::shared_ptr; +using strings::Substitute; +using strings::SubstituteAndAppend; + +namespace kudu { + +namespace rpc { + +bool RpcRetrier::HandleResponse(Rpc* rpc, Status* out_status) { + DCHECK(rpc); + DCHECK(out_status); + + // Always retry TOO_BUSY and UNAVAILABLE errors. + const Status controller_status = controller_.status(); + if (controller_status.IsRemoteError()) { + const ErrorStatusPB* err = controller_.error_response(); + if (err && + err->has_code() && + (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY || + err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) { + // The UNAVAILABLE code is a broader counterpart of the + // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later. + DelayedRetry(rpc, controller_status); + return true; + } + } + + *out_status = controller_status; + return false; +} + +void RpcRetrier::DelayedRetry(Rpc* rpc, const Status& why_status) { + if (!why_status.ok() && (last_error_.ok() || last_error_.IsTimedOut())) { + last_error_ = why_status; + } + // Add some jitter to the retry delay. + // + // If the delay causes us to miss our deadline, RetryCb will fail the + // RPC on our behalf. + int num_ms = ++attempt_num_ + ((rand() % 5)); + messenger_->ScheduleOnReactor(boost::bind(&RpcRetrier::DelayedRetryCb, + this, + rpc, _1), + MonoDelta::FromMilliseconds(num_ms)); +} + +void RpcRetrier::DelayedRetryCb(Rpc* rpc, const Status& status) { + Status new_status = status; + if (new_status.ok()) { + // Has this RPC timed out? + if (deadline_.Initialized()) { + if (MonoTime::Now() > deadline_) { + string err_str = Substitute("$0 passed its deadline", rpc->ToString()); + if (!last_error_.ok()) { + SubstituteAndAppend(&err_str, ": $0", last_error_.ToString()); + } + new_status = Status::TimedOut(err_str); + } + } + } + if (new_status.ok()) { + controller_.Reset(); + rpc->SendRpc(); + } else { + rpc->SendRpcCb(new_status); + } +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc.h b/be/src/kudu/rpc/rpc.h new file mode 100644 index 0000000..7bc484f --- /dev/null +++ b/be/src/kudu/rpc/rpc.h @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_RPC_H +#define KUDU_RPC_RPC_H + +#include <memory> +#include <string> + +#include "kudu/gutil/callback.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status_callback.h" + +namespace kudu { + +namespace rpc { + +class Messenger; +class Rpc; + +// Result status of a retriable Rpc. +// +// TODO Consider merging this with ScanRpcStatus. +struct RetriableRpcStatus { + enum Result { + // There was no error, i.e. the Rpc was successful. + OK, + + // The Rpc got an error and it's not retriable. + NON_RETRIABLE_ERROR, + + // The server couldn't be reached, i.e. there was a network error while + // reaching the replica or a DNS resolution problem. + SERVER_NOT_ACCESSIBLE, + + // The server received the request but it was not ready to serve it right + // away. It might happen that the server was too busy and did not have + // necessary resources or information to serve the request but it + // anticipates it should be ready to serve the request really soon, so it's + // worth retrying the request at a later time. + SERVICE_UNAVAILABLE, + + // For rpc's that are meant only for the leader of a shared resource, when the server + // we're interacting with is not the leader. + REPLICA_NOT_LEADER, + + // The server doesn't know the resource we're interacting with. For instance a TabletServer + // is not part of the config for the tablet we're trying to write to. + RESOURCE_NOT_FOUND, + + // The authentication token supplied with the operation was found invalid + // by the server. Most likely, the token has expired. If so, get a new token + // using client credentials and retry the operation with it. + INVALID_AUTHENTICATION_TOKEN, + }; + + Result result; + Status status; +}; + +// This class picks a server among a possible set of servers serving a given resource. +// +// TODO Currently this only picks the leader, though it wouldn't be unfeasible to have this +// have an enum so that it can pick any server. +template <class Server> +class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> { + public: + virtual ~ServerPicker() {} + + typedef Callback<void(const Status& status, Server* server)> ServerPickedCallback; + + // Picks the leader among the replicas serving a resource. + // If the leader was found, it calls the callback with Status::OK() and + // with 'server' set to the current leader, otherwise calls the callback + // with 'status' set to the failure reason, and 'server' set to nullptr. + // If picking a leader takes longer than 'deadline' the callback is called with + // Status::TimedOut(). + virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) = 0; + + // Marks a server as failed/unacessible. + virtual void MarkServerFailed(Server *server, const Status &status) = 0; + + // Marks a server as not the leader of config serving the resource we're trying to interact with. + virtual void MarkReplicaNotLeader(Server* replica) = 0; + + // Marks a server as not serving the resource we want. + virtual void MarkResourceNotFound(Server *replica) = 0; +}; + +// Provides utilities for retrying failed RPCs. +// +// All RPCs should use HandleResponse() to retry certain generic errors. +class RpcRetrier { + public: + RpcRetrier(MonoTime deadline, std::shared_ptr<rpc::Messenger> messenger) + : attempt_num_(1), + deadline_(deadline), + messenger_(std::move(messenger)) { + if (deadline_.Initialized()) { + controller_.set_deadline(deadline_); + } + controller_.Reset(); + } + + // Tries to handle a failed RPC. + // + // If it was handled (e.g. scheduled for retry in the future), returns + // true. In this case, callers should ensure that 'rpc' remains alive. + // + // Otherwise, returns false and writes the controller status to + // 'out_status'. + bool HandleResponse(Rpc* rpc, Status* out_status); + + // Retries an RPC at some point in the near future. If 'why_status' is not OK, + // records it as the most recent error causing the RPC to retry. This is + // reported to the caller eventually if the RPC never succeeds. + // + // If the RPC's deadline expires, the callback will fire with a timeout + // error when the RPC comes up for retrying. This is true even if the + // deadline has already expired at the time that Retry() was called. + // + // Callers should ensure that 'rpc' remains alive. + void DelayedRetry(Rpc* rpc, const Status& why_status); + + RpcController* mutable_controller() { return &controller_; } + const RpcController& controller() const { return controller_; } + + const MonoTime& deadline() const { return deadline_; } + + const std::shared_ptr<Messenger>& messenger() const { + return messenger_; + } + + int attempt_num() const { return attempt_num_; } + + // Called when an RPC comes up for retrying. Actually sends the RPC. + void DelayedRetryCb(Rpc* rpc, const Status& status); + + private: + // The next sent rpc will be the nth attempt (indexed from 1). + int attempt_num_; + + // If the remote end is busy, the RPC will be retried (with a small + // delay) until this deadline is reached. + // + // May be uninitialized. + MonoTime deadline_; + + // Messenger to use when sending the RPC. + std::shared_ptr<Messenger> messenger_; + + // RPC controller to use when sending the RPC. + RpcController controller_; + + // In case any retries have already happened, remembers the last error. + // Errors from the server take precedence over timeout errors. + Status last_error_; + + DISALLOW_COPY_AND_ASSIGN(RpcRetrier); +}; + +// An in-flight remote procedure call to some server. +class Rpc { + public: + Rpc(const MonoTime& deadline, + std::shared_ptr<rpc::Messenger> messenger) + : retrier_(deadline, std::move(messenger)) { + } + + virtual ~Rpc() {} + + // Asynchronously sends the RPC to the remote end. + // + // Subclasses should use SendRpcCb() below as the callback function. + virtual void SendRpc() = 0; + + // Returns a string representation of the RPC. + virtual std::string ToString() const = 0; + + // Returns the number of times this RPC has been sent. Will always be at + // least one. + int num_attempts() const { return retrier().attempt_num(); } + + protected: + const RpcRetrier& retrier() const { return retrier_; } + RpcRetrier* mutable_retrier() { return &retrier_; } + + private: + friend class RpcRetrier; + + // Callback for SendRpc(). If 'status' is not OK, something failed + // before the RPC was sent. + virtual void SendRpcCb(const Status& status) = 0; + + // Used to retry some failed RPCs. + RpcRetrier retrier_; + + DISALLOW_COPY_AND_ASSIGN(Rpc); +}; + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_RPC_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_context.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc new file mode 100644 index 0000000..06fd8c5 --- /dev/null +++ b/be/src/kudu/rpc/rpc_context.cc @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc_context.h" + +#include <memory> +#include <ostream> +#include <sstream> + +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/remote_user.h" +#include "kudu/rpc/result_tracker.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/rpc/service_if.h" +#include "kudu/util/hdr_histogram.h" +#include "kudu/util/metrics.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/trace.h" + +using google::protobuf::Message; +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +RpcContext::RpcContext(InboundCall *call, + const google::protobuf::Message *request_pb, + google::protobuf::Message *response_pb, + const scoped_refptr<ResultTracker>& result_tracker) + : call_(CHECK_NOTNULL(call)), + request_pb_(request_pb), + response_pb_(response_pb), + result_tracker_(result_tracker) { + VLOG(4) << call_->remote_method().service_name() << ": Received RPC request for " + << call_->ToString() << ":" << std::endl << SecureDebugString(*request_pb_); + TRACE_EVENT_ASYNC_BEGIN2("rpc_call", "RPC", this, + "call", call_->ToString(), + "request", pb_util::PbTracer::TracePb(*request_pb_)); +} + +RpcContext::~RpcContext() { +} + +void RpcContext::RespondSuccess() { + if (AreResultsTracked()) { + result_tracker_->RecordCompletionAndRespond(call_->header().request_id(), + response_pb_.get()); + } else { + VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success response for " + << call_->ToString() << ":" << std::endl << SecureDebugString(*response_pb_); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "response", pb_util::PbTracer::TracePb(*response_pb_), + "trace", trace()->DumpToString()); + call_->RespondSuccess(*response_pb_); + delete this; + } +} + +void RpcContext::RespondNoCache() { + if (AreResultsTracked()) { + result_tracker_->FailAndRespond(call_->header().request_id(), + response_pb_.get()); + } else { + VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for " + << call_->ToString() << ": " << SecureDebugString(*response_pb_); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "response", pb_util::PbTracer::TracePb(*response_pb_), + "trace", trace()->DumpToString()); + // This is a bit counter intuitive, but when we get the failure but set the error on the + // call's response we call RespondSuccess() instead of RespondFailure(). + call_->RespondSuccess(*response_pb_); + delete this; + } +} + +void RpcContext::RespondFailure(const Status &status) { + if (AreResultsTracked()) { + result_tracker_->FailAndRespond(call_->header().request_id(), + ErrorStatusPB::ERROR_APPLICATION, status); + } else { + VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for " + << call_->ToString() << ": " << status.ToString(); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "status", status.ToString(), + "trace", trace()->DumpToString()); + call_->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, status); + delete this; + } +} + +void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status) { + if (AreResultsTracked()) { + result_tracker_->FailAndRespond(call_->header().request_id(), + err, status); + } else { + VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for " + << call_->ToString() << ": " << status.ToString(); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "status", status.ToString(), + "trace", trace()->DumpToString()); + call_->RespondFailure(err, status); + delete this; + } +} + +void RpcContext::RespondApplicationError(int error_ext_id, const std::string& message, + const Message& app_error_pb) { + if (AreResultsTracked()) { + result_tracker_->FailAndRespond(call_->header().request_id(), + error_ext_id, message, app_error_pb); + } else { + if (VLOG_IS_ON(4)) { + ErrorStatusPB err; + InboundCall::ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err); + VLOG(4) << call_->remote_method().service_name() + << ": Sending application error response for " << call_->ToString() + << ":" << std::endl << SecureDebugString(err); + } + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "response", pb_util::PbTracer::TracePb(app_error_pb), + "trace", trace()->DumpToString()); + call_->RespondApplicationError(error_ext_id, message, app_error_pb); + delete this; + } +} + +const rpc::RequestIdPB* RpcContext::request_id() const { + return call_->header().has_request_id() ? &call_->header().request_id() : nullptr; +} + +Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { + return call_->AddOutboundSidecar(std::move(car), idx); +} + +Status RpcContext::GetInboundSidecar(int idx, Slice* slice) { + return call_->GetInboundSidecar(idx, slice); +} + +const RemoteUser& RpcContext::remote_user() const { + return call_->remote_user(); +} + +void RpcContext::DiscardTransfer() { + call_->DiscardTransfer(); +} + +const Sockaddr& RpcContext::remote_address() const { + return call_->remote_address(); +} + +std::string RpcContext::requestor_string() const { + return call_->remote_user().ToString() + " at " + + call_->remote_address().ToString(); +} + +std::string RpcContext::method_name() const { + return call_->remote_method().method_name(); +} + +std::string RpcContext::service_name() const { + return call_->remote_method().service_name(); +} + +MonoTime RpcContext::GetClientDeadline() const { + return call_->GetClientDeadline(); +} + +Trace* RpcContext::trace() { + return call_->trace(); +} + +void RpcContext::Panic(const char* filepath, int line_number, const string& message) { + // Use the LogMessage class directly so that the log messages appear to come from + // the line of code which caused the panic, not this code. +#define MY_ERROR google::LogMessage(filepath, line_number, google::GLOG_ERROR).stream() +#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream() + + MY_ERROR << "Panic handling " << call_->ToString() << ": " << message; + MY_ERROR << "Request:\n" << SecureDebugString(*request_pb_); + Trace* t = trace(); + if (t) { + MY_ERROR << "RPC trace:"; + t->Dump(&MY_ERROR, true); + } + MY_FATAL << "Exiting due to panic."; + +#undef MY_ERROR +#undef MY_FATAL +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_context.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_context.h b/be/src/kudu/rpc/rpc_context.h new file mode 100644 index 0000000..ac895fc --- /dev/null +++ b/be/src/kudu/rpc/rpc_context.h @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_RPC_CONTEXT_H +#define KUDU_RPC_RPC_CONTEXT_H + +#include <string> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/service_if.h" +#include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { + +class Sockaddr; +class Trace; + +namespace rpc { + +class InboundCall; +class RemoteUser; +class ResultTracker; +class RpcSidecar; + +#define PANIC_RPC(rpc_context, message) \ + do { \ + if (rpc_context) { \ + rpc_context->Panic(__FILE__, __LINE__, (message)); \ + } else { \ + LOG(FATAL) << message; \ + } \ + } while (0) + +// The context provided to a generated ServiceIf. This provides +// methods to respond to the RPC. In the future, this will also +// include methods to access information about the caller: e.g +// authentication info, tracing info, and cancellation status. +// +// This is the server-side analogue to the RpcController class. +class RpcContext { + public: + // Create an RpcContext. This is called only from generated code + // and is not a public API. + RpcContext(InboundCall *call, + const google::protobuf::Message *request_pb, + google::protobuf::Message *response_pb, + const scoped_refptr<ResultTracker>& result_tracker); + + ~RpcContext(); + + // Return the trace buffer for this call. + Trace* trace(); + + // Send a response to the call. The service may call this method + // before or after returning from the original handler method, + // and it may call this method from a different thread. + // + // The response should be prepared already in the response PB pointer + // which was passed to the handler method. + // + // After this method returns, this RpcContext object is destroyed. The request + // and response protobufs are also destroyed. + void RespondSuccess(); + + // Like the above, but doesn't store the results of the service call, if results + // are being tracked. + // Used in cases where a call specific error was set on the response protobuf, + // the call should be considered failed, thus results shouldn't be cached. + void RespondNoCache(); + + // Respond with an error to the client. This sends back an error with the code + // ERROR_APPLICATION. Because there is no more specific error code passed back + // to the client, most applications should create a custom error PB extension + // and use RespondApplicationError(...) below. This method should only be used + // for unexpected errors where the server doesn't expect the client to do any + // more advanced handling. + // + // After this method returns, this RpcContext object is destroyed. The request + // and response protobufs are also destroyed. + void RespondFailure(const Status &status); + + // Respond with an RPC-level error. This typically manifests to the client as + // a remote error, one whose handling is agnostic to the particulars of the + // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE + // usually cause the client to retry the RPC at a later time. + // + // After this method returns, this RpcContext object is destroyed. The request + // and response protobufs are also destroyed. + void RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status); + + // Respond with an application-level error. This causes the caller to get a + // RemoteError status with the provided string message. Additionally, a + // service-specific error extension is passed back to the client. The + // extension must be registered with the ErrorStatusPB protobuf. For + // example: + // + // message MyServiceError { + // extend kudu.rpc.ErrorStatusPB { + // optional MyServiceError my_service_error_ext = 101; + // } + // // Add any extra fields or status codes you want to pass back to + // // the client here. + // required string extra_error_data = 1; + // } + // + // NOTE: the numeric '101' above must be an integer greater than 101 + // and must be unique across your code base. + // + // Given the above definition in your service protobuf file, you would + // use this method like: + // + // MyServiceError err; + // err.set_extra_error_data("foo bar"); + // ctx->RespondApplicationError(MyServiceError::my_service_error_ext.number(), + // "Some error occurred", err); + // + // The client side may then retreieve the error by calling: + // const MyServiceError& err_details = + // controller->error_response()->GetExtension(MyServiceError::my_service_error_ext); + // + // After this method returns, this RpcContext object is destroyed. The request + // and response protobufs are also destroyed. + void RespondApplicationError(int error_ext_id, const std::string& message, + const google::protobuf::Message& app_error_pb); + + + // Adds an RpcSidecar to the response. This is the preferred method for + // transferring large amounts of binary data, because this avoids additional + // copies made by serializing the protobuf. + // + // Assumes no changes to the sidecar's data are made after insertion. + // + // Upon success, writes the index of the sidecar (necessary to be retrieved + // later) to 'idx'. Call may fail if all sidecars have already been used + // by the RPC response. + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); + + // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out + // of bounds. + Status GetInboundSidecar(int idx, Slice* slice); + + // Return the identity of remote user who made this call. + const RemoteUser& remote_user() const; + + // Discards the memory associated with the inbound call's payload. All previously + // obtained sidecar slices will be invalidated by this call. It is an error to call + // GetInboundSidecar() after this method. request_pb() remains valid. + // This is useful in the case where the server wishes to delay responding to an RPC + // (perhaps to control the rate of RPC requests), but knows that the RPC payload itself + // won't be processed any further. + void DiscardTransfer(); + + // Return the remote IP address and port which sent the current RPC call. + const Sockaddr& remote_address() const; + + // A string identifying the requestor -- both the user info and the IP address. + // Suitable for use in log messages. + std::string requestor_string() const; + + // Return the name of the RPC service method being called. + std::string method_name() const; + + // Return the name of the RPC service being called. + std::string service_name() const; + + const google::protobuf::Message *request_pb() const { return request_pb_.get(); } + google::protobuf::Message *response_pb() const { return response_pb_.get(); } + + // Return an upper bound on the client timeout deadline. This does not + // account for transmission delays between the client and the server. + // If the client did not specify a deadline, returns MonoTime::Max(). + MonoTime GetClientDeadline() const; + + // Whether the results of this RPC are tracked with a ResultTracker. + // If this returns true, both result_tracker() and request_id() should return non-null results. + bool AreResultsTracked() const { return result_tracker_.get() != nullptr; } + + // Returns this call's result tracker, if it is set. + const scoped_refptr<ResultTracker>& result_tracker() const { + return result_tracker_; + } + + // Returns this call's request id, if it is set. + const rpc::RequestIdPB* request_id() const; + + // Panic the server. This logs a fatal error with the given message, and + // also includes the current RPC request, requestor, trace information, etc, + // to make it easier to debug. + // + // Call this via the PANIC_RPC() macro. + void Panic(const char* filepath, int line_number, const std::string& message) + __attribute__((noreturn)); + + private: + friend class ResultTracker; + InboundCall* const call_; + const gscoped_ptr<const google::protobuf::Message> request_pb_; + const gscoped_ptr<google::protobuf::Message> response_pb_; + scoped_refptr<ResultTracker> result_tracker_; +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_controller.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_controller.cc b/be/src/kudu/rpc/rpc_controller.cc new file mode 100644 index 0000000..505db22 --- /dev/null +++ b/be/src/kudu/rpc/rpc_controller.cc @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc_controller.h" + +#include <algorithm> +#include <memory> +#include <mutex> + +#include <glog/logging.h> + +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/rpc_header.pb.h" + +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +RpcController::RpcController() + : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS) { + DVLOG(4) << "RpcController " << this << " constructed"; +} + +RpcController::~RpcController() { + DVLOG(4) << "RpcController " << this << " destroyed"; +} + +void RpcController::Swap(RpcController* other) { + // Cannot swap RPC controllers while they are in-flight. + if (call_) { + CHECK(finished()); + } + if (other->call_) { + CHECK(other->finished()); + } + + std::swap(outbound_sidecars_, other->outbound_sidecars_); + std::swap(timeout_, other->timeout_); + std::swap(credentials_policy_, other->credentials_policy_); + std::swap(call_, other->call_); +} + +void RpcController::Reset() { + std::lock_guard<simple_spinlock> l(lock_); + if (call_) { + CHECK(finished()); + } + call_.reset(); + required_server_features_.clear(); + credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS; +} + +bool RpcController::finished() const { + if (call_) { + return call_->IsFinished(); + } + return false; +} + +bool RpcController::negotiation_failed() const { + if (call_) { + DCHECK(finished()); + return call_->IsNegotiationError(); + } + return false; +} + +Status RpcController::status() const { + if (call_) { + return call_->status(); + } + return Status::OK(); +} + +const ErrorStatusPB* RpcController::error_response() const { + if (call_) { + return call_->error_pb(); + } + return nullptr; +} + +Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const { + return call_->call_response_->GetSidecar(idx, sidecar); +} + +void RpcController::set_timeout(const MonoDelta& timeout) { + std::lock_guard<simple_spinlock> l(lock_); + DCHECK(!call_ || call_->state() == OutboundCall::READY); + timeout_ = timeout; +} + +void RpcController::set_deadline(const MonoTime& deadline) { + set_timeout(deadline - MonoTime::Now()); +} + +void RpcController::SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id) { + request_id_ = std::move(request_id); +} + +bool RpcController::has_request_id() const { + return request_id_ != nullptr; +} + +const RequestIdPB& RpcController::request_id() const { + DCHECK(has_request_id()); + return *request_id_; +} + +void RpcController::RequireServerFeature(uint32_t feature) { + DCHECK(!call_ || call_->state() == OutboundCall::READY); + required_server_features_.insert(feature); +} + +MonoDelta RpcController::timeout() const { + std::lock_guard<simple_spinlock> l(lock_); + return timeout_; +} + +Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { + if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) { + return Status::RuntimeError("All available sidecars already used"); + } + outbound_sidecars_.emplace_back(std::move(car)); + *idx = outbound_sidecars_.size() - 1; + return Status::OK(); +} + +void RpcController::SetRequestParam(const google::protobuf::Message& req) { + DCHECK(call_ != nullptr); + call_->SetRequestPayload(req, std::move(outbound_sidecars_)); +} + +} // namespace rpc +} // namespace kudu
