Repository: kudu
Updated Branches:
  refs/heads/master b9aa5dd31 -> dc8525358


http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index f178a2d..c8f523c 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -18,17 +18,19 @@
 #include "kudu/rpc/reactor.h"
 
 #include <arpa/inet.h>
-#include <boost/intrusive/list.hpp>
-#include <ev++.h>
-#include <glog/logging.h>
-#include <mutex>
 #include <netinet/in.h>
 #include <stdlib.h>
-#include <string>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
 
+#include <mutex>
+#include <string>
+
+#include <boost/intrusive/list.hpp>
+#include <ev++.h>
+#include <glog/logging.h>
+
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/rpc/client_negotiation.h"
@@ -85,7 +87,7 @@ Status ShutdownError(bool aborted) {
 }
 } // anonymous namespace
 
-ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder &bld)
+ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
   : loop_(kDefaultLibEvFlags),
     cur_time_(MonoTime::Now()),
     last_unused_tcp_scan_(cur_time_),
@@ -191,7 +193,7 @@ void ReactorThread::WakeThread() {
 // threads that want to bring something to our attention, like the fact that
 // we're shutting down, or the fact that there is a new outbound Transfer
 // ready to send.
-void ReactorThread::AsyncHandler(ev::async &watcher, int revents) {
+void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
   DCHECK(IsCurrentThread());
 
   if (PREDICT_FALSE(reactor_->closing())) {
@@ -204,13 +206,13 @@ void ReactorThread::AsyncHandler(ev::async &watcher, int 
revents) {
   reactor_->DrainTaskQueue(&tasks);
 
   while (!tasks.empty()) {
-    ReactorTask &task = tasks.front();
+    ReactorTask& task = tasks.front();
     tasks.pop_front();
     task.Run(this);
   }
 }
 
-void ReactorThread::RegisterConnection(const scoped_refptr<Connection>& conn) {
+void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
   DCHECK(IsCurrentThread());
 
   Status s = StartConnectionNegotiation(conn);
@@ -219,10 +221,10 @@ void ReactorThread::RegisterConnection(const 
scoped_refptr<Connection>& conn) {
     DestroyConnection(conn.get(), s);
     return;
   }
-  server_conns_.push_back(conn);
+  server_conns_.emplace_back(std::move(conn));
 }
 
-void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall> &call) {
+void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
   DCHECK(IsCurrentThread());
   scoped_refptr<Connection> conn;
 
@@ -242,7 +244,7 @@ void ReactorThread::AssignOutboundCall(const 
shared_ptr<OutboundCall> &call) {
 // 2. every tcp_conn_timeo_ seconds, close down connections older than
 //    tcp_conn_timeo_ seconds.
 //
-void ReactorThread::TimerHandler(ev::timer &watcher, int revents) {
+void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
   DCHECK(IsCurrentThread());
   if (EV_ERROR & revents) {
     LOG(WARNING) << "Reactor " << name() << " got an error in "
@@ -293,7 +295,7 @@ void ReactorThread::ScanIdleConnections() {
   VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP 
connections.";
 }
 
-const std::string &ReactorThread::name() const {
+const std::string& ReactorThread::name() const {
   return reactor_->name();
 }
 
@@ -321,7 +323,7 @@ void ReactorThread::RunThread() {
   reactor_->messenger_.reset();
 }
 
-Status ReactorThread::FindOrStartConnection(const ConnectionId &conn_id,
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
                                             scoped_refptr<Connection>* conn) {
   DCHECK(IsCurrentThread());
   conn_map_t::const_iterator c = client_conns_.find(conn_id);
@@ -348,7 +350,7 @@ Status ReactorThread::FindOrStartConnection(const 
ConnectionId &conn_id,
   }
 
   // Register the new connection in our map.
-  *conn = new Connection(this, conn_id.remote(), new_socket.release(), 
Connection::CLIENT);
+  *conn = new Connection(this, conn_id.remote(), std::move(new_socket), 
Connection::CLIENT);
   (*conn)->set_user_credentials(conn_id.user_credentials());
 
   // Kick off blocking client connection negotiation.
@@ -377,12 +379,12 @@ Status ReactorThread::StartConnectionNegotiation(const 
scoped_refptr<Connection>
   ADOPT_TRACE(trace.get());
   TRACE("Submitting negotiation task for $0", conn->ToString());
   RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
-      Bind(&Negotiation::RunNegotiation, conn, deadline)));
+        Bind(&Negotiation::RunNegotiation, conn, deadline)));
   return Status::OK();
 }
 
 void ReactorThread::CompleteConnectionNegotiation(const 
scoped_refptr<Connection>& conn,
-      const Status &status) {
+                                                  const Status& status) {
   DCHECK(IsCurrentThread());
   if (PREDICT_FALSE(!status.ok())) {
     DestroyConnection(conn.get(), status);
@@ -396,6 +398,7 @@ void ReactorThread::CompleteConnectionNegotiation(const 
scoped_refptr<Connection
     DestroyConnection(conn.get(), s);
     return;
   }
+
   conn->MarkNegotiationComplete();
   conn->EpollRegister(loop_);
 }
@@ -405,13 +408,13 @@ Status ReactorThread::CreateClientSocket(Socket *sock) {
   if (ret.ok()) {
     ret = sock->SetNoDelay(true);
   }
-  LOG_IF(WARNING, !ret.ok()) << "failed to create an "
-    "outbound connection because a new socket could not "
-    "be created: " << ret.ToString();
+  LOG_IF(WARNING, !ret.ok())
+      << "failed to create an outbound connection because a new socket could 
not be created: "
+      << ret.ToString();
   return ret;
 }
 
-Status ReactorThread::StartConnect(Socket *sock, const Sockaddr &remote, bool 
*in_progress) {
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote, bool 
*in_progress) {
   Status ret = sock->Connect(remote);
   if (ret.ok()) {
     VLOG(3) << "StartConnect: connect finished immediately for " << 
remote.ToString();
@@ -432,7 +435,7 @@ Status ReactorThread::StartConnect(Socket *sock, const 
Sockaddr &remote, bool *i
 }
 
 void ReactorThread::DestroyConnection(Connection *conn,
-                                      const Status &conn_status) {
+                                      const Status& conn_status) {
   DCHECK(IsCurrentThread());
 
   conn->Shutdown(conn_status);
@@ -455,9 +458,12 @@ void ReactorThread::DestroyConnection(Connection *conn,
   }
 }
 
-DelayedTask::DelayedTask(boost::function<void(const Status &)> func,
+DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
                          MonoDelta when)
-    : func_(std::move(func)), when_(std::move(when)), thread_(nullptr) {}
+    : func_(std::move(func)),
+      when_(when),
+      thread_(nullptr) {
+}
 
 void DelayedTask::Run(ReactorThread* thread) {
   DCHECK(thread_ == nullptr) << "Task has already been scheduled";
@@ -492,7 +498,7 @@ void DelayedTask::TimerHandler(ev::timer& watcher, int 
revents) {
 }
 
 Reactor::Reactor(const shared_ptr<Messenger>& messenger,
-                 int index, const MessengerBuilder &bld)
+                 int index, const MessengerBuilder& bld)
   : messenger_(messenger),
     name_(StringPrintf("%s_R%03d", messenger->name().c_str(), index)),
     closing_(false),
@@ -529,7 +535,7 @@ Reactor::~Reactor() {
   Shutdown();
 }
 
-const std::string &Reactor::name() const {
+const std::string& Reactor::name() const {
   return name_;
 }
 
@@ -544,11 +550,11 @@ class RunFunctionTask : public ReactorTask {
   explicit RunFunctionTask(boost::function<Status()> f)
       : function_(std::move(f)), latch_(1) {}
 
-  virtual void Run(ReactorThread *reactor) OVERRIDE {
+  void Run(ReactorThread* /*reactor*/) override {
     status_ = function_();
     latch_.CountDown();
   }
-  virtual void Abort(const Status &status) OVERRIDE {
+  void Abort(const Status& status) override {
     status_ = status;
     latch_.CountDown();
   }
@@ -584,16 +590,16 @@ Status Reactor::DumpRunningRpcs(const 
DumpRunningRpcsRequestPB& req,
 
 class RegisterConnectionTask : public ReactorTask {
  public:
-  explicit RegisterConnectionTask(const scoped_refptr<Connection>& conn) :
-    conn_(conn)
-  {}
+  explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
+      : conn_(std::move(conn)) {
+  }
 
-  virtual void Run(ReactorThread *thread) OVERRIDE {
-    thread->RegisterConnection(conn_);
+  void Run(ReactorThread* reactor) override {
+    reactor->RegisterConnection(std::move(conn_));
     delete this;
   }
 
-  virtual void Abort(const Status &status) OVERRIDE {
+  void Abort(const Status& /*status*/) override {
     // We don't need to Shutdown the connection since it was never registered.
     // This is only used for inbound connections, and inbound connections will
     // never have any calls added to them until they've been registered.
@@ -604,7 +610,7 @@ class RegisterConnectionTask : public ReactorTask {
   scoped_refptr<Connection> conn_;
 };
 
-void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr &remote) {
+void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
   VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
   std::unique_ptr<Socket> new_socket;
   if (messenger()->ssl_enabled()) {
@@ -612,9 +618,8 @@ void Reactor::RegisterInboundSocket(Socket *socket, const 
Sockaddr &remote) {
   } else {
     new_socket.reset(new Socket(socket->Release()));
   }
-  scoped_refptr<Connection> conn(
-    new Connection(&thread_, remote, new_socket.release(), 
Connection::SERVER));
-  auto task = new RegisterConnectionTask(conn);
+  auto task = new RegisterConnectionTask(
+      new Connection(&thread_, remote, std::move(new_socket), 
Connection::SERVER));
   ScheduleReactorTask(task);
 }
 
@@ -625,12 +630,12 @@ class AssignOutboundCallTask : public ReactorTask {
   explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
       : call_(std::move(call)) {}
 
-  virtual void Run(ReactorThread *reactor) OVERRIDE {
+  void Run(ReactorThread* reactor) override {
     reactor->AssignOutboundCall(call_);
     delete this;
   }
 
-  virtual void Abort(const Status &status) OVERRIDE {
+  void Abort(const Status& status) override {
     call_->SetFailed(status);
     delete this;
   }
@@ -639,7 +644,7 @@ class AssignOutboundCallTask : public ReactorTask {
   shared_ptr<OutboundCall> call_;
 };
 
-void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
+void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
   DVLOG(3) << name_ << ": queueing outbound call "
            << call->ToString() << " to remote " << 
call->conn_id().remote().ToString();
   AssignOutboundCallTask *task = new AssignOutboundCallTask(call);

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index 54f3332..f9f5662 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -17,16 +17,18 @@
 #ifndef KUDU_RPC_REACTOR_H
 #define KUDU_RPC_REACTOR_H
 
-#include <boost/function.hpp>
-#include <boost/intrusive/list.hpp>
-#include <ev++.h>
+#include <stdint.h>
+
 #include <list>
 #include <map>
 #include <memory>
 #include <set>
-#include <stdint.h>
 #include <string>
 
+#include <boost/function.hpp>
+#include <boost/intrusive/list.hpp>
+#include <ev++.h>
+
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/transfer.h"
@@ -37,9 +39,12 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+class Socket;
+
 namespace rpc {
 
-typedef std::list<scoped_refptr<Connection> > conn_list_t;
+typedef std::list<scoped_refptr<Connection>> conn_list_t;
 
 class DumpRunningRpcsRequestPB;
 class DumpRunningRpcsResponsePB;
@@ -89,10 +94,10 @@ class DelayedTask : public ReactorTask {
   DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
 
   // Schedules the task for running later but doesn't actually run it yet.
-  virtual void Run(ReactorThread* reactor) OVERRIDE;
+  void Run(ReactorThread* thread) override;
 
   // Behaves like ReactorTask::Abort.
-  virtual void Abort(const Status& abort_status) OVERRIDE;
+  void Abort(const Status& abort_status) override;
 
  private:
   // libev callback for when the registered timer fires.
@@ -172,7 +177,7 @@ class ReactorThread {
   // Transition back from negotiating to processing requests.
   // Must be called from the reactor thread.
   void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
-      const Status& status);
+                                     const Status& status);
 
   // Collect metrics.
   // Must be called from the reactor thread.
@@ -217,7 +222,7 @@ class ReactorThread {
   void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
   // Register a new connection.
-  void RegisterConnection(const scoped_refptr<Connection>& conn);
+  void RegisterConnection(scoped_refptr<Connection> conn);
 
   // Actually perform shutdown of the thread, tearing down any connections,
   // etc. This is called from within the thread.

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index ea9d266..9769ffe 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -365,8 +365,8 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
   // Create another thread to accept the connection on the fake server.
   scoped_refptr<Thread> acceptor_thread;
   ASSERT_OK(Thread::Create("test", "acceptor",
-                                  AcceptAndReadForever, &listen_sock,
-                                  &acceptor_thread));
+                           AcceptAndReadForever, &listen_sock,
+                           &acceptor_thread));
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc
index 359dc80..5e4aae6 100644
--- a/src/kudu/rpc/sasl_common.cc
+++ b/src/kudu/rpc/sasl_common.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/once.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/rpc/constants.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/net/sockaddr.h"
@@ -46,11 +47,8 @@ const char* const kSaslMechGSSAPI = "GSSAPI";
 static __thread string* g_auth_failure_capture = nullptr;
 
 // Determine whether initialization was ever called
-struct InitializationData {
-  Status status;
-  string app_name;
-};
-static struct InitializationData* sasl_init_data;
+static Status sasl_init_status = Status::OK();
+static bool sasl_is_initialized = false;
 
 // If true, then we expect someone else has initialized SASL.
 static bool g_disable_sasl_init = false;
@@ -203,14 +201,9 @@ static bool SaslMutexImplementationProvided() {
 #endif
 
 // Actually perform the initialization for the SASL subsystem.
-// Meant to be called via GoogleOnceInitArg().
-static void DoSaslInit(void* app_name_char_array) {
-  // Explicitly cast from void* here so GoogleOnce doesn't have to deal with 
it.
-  // We were getting Clang 3.4 UBSAN errors when letting GoogleOnce cast.
-  const char* const app_name = reinterpret_cast<const char* 
const>(app_name_char_array);
+// Meant to be called via GoogleOnceInit().
+static void DoSaslInit() {
   VLOG(3) << "Initializing SASL library";
-  sasl_init_data = new InitializationData();
-  sasl_init_data->app_name = app_name;
 
   bool sasl_initialized = SaslIsInitialized();
   if (sasl_initialized && !g_disable_sasl_init) {
@@ -222,7 +215,7 @@ static void DoSaslInit(void* app_name_char_array) {
 
   if (g_disable_sasl_init) {
     if (!sasl_initialized) {
-      sasl_init_data->status = Status::RuntimeError(
+      sasl_init_status = Status::RuntimeError(
           "SASL initialization was disabled, but SASL was not externally 
initialized.");
       return;
     }
@@ -232,30 +225,29 @@ static void DoSaslInit(void* app_name_char_array) {
           << "but was not provided with a mutex implementation! If "
           << "manually initializing SASL, use sasl_set_mutex(3).";
     }
-    sasl_init_data->status = Status::OK();
     return;
   }
   internal::SaslSetMutex();
   int result = sasl_client_init(&callbacks[0]);
   if (result != SASL_OK) {
-    sasl_init_data->status = Status::RuntimeError("Could not initialize SASL 
client",
-        sasl_errstring(result, nullptr, nullptr));
+    sasl_init_status = Status::RuntimeError("Could not initialize SASL client",
+                                            sasl_errstring(result, nullptr, 
nullptr));
     return;
   }
 
-  result = sasl_server_init(&callbacks[0], sasl_init_data->app_name.c_str());
+  result = sasl_server_init(&callbacks[0], kSaslAppName);
   if (result != SASL_OK) {
-    sasl_init_data->status = Status::RuntimeError("Could not initialize SASL 
server",
-        sasl_errstring(result, nullptr, nullptr));
+    sasl_init_status = Status::RuntimeError("Could not initialize SASL server",
+                                            sasl_errstring(result, nullptr, 
nullptr));
     return;
   }
 
-  sasl_init_data->status = Status::OK();
+  sasl_is_initialized = true;
 }
 
 Status DisableSaslInitialization() {
   if (g_disable_sasl_init) return Status::OK();
-  if (sasl_init_data != nullptr) {
+  if (sasl_is_initialized) {
     return Status::IllegalState("SASL already initialized. Initialization can 
only be disabled "
                                 "before first usage.");
   }
@@ -263,18 +255,11 @@ Status DisableSaslInitialization() {
   return Status::OK();
 }
 
-Status SaslInit(const char* app_name) {
+Status SaslInit() {
   // Only execute SASL initialization once
   static GoogleOnceType once = GOOGLE_ONCE_INIT;
-  GoogleOnceInitArg(&once,
-                    &DoSaslInit,
-                    // This is a bit ugly, but Clang 3.4 UBSAN complains 
otherwise.
-                    reinterpret_cast<void*>(const_cast<char*>(app_name)));
-  if (PREDICT_FALSE(sasl_init_data->app_name != app_name)) {
-    return Status::InvalidArgument("SaslInit called successively with 
different arguments",
-        StringPrintf("Previous: %s, current: %s", 
sasl_init_data->app_name.c_str(), app_name));
-  }
-  return sasl_init_data->status;
+  GoogleOnceInit(&once, &DoSaslInit);
+  return sasl_init_status;
 }
 
 static string CleanSaslError(const string& err) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.h b/src/kudu/rpc/sasl_common.h
index 419fc5f..53b713e 100644
--- a/src/kudu/rpc/sasl_common.h
+++ b/src/kudu/rpc/sasl_common.h
@@ -53,7 +53,7 @@ extern const char* const kSaslMechGSSAPI;
 //
 // This function is thread safe and uses a static lock.
 // This function should NOT be called during static initialization.
-Status SaslInit(const char* app_name);
+Status SaslInit();
 
 // Disable Kudu's initialization of SASL. See equivalent method in client.h.
 Status DisableSaslInitialization();

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_helper.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_helper.cc b/src/kudu/rpc/sasl_helper.cc
index 4e408f7..0614f3e 100644
--- a/src/kudu/rpc/sasl_helper.cc
+++ b/src/kudu/rpc/sasl_helper.cc
@@ -17,28 +17,24 @@
 
 #include "kudu/rpc/sasl_helper.h"
 
-#include <set>
 #include <string>
 
 #include <glog/logging.h>
 #include <google/protobuf/message_lite.h>
 
-#include "kudu/gutil/endian.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/serialization.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
+using std::string;
+
 namespace kudu {
 namespace rpc {
 
@@ -46,13 +42,10 @@ using google::protobuf::MessageLite;
 
 SaslHelper::SaslHelper(PeerType peer_type)
   : peer_type_(peer_type),
-    conn_header_exchanged_(false),
+    global_mechs_(SaslListAvailableMechs()),
     plain_enabled_(false),
     gssapi_enabled_(false) {
-  tag_ = (peer_type_ == SERVER) ? "Sasl Server" : "Sasl Client";
-}
-
-SaslHelper::~SaslHelper() {
+  tag_ = (peer_type_ == SERVER) ? "Server" : "Client";
 }
 
 void SaslHelper::set_local_addr(const Sockaddr& addr) {
@@ -76,27 +69,11 @@ const char* SaslHelper::server_fqdn() const {
   return server_fqdn_.empty() ? nullptr : server_fqdn_.c_str();
 }
 
-const std::set<std::string>& SaslHelper::GlobalMechs() const {
-  if (!global_mechs_) {
-    global_mechs_.reset(new set<string>(SaslListAvailableMechs()));
-  }
-  return *global_mechs_;
-}
-
-void SaslHelper::AddToLocalMechList(const string& mech) {
-  mechs_.insert(mech);
-}
-
-const std::set<std::string>& SaslHelper::LocalMechs() const {
-  return mechs_;
+const char* SaslHelper::EnabledMechsString() const {
+  JoinStrings(enabled_mechs_, " ", &enabled_mechs_string_);
+  return enabled_mechs_string_.c_str();
 }
 
-const char* SaslHelper::LocalMechListString() const {
-  JoinStrings(mechs_, " ", &mech_list_);
-  return mech_list_.c_str();
-}
-
-
 int SaslHelper::GetOptionCb(const char* plugin_name, const char* option,
                             const char** result, unsigned* len) {
   DVLOG(4) << tag_ << ": GetOption Callback called. ";
@@ -112,7 +89,7 @@ int SaslHelper::GetOptionCb(const char* plugin_name, const 
char* option,
   if (plugin_name == nullptr) {
     // SASL library option, not a plugin option
     if (strcmp(option, "mech_list") == 0) {
-      *result = LocalMechListString();
+      *result = EnabledMechsString();
       if (len != nullptr) *len = strlen(*result);
       VLOG(4) << tag_ << ": Enabled mech list: " << *result;
       return SASL_OK;
@@ -137,10 +114,10 @@ Status SaslHelper::EnableGSSAPI() {
 }
 
 Status SaslHelper::EnableMechanism(const string& mech) {
-  if (PREDICT_FALSE(!ContainsKey(GlobalMechs(), mech))) {
+  if (PREDICT_FALSE(!ContainsKey(global_mechs_, mech))) {
     return Status::InvalidArgument("unable to find SASL plugin", mech);
   }
-  AddToLocalMechList(mech);
+  enabled_mechs_.insert(mech);
   return Status::OK();
 }
 
@@ -148,11 +125,11 @@ bool SaslHelper::IsPlainEnabled() const {
   return plain_enabled_;
 }
 
-Status SaslHelper::SanityCheckNegotiateCallId(int32_t call_id) const {
+Status SaslHelper::CheckNegotiateCallId(int32_t call_id) const {
   if (call_id != kNegotiateCallId) {
     Status s = Status::IllegalState(strings::Substitute(
-          "Non-Negotiate request during negotiation. Expected callId: $0, 
received callId: $1",
-          kNegotiateCallId, call_id));
+        "Received illegal call-id during negotiation; expected: $0, received: 
$1",
+        kNegotiateCallId, call_id));
     LOG(DFATAL) << tag_ << ": " << s.ToString();
     return s;
   }
@@ -167,27 +144,5 @@ Status SaslHelper::ParseNegotiatePB(const Slice& 
param_buf, NegotiatePB* msg) {
   return Status::OK();
 }
 
-Status SaslHelper::SendNegotiatePB(Socket* sock,
-                                   const MessageLite& header,
-                                   const MessageLite& msg,
-                                   const MonoTime& deadline) {
-  DCHECK(sock != nullptr);
-  DCHECK(header.IsInitialized()) << tag_ << ": Header must be initialized";
-  DCHECK(msg.IsInitialized()) << tag_ << ": Message must be initialized";
-
-  // Write connection header, if needed
-  if (PREDICT_FALSE(peer_type_ == CLIENT && !conn_header_exchanged_)) {
-    const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength;
-    uint8_t buf[buflen];
-    serialization::SerializeConnHeader(buf);
-    size_t nsent;
-    RETURN_NOT_OK(sock->BlockingWrite(buf, buflen, &nsent, deadline));
-    conn_header_exchanged_ = true;
-  }
-
-  RETURN_NOT_OK(SendFramedMessageBlocking(sock, header, msg, deadline));
-  return Status::OK();
-}
-
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_helper.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_helper.h b/src/kudu/rpc/sasl_helper.h
index f0a676e..05d6904 100644
--- a/src/kudu/rpc/sasl_helper.h
+++ b/src/kudu/rpc/sasl_helper.h
@@ -23,30 +23,19 @@
 
 #include <sasl/sasl.h>
 
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/util/net/socket.h"
-
-namespace google {
-namespace protobuf {
-class MessageLite;
-} // namespace protobuf
-} // namespace google
+#include "kudu/util/status.h"
 
 namespace kudu {
 
-class MonoTime;
 class Sockaddr;
-class Status;
 
 namespace rpc {
 
-using std::string;
-
 class NegotiatePB;
 
-// Helper class which contains functionality that is common to SaslClient & 
SaslServer.
-// Most of these methods are convenience methods for interacting with the 
libsasl2 library.
+// Helper class which contains functionality that is common to client and 
server
+// SASL negotiations. Most of these methods are convenience methods for
+// interacting with the libsasl2 library.
 class SaslHelper {
  public:
   enum PeerType {
@@ -55,7 +44,7 @@ class SaslHelper {
   };
 
   explicit SaslHelper(PeerType peer_type);
-  ~SaslHelper();
+  ~SaslHelper() = default;
 
   // Specify IP:port of local side of connection.
   void set_local_addr(const Sockaddr& addr);
@@ -66,20 +55,18 @@ class SaslHelper {
   const char* remote_addr_string() const;
 
   // Specify the fully-qualified domain name of the remote server.
-  void set_server_fqdn(const string& domain_name);
+  void set_server_fqdn(const std::string& domain_name);
   const char* server_fqdn() const;
 
   // Globally-registered available SASL plugins.
-  const std::set<string>& GlobalMechs() const;
+  const std::set<std::string>& GlobalMechs() const {
+    return global_mechs_;
+  }
 
   // Helper functions for managing the list of active SASL mechanisms.
-  void AddToLocalMechList(const string& mech);
-  const std::set<string>& LocalMechs() const;
-
-  // Returns space-delimited local mechanism list string suitable for passing
-  // to libsasl2, such as via "mech_list" callbacks.
-  // The returned pointer is valid only until the next call to 
LocalMechListString().
-  const char* LocalMechListString() const;
+  const std::set<std::string>& EnabledMechs() const {
+    return enabled_mechs_;
+  }
 
   // Implements the client_mech_list / mech_list callbacks.
   int GetOptionCb(const char* plugin_name, const char* option, const char** 
result, unsigned* len);
@@ -95,31 +82,29 @@ class SaslHelper {
 
   // Sanity check that the call ID is the negotiation call ID.
   // Logs DFATAL if call_id does not match.
-  Status SanityCheckNegotiateCallId(int32_t call_id) const;
+  Status CheckNegotiateCallId(int32_t call_id) const;
 
   // Parse msg from the given Slice.
   Status ParseNegotiatePB(const Slice& param_buf, NegotiatePB* msg);
 
-  // Encode and send a message over a socket, sending the connection header if 
necessary.
-  Status SendNegotiatePB(Socket* sock,
-                         const google::protobuf::MessageLite& header,
-                         const google::protobuf::MessageLite& msg,
-                         const MonoTime& deadline);
-
  private:
   Status EnableMechanism(const std::string& mech);
 
-  string local_addr_;
-  string remote_addr_;
-  string server_fqdn_;
+  // Returns space-delimited local mechanism list string suitable for passing
+  // to libsasl2, such as via "mech_list" callbacks.
+  // The returned pointer is valid only until the next call to 
EnabledMechsString().
+  const char* EnabledMechsString() const;
+
+  std::string local_addr_;
+  std::string remote_addr_;
+  std::string server_fqdn_;
 
   // Authentication types and data.
   const PeerType peer_type_;
-  bool conn_header_exchanged_;
-  string tag_;
-  mutable gscoped_ptr< std::set<string> > global_mechs_;  // Cache of global 
mechanisms.
-  std::set<string> mechs_;    // Active mechanisms.
-  mutable string mech_list_;  // Mechanism list string returned by callbacks.
+  std::string tag_;
+  std::set<std::string> global_mechs_;       // Cache of global mechanisms.
+  std::set<std::string> enabled_mechs_;      // Active mechanisms.
+  mutable std::string enabled_mechs_string_; // Mechanism list string returned 
by callbacks.
 
   bool plain_enabled_;
   bool gssapi_enabled_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc 
b/src/kudu/rpc/server_negotiation.cc
index 9e91481..837fb47 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -17,17 +17,18 @@
 
 #include "kudu/rpc/server_negotiation.h"
 
-#include <glog/logging.h>
-#include <google/protobuf/message_lite.h>
 #include <limits>
-#include <sasl/sasl.h>
+#include <memory>
 #include <set>
 #include <string>
 
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+#include <sasl/sasl.h>
+
 #include "kudu/gutil/endian.h"
 #include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/serialization.h"
@@ -36,242 +37,197 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/trace.h"
 
+using std::set;
+using std::string;
+using std::unique_ptr;
+
 namespace kudu {
 namespace rpc {
 
-static int SaslServerGetoptCb(void* sasl_server, const char* plugin_name, 
const char* option,
-                       const char** result, unsigned* len) {
-  return static_cast<SaslServer*>(sasl_server)
-    ->GetOptionCb(plugin_name, option, result, len);
+static int ServerNegotiationGetoptCb(ServerNegotiation* server_negotiation,
+                                     const char* plugin_name,
+                                     const char* option,
+                                     const char** result,
+                                     unsigned* len) {
+  return server_negotiation->GetOptionCb(plugin_name, option, result, len);
 }
 
-static int SaslServerPlainAuthCb(sasl_conn_t *conn, void *sasl_server, const 
char *user,
-    const char *pass, unsigned passlen, struct propctx *propctx) {
-  return static_cast<SaslServer*>(sasl_server)
-    ->PlainAuthCb(conn, user, pass, passlen, propctx);
+static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
+                                        ServerNegotiation* server_negotiation,
+                                        const char* user,
+                                        const char* pass,
+                                        unsigned passlen,
+                                        struct propctx* propctx) {
+  return server_negotiation->PlainAuthCb(conn, user, pass, passlen, propctx);
 }
 
-SaslServer::SaslServer(string app_name, Socket* socket)
-    : app_name_(std::move(app_name)),
-      sock_(socket),
+ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket)
+    : socket_(std::move(socket)),
       helper_(SaslHelper::SERVER),
-      server_state_(SaslNegotiationState::NEW),
       negotiated_mech_(SaslMechanism::INVALID),
       deadline_(MonoTime::Max()) {
   callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
-      reinterpret_cast<int (*)()>(&SaslServerGetoptCb), this));
+      reinterpret_cast<int (*)()>(&ServerNegotiationGetoptCb), this));
   callbacks_.push_back(SaslBuildCallback(SASL_CB_SERVER_USERDB_CHECKPASS,
-      reinterpret_cast<int (*)()>(&SaslServerPlainAuthCb), this));
+      reinterpret_cast<int (*)()>(&ServerNegotiationPlainAuthCb), this));
   callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
 }
 
-Status SaslServer::EnablePlain() {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+Status ServerNegotiation::EnablePlain() {
   RETURN_NOT_OK(helper_.EnablePlain());
   return Status::OK();
 }
 
-Status SaslServer::EnableGSSAPI() {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+Status ServerNegotiation::EnableGSSAPI() {
   return helper_.EnableGSSAPI();
 }
 
-SaslMechanism::Type SaslServer::negotiated_mechanism() const {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
+SaslMechanism::Type ServerNegotiation::negotiated_mechanism() const {
   return negotiated_mech_;
 }
 
-const std::string& SaslServer::authenticated_user() const {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
+const string& ServerNegotiation::authenticated_user() const {
   return authenticated_user_;
 }
 
-void SaslServer::set_local_addr(const Sockaddr& addr) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+void ServerNegotiation::set_local_addr(const Sockaddr& addr) {
   helper_.set_local_addr(addr);
 }
 
-void SaslServer::set_remote_addr(const Sockaddr& addr) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+void ServerNegotiation::set_remote_addr(const Sockaddr& addr) {
   helper_.set_remote_addr(addr);
 }
 
-void SaslServer::set_server_fqdn(const string& domain_name) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+void ServerNegotiation::set_server_fqdn(const string& domain_name) {
   helper_.set_server_fqdn(domain_name);
 }
 
-void SaslServer::set_deadline(const MonoTime& deadline) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED);
+void ServerNegotiation::set_deadline(const MonoTime& deadline) {
   deadline_ = deadline;
 }
 
-// calls sasl_server_init() and sasl_server_new()
-Status SaslServer::Init(const string& service_type) {
-  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
-
-  // Ensure we are not called more than once.
-  if (server_state_ != SaslNegotiationState::NEW) {
-    return Status::IllegalState("Init() may only be called once per SaslServer 
object.");
-  }
-
-  // TODO(unknown): Support security flags.
-  unsigned secflags = 0;
-
-  sasl_conn_t* sasl_conn = nullptr;
-  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
-      return sasl_server_new(
-          // Registered name of the service using SASL. Required.
-          service_type.c_str(),
-          // The fully qualified domain name of this server.
-          helper_.server_fqdn(),
-          // Permits multiple user realms on server. NULL == use default.
-          nullptr,
-          // Local and remote IP address strings. (NULL disables
-          // mechanisms which require this info.)
-          helper_.local_addr_string(),
-          helper_.remote_addr_string(),
-          // Connection-specific callbacks.
-          &callbacks_[0],
-          // Security flags.
-          secflags,
-          &sasl_conn);
-    });
-
-  if (PREDICT_FALSE(!s.ok())) {
-    return Status::RuntimeError("Unable to create new SASL server",
-                                s.message());
-  }
-  sasl_conn_.reset(sasl_conn);
-
-  server_state_ = SaslNegotiationState::INITIALIZED;
-  return Status::OK();
-}
-
-Status SaslServer::Negotiate() {
-  // After negotiation, we no longer need the SASL library object, so
-  // may as well free its memory since the connection may be long-lived.
-  auto cleanup = MakeScopedCleanup([&]() {
-      sasl_conn_.reset();
-    });
-  DVLOG(4) << "Called SaslServer::Negotiate()";
-
-  // Ensure we are called exactly once, and in the right order.
-  if (server_state_ == SaslNegotiationState::NEW) {
-    return Status::IllegalState("SaslServer: Init() must be called before 
calling Negotiate()");
-  }
-  if (server_state_ == SaslNegotiationState::NEGOTIATED) {
-    return Status::IllegalState("SaslServer: Negotiate() may only be called 
once per object.");
-  }
+Status ServerNegotiation::Negotiate() {
+  TRACE("Beginning negotiation");
 
   // Ensure we can use blocking calls on the socket during negotiation.
-  RETURN_NOT_OK(EnsureBlockingMode(sock_));
+  RETURN_NOT_OK(EnsureBlockingMode(socket_.get()));
 
   faststring recv_buf;
 
-  // Read connection header
+  // Step 1: Read the connection header.
   RETURN_NOT_OK(ValidateConnectionHeader(&recv_buf));
 
-  nego_ok_ = false;
-  while (!nego_ok_) {
-    TRACE("Waiting for next Negotiation message...");
-    RequestHeader header;
-    Slice param_buf;
-    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, 
&param_buf, deadline_));
+  { // Step 2: Receive and respond to the NEGOTIATE step message.
+    NegotiatePB request;
+    RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+    RETURN_NOT_OK(HandleNegotiate(request));
+  }
 
+  // Step 3: SASL negotiation.
+  RETURN_NOT_OK(InitSaslServer());
+  {
     NegotiatePB request;
-    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &request));
-
-    switch (request.step()) {
-      // NEGOTIATE: They want a list of available mechanisms.
-      case NegotiatePB::NEGOTIATE:
-        RETURN_NOT_OK(HandleNegotiateRequest(request));
-        break;
-
-      // INITIATE: They want to initiate negotiation based on their specified 
mechanism.
-      case NegotiatePB::SASL_INITIATE:
-        RETURN_NOT_OK(HandleInitiateRequest(request));
-        break;
-
-      // RESPONSE: Client sent a new request as a follow-up to a 
SASL_CHALLENGE response.
-      case NegotiatePB::SASL_RESPONSE:
-        RETURN_NOT_OK(HandleResponseRequest(request));
-        break;
-
-      // Client sent us an unsupported Negotiation request.
-      default: {
-        TRACE("SASL Server: Received unsupported request from client");
-        Status s = Status::InvalidArgument("RPC server doesn't support 
negotiation step in request",
-                                           
NegotiatePB::NegotiateStep_Name(request.step()));
-        RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-        return s;
-      }
+    RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+    Status s = HandleSaslInitiate(request);
+
+    while (s.IsIncomplete()) {
+      RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+      s = HandleSaslResponse(request);
     }
+    RETURN_NOT_OK(s);
   }
-
   const char* username = nullptr;
   int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
                         reinterpret_cast<const void**>(&username));
   // We expect that SASL_USERNAME will always get set.
-  CHECK(rc == SASL_OK && username != nullptr)
-      << "No username on authenticated connection";
+  CHECK(rc == SASL_OK && username != nullptr) << "No username on authenticated 
connection";
   authenticated_user_ = username;
 
-  TRACE("SASL Server: Successful negotiation");
-  server_state_ = SaslNegotiationState::NEGOTIATED;
+  // Step 4: Receive connection context.
+  RETURN_NOT_OK(RecvConnectionContext(&recv_buf));
+
+  TRACE("Negotiation successful");
   return Status::OK();
 }
 
-Status SaslServer::ValidateConnectionHeader(faststring* recv_buf) {
-  TRACE("Waiting for connection header");
-  size_t num_read;
-  const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength;
-  recv_buf->resize(conn_header_len);
-  RETURN_NOT_OK(sock_->BlockingRecv(recv_buf->data(), conn_header_len, 
&num_read, deadline_));
-  DCHECK_EQ(conn_header_len, num_read);
+Status ServerNegotiation::PreflightCheckGSSAPI() {
+  // TODO(todd): the error messages that come from this function on el6
+  // are relatively useless due to the following krb5 bug:
+  // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+  // This may not be useful anymore given the keytab login that happens
+  // in security/init.cc.
 
-  RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf));
-  TRACE("Connection header received");
-  return Status::OK();
+  // Initialize a ServerNegotiation with a null socket, and enable
+  // only GSSAPI.
+  //
+  // We aren't going to actually send/receive any messages, but
+  // this makes it easier to reuse the initialization code.
+  ServerNegotiation server(nullptr);
+  Status s = server.EnableGSSAPI();
+  if (!s.ok()) {
+    return Status::RuntimeError(s.message());
+  }
+
+  RETURN_NOT_OK(server.InitSaslServer());
+
+  // Start the SASL server as if we were accepting a connection.
+  const char* server_out = nullptr; // ignored
+  uint32_t server_out_len = 0;
+  s = WrapSaslCall(server.sasl_conn_.get(), [&]() {
+      return sasl_server_start(
+          server.sasl_conn_.get(),
+          kSaslMechGSSAPI,
+          "", 0,  // Pass a 0-length token.
+          &server_out, &server_out_len);
+    });
+
+  // We expect 'Incomplete' status to indicate that the first step of 
negotiation
+  // was correct.
+  if (s.IsIncomplete()) return Status::OK();
+
+  string err_msg = s.message().ToString();
+  if (err_msg == "Permission denied") {
+    // For bad keytab permissions, we get a rather vague message. So,
+    // we make it more specific for better usability.
+    err_msg = "error accessing keytab: " + err_msg;
+  }
+  return Status::RuntimeError(err_msg);
 }
 
-Status SaslServer::ParseNegotiatePB(const RequestHeader& header,
-                                    const Slice& param_buf,
-                                    NegotiatePB* request) {
-  Status s = helper_.SanityCheckNegotiateCallId(header.call_id());
+Status ServerNegotiation::RecvNegotiatePB(NegotiatePB* msg, faststring* 
recv_buf) {
+  RequestHeader header;
+  Slice param_buf;
+  RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, 
&param_buf, deadline_));
+  Status s = helper_.CheckNegotiateCallId(header.call_id());
   if (!s.ok()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s));
+    return s;
   }
 
-  s = helper_.ParseNegotiatePB(param_buf, request);
+  s = helper_.ParseNegotiatePB(param_buf, msg);
   if (!s.ok()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s));
     return s;
   }
 
+  TRACE("Received $0 NegotiatePB request", 
NegotiatePB::NegotiateStep_Name(msg->step()));
   return Status::OK();
 }
 
-Status SaslServer::SendNegotiatePB(const NegotiatePB& msg) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
-      << "Must not send Negotiate messages before calling Init()";
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send Negotiate messages after Negotiate() succeeds";
-
-  // Create header with negotiation-specific callId
+Status ServerNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
   ResponseHeader header;
   header.set_call_id(kNegotiateCallId);
-  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
+
+  DCHECK(socket_);
+  DCHECK(msg.IsInitialized()) << "message must be initialized";
+  DCHECK(msg.has_step()) << "message must have a step";
+
+  TRACE("Sending $0 NegotiatePB response", 
NegotiatePB::NegotiateStep_Name(msg.step()));
+  return SendFramedMessageBlocking(socket(), header, msg, deadline_);
 }
 
-Status SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const 
Status& err) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
-      << "Must not send SASL messages before calling Init()";
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send SASL messages after Negotiate() succeeds";
-  if (err.ok()) {
-    return Status::InvalidArgument("Cannot send error message using OK 
status");
-  }
+Status ServerNegotiation::SendError(ErrorStatusPB::RpcErrorCodePB code, const 
Status& err) {
+  DCHECK(!err.ok());
 
   // Create header with negotiation-specific callId
   ResponseHeader header;
@@ -283,39 +239,87 @@ Status 
SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status
   msg.set_code(code);
   msg.set_message(err.ToString());
 
-  RETURN_NOT_OK(helper_.SendNegotiatePB(sock_, header, msg, deadline_));
-  TRACE("Sent SASL error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code));
+  TRACE("Sending RPC error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code));
+  RETURN_NOT_OK(SendFramedMessageBlocking(socket(), header, msg, deadline_));
+
+  return Status::OK();
+}
+
+Status ServerNegotiation::ValidateConnectionHeader(faststring* recv_buf) {
+  TRACE("Waiting for connection header");
+  size_t num_read;
+  const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength;
+  recv_buf->resize(conn_header_len);
+  RETURN_NOT_OK(socket_->BlockingRecv(recv_buf->data(), conn_header_len, 
&num_read, deadline_));
+  DCHECK_EQ(conn_header_len, num_read);
+
+  RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf));
+  TRACE("Connection header received");
   return Status::OK();
 }
 
-Status SaslServer::HandleNegotiateRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received NEGOTIATE request from client");
+// calls sasl_server_init() and sasl_server_new()
+Status ServerNegotiation::InitSaslServer() {
+  RETURN_NOT_OK(SaslInit());
+
+  // TODO(unknown): Support security flags.
+  unsigned secflags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_server_new(
+          // Registered name of the service using SASL. Required.
+          kSaslProtoName,
+          // The fully qualified domain name of this server.
+          helper_.server_fqdn(),
+          // Permits multiple user realms on server. NULL == use default.
+          nullptr,
+          // Local and remote IP address strings. (NULL disables
+          // mechanisms which require this info.)
+          helper_.local_addr_string(),
+          helper_.remote_addr_string(),
+          // Connection-specific callbacks.
+          &callbacks_[0],
+          // Security flags.
+          secflags,
+          &sasl_conn);
+    }), "Unable to create new SASL server");
+  sasl_conn_.reset(sasl_conn);
+  return Status::OK();
+}
+
+Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
+  if (request.step() != NegotiatePB::NEGOTIATE) {
+    return Status::NotAuthorized("expected NEGOTIATE step",
+                                 
NegotiatePB::NegotiateStep_Name(request.step()));
+  }
+  TRACE("Received NEGOTIATE request from client");
 
   // Fill in the set of features supported by the client.
   for (int flag : request.supported_features()) {
     // We only add the features that our local build knows about.
     RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
                                   static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
-    if (ContainsKey(kSupportedServerRpcFeatureFlags, feature_flag)) {
+    if (feature_flag != UNKNOWN) {
       client_features_.insert(feature_flag);
     }
   }
 
-  set<string> server_mechs = helper_.LocalMechs();
+  set<string> server_mechs = helper_.EnabledMechs();
   if (PREDICT_FALSE(server_mechs.empty())) {
     // This will happen if no mechanisms are enabled before calling Init()
-    Status s = Status::IllegalState("SASL server mechanism list is empty!");
+    Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
     LOG(ERROR) << s.ToString();
-    TRACE("SASL Server: Sending FATAL_UNAUTHORIZED response to client");
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    TRACE("Sending FATAL_UNAUTHORIZED response to client");
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
     return s;
   }
 
-  RETURN_NOT_OK(SendNegotiateResponse(server_mechs));
+  RETURN_NOT_OK(SendNegotiate(server_mechs));
   return Status::OK();
 }
 
-Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) {
+Status ServerNegotiation::SendNegotiate(const set<string>& server_mechs) {
   NegotiatePB response;
   response.set_step(NegotiatePB::NEGOTIATE);
 
@@ -329,31 +333,32 @@ Status SaslServer::SendNegotiateResponse(const 
set<string>& server_mechs) {
   }
 
   RETURN_NOT_OK(SendNegotiatePB(response));
-  TRACE("Sent NEGOTIATE response");
   return Status::OK();
 }
 
-
-Status SaslServer::HandleInitiateRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received INITIATE request from client");
+Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) {
+  if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_INITIATE)) {
+    Status s =  Status::NotAuthorized("expected SASL_INITIATE step",
+                                      
NegotiatePB::NegotiateStep_Name(request.step()));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+  TRACE("Received SASL_INITIATE request from client");
 
   if (request.auths_size() != 1) {
-    Status s = Status::NotAuthorized(StringPrintf(
-          "SASL INITIATE request must include exactly one SaslAuth section, 
found: %d",
-          request.auths_size()));
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    Status s = Status::NotAuthorized(
+        "SASL_INITIATE request must include exactly one SaslAuth section, 
found",
+        std::to_string(request.auths_size()));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
     return s;
   }
 
   const NegotiatePB::SaslAuth& auth = request.auths(0);
-  TRACE("SASL Server: Client requested to use mechanism: $0", 
auth.mechanism());
-
-  // Security issue to display this. Commented out but left for debugging 
purposes.
-  //DVLOG(3) << "SASL server: Client token: " << request.token();
+  TRACE("Client requested to use mechanism: $0", auth.mechanism());
 
   const char* server_out = nullptr;
   uint32_t server_out_len = 0;
-  TRACE("SASL Server: Calling sasl_server_start()");
+  TRACE("Calling sasl_server_start()");
 
   Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
       return sasl_server_start(
@@ -364,55 +369,41 @@ Status SaslServer::HandleInitiateRequest(const 
NegotiatePB& request) {
           &server_out,              // The output of the SASL library, might 
not be NULL terminated
           &server_out_len);         // Output len.
     });
+
   if (PREDICT_FALSE(!s.ok() && !s.IsIncomplete())) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
     return s;
   }
+
   negotiated_mech_ = SaslMechanism::value_of(auth.mechanism());
 
   // We have a valid mechanism match
   if (s.ok()) {
-    nego_ok_ = true;
-    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
+    RETURN_NOT_OK(SendSaslSuccess(server_out, server_out_len));
   } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
-    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
+    RETURN_NOT_OK(SendSaslChallenge(server_out, server_out_len));
   }
-  return Status::OK();
-}
-
-Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen) 
{
-  NegotiatePB response;
-  response.set_step(NegotiatePB::SASL_CHALLENGE);
-  response.mutable_token()->assign(challenge, clen);
-  TRACE("SASL Server: Sending SASL_CHALLENGE response to client");
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  return Status::OK();
+  return s;
 }
 
-Status SaslServer::SendSuccessResponse(const char* token, unsigned tlen) {
-  NegotiatePB response;
-  response.set_step(NegotiatePB::SASL_SUCCESS);
-  if (PREDICT_FALSE(tlen > 0)) {
-    response.mutable_token()->assign(token, tlen);
+Status ServerNegotiation::HandleSaslResponse(const NegotiatePB& request) {
+  if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_RESPONSE)) {
+    Status s =  Status::NotAuthorized("expected SASL_RESPONSE step",
+                                      
NegotiatePB::NegotiateStep_Name(request.step()));
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
   }
-  TRACE("SASL Server: Sending SASL_SUCCESS response to client");
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  return Status::OK();
-}
-
-
-Status SaslServer::HandleResponseRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received RESPONSE request from client");
+  TRACE("Received SASL_RESPONSE request from client");
 
   if (!request.has_token()) {
-    Status s = Status::InvalidArgument("No token in SASL_RESPONSE from 
client");
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    Status s = Status::NotAuthorized("no token in SASL_RESPONSE from client");
+    RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
     return s;
   }
 
   const char* server_out = nullptr;
   uint32_t server_out_len = 0;
-  TRACE("SASL Server: Calling sasl_server_step()");
+  TRACE("Calling sasl_server_step()");
   Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
       return sasl_server_step(
           sasl_conn_.get(),         // The SASL connection context created by 
init()
@@ -421,78 +412,76 @@ Status SaslServer::HandleResponseRequest(const 
NegotiatePB& request) {
           &server_out,              // The output of the SASL library, might 
not be NULL terminated
           &server_out_len);         // Output len
     });
-  if (!s.ok() && !s.IsIncomplete()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
 
   if (s.ok()) {
-    nego_ok_ = true;
-    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
-  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
-    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
+    return SendSaslSuccess(server_out, server_out_len);
   }
-  return Status::OK();
+  if (s.IsIncomplete()) {
+    return SendSaslChallenge(server_out, server_out_len);
+  }
+  RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+  return s;
 }
 
-int SaslServer::GetOptionCb(const char* plugin_name, const char* option,
-                            const char** result, unsigned* len) {
-  return helper_.GetOptionCb(plugin_name, option, result, len);
+Status ServerNegotiation::SendSaslChallenge(const char* challenge, unsigned 
clen) {
+  NegotiatePB response;
+  response.set_step(NegotiatePB::SASL_CHALLENGE);
+  response.mutable_token()->assign(challenge, clen);
+  RETURN_NOT_OK(SendNegotiatePB(response));
+  return Status::Incomplete("");
 }
 
-int SaslServer::PlainAuthCb(sasl_conn_t * /*conn*/, const char * /*user*/, 
const char * /*pass*/,
-                            unsigned /*passlen*/, struct propctx * 
/*propctx*/) {
-  TRACE("SASL Server: Received PLAIN auth.");
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "Password authentication callback called while PLAIN auth 
disabled";
-    return SASL_BADPARAM;
+Status ServerNegotiation::SendSaslSuccess(const char* token, unsigned tlen) {
+  NegotiatePB response;
+  response.set_step(NegotiatePB::SASL_SUCCESS);
+  if (PREDICT_FALSE(tlen > 0)) {
+    response.mutable_token()->assign(token, tlen);
   }
-  // We always allow PLAIN authentication to succeed.
-  return SASL_OK;
+  RETURN_NOT_OK(SendNegotiatePB(response));
+  return Status::OK();
 }
 
-Status SaslServer::PreflightCheckGSSAPI(const string& app_name) {
-  // TODO(todd): the error messages that come from this function on el6
-  // are relatively useless due to the following krb5 bug:
-  // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
-  // This may not be useful anymore given the keytab login that happens
-  // in security/init.cc.
+Status ServerNegotiation::RecvConnectionContext(faststring* recv_buf) {
+  TRACE("Waiting for connection context");
+  RequestHeader header;
+  Slice param_buf;
+  RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, 
&param_buf, deadline_));
+  DCHECK(header.IsInitialized());
 
-  // Initialize a SaslServer with a null socket, and enable
-  // only GSSAPI.
-  //
-  // We aren't going to actually send/receive any messages, but
-  // this makes it easier to reuse the initialization code.
-  SaslServer server(app_name, nullptr);
-  Status s = server.EnableGSSAPI();
-  if (!s.ok()) {
-    return Status::RuntimeError(s.message());
+  if (header.call_id() != kConnectionContextCallId) {
+    return Status::NotAuthorized("expected ConnectionContext callid, received",
+                                 std::to_string(header.call_id()));
   }
 
-  RETURN_NOT_OK(server.Init(app_name));
+  ConnectionContextPB conn_context;
+  if (!conn_context.ParseFromArray(param_buf.data(), param_buf.size())) {
+    return Status::NotAuthorized("invalid ConnectionContextPB message, missing 
fields",
+                                 conn_context.InitializationErrorString());
+  }
 
-  // Start the SASL server as if we were accepting a connection.
-  const char* server_out = nullptr; // ignored
-  uint32_t server_out_len = 0;
-  s = WrapSaslCall(server.sasl_conn_.get(), [&]() {
-      return sasl_server_start(
-          server.sasl_conn_.get(),
-          kSaslMechGSSAPI,
-          "", 0,  // Pass a 0-length token.
-          &server_out, &server_out_len);
-    });
+  // Currently none of the fields of the connection context are used.
+  return Status::OK();
+}
 
-  // We expect 'Incomplete' status to indicate that the first step of 
negotiation
-  // was correct.
-  if (s.IsIncomplete()) return Status::OK();
+int ServerNegotiation::GetOptionCb(const char* plugin_name,
+                                   const char* option,
+                                   const char** result,
+                                   unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
 
-  string err_msg = s.message().ToString();
-  if (err_msg == "Permission denied") {
-    // For bad keytab permissions, we get a rather vague message. So,
-    // we make it more specific for better usability.
-    err_msg = "error accessing keytab: " + err_msg;
+int ServerNegotiation::PlainAuthCb(sasl_conn_t* /*conn*/,
+                                   const char*  /*user*/,
+                                   const char*  /*pass*/,
+                                   unsigned /*passlen*/,
+                                   struct propctx*  /*propctx*/) {
+  TRACE("Received PLAIN auth.");
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Password authentication callback called while PLAIN auth 
disabled";
+    return SASL_BADPARAM;
   }
-  return Status::RuntimeError(err_msg);
+  // We always allow PLAIN authentication to succeed.
+  return SASL_OK;
 }
 
 } // namespace rpc

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h 
b/src/kudu/rpc/server_negotiation.h
index 53f674d..089bc0b 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -15,15 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_RPC_SASL_SERVER_H
-#define KUDU_RPC_SASL_SERVER_H
+#pragma once
 
+#include <memory>
 #include <set>
 #include <string>
 #include <vector>
 
 #include <sasl/sasl.h>
 
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
@@ -37,23 +38,24 @@ class Slice;
 
 namespace rpc {
 
-using std::string;
-
-// Class for doing SASL negotiation with a SaslClient over a bidirectional 
socket.
+// Class for doing KRPC negotiation with a remote client over a bidirectional 
socket.
 // Operations on this class are NOT thread-safe.
-class SaslServer {
+class ServerNegotiation {
  public:
-  // Does not take ownership of 'socket'.
-  SaslServer(string app_name, Socket* socket);
+  // Creates a new server negotiation instance, taking ownership of the
+  // provided socket. After completing the negotiation process by setting the
+  // desired options and calling Negotiate((), the socket can be retrieved with
+  // release_socket().
+  explicit ServerNegotiation(std::unique_ptr<Socket> socket);
 
   // Enable PLAIN authentication.
   // Despite PLAIN authentication taking a username and password, we disregard
   // the password and use this as a "unauthenticated" mode.
-  // Must be called after Init().
+  // Must be called before Negotiate().
   Status EnablePlain();
 
   // Enable GSSAPI (Kerberos) authentication.
-  // Call after Init().
+  // Must be called before Negotiate().
   Status EnableGSSAPI();
 
   // Returns mechanism negotiated by this connection.
@@ -62,41 +64,49 @@ class SaslServer {
 
   // Returns the set of RPC system features supported by the remote client.
   // Must be called after Negotiate().
-  const std::set<RpcFeatureFlag>& client_features() const {
+  std::set<RpcFeatureFlag> client_features() const {
     return client_features_;
   }
 
+  // Returns the set of RPC system features supported by the remote client.
+  // Must be called after Negotiate().
+  // Subsequent calls to this method or client_features() will return an empty 
set.
+  std::set<RpcFeatureFlag> take_client_features() {
+    return std::move(client_features_);
+  }
+
   // Name of the user that was authenticated.
   // Must be called after a successful Negotiate().
   const std::string& authenticated_user() const;
 
   // Specify IP:port of local side of connection.
-  // Must be called before Init(). Required for some mechanisms.
+  // Must be called before Negotiate(). Required for some mechanisms.
   void set_local_addr(const Sockaddr& addr);
 
   // Specify IP:port of remote side of connection.
-  // Must be called before Init(). Required for some mechanisms.
+  // Must be called before Negotiate(). Required for some mechanisms.
   void set_remote_addr(const Sockaddr& addr);
 
   // Specify the fully-qualified domain name of the remote server.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_server_fqdn(const string& domain_name);
+  // Must be called before Negotiate(). Required for some mechanisms.
+  void set_server_fqdn(const std::string& domain_name);
 
   // Set deadline for connection negotiation.
   void set_deadline(const MonoTime& deadline);
 
-  // Get deadline for connection negotiation.
-  const MonoTime& deadline() const { return deadline_; }
+  Socket* socket() const { return socket_.get(); }
 
-  // Initialize a new SASL server. Must be called before Negotiate().
-  // Returns OK on success, otherwise RuntimeError.
-  Status Init(const string& service_type);
+  // Returns the socket owned by this server negotiation. The caller will own
+  // the socket after this call, and the negotiation instance should no longer
+  // be used. Must be called after Negotiate().
+  std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
 
-  // Begin negotiation with the SASL client on the other side of the fd socket
-  // that this server was constructed with.
-  // Returns OK on success.
-  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK 
status.
-  Status Negotiate();
+  // Negotiate with the remote client. Should only be called once per
+  // ServerNegotiation and socket instance, after all options have been set.
+  //
+  // Returns OK on success, otherwise may return NotAuthorized, NotSupported, 
or
+  // another non-OK status.
+  Status Negotiate() WARN_UNUSED_RESULT;
 
   // SASL callback for plugin options, supported mechanisms, etc.
   // Returns SASL_FAIL if the option is not handled, which does not fail the 
handshake.
@@ -109,72 +119,71 @@ class SaslServer {
 
   // Perform a "pre-flight check" that everything required to act as a Kerberos
   // server is properly set up.
-  static Status PreflightCheckGSSAPI(const std::string& app_name);
+  static Status PreflightCheckGSSAPI() WARN_UNUSED_RESULT;
 
  private:
-  // Parse and validate connection header.
-  Status ValidateConnectionHeader(faststring* recv_buf);
 
-  // Parse request body. If malformed, sends an error message to the client.
-  Status ParseNegotiatePB(const RequestHeader& header,
-                          const Slice& param_buf,
-                          NegotiatePB* request);
+  // Parse a negotiate request from the client, deserializing it into 'msg'.
+  // If the request is malformed, sends an error message to the client.
+  Status RecvNegotiatePB(NegotiatePB* msg, faststring* recv_buf) 
WARN_UNUSED_RESULT;
 
-  // Encode and send the specified SASL message to the client.
-  Status SendNegotiatePB(const NegotiatePB& msg);
+  // Encode and send the specified negotiate response message to the server.
+  Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
 
   // Encode and send the specified RPC error message to the client.
   // Calls Status.ToString() for the embedded error message.
-  Status SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err);
+  Status SendError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) 
WARN_UNUSED_RESULT;
+
+  // Parse and validate connection header.
+  Status ValidateConnectionHeader(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+  // Initialize the SASL server negotiation instance.
+  Status InitSaslServer() WARN_UNUSED_RESULT;
 
   // Handle case when client sends NEGOTIATE request.
-  Status HandleNegotiateRequest(const NegotiatePB& request);
+  Status HandleNegotiate(const NegotiatePB& request) WARN_UNUSED_RESULT;
 
   // Send a NEGOTIATE response to the client with the list of available 
mechanisms.
-  Status SendNegotiateResponse(const std::set<string>& server_mechs);
+  Status SendNegotiate(const std::set<std::string>& server_mechs) 
WARN_UNUSED_RESULT;
+
+  // Handle case when client sends SASL_INITIATE request.
+  // Returns Status::OK if the SASL negotiation is complete, or
+  // Status::Incomplete if a SASL_RESPONSE step is expected.
+  Status HandleSaslInitiate(const NegotiatePB& request) WARN_UNUSED_RESULT;
 
-  // Handle case when client sends INITIATE request.
-  Status HandleInitiateRequest(const NegotiatePB& request);
+  // Handle case when client sends SASL_RESPONSE request.
+  Status HandleSaslResponse(const NegotiatePB& request) WARN_UNUSED_RESULT;
 
-  // Send a CHALLENGE response to the client with a challenge token.
-  Status SendChallengeResponse(const char* challenge, unsigned clen);
+  // Send a SASL_CHALLENGE response to the client with a challenge token.
+  Status SendSaslChallenge(const char* challenge, unsigned clen) 
WARN_UNUSED_RESULT;
 
-  // Send a SUCCESS response to the client with an token (typically empty).
-  Status SendSuccessResponse(const char* token, unsigned tlen);
+  // Send a SASL_SUCCESS response to the client with an token (typically 
empty).
+  Status SendSaslSuccess(const char* token, unsigned tlen) WARN_UNUSED_RESULT;
 
-  // Handle case when client sends RESPONSE request.
-  Status HandleResponseRequest(const NegotiatePB& request);
+  // Receive and validate the ConnectionContextPB.
+  Status RecvConnectionContext(faststring* recv_buf) WARN_UNUSED_RESULT;
 
-  string app_name_;
-  Socket* sock_;
+  // The socket to the remote client.
+  std::unique_ptr<Socket> socket_;
+
+  // SASL state.
   std::vector<sasl_callback_t> callbacks_;
-  // The SASL connection object. This is initialized in Init() and
-  // freed after Negotiate() completes (regardless whether it was successful).
   gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
   SaslHelper helper_;
 
-  // The set of features that the client supports. Filled in
-  // after we receive the NEGOTIATE request from the client.
+  // The set of features supported by the client. Filled in during negotiation.
   std::set<RpcFeatureFlag> client_features_;
 
-  // The successfully-authenticated user, if applicable.
-  string authenticated_user_;
-
-  SaslNegotiationState::Type server_state_;
+  // The successfully-authenticated user, if applicable. Filled in during
+  // negotiation.
+  std::string authenticated_user_;
 
-  // The mechanism we negotiated with the client.
+  // The SASL mechanism. Filled in during negotiation.
   SaslMechanism::Type negotiated_mech_;
 
-  // Intra-negotiation state.
-  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from 
the SASL library?
-
   // Negotiation timeout deadline.
   MonoTime deadline_;
-
-  DISALLOW_COPY_AND_ASSIGN(SaslServer);
 };
 
 } // namespace rpc
 } // namespace kudu
-
-#endif  // KUDU_RPC_SASL_SERVER_H

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/security/ssl_socket.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/ssl_socket.cc b/src/kudu/security/ssl_socket.cc
index 126d2f1..c2725c2 100644
--- a/src/kudu/security/ssl_socket.cc
+++ b/src/kudu/security/ssl_socket.cc
@@ -44,6 +44,7 @@ SSLSocket::SSLSocket(int fd, SSL* ssl, bool is_server) :
 }
 
 SSLSocket::~SSLSocket() {
+  WARN_NOT_OK(Close(), "unable to close SSL socket in destructor");
 }
 
 Status SSLSocket::DoHandshake() {
@@ -154,7 +155,10 @@ Status SSLSocket::Recv(uint8_t *buf, int32_t amt, int32_t 
*nread) {
 }
 
 Status SSLSocket::Close() {
-  CHECK(ssl_);
+  if (!ssl_) {
+    // Socket is already closed.
+    return Status::OK();
+  }
   ERR_clear_error();
   errno = 0;
   int32_t ret = SSL_shutdown(ssl_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/security/ssl_socket.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/ssl_socket.h b/src/kudu/security/ssl_socket.h
index 4f67d48..f7570fb 100644
--- a/src/kudu/security/ssl_socket.h
+++ b/src/kudu/security/ssl_socket.h
@@ -32,9 +32,10 @@ class Sockaddr;
 
 class SSLSocket : public Socket {
  public:
+
   SSLSocket(int fd, SSL* ssl, bool is_server);
 
-  ~SSLSocket();
+  ~SSLSocket() override;
 
   // Do the SSL handshake as a client or a server and verify that the 
credentials were correctly
   // verified.

Reply via email to