http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
new file mode 100644
index 0000000..c40f546
--- /dev/null
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -0,0 +1,585 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_TEST_BASE_H
+#define KUDU_RPC_RPC_TEST_BASE_H
+
+#include <algorithm>
+#include <atomic>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/proxy.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rtest.pb.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/rpc/rtest.service.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+namespace kudu {
+namespace rpc {
+
+using kudu::rpc_test::AddRequestPB;
+using kudu::rpc_test::AddResponsePB;
+using kudu::rpc_test::CalculatorError;
+using kudu::rpc_test::CalculatorServiceIf;
+using kudu::rpc_test::CalculatorServiceProxy;
+using kudu::rpc_test::EchoRequestPB;
+using kudu::rpc_test::EchoResponsePB;
+using kudu::rpc_test::ExactlyOnceRequestPB;
+using kudu::rpc_test::ExactlyOnceResponsePB;
+using kudu::rpc_test::FeatureFlags;
+using kudu::rpc_test::PanicRequestPB;
+using kudu::rpc_test::PanicResponsePB;
+using kudu::rpc_test::PushTwoStringsRequestPB;
+using kudu::rpc_test::PushTwoStringsResponsePB;
+using kudu::rpc_test::SendTwoStringsRequestPB;
+using kudu::rpc_test::SendTwoStringsResponsePB;
+using kudu::rpc_test::SleepRequestPB;
+using kudu::rpc_test::SleepResponsePB;
+using kudu::rpc_test::TestInvalidResponseRequestPB;
+using kudu::rpc_test::TestInvalidResponseResponsePB;
+using kudu::rpc_test::WhoAmIRequestPB;
+using kudu::rpc_test::WhoAmIResponsePB;
+using kudu::rpc_test_diff_package::ReqDiffPackagePB;
+using kudu::rpc_test_diff_package::RespDiffPackagePB;
+
+// Implementation of CalculatorService which just implements the generic
+// RPC handler (no generated code).
+class GenericCalculatorService : public ServiceIf {
+ public:
+  static const char *kFullServiceName;
+  static const char *kAddMethodName;
+  static const char *kSleepMethodName;
+  static const char *kPushTwoStringsMethodName;
+  static const char *kSendTwoStringsMethodName;
+  static const char *kAddExactlyOnce;
+
+  static const char* kFirstString;
+  static const char* kSecondString;
+
+  GenericCalculatorService() {
+  }
+
+  // To match the argument list of the generated CalculatorService.
+  explicit GenericCalculatorService(const scoped_refptr<MetricEntity>& entity,
+                                    const scoped_refptr<ResultTracker>& 
result_tracker) {
+    // this test doesn't generate metrics, so we ignore the argument.
+  }
+
+  void Handle(InboundCall *incoming) override {
+    if (incoming->remote_method().method_name() == kAddMethodName) {
+      DoAdd(incoming);
+    } else if (incoming->remote_method().method_name() == kSleepMethodName) {
+      DoSleep(incoming);
+    } else if (incoming->remote_method().method_name() == 
kSendTwoStringsMethodName) {
+      DoSendTwoStrings(incoming);
+    } else if (incoming->remote_method().method_name() == 
kPushTwoStringsMethodName) {
+      DoPushTwoStrings(incoming);
+    } else {
+      incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
+                               Status::InvalidArgument("bad method"));
+    }
+  }
+
+  std::string service_name() const override { return kFullServiceName; }
+  static std::string static_service_name() { return kFullServiceName; }
+
+ private:
+  void DoAdd(InboundCall *incoming) {
+    Slice param(incoming->serialized_request());
+    AddRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+    }
+
+    AddResponsePB resp;
+    resp.set_result(req.x() + req.y());
+    incoming->RespondSuccess(resp);
+  }
+
+  void DoSendTwoStrings(InboundCall* incoming) {
+    Slice param(incoming->serialized_request());
+    SendTwoStringsRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+    }
+
+    std::unique_ptr<faststring> first(new faststring);
+    std::unique_ptr<faststring> second(new faststring);
+
+    Random r(req.random_seed());
+    first->resize(req.size1());
+    RandomString(first->data(), req.size1(), &r);
+
+    second->resize(req.size2());
+    RandomString(second->data(), req.size2(), &r);
+
+    SendTwoStringsResponsePB resp;
+    int idx1, idx2;
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(first)), &idx1));
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(second)), &idx2));
+    resp.set_sidecar1(idx1);
+    resp.set_sidecar2(idx2);
+
+    incoming->RespondSuccess(resp);
+  }
+
+  void DoPushTwoStrings(InboundCall* incoming) {
+    Slice param(incoming->serialized_request());
+    PushTwoStringsRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+    }
+
+    Slice sidecar1;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1));
+
+    Slice sidecar2;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2));
+
+    // Check that reading non-existant sidecars doesn't work.
+    Slice tmp;
+    CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok());
+
+    PushTwoStringsResponsePB resp;
+    resp.set_size1(sidecar1.size());
+    resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), 
sidecar1.size());
+    resp.set_size2(sidecar2.size());
+    resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), 
sidecar2.size());
+
+    // Drop the sidecars etc, just to confirm that it's safe to do so.
+    CHECK_GT(incoming->GetTransferSize(), 0);
+    incoming->DiscardTransfer();
+    CHECK_EQ(0, incoming->GetTransferSize());
+    incoming->RespondSuccess(resp);
+  }
+
+  void DoSleep(InboundCall *incoming) {
+    Slice param(incoming->serialized_request());
+    SleepRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      incoming->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST,
+        Status::InvalidArgument("Couldn't parse pb",
+                                req.InitializationErrorString()));
+      return;
+    }
+
+    LOG(INFO) << "got call: " << SecureShortDebugString(req);
+    SleepFor(MonoDelta::FromMicroseconds(req.sleep_micros()));
+    SleepResponsePB resp;
+    incoming->RespondSuccess(resp);
+  }
+};
+
+class CalculatorService : public CalculatorServiceIf {
+ public:
+  explicit CalculatorService(const scoped_refptr<MetricEntity>& entity,
+                             const scoped_refptr<ResultTracker> result_tracker)
+    : CalculatorServiceIf(entity, result_tracker),
+      exactly_once_test_val_(0) {
+  }
+
+  void Add(const AddRequestPB *req, AddResponsePB *resp, RpcContext *context) 
override {
+    resp->set_result(req->x() + req->y());
+    context->RespondSuccess();
+  }
+
+  void Sleep(const SleepRequestPB *req, SleepResponsePB *resp, RpcContext 
*context) override {
+    if (req->return_app_error()) {
+      CalculatorError my_error;
+      my_error.set_extra_error_data("some application-specific error data");
+      context->RespondApplicationError(CalculatorError::app_error_ext.number(),
+                                       "Got some error", my_error);
+      return;
+    }
+
+    // Respond w/ error if the RPC specifies that the client deadline is set,
+    // but it isn't.
+    if (req->client_timeout_defined()) {
+      MonoTime deadline = context->GetClientDeadline();
+      if (deadline == MonoTime::Max()) {
+        CalculatorError my_error;
+        my_error.set_extra_error_data("Timeout not set");
+        
context->RespondApplicationError(CalculatorError::app_error_ext.number(),
+                                        "Missing required timeout", my_error);
+        return;
+      }
+    }
+
+    if (req->deferred()) {
+      // Spawn a new thread which does the sleep and responds later.
+      scoped_refptr<Thread> thread;
+      CHECK_OK(Thread::Create("rpc-test", "deferred",
+                              &CalculatorService::DoSleep, this, req, context,
+                              &thread));
+      return;
+    }
+    DoSleep(req, context);
+  }
+
+  void Echo(const EchoRequestPB *req, EchoResponsePB *resp, RpcContext 
*context) override {
+    resp->set_data(req->data());
+    context->RespondSuccess();
+  }
+
+  void WhoAmI(const WhoAmIRequestPB* /*req*/,
+              WhoAmIResponsePB* resp,
+              RpcContext* context) override {
+    const RemoteUser& user = context->remote_user();
+    resp->mutable_credentials()->set_real_user(user.username());
+    resp->set_address(context->remote_address().ToString());
+    context->RespondSuccess();
+  }
+
+  void TestArgumentsInDiffPackage(const ReqDiffPackagePB *req,
+                                  RespDiffPackagePB *resp,
+                                  ::kudu::rpc::RpcContext *context) override {
+    context->RespondSuccess();
+  }
+
+  void Panic(const PanicRequestPB* req, PanicResponsePB* resp, RpcContext* 
context) override {
+    TRACE("Got panic request");
+    PANIC_RPC(context, "Test method panicking!");
+  }
+
+  void TestInvalidResponse(const TestInvalidResponseRequestPB* req,
+                           TestInvalidResponseResponsePB* resp,
+                           RpcContext* context) override {
+    switch (req->error_type()) {
+      case 
rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD:
+        // Respond without setting the 'resp->response' protobuf field, which 
is
+        // marked as required. This exercises the error path of invalid 
responses.
+        context->RespondSuccess();
+        break;
+      case rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE:
+        resp->mutable_response()->resize(FLAGS_rpc_max_message_size + 1000);
+        context->RespondSuccess();
+        break;
+      default:
+        LOG(FATAL);
+    }
+  }
+
+  bool SupportsFeature(uint32_t feature) const override {
+    return feature == FeatureFlags::FOO;
+  }
+
+  void AddExactlyOnce(const ExactlyOnceRequestPB* req, ExactlyOnceResponsePB* 
resp,
+                      ::kudu::rpc::RpcContext* context) override {
+    if (req->sleep_for_ms() > 0) {
+      usleep(req->sleep_for_ms() * 1000);
+    }
+    // If failures are enabled, cause them some percentage of the time.
+    if (req->randomly_fail()) {
+      if (rand() % 10 < 3) {
+        context->RespondFailure(Status::ServiceUnavailable("Random injected 
failure."));
+        return;
+      }
+    }
+    int result = exactly_once_test_val_ += req->value_to_add();
+    resp->set_current_val(result);
+    resp->set_current_time_micros(GetCurrentTimeMicros());
+    context->RespondSuccess();
+  }
+
+  bool AuthorizeDisallowAlice(const google::protobuf::Message* /*req*/,
+                              google::protobuf::Message* /*resp*/,
+                              RpcContext* context) override {
+    if (context->remote_user().username() == "alice") {
+      context->RespondFailure(Status::NotAuthorized("alice is not allowed to 
call this method"));
+      return false;
+    }
+    return true;
+  }
+
+  bool AuthorizeDisallowBob(const google::protobuf::Message* /*req*/,
+                            google::protobuf::Message* /*resp*/,
+                            RpcContext* context) override {
+    if (context->remote_user().username() == "bob") {
+      context->RespondFailure(Status::NotAuthorized("bob is not allowed to 
call this method"));
+      return false;
+    }
+    return true;
+  }
+
+ private:
+  void DoSleep(const SleepRequestPB *req,
+               RpcContext *context) {
+    TRACE_COUNTER_INCREMENT("test_sleep_us", req->sleep_micros());
+    if (Trace::CurrentTrace()) {
+      scoped_refptr<Trace> child_trace(new Trace());
+      Trace::CurrentTrace()->AddChildTrace("test_child", child_trace.get());
+      ADOPT_TRACE(child_trace.get());
+      TRACE_COUNTER_INCREMENT("related_trace_metric", 1);
+    }
+
+    SleepFor(MonoDelta::FromMicroseconds(req->sleep_micros()));
+    context->RespondSuccess();
+  }
+
+  std::atomic_int exactly_once_test_val_;
+
+};
+
+const char *GenericCalculatorService::kFullServiceName = 
"kudu.rpc.GenericCalculatorService";
+const char *GenericCalculatorService::kAddMethodName = "Add";
+const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kPushTwoStringsMethodName = 
"PushTwoStrings";
+const char *GenericCalculatorService::kSendTwoStringsMethodName = 
"SendTwoStrings";
+const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
+
+const char *GenericCalculatorService::kFirstString =
+    "1111111111111111111111111111111111111111111111111111111111";
+const char *GenericCalculatorService::kSecondString =
+    "2222222222222222222222222222222222222222222222222222222222222222222222";
+
+class RpcTestBase : public KuduTest {
+ public:
+  RpcTestBase()
+    : n_worker_threads_(3),
+      service_queue_length_(100),
+      n_server_reactor_threads_(3),
+      keepalive_time_ms_(1000),
+      metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, 
"test.rpc_test")) {
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+  }
+
+  void TearDown() override {
+    if (service_pool_) {
+      server_messenger_->UnregisterService(service_name_);
+      service_pool_->Shutdown();
+    }
+    if (server_messenger_) {
+      server_messenger_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+
+ protected:
+  std::shared_ptr<Messenger> CreateMessenger(const string &name,
+                                             int n_reactors = 1,
+                                             bool enable_ssl = false) {
+    MessengerBuilder bld(name);
+
+    if (enable_ssl) {
+      bld.enable_inbound_tls();
+    }
+
+    bld.set_num_reactors(n_reactors);
+    bld.set_connection_keepalive_time(
+      MonoDelta::FromMilliseconds(keepalive_time_ms_));
+    // In order for the keepalive timing to be accurate, we need to scan 
connections
+    // significantly more frequently than the keepalive time. This "coarse 
timer"
+    // granularity determines this.
+    bld.set_coarse_timer_granularity(MonoDelta::FromMilliseconds(
+                                       std::min(keepalive_time_ms_ / 5, 100)));
+    bld.set_metric_entity(metric_entity_);
+    std::shared_ptr<Messenger> messenger;
+    CHECK_OK(bld.Build(&messenger));
+    return messenger;
+  }
+
+  Status DoTestSyncCall(const Proxy &p, const char *method,
+                        CredentialsPolicy policy = 
CredentialsPolicy::ANY_CREDENTIALS) {
+    AddRequestPB req;
+    req.set_x(rand());
+    req.set_y(rand());
+    AddResponsePB resp;
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+    controller.set_credentials_policy(policy);
+    RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller));
+
+    CHECK_EQ(req.x() + req.y(), resp.result());
+    return Status::OK();
+  }
+
+  void DoTestSidecar(const Proxy &p, int size1, int size2) {
+    const uint32_t kSeed = 12345;
+
+    SendTwoStringsRequestPB req;
+    req.set_size1(size1);
+    req.set_size2(size2);
+    req.set_random_seed(kSeed);
+
+    SendTwoStringsResponsePB resp;
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+    CHECK_OK(p.SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
+                           req, &resp, &controller));
+
+    Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1);
+    Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2);
+    Random rng(kSeed);
+    faststring expected;
+
+    expected.resize(size1);
+    RandomString(expected.data(), size1, &rng);
+    CHECK_EQ(0, first.compare(Slice(expected)));
+
+    expected.resize(size2);
+    RandomString(expected.data(), size2, &rng);
+    CHECK_EQ(0, second.compare(Slice(expected)));
+  }
+
+  void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+    PushTwoStringsRequestPB request;
+    RpcController controller;
+
+    int idx1;
+    string s1(size1, 'a');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), 
&idx1));
+
+    int idx2;
+    string s2(size2, 'b');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), 
&idx2));
+
+    request.set_sidecar1_idx(idx1);
+    request.set_sidecar2_idx(idx2);
+
+    PushTwoStringsResponsePB resp;
+    CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+            request, &resp, &controller));
+    CHECK_EQ(size1, resp.size1());
+    CHECK_EQ(resp.data1(), s1);
+    CHECK_EQ(size2, resp.size2());
+    CHECK_EQ(resp.data2(), s2);
+  }
+
+  void DoTestExpectTimeout(const Proxy& p,
+                           const MonoDelta& timeout,
+                           bool* is_negotiaton_error = nullptr) {
+    SleepRequestPB req;
+    SleepResponsePB resp;
+    // Sleep for 500ms longer than the call timeout.
+    int sleep_micros = timeout.ToMicroseconds() + 500 * 1000;
+    req.set_sleep_micros(sleep_micros);
+
+    RpcController c;
+    c.set_timeout(timeout);
+    Stopwatch sw;
+    sw.start();
+    Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, 
&resp, &c);
+    sw.stop();
+    ASSERT_FALSE(s.ok());
+    if (is_negotiaton_error != nullptr) {
+      *is_negotiaton_error = c.negotiation_failed();
+    }
+
+    int expected_millis = timeout.ToMilliseconds();
+    int elapsed_millis = sw.elapsed().wall_millis();
+
+    // We shouldn't timeout significantly faster than our configured timeout.
+    EXPECT_GE(elapsed_millis, expected_millis - 10);
+    // And we also shouldn't take the full time that we asked for
+    EXPECT_LT(elapsed_millis * 1000, sleep_micros);
+    EXPECT_TRUE(s.IsTimedOut());
+    LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << 
sw.elapsed().wall_seconds();
+  }
+
+  void StartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl);
+  }
+
+  void StartTestServerWithGeneratedCode(Sockaddr *server_addr, bool enable_ssl 
= false) {
+    DoStartTestServer<CalculatorService>(server_addr, enable_ssl);
+  }
+
+  // Start a simple socket listening on a local port, returning the address.
+  // This isn't an RPC server -- just a plain socket which can be helpful for 
testing.
+  Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) {
+    Sockaddr bind_addr;
+    bind_addr.set_port(0);
+    RETURN_NOT_OK(listen_sock->Init(0));
+    RETURN_NOT_OK(listen_sock->BindAndListen(bind_addr, 1));
+    RETURN_NOT_OK(listen_sock->GetSocketAddress(listen_addr));
+    LOG(INFO) << "Bound to: " << listen_addr->ToString();
+    return Status::OK();
+  }
+
+ private:
+
+  static Slice GetSidecarPointer(const RpcController& controller, int idx,
+                                 int expected_size) {
+    Slice sidecar;
+    CHECK_OK(controller.GetInboundSidecar(idx, &sidecar));
+    CHECK_EQ(expected_size, sidecar.size());
+    return Slice(sidecar.data(), expected_size);
+  }
+
+  template<class ServiceClass>
+  void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
+    server_messenger_ = CreateMessenger("TestServer", 
n_server_reactor_threads_, enable_ssl);
+    std::shared_ptr<AcceptorPool> pool;
+    ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
+    ASSERT_OK(pool->Start(2));
+    *server_addr = pool->bind_address();
+    mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker");
+    result_tracker_.reset(new ResultTracker(mem_tracker_));
+
+    gscoped_ptr<ServiceIf> service(new ServiceClass(metric_entity_, 
result_tracker_));
+    service_name_ = service->service_name();
+    scoped_refptr<MetricEntity> metric_entity = 
server_messenger_->metric_entity();
+    service_pool_ = new ServicePool(std::move(service), metric_entity, 
service_queue_length_);
+    server_messenger_->RegisterService(service_name_, service_pool_);
+    ASSERT_OK(service_pool_->Init(n_worker_threads_));
+  }
+
+ protected:
+  string service_name_;
+  std::shared_ptr<Messenger> server_messenger_;
+  scoped_refptr<ServicePool> service_pool_;
+  std::shared_ptr<kudu::MemTracker> mem_tracker_;
+  scoped_refptr<ResultTracker> result_tracker_;
+  int n_worker_threads_;
+  int service_queue_length_;
+  int n_server_reactor_threads_;
+  int keepalive_time_ms_;
+
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
new file mode 100644
index 0000000..63b7b73
--- /dev/null
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -0,0 +1,808 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_histogram(rpc_incoming_queue_time);
+
+DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_int32(rpc_negotiation_inject_delay_ms);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+class TestRpc : public RpcTestBase, public ::testing::WithParamInterface<bool> 
{
+};
+
+// This is used to run all parameterized tests with and without SSL.
+INSTANTIATE_TEST_CASE_P(OptionalSSL, TestRpc, testing::Values(false, true));
+
+TEST_F(TestRpc, TestSockaddr) {
+  Sockaddr addr1, addr2;
+  addr1.set_port(1000);
+  addr2.set_port(2000);
+  // port is ignored when comparing Sockaddr objects
+  ASSERT_FALSE(addr1 < addr2);
+  ASSERT_FALSE(addr2 < addr1);
+  ASSERT_EQ(1000, addr1.port());
+  ASSERT_EQ(2000, addr2.port());
+  ASSERT_EQ(string("0.0.0.0:1000"), addr1.ToString());
+  ASSERT_EQ(string("0.0.0.0:2000"), addr2.ToString());
+  Sockaddr addr3(addr1);
+  ASSERT_EQ(string("0.0.0.0:1000"), addr3.ToString());
+}
+
+TEST_P(TestRpc, TestMessengerCreateDestroy) {
+  shared_ptr<Messenger> messenger(CreateMessenger("TestCreateDestroy", 1, 
GetParam()));
+  LOG(INFO) << "started messenger " << messenger->name();
+  messenger->Shutdown();
+}
+
+// Test starting and stopping a messenger. This is a regression
+// test for a segfault seen in early versions of the RPC code,
+// in which shutting down the acceptor would trigger an assert,
+// making our tests flaky.
+TEST_P(TestRpc, TestAcceptorPoolStartStop) {
+  int n_iters = AllowSlowTests() ? 100 : 5;
+  for (int i = 0; i < n_iters; i++) {
+    shared_ptr<Messenger> 
messenger(CreateMessenger("TestAcceptorPoolStartStop", 1, GetParam()));
+    shared_ptr<AcceptorPool> pool;
+    ASSERT_OK(messenger->AddAcceptorPool(Sockaddr(), &pool));
+    Sockaddr bound_addr;
+    ASSERT_OK(pool->GetBoundAddress(&bound_addr));
+    ASSERT_NE(0, bound_addr.port());
+    ASSERT_OK(pool->Start(2));
+    messenger->Shutdown();
+  }
+}
+
+TEST_F(TestRpc, TestConnHeaderValidation) {
+  MessengerBuilder mb("TestRpc.TestConnHeaderValidation");
+  const int conn_hdr_len = kMagicNumberLength + kHeaderFlagsLength;
+  uint8_t buf[conn_hdr_len];
+  serialization::SerializeConnHeader(buf);
+  ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len)));
+}
+
+// Test making successful RPC calls.
+TEST_P(TestRpc, TestCall) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+  ASSERT_STR_CONTAINS(p.ToString(), 
strings::Substitute("kudu.rpc.GenericCalculatorService@"
+                                                            "{remote=$0, 
user_credentials=",
+                                                        
server_addr.ToString()));
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  }
+}
+
+// Test that connecting to an invalid server properly throws an error.
+TEST_P(TestRpc, TestCallToBadServer) {
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
GetParam()));
+  Sockaddr addr;
+  addr.set_port(0);
+  Proxy p(client_messenger, addr, 
GenericCalculatorService::static_service_name());
+
+  // Loop a few calls to make sure that we properly set up and tear down
+  // the connections.
+  for (int i = 0; i < 5; i++) {
+    Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName);
+    LOG(INFO) << "Status: " << s.ToString();
+    ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString();
+  }
+}
+
+// Test that RPC calls can be failed with an error status on the server.
+TEST_P(TestRpc, TestInvalidMethodCall) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Call the method which fails.
+  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "bad method");
+}
+
+// Test that the error message returned when connecting to the wrong service
+// is reasonable.
+TEST_P(TestRpc, TestWrongService) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client with the wrong service name.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, "WrongServiceName");
+
+  // Call the method which fails.
+  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "Service unavailable: service WrongServiceName "
+                      "not registered on TestServer");
+}
+
+// Test that we can still make RPC connections even if many fds are in use.
+// This is a regression test for KUDU-650.
+TEST_P(TestRpc, TestHighFDs) {
+  // This test can only run if ulimit is set high.
+  const int kNumFakeFiles = 3500;
+  const int kMinUlimit = kNumFakeFiles + 100;
+  if (env_->GetOpenFileLimit() < kMinUlimit) {
+    LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << 
kMinUlimit;
+    return;
+  }
+
+  // Open a bunch of fds just to increase our fd count.
+  vector<RandomAccessFile*> fake_files;
+  ElementDeleter d(&fake_files);
+  for (int i = 0; i < kNumFakeFiles; i++) {
+    unique_ptr<RandomAccessFile> f;
+    CHECK_OK(Env::Default()->NewRandomAccessFile("/dev/zero", &f));
+    fake_files.push_back(f.release());
+  }
+
+  // Set up server and client, and verify we can make a successful call.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+}
+
+// Test that connections are kept alive between calls.
+TEST_P(TestRpc, TestConnectionKeepalive) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+  keepalive_time_ms_ = 500;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+
+  SleepFor(MonoDelta::FromMilliseconds(5));
+
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 
server connection";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 
client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 
server connections";
+  ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 
client connections";
+
+  SleepFor(MonoDelta::FromMilliseconds(2 * keepalive_time_ms_));
+
+  // After sleeping, the keepalive timer should have closed both sides of
+  // the connection.
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Server should have 0 
server connections";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 
client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 
server connections";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 
client connections";
+}
+
+// Test that outbound connections to the same server are reopen upon every RPC
+// call when the 'rpc_reopen_outbound_connections' flag is set.
+TEST_P(TestRpc, TestReopenOutboundConnections) {
+  // Set the flag to enable special mode: close and reopen already established
+  // outbound connections.
+  FLAGS_rpc_reopen_outbound_connections = true;
+
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Verify the initial counters.
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+
+  // Run several iterations, just in case.
+  for (int i = 0; i < 32; ++i) {
+    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+    ASSERT_EQ(0, metrics.total_client_connections_);
+    ASSERT_EQ(i + 1, metrics.total_server_connections_);
+    ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+    ASSERT_EQ(i + 1, metrics.total_client_connections_);
+    ASSERT_EQ(0, metrics.total_server_connections_);
+  }
+}
+
+// Test that an outbound connection is closed and a new one is open if going
+// from ANY_CREDENTIALS to PRIMARY_CREDENTIALS policy for RPC calls to the same
+// destination.
+// Test that changing from PRIMARY_CREDENTIALS policy to ANY_CREDENTIALS policy
+// re-uses the connection established with PRIMARY_CREDENTIALS policy.
+TEST_P(TestRpc, TestCredentialsPolicy) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Verify the initial counters.
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+
+  // Make an RPC call with ANY_CREDENTIALS policy.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(1, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(1, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+
+  // This is to allow all the data to be sent so the connection becomes idle.
+  SleepFor(MonoDelta::FromMilliseconds(5));
+
+  // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open 
connection
+  // with ANY_CREDENTIALS policy should be closed and a new one established
+  // with PRIMARY_CREDENTIALS policy.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName,
+                           CredentialsPolicy::PRIMARY_CREDENTIALS));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(2, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(2, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+
+  // Make another RPC call with ANY_CREDENTIALS policy. The already established
+  // connection with PRIMARY_CREDENTIALS policy should be re-used because
+  // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which
+  // the currently open connection has been established with.
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(0, metrics.total_client_connections_);
+  EXPECT_EQ(2, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_server_connections_);
+  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  EXPECT_EQ(2, metrics.total_client_connections_);
+  EXPECT_EQ(0, metrics.total_server_connections_);
+  EXPECT_EQ(1, metrics.num_client_connections_);
+}
+
+// Test that a call which takes longer than the keepalive time
+// succeeds -- i.e that we don't consider a connection to be "idle" on the
+// server if there is a call outstanding on it.
+TEST_P(TestRpc, TestCallLongerThanKeepalive) {
+  // Set a short keepalive.
+  keepalive_time_ms_ = 1000;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Make a call which sleeps longer than the keepalive.
+  RpcController controller;
+  SleepRequestPB req;
+  req.set_sleep_micros(3 * 1000 * 1000); // 3 seconds.
+  req.set_deferred(true);
+  SleepResponsePB resp;
+  ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName,
+                                 req, &resp, &controller));
+}
+
+// Test that the RpcSidecar transfers the expected messages.
+TEST_P(TestRpc, TestRpcSidecar) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
GetParam()));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Test a zero-length sidecar
+  DoTestSidecar(p, 0, 0);
+
+  // Test some small sidecars
+  DoTestSidecar(p, 123, 456);
+
+  // Test some larger sidecars to verify that we properly handle the case where
+  // we can't write the whole response to the socket in a single call.
+  DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+
+  DoTestOutgoingSidecar(p, 0, 0);
+  DoTestOutgoingSidecar(p, 123, 456);
+  DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+}
+
+TEST_P(TestRpc, TestRpcSidecarLimits) {
+  {
+    // Test that the limits on the number of sidecars is respected.
+    RpcController controller;
+    string s = "foo";
+    int idx;
+    for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) {
+      CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), 
&idx));
+    }
+
+    CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), 
&idx).ok());
+  }
+
+  {
+    // Test that the payload may not exceed --rpc_max_message_size.
+    // Set up server.
+    Sockaddr server_addr;
+    bool enable_ssl = GetParam();
+    StartTestServer(&server_addr, enable_ssl);
+
+    // Set up client.
+    shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
GetParam()));
+    Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+    RpcController controller;
+    string s(FLAGS_rpc_max_message_size + 1, 'a');
+    int idx;
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), 
&idx));
+
+    PushTwoStringsRequestPB request;
+    request.set_sidecar1_idx(idx);
+    request.set_sidecar2_idx(idx);
+    PushTwoStringsResponsePB resp;
+    Status status = 
p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+        request, &resp, &controller);
+    ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << 
status.ToString();
+    // Remote responds to extra-large payloads by closing the connection.
+    ASSERT_STR_MATCHES(status.ToString(),
+                       // Linux
+                       "Connection reset by peer"
+                       // macOS, while reading from socket.
+                       "|got EOF from remote"
+                       // macOS, while writing to socket.
+                       "|Protocol wrong type for socket");
+  }
+}
+
+// Test that timeouts are properly handled.
+TEST_P(TestRpc, TestCallTimeout) {
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Test a very short timeout - we expect this will time out while the
+  // call is still trying to connect, or in the send queue. This was 
triggering ASAN failures
+  // before.
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, 
MonoDelta::FromNanoseconds(1)));
+
+  // Test a longer timeout - expect this will time out after we send the 
request,
+  // but shorter than our threshold for two-stage timeout handling.
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, 
MonoDelta::FromMilliseconds(200)));
+
+  // Test a longer timeout - expect this will trigger the "two-stage timeout"
+  // code path.
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, 
MonoDelta::FromMilliseconds(1500)));
+}
+
+// Inject 500ms delay in negotiation, and send a call with a short timeout, 
followed by
+// one with a long timeout. The call with the long timeout should succeed even 
though
+// the previous one failed.
+//
+// This is a regression test against prior behavior where the connection 
negotiation
+// was assigned the timeout of the first call on that connection. So, if the 
first
+// call had a short timeout, the later call would also inherit the timed-out 
negotiation.
+TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) {
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  FLAGS_rpc_negotiation_inject_delay_ms = 500;
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, 
MonoDelta::FromMilliseconds(50)));
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+
+  // Only the second call should have been received by the server, because we
+  // don't bother sending an already-timed-out call.
+  auto metric_map = 
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+  auto* metric = FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time).get();
+  ASSERT_EQ(1, down_cast<Histogram*>(metric)->TotalCount());
+}
+
+static void AcceptAndReadForever(Socket* listen_sock) {
+  // Accept the TCP connection.
+  Socket server_sock;
+  Sockaddr remote;
+  CHECK_OK(listen_sock->Accept(&server_sock, &remote, 0));
+
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+
+  size_t nread;
+  uint8_t buf[1024];
+  while (server_sock.BlockingRecv(buf, sizeof(buf), &nread, deadline).ok()) {
+  }
+}
+
+// Starts a fake listening socket which never actually negotiates.
+// Ensures that the client gets a reasonable status code in this case.
+TEST_F(TestRpc, TestNegotiationTimeout) {
+  // Set up a simple socket server which accepts a connection.
+  Sockaddr server_addr;
+  Socket listen_sock;
+  ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
+
+  // Create another thread to accept the connection on the fake server.
+  scoped_refptr<Thread> acceptor_thread;
+  ASSERT_OK(Thread::Create("test", "acceptor",
+                           AcceptAndReadForever, &listen_sock,
+                           &acceptor_thread));
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  bool is_negotiation_error = false;
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(
+      p, MonoDelta::FromMilliseconds(100), &is_negotiation_error));
+  EXPECT_TRUE(is_negotiation_error);
+
+  acceptor_thread->Join();
+}
+
+// Test that client calls get failed properly when the server they're 
connected to
+// shuts down.
+TEST_F(TestRpc, TestServerShutsDown) {
+  // Set up a simple socket server which accepts a connection.
+  Sockaddr server_addr;
+  Socket listen_sock;
+  ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
+  Proxy p(client_messenger, server_addr, 
GenericCalculatorService::static_service_name());
+
+  // Send a call.
+  AddRequestPB req;
+  req.set_x(rand());
+  req.set_y(rand());
+  AddResponsePB resp;
+
+  vector<unique_ptr<RpcController>> controllers;
+
+  // We'll send several calls async, and ensure that they all
+  // get the error status when the connection drops.
+  int n_calls = 5;
+
+  CountDownLatch latch(n_calls);
+  for (int i = 0; i < n_calls; i++) {
+    controllers.emplace_back(new RpcController());
+    p.AsyncRequest(GenericCalculatorService::kAddMethodName, req, &resp, 
controllers.back().get(),
+                   boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+  }
+
+  // Accept the TCP connection.
+  Socket server_sock;
+  Sockaddr remote;
+  ASSERT_OK(listen_sock.Accept(&server_sock, &remote, 0));
+
+  // The call is still in progress at this point.
+  for (const auto& controller : controllers) {
+    ASSERT_FALSE(controller->finished());
+  }
+
+  // Shut down the socket.
+  ASSERT_OK(listen_sock.Close());
+  ASSERT_OK(server_sock.Close());
+
+  // Wait for the call to be marked finished.
+  latch.Wait();
+
+  // Should get the appropriate error on the client for all calls;
+  for (const auto& controller : controllers) {
+    ASSERT_TRUE(controller->finished());
+    Status s = controller->status();
+    ASSERT_TRUE(s.IsNetworkError()) <<
+      "Unexpected status: " << s.ToString();
+
+    // Any of these errors could happen, depending on whether we were
+    // in the middle of sending a call while the connection died, or
+    // if we were already waiting for responses.
+    //
+    // ECONNREFUSED is possible because the sending of the calls is async.
+    // For example, the following interleaving:
+    // - Enqueue 3 calls
+    // - Reactor wakes up, creates connection, starts writing calls
+    // - Enqueue 2 more calls
+    // - Shut down socket
+    // - Reactor wakes up, tries to write more of the first 3 calls, gets error
+    // - Reactor shuts down connection
+    // - Reactor sees the 2 remaining calls, makes a new connection
+    // - Because the socket is shut down, gets ECONNREFUSED.
+    //
+    // EINVAL is possible if the controller socket had already disconnected by
+    // the time it trys to set the SO_SNDTIMEO socket option as part of the
+    // normal blocking SASL handshake.
+    ASSERT_TRUE(s.posix_code() == EPIPE ||
+                s.posix_code() == ECONNRESET ||
+                s.posix_code() == ESHUTDOWN ||
+                s.posix_code() == ECONNREFUSED ||
+                s.posix_code() == EINVAL)
+      << "Unexpected status: " << s.ToString();
+  }
+}
+
+// Test handler latency metric.
+TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
+
+  const uint64_t sleep_micros = 20 * 1000;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServerWithGeneratedCode(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
CalculatorService::static_service_name());
+
+  RpcController controller;
+  SleepRequestPB req;
+  req.set_sleep_micros(sleep_micros);
+  req.set_deferred(true);
+  SleepResponsePB resp;
+  ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller));
+
+  const unordered_map<const MetricPrototype*, scoped_refptr<Metric> > 
metric_map =
+    server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+
+  scoped_refptr<Histogram> latency_histogram = down_cast<Histogram *>(
+      FindOrDie(metric_map,
+                
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+
+  LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests();
+  LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests();
+  LOG(INFO) << "Sleep() max lat: " << latency_histogram->MaxValueForTests();
+  LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount();
+
+  ASSERT_EQ(1, latency_histogram->TotalCount());
+  ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros);
+  ASSERT_TRUE(latency_histogram->MinValueForTests() == 
latency_histogram->MaxValueForTests());
+
+  // TODO: Implement an incoming queue latency test.
+  // For now we just assert that the metric exists.
+  ASSERT_TRUE(FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time));
+}
+
+static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
+                                     CountDownLatch* latch) {
+  messenger->reset();
+  latch->CountDown();
+}
+
+TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) {
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
GetParam()));
+  Sockaddr bad_addr;
+  CountDownLatch latch(1);
+
+  AddRequestPB req;
+  req.set_x(rand());
+  req.set_y(rand());
+  AddResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(1));
+  {
+    Proxy p(client_messenger, bad_addr, "xxx");
+    p.AsyncRequest("my-fake-method", req, &resp, &controller,
+                   boost::bind(&DestroyMessengerCallback, &client_messenger, 
&latch));
+  }
+  latch.Wait();
+}
+
+// Test that setting the client timeout / deadline gets propagated to RPC
+// services.
+TEST_P(TestRpc, TestRpcContextClientDeadline) {
+  const uint64_t sleep_micros = 20 * 1000;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServerWithGeneratedCode(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
CalculatorService::static_service_name());
+
+  SleepRequestPB req;
+  req.set_sleep_micros(sleep_micros);
+  req.set_client_timeout_defined(true);
+  SleepResponsePB resp;
+  RpcController controller;
+  Status s = p.SyncRequest("Sleep", req, &resp, &controller);
+  ASSERT_TRUE(s.IsRemoteError());
+  ASSERT_STR_CONTAINS(s.ToString(), "Missing required timeout");
+
+  controller.Reset();
+  controller.set_timeout(MonoDelta::FromMilliseconds(1000));
+  ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller));
+}
+
+// Test that setting an call-level application feature flag to an unknown value
+// will make the server reject the call.
+TEST_P(TestRpc, TestApplicationFeatureFlag) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServerWithGeneratedCode(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
CalculatorService::static_service_name());
+
+  { // Supported flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.ok());
+    ASSERT_EQ(resp.result(), 3);
+  }
+
+  { // Unsupported flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    controller.RequireServerFeature(99);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("unsupported response: $0", 
s.ToString()));
+    ASSERT_TRUE(s.IsRemoteError());
+  }
+}
+
+TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
+  auto savedFlags = kSupportedServerRpcFeatureFlags;
+  auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = 
savedFlags; });
+  kSupportedServerRpcFeatureFlags = {};
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServerWithGeneratedCode(&server_addr, enable_ssl);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, 
CalculatorService::static_service_name());
+
+  { // Required flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.IsNotSupported());
+  }
+
+  { // No required flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.ok());
+  }
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc.cc b/be/src/kudu/rpc/rpc.cc
new file mode 100644
index 0000000..685da13
--- /dev/null
+++ b/be/src/kudu/rpc/rpc.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc.h"
+
+#include <boost/bind.hpp>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+using std::shared_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+namespace kudu {
+
+namespace rpc {
+
+bool RpcRetrier::HandleResponse(Rpc* rpc, Status* out_status) {
+  DCHECK(rpc);
+  DCHECK(out_status);
+
+  // Always retry TOO_BUSY and UNAVAILABLE errors.
+  const Status controller_status = controller_.status();
+  if (controller_status.IsRemoteError()) {
+    const ErrorStatusPB* err = controller_.error_response();
+    if (err &&
+        err->has_code() &&
+        (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+         err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+      // The UNAVAILABLE code is a broader counterpart of the
+      // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later.
+      DelayedRetry(rpc, controller_status);
+      return true;
+    }
+  }
+
+  *out_status = controller_status;
+  return false;
+}
+
+void RpcRetrier::DelayedRetry(Rpc* rpc, const Status& why_status) {
+  if (!why_status.ok() && (last_error_.ok() || last_error_.IsTimedOut())) {
+    last_error_ = why_status;
+  }
+  // Add some jitter to the retry delay.
+  //
+  // If the delay causes us to miss our deadline, RetryCb will fail the
+  // RPC on our behalf.
+  int num_ms = ++attempt_num_ + ((rand() % 5));
+  messenger_->ScheduleOnReactor(boost::bind(&RpcRetrier::DelayedRetryCb,
+                                            this,
+                                            rpc, _1),
+                                MonoDelta::FromMilliseconds(num_ms));
+}
+
+void RpcRetrier::DelayedRetryCb(Rpc* rpc, const Status& status) {
+  Status new_status = status;
+  if (new_status.ok()) {
+    // Has this RPC timed out?
+    if (deadline_.Initialized()) {
+      if (MonoTime::Now() > deadline_) {
+        string err_str = Substitute("$0 passed its deadline", rpc->ToString());
+        if (!last_error_.ok()) {
+          SubstituteAndAppend(&err_str, ": $0", last_error_.ToString());
+        }
+        new_status = Status::TimedOut(err_str);
+      }
+    }
+  }
+  if (new_status.ok()) {
+    controller_.Reset();
+    rpc->SendRpc();
+  } else {
+    rpc->SendRpcCb(new_status);
+  }
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc.h b/be/src/kudu/rpc/rpc.h
new file mode 100644
index 0000000..7bc484f
--- /dev/null
+++ b/be/src/kudu/rpc/rpc.h
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_H
+#define KUDU_RPC_RPC_H
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+
+namespace rpc {
+
+class Messenger;
+class Rpc;
+
+// Result status of a retriable Rpc.
+//
+// TODO Consider merging this with ScanRpcStatus.
+struct RetriableRpcStatus {
+  enum Result {
+    // There was no error, i.e. the Rpc was successful.
+    OK,
+
+    // The Rpc got an error and it's not retriable.
+    NON_RETRIABLE_ERROR,
+
+    // The server couldn't be reached, i.e. there was a network error while
+    // reaching the replica or a DNS resolution problem.
+    SERVER_NOT_ACCESSIBLE,
+
+    // The server received the request but it was not ready to serve it right
+    // away. It might happen that the server was too busy and did not have
+    // necessary resources or information to serve the request but it
+    // anticipates it should be ready to serve the request really soon, so it's
+    // worth retrying the request at a later time.
+    SERVICE_UNAVAILABLE,
+
+    // For rpc's that are meant only for the leader of a shared resource, when 
the server
+    // we're interacting with is not the leader.
+    REPLICA_NOT_LEADER,
+
+    // The server doesn't know the resource we're interacting with. For 
instance a TabletServer
+    // is not part of the config for the tablet we're trying to write to.
+    RESOURCE_NOT_FOUND,
+
+    // The authentication token supplied with the operation was found invalid
+    // by the server. Most likely, the token has expired. If so, get a new 
token
+    // using client credentials and retry the operation with it.
+    INVALID_AUTHENTICATION_TOKEN,
+  };
+
+  Result result;
+  Status status;
+};
+
+// This class picks a server among a possible set of servers serving a given 
resource.
+//
+// TODO Currently this only picks the leader, though it wouldn't be unfeasible 
to have this
+// have an enum so that it can pick any server.
+template <class Server>
+class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> {
+ public:
+  virtual ~ServerPicker() {}
+
+  typedef Callback<void(const Status& status, Server* server)> 
ServerPickedCallback;
+
+  // Picks the leader among the replicas serving a resource.
+  // If the leader was found, it calls the callback with Status::OK() and
+  // with 'server' set to the current leader, otherwise calls the callback
+  // with 'status' set to the failure reason, and 'server' set to nullptr.
+  // If picking a leader takes longer than 'deadline' the callback is called 
with
+  // Status::TimedOut().
+  virtual void PickLeader(const ServerPickedCallback& callback, const 
MonoTime& deadline) = 0;
+
+  // Marks a server as failed/unacessible.
+  virtual void MarkServerFailed(Server *server, const Status &status) = 0;
+
+  // Marks a server as not the leader of config serving the resource we're 
trying to interact with.
+  virtual void MarkReplicaNotLeader(Server* replica) = 0;
+
+  // Marks a server as not serving the resource we want.
+  virtual void MarkResourceNotFound(Server *replica) = 0;
+};
+
+// Provides utilities for retrying failed RPCs.
+//
+// All RPCs should use HandleResponse() to retry certain generic errors.
+class RpcRetrier {
+ public:
+  RpcRetrier(MonoTime deadline, std::shared_ptr<rpc::Messenger> messenger)
+      : attempt_num_(1),
+        deadline_(deadline),
+        messenger_(std::move(messenger)) {
+    if (deadline_.Initialized()) {
+      controller_.set_deadline(deadline_);
+    }
+    controller_.Reset();
+  }
+
+  // Tries to handle a failed RPC.
+  //
+  // If it was handled (e.g. scheduled for retry in the future), returns
+  // true. In this case, callers should ensure that 'rpc' remains alive.
+  //
+  // Otherwise, returns false and writes the controller status to
+  // 'out_status'.
+  bool HandleResponse(Rpc* rpc, Status* out_status);
+
+  // Retries an RPC at some point in the near future. If 'why_status' is not 
OK,
+  // records it as the most recent error causing the RPC to retry. This is
+  // reported to the caller eventually if the RPC never succeeds.
+  //
+  // If the RPC's deadline expires, the callback will fire with a timeout
+  // error when the RPC comes up for retrying. This is true even if the
+  // deadline has already expired at the time that Retry() was called.
+  //
+  // Callers should ensure that 'rpc' remains alive.
+  void DelayedRetry(Rpc* rpc, const Status& why_status);
+
+  RpcController* mutable_controller() { return &controller_; }
+  const RpcController& controller() const { return controller_; }
+
+  const MonoTime& deadline() const { return deadline_; }
+
+  const std::shared_ptr<Messenger>& messenger() const {
+    return messenger_;
+  }
+
+  int attempt_num() const { return attempt_num_; }
+
+  // Called when an RPC comes up for retrying. Actually sends the RPC.
+  void DelayedRetryCb(Rpc* rpc, const Status& status);
+
+ private:
+  // The next sent rpc will be the nth attempt (indexed from 1).
+  int attempt_num_;
+
+  // If the remote end is busy, the RPC will be retried (with a small
+  // delay) until this deadline is reached.
+  //
+  // May be uninitialized.
+  MonoTime deadline_;
+
+  // Messenger to use when sending the RPC.
+  std::shared_ptr<Messenger> messenger_;
+
+  // RPC controller to use when sending the RPC.
+  RpcController controller_;
+
+  // In case any retries have already happened, remembers the last error.
+  // Errors from the server take precedence over timeout errors.
+  Status last_error_;
+
+  DISALLOW_COPY_AND_ASSIGN(RpcRetrier);
+};
+
+// An in-flight remote procedure call to some server.
+class Rpc {
+ public:
+  Rpc(const MonoTime& deadline,
+      std::shared_ptr<rpc::Messenger> messenger)
+      : retrier_(deadline, std::move(messenger)) {
+  }
+
+  virtual ~Rpc() {}
+
+  // Asynchronously sends the RPC to the remote end.
+  //
+  // Subclasses should use SendRpcCb() below as the callback function.
+  virtual void SendRpc() = 0;
+
+  // Returns a string representation of the RPC.
+  virtual std::string ToString() const = 0;
+
+  // Returns the number of times this RPC has been sent. Will always be at
+  // least one.
+  int num_attempts() const { return retrier().attempt_num(); }
+
+ protected:
+  const RpcRetrier& retrier() const { return retrier_; }
+  RpcRetrier* mutable_retrier() { return &retrier_; }
+
+ private:
+  friend class RpcRetrier;
+
+  // Callback for SendRpc(). If 'status' is not OK, something failed
+  // before the RPC was sent.
+  virtual void SendRpcCb(const Status& status) = 0;
+
+  // Used to retry some failed RPCs.
+  RpcRetrier retrier_;
+
+  DISALLOW_COPY_AND_ASSIGN(Rpc);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_RPC_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc
new file mode 100644
index 0000000..06fd8c5
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.cc
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_context.h"
+
+#include <memory>
+#include <ostream>
+#include <sstream>
+
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/hdr_histogram.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+using google::protobuf::Message;
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+RpcContext::RpcContext(InboundCall *call,
+                       const google::protobuf::Message *request_pb,
+                       google::protobuf::Message *response_pb,
+                       const scoped_refptr<ResultTracker>& result_tracker)
+  : call_(CHECK_NOTNULL(call)),
+    request_pb_(request_pb),
+    response_pb_(response_pb),
+    result_tracker_(result_tracker) {
+  VLOG(4) << call_->remote_method().service_name() << ": Received RPC request 
for "
+          << call_->ToString() << ":" << std::endl << 
SecureDebugString(*request_pb_);
+  TRACE_EVENT_ASYNC_BEGIN2("rpc_call", "RPC", this,
+                           "call", call_->ToString(),
+                           "request", 
pb_util::PbTracer::TracePb(*request_pb_));
+}
+
+RpcContext::~RpcContext() {
+}
+
+void RpcContext::RespondSuccess() {
+  if (AreResultsTracked()) {
+    result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
+                                                response_pb_.get());
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success 
response for "
+        << call_->ToString() << ":" << std::endl << 
SecureDebugString(*response_pb_);
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", 
pb_util::PbTracer::TracePb(*response_pb_),
+                           "trace", trace()->DumpToString());
+    call_->RespondSuccess(*response_pb_);
+    delete this;
+  }
+}
+
+void RpcContext::RespondNoCache() {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    response_pb_.get());
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure 
response for "
+        << call_->ToString() << ": " << SecureDebugString(*response_pb_);
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", 
pb_util::PbTracer::TracePb(*response_pb_),
+                           "trace", trace()->DumpToString());
+    // This is a bit counter intuitive, but when we get the failure but set 
the error on the
+    // call's response we call RespondSuccess() instead of RespondFailure().
+    call_->RespondSuccess(*response_pb_);
+    delete this;
+  }
+}
+
+void RpcContext::RespondFailure(const Status &status) {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    ErrorStatusPB::ERROR_APPLICATION, status);
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure 
response for "
+        << call_->ToString() << ": " << status.ToString();
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "status", status.ToString(),
+                           "trace", trace()->DumpToString());
+    call_->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, status);
+    delete this;
+  }
+}
+
+void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const 
Status& status) {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    err, status);
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure 
response for "
+        << call_->ToString() << ": " << status.ToString();
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "status", status.ToString(),
+                           "trace", trace()->DumpToString());
+    call_->RespondFailure(err, status);
+    delete this;
+  }
+}
+
+void RpcContext::RespondApplicationError(int error_ext_id, const std::string& 
message,
+                                         const Message& app_error_pb) {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    error_ext_id, message, app_error_pb);
+  } else {
+    if (VLOG_IS_ON(4)) {
+      ErrorStatusPB err;
+      InboundCall::ApplicationErrorToPB(error_ext_id, message, app_error_pb, 
&err);
+      VLOG(4) << call_->remote_method().service_name()
+          << ": Sending application error response for " << call_->ToString()
+          << ":" << std::endl << SecureDebugString(err);
+    }
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", 
pb_util::PbTracer::TracePb(app_error_pb),
+                           "trace", trace()->DumpToString());
+    call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+    delete this;
+  }
+}
+
+const rpc::RequestIdPB* RpcContext::request_id() const {
+  return call_->header().has_request_id() ? &call_->header().request_id() : 
nullptr;
+}
+
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+  return call_->GetInboundSidecar(idx, slice);
+}
+
+const RemoteUser& RpcContext::remote_user() const {
+  return call_->remote_user();
+}
+
+void RpcContext::DiscardTransfer() {
+  call_->DiscardTransfer();
+}
+
+const Sockaddr& RpcContext::remote_address() const {
+  return call_->remote_address();
+}
+
+std::string RpcContext::requestor_string() const {
+  return call_->remote_user().ToString() + " at " +
+    call_->remote_address().ToString();
+}
+
+std::string RpcContext::method_name() const {
+  return call_->remote_method().method_name();
+}
+
+std::string RpcContext::service_name() const {
+  return call_->remote_method().service_name();
+}
+
+MonoTime RpcContext::GetClientDeadline() const {
+  return call_->GetClientDeadline();
+}
+
+Trace* RpcContext::trace() {
+  return call_->trace();
+}
+
+void RpcContext::Panic(const char* filepath, int line_number, const string& 
message) {
+  // Use the LogMessage class directly so that the log messages appear to come 
from
+  // the line of code which caused the panic, not this code.
+#define MY_ERROR google::LogMessage(filepath, line_number, 
google::GLOG_ERROR).stream()
+#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream()
+
+  MY_ERROR << "Panic handling " << call_->ToString() << ": " << message;
+  MY_ERROR << "Request:\n" << SecureDebugString(*request_pb_);
+  Trace* t = trace();
+  if (t) {
+    MY_ERROR << "RPC trace:";
+    t->Dump(&MY_ERROR, true);
+  }
+  MY_FATAL << "Exiting due to panic.";
+
+#undef MY_ERROR
+#undef MY_FATAL
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.h b/be/src/kudu/rpc/rpc_context.h
new file mode 100644
index 0000000..ac895fc
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.h
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTEXT_H
+#define KUDU_RPC_RPC_CONTEXT_H
+
+#include <string>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Sockaddr;
+class Trace;
+
+namespace rpc {
+
+class InboundCall;
+class RemoteUser;
+class ResultTracker;
+class RpcSidecar;
+
+#define PANIC_RPC(rpc_context, message) \
+  do { \
+    if (rpc_context) {                              \
+      rpc_context->Panic(__FILE__, __LINE__, (message));  \
+    } else { \
+      LOG(FATAL) << message; \
+    } \
+  } while (0)
+
+// The context provided to a generated ServiceIf. This provides
+// methods to respond to the RPC. In the future, this will also
+// include methods to access information about the caller: e.g
+// authentication info, tracing info, and cancellation status.
+//
+// This is the server-side analogue to the RpcController class.
+class RpcContext {
+ public:
+  // Create an RpcContext. This is called only from generated code
+  // and is not a public API.
+  RpcContext(InboundCall *call,
+             const google::protobuf::Message *request_pb,
+             google::protobuf::Message *response_pb,
+             const scoped_refptr<ResultTracker>& result_tracker);
+
+  ~RpcContext();
+
+  // Return the trace buffer for this call.
+  Trace* trace();
+
+  // Send a response to the call. The service may call this method
+  // before or after returning from the original handler method,
+  // and it may call this method from a different thread.
+  //
+  // The response should be prepared already in the response PB pointer
+  // which was passed to the handler method.
+  //
+  // After this method returns, this RpcContext object is destroyed. The 
request
+  // and response protobufs are also destroyed.
+  void RespondSuccess();
+
+  // Like the above, but doesn't store the results of the service call, if 
results
+  // are being tracked.
+  // Used in cases where a call specific error was set on the response 
protobuf,
+  // the call should be considered failed, thus results shouldn't be cached.
+  void RespondNoCache();
+
+  // Respond with an error to the client. This sends back an error with the 
code
+  // ERROR_APPLICATION. Because there is no more specific error code passed 
back
+  // to the client, most applications should create a custom error PB extension
+  // and use RespondApplicationError(...) below. This method should only be 
used
+  // for unexpected errors where the server doesn't expect the client to do any
+  // more advanced handling.
+  //
+  // After this method returns, this RpcContext object is destroyed. The 
request
+  // and response protobufs are also destroyed.
+  void RespondFailure(const Status &status);
+
+  // Respond with an RPC-level error. This typically manifests to the client as
+  // a remote error, one whose handling is agnostic to the particulars of the
+  // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE
+  // usually cause the client to retry the RPC at a later time.
+  //
+  // After this method returns, this RpcContext object is destroyed. The 
request
+  // and response protobufs are also destroyed.
+  void RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& 
status);
+
+  // Respond with an application-level error. This causes the caller to get a
+  // RemoteError status with the provided string message. Additionally, a
+  // service-specific error extension is passed back to the client. The
+  // extension must be registered with the ErrorStatusPB protobuf. For
+  // example:
+  //
+  //   message MyServiceError {
+  //     extend kudu.rpc.ErrorStatusPB {
+  //       optional MyServiceError my_service_error_ext = 101;
+  //     }
+  //     // Add any extra fields or status codes you want to pass back to
+  //     // the client here.
+  //     required string extra_error_data = 1;
+  //   }
+  //
+  // NOTE: the numeric '101' above must be an integer greater than 101
+  // and must be unique across your code base.
+  //
+  // Given the above definition in your service protobuf file, you would
+  // use this method like:
+  //
+  //   MyServiceError err;
+  //   err.set_extra_error_data("foo bar");
+  //   
ctx->RespondApplicationError(MyServiceError::my_service_error_ext.number(),
+  //                                "Some error occurred", err);
+  //
+  // The client side may then retreieve the error by calling:
+  //   const MyServiceError& err_details =
+  //     
controller->error_response()->GetExtension(MyServiceError::my_service_error_ext);
+  //
+  // After this method returns, this RpcContext object is destroyed. The 
request
+  // and response protobufs are also destroyed.
+  void RespondApplicationError(int error_ext_id, const std::string& message,
+                               const google::protobuf::Message& app_error_pb);
+
+
+  // Adds an RpcSidecar to the response. This is the preferred method for
+  // transferring large amounts of binary data, because this avoids additional
+  // copies made by serializing the protobuf.
+  //
+  // Assumes no changes to the sidecar's data are made after insertion.
+  //
+  // Upon success, writes the index of the sidecar (necessary to be retrieved
+  // later) to 'idx'. Call may fail if all sidecars have already been used
+  // by the RPC response.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 
'idx' is out
+  // of bounds.
+  Status GetInboundSidecar(int idx, Slice* slice);
+
+  // Return the identity of remote user who made this call.
+  const RemoteUser& remote_user() const;
+
+  // Discards the memory associated with the inbound call's payload. All 
previously
+  // obtained sidecar slices will be invalidated by this call. It is an error 
to call
+  // GetInboundSidecar() after this method. request_pb() remains valid.
+  // This is useful in the case where the server wishes to delay responding to 
an RPC
+  // (perhaps to control the rate of RPC requests), but knows that the RPC 
payload itself
+  // won't be processed any further.
+  void DiscardTransfer();
+
+  // Return the remote IP address and port which sent the current RPC call.
+  const Sockaddr& remote_address() const;
+
+  // A string identifying the requestor -- both the user info and the IP 
address.
+  // Suitable for use in log messages.
+  std::string requestor_string() const;
+
+  // Return the name of the RPC service method being called.
+  std::string method_name() const;
+
+  // Return the name of the RPC service being called.
+  std::string service_name() const;
+
+  const google::protobuf::Message *request_pb() const { return 
request_pb_.get(); }
+  google::protobuf::Message *response_pb() const { return response_pb_.get(); }
+
+  // Return an upper bound on the client timeout deadline. This does not
+  // account for transmission delays between the client and the server.
+  // If the client did not specify a deadline, returns MonoTime::Max().
+  MonoTime GetClientDeadline() const;
+
+  // Whether the results of this RPC are tracked with a ResultTracker.
+  // If this returns true, both result_tracker() and request_id() should 
return non-null results.
+  bool AreResultsTracked() const { return result_tracker_.get() != nullptr; }
+
+  // Returns this call's result tracker, if it is set.
+  const scoped_refptr<ResultTracker>& result_tracker() const {
+    return result_tracker_;
+  }
+
+  // Returns this call's request id, if it is set.
+  const rpc::RequestIdPB* request_id() const;
+
+  // Panic the server. This logs a fatal error with the given message, and
+  // also includes the current RPC request, requestor, trace information, etc,
+  // to make it easier to debug.
+  //
+  // Call this via the PANIC_RPC() macro.
+  void Panic(const char* filepath, int line_number, const std::string& message)
+    __attribute__((noreturn));
+
+ private:
+  friend class ResultTracker;
+  InboundCall* const call_;
+  const gscoped_ptr<const google::protobuf::Message> request_pb_;
+  const gscoped_ptr<google::protobuf::Message> response_pb_;
+  scoped_refptr<ResultTracker> result_tracker_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.cc 
b/be/src/kudu/rpc/rpc_controller.cc
new file mode 100644
index 0000000..505db22
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_controller.h"
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+
+#include <glog/logging.h>
+
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+RpcController::RpcController()
+    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS) {
+  DVLOG(4) << "RpcController " << this << " constructed";
+}
+
+RpcController::~RpcController() {
+  DVLOG(4) << "RpcController " << this << " destroyed";
+}
+
+void RpcController::Swap(RpcController* other) {
+  // Cannot swap RPC controllers while they are in-flight.
+  if (call_) {
+    CHECK(finished());
+  }
+  if (other->call_) {
+    CHECK(other->finished());
+  }
+
+  std::swap(outbound_sidecars_, other->outbound_sidecars_);
+  std::swap(timeout_, other->timeout_);
+  std::swap(credentials_policy_, other->credentials_policy_);
+  std::swap(call_, other->call_);
+}
+
+void RpcController::Reset() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (call_) {
+    CHECK(finished());
+  }
+  call_.reset();
+  required_server_features_.clear();
+  credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
+}
+
+bool RpcController::finished() const {
+  if (call_) {
+    return call_->IsFinished();
+  }
+  return false;
+}
+
+bool RpcController::negotiation_failed() const {
+  if (call_) {
+    DCHECK(finished());
+    return call_->IsNegotiationError();
+  }
+  return false;
+}
+
+Status RpcController::status() const {
+  if (call_) {
+    return call_->status();
+  }
+  return Status::OK();
+}
+
+const ErrorStatusPB* RpcController::error_response() const {
+  if (call_) {
+    return call_->error_pb();
+  }
+  return nullptr;
+}
+
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
+  return call_->call_response_->GetSidecar(idx, sidecar);
+}
+
+void RpcController::set_timeout(const MonoDelta& timeout) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  DCHECK(!call_ || call_->state() == OutboundCall::READY);
+  timeout_ = timeout;
+}
+
+void RpcController::set_deadline(const MonoTime& deadline) {
+  set_timeout(deadline - MonoTime::Now());
+}
+
+void RpcController::SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id) {
+  request_id_ = std::move(request_id);
+}
+
+bool RpcController::has_request_id() const {
+  return request_id_ != nullptr;
+}
+
+const RequestIdPB& RpcController::request_id() const {
+  DCHECK(has_request_id());
+  return *request_id_;
+}
+
+void RpcController::RequireServerFeature(uint32_t feature) {
+  DCHECK(!call_ || call_->state() == OutboundCall::READY);
+  required_server_features_.insert(feature);
+}
+
+MonoDelta RpcController::timeout() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return timeout_;
+}
+
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) 
{
+  if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+    return Status::RuntimeError("All available sidecars already used");
+  }
+  outbound_sidecars_.emplace_back(std::move(car));
+  *idx = outbound_sidecars_.size() - 1;
+  return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+  DCHECK(call_ != nullptr);
+  call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
+} // namespace rpc
+} // namespace kudu


Reply via email to