Repository: kudu Updated Branches: refs/heads/master b9aa5dd31 -> dc8525358
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/reactor.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index f178a2d..c8f523c 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -18,17 +18,19 @@ #include "kudu/rpc/reactor.h" #include <arpa/inet.h> -#include <boost/intrusive/list.hpp> -#include <ev++.h> -#include <glog/logging.h> -#include <mutex> #include <netinet/in.h> #include <stdlib.h> -#include <string> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> +#include <mutex> +#include <string> + +#include <boost/intrusive/list.hpp> +#include <ev++.h> +#include <glog/logging.h> + #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stringprintf.h" #include "kudu/rpc/client_negotiation.h" @@ -85,7 +87,7 @@ Status ShutdownError(bool aborted) { } } // anonymous namespace -ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder &bld) +ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld) : loop_(kDefaultLibEvFlags), cur_time_(MonoTime::Now()), last_unused_tcp_scan_(cur_time_), @@ -191,7 +193,7 @@ void ReactorThread::WakeThread() { // threads that want to bring something to our attention, like the fact that // we're shutting down, or the fact that there is a new outbound Transfer // ready to send. -void ReactorThread::AsyncHandler(ev::async &watcher, int revents) { +void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) { DCHECK(IsCurrentThread()); if (PREDICT_FALSE(reactor_->closing())) { @@ -204,13 +206,13 @@ void ReactorThread::AsyncHandler(ev::async &watcher, int revents) { reactor_->DrainTaskQueue(&tasks); while (!tasks.empty()) { - ReactorTask &task = tasks.front(); + ReactorTask& task = tasks.front(); tasks.pop_front(); task.Run(this); } } -void ReactorThread::RegisterConnection(const scoped_refptr<Connection>& conn) { +void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) { DCHECK(IsCurrentThread()); Status s = StartConnectionNegotiation(conn); @@ -219,10 +221,10 @@ void ReactorThread::RegisterConnection(const scoped_refptr<Connection>& conn) { DestroyConnection(conn.get(), s); return; } - server_conns_.push_back(conn); + server_conns_.emplace_back(std::move(conn)); } -void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall> &call) { +void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) { DCHECK(IsCurrentThread()); scoped_refptr<Connection> conn; @@ -242,7 +244,7 @@ void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall> &call) { // 2. every tcp_conn_timeo_ seconds, close down connections older than // tcp_conn_timeo_ seconds. // -void ReactorThread::TimerHandler(ev::timer &watcher, int revents) { +void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) { DCHECK(IsCurrentThread()); if (EV_ERROR & revents) { LOG(WARNING) << "Reactor " << name() << " got an error in " @@ -293,7 +295,7 @@ void ReactorThread::ScanIdleConnections() { VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections."; } -const std::string &ReactorThread::name() const { +const std::string& ReactorThread::name() const { return reactor_->name(); } @@ -321,7 +323,7 @@ void ReactorThread::RunThread() { reactor_->messenger_.reset(); } -Status ReactorThread::FindOrStartConnection(const ConnectionId &conn_id, +Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id, scoped_refptr<Connection>* conn) { DCHECK(IsCurrentThread()); conn_map_t::const_iterator c = client_conns_.find(conn_id); @@ -348,7 +350,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId &conn_id, } // Register the new connection in our map. - *conn = new Connection(this, conn_id.remote(), new_socket.release(), Connection::CLIENT); + *conn = new Connection(this, conn_id.remote(), std::move(new_socket), Connection::CLIENT); (*conn)->set_user_credentials(conn_id.user_credentials()); // Kick off blocking client connection negotiation. @@ -377,12 +379,12 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection> ADOPT_TRACE(trace.get()); TRACE("Submitting negotiation task for $0", conn->ToString()); RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure( - Bind(&Negotiation::RunNegotiation, conn, deadline))); + Bind(&Negotiation::RunNegotiation, conn, deadline))); return Status::OK(); } void ReactorThread::CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn, - const Status &status) { + const Status& status) { DCHECK(IsCurrentThread()); if (PREDICT_FALSE(!status.ok())) { DestroyConnection(conn.get(), status); @@ -396,6 +398,7 @@ void ReactorThread::CompleteConnectionNegotiation(const scoped_refptr<Connection DestroyConnection(conn.get(), s); return; } + conn->MarkNegotiationComplete(); conn->EpollRegister(loop_); } @@ -405,13 +408,13 @@ Status ReactorThread::CreateClientSocket(Socket *sock) { if (ret.ok()) { ret = sock->SetNoDelay(true); } - LOG_IF(WARNING, !ret.ok()) << "failed to create an " - "outbound connection because a new socket could not " - "be created: " << ret.ToString(); + LOG_IF(WARNING, !ret.ok()) + << "failed to create an outbound connection because a new socket could not be created: " + << ret.ToString(); return ret; } -Status ReactorThread::StartConnect(Socket *sock, const Sockaddr &remote, bool *in_progress) { +Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote, bool *in_progress) { Status ret = sock->Connect(remote); if (ret.ok()) { VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString(); @@ -432,7 +435,7 @@ Status ReactorThread::StartConnect(Socket *sock, const Sockaddr &remote, bool *i } void ReactorThread::DestroyConnection(Connection *conn, - const Status &conn_status) { + const Status& conn_status) { DCHECK(IsCurrentThread()); conn->Shutdown(conn_status); @@ -455,9 +458,12 @@ void ReactorThread::DestroyConnection(Connection *conn, } } -DelayedTask::DelayedTask(boost::function<void(const Status &)> func, +DelayedTask::DelayedTask(boost::function<void(const Status&)> func, MonoDelta when) - : func_(std::move(func)), when_(std::move(when)), thread_(nullptr) {} + : func_(std::move(func)), + when_(when), + thread_(nullptr) { +} void DelayedTask::Run(ReactorThread* thread) { DCHECK(thread_ == nullptr) << "Task has already been scheduled"; @@ -492,7 +498,7 @@ void DelayedTask::TimerHandler(ev::timer& watcher, int revents) { } Reactor::Reactor(const shared_ptr<Messenger>& messenger, - int index, const MessengerBuilder &bld) + int index, const MessengerBuilder& bld) : messenger_(messenger), name_(StringPrintf("%s_R%03d", messenger->name().c_str(), index)), closing_(false), @@ -529,7 +535,7 @@ Reactor::~Reactor() { Shutdown(); } -const std::string &Reactor::name() const { +const std::string& Reactor::name() const { return name_; } @@ -544,11 +550,11 @@ class RunFunctionTask : public ReactorTask { explicit RunFunctionTask(boost::function<Status()> f) : function_(std::move(f)), latch_(1) {} - virtual void Run(ReactorThread *reactor) OVERRIDE { + void Run(ReactorThread* /*reactor*/) override { status_ = function_(); latch_.CountDown(); } - virtual void Abort(const Status &status) OVERRIDE { + void Abort(const Status& status) override { status_ = status; latch_.CountDown(); } @@ -584,16 +590,16 @@ Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, class RegisterConnectionTask : public ReactorTask { public: - explicit RegisterConnectionTask(const scoped_refptr<Connection>& conn) : - conn_(conn) - {} + explicit RegisterConnectionTask(scoped_refptr<Connection> conn) + : conn_(std::move(conn)) { + } - virtual void Run(ReactorThread *thread) OVERRIDE { - thread->RegisterConnection(conn_); + void Run(ReactorThread* reactor) override { + reactor->RegisterConnection(std::move(conn_)); delete this; } - virtual void Abort(const Status &status) OVERRIDE { + void Abort(const Status& /*status*/) override { // We don't need to Shutdown the connection since it was never registered. // This is only used for inbound connections, and inbound connections will // never have any calls added to them until they've been registered. @@ -604,7 +610,7 @@ class RegisterConnectionTask : public ReactorTask { scoped_refptr<Connection> conn_; }; -void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr &remote) { +void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) { VLOG(3) << name_ << ": new inbound connection to " << remote.ToString(); std::unique_ptr<Socket> new_socket; if (messenger()->ssl_enabled()) { @@ -612,9 +618,8 @@ void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr &remote) { } else { new_socket.reset(new Socket(socket->Release())); } - scoped_refptr<Connection> conn( - new Connection(&thread_, remote, new_socket.release(), Connection::SERVER)); - auto task = new RegisterConnectionTask(conn); + auto task = new RegisterConnectionTask( + new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER)); ScheduleReactorTask(task); } @@ -625,12 +630,12 @@ class AssignOutboundCallTask : public ReactorTask { explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call) : call_(std::move(call)) {} - virtual void Run(ReactorThread *reactor) OVERRIDE { + void Run(ReactorThread* reactor) override { reactor->AssignOutboundCall(call_); delete this; } - virtual void Abort(const Status &status) OVERRIDE { + void Abort(const Status& status) override { call_->SetFailed(status); delete this; } @@ -639,7 +644,7 @@ class AssignOutboundCallTask : public ReactorTask { shared_ptr<OutboundCall> call_; }; -void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { +void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) { DVLOG(3) << name_ << ": queueing outbound call " << call->ToString() << " to remote " << call->conn_id().remote().ToString(); AssignOutboundCallTask *task = new AssignOutboundCallTask(call); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/reactor.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h index 54f3332..f9f5662 100644 --- a/src/kudu/rpc/reactor.h +++ b/src/kudu/rpc/reactor.h @@ -17,16 +17,18 @@ #ifndef KUDU_RPC_REACTOR_H #define KUDU_RPC_REACTOR_H -#include <boost/function.hpp> -#include <boost/intrusive/list.hpp> -#include <ev++.h> +#include <stdint.h> + #include <list> #include <map> #include <memory> #include <set> -#include <stdint.h> #include <string> +#include <boost/function.hpp> +#include <boost/intrusive/list.hpp> +#include <ev++.h> + #include "kudu/gutil/ref_counted.h" #include "kudu/rpc/connection.h" #include "kudu/rpc/transfer.h" @@ -37,9 +39,12 @@ #include "kudu/util/status.h" namespace kudu { + +class Socket; + namespace rpc { -typedef std::list<scoped_refptr<Connection> > conn_list_t; +typedef std::list<scoped_refptr<Connection>> conn_list_t; class DumpRunningRpcsRequestPB; class DumpRunningRpcsResponsePB; @@ -89,10 +94,10 @@ class DelayedTask : public ReactorTask { DelayedTask(boost::function<void(const Status &)> func, MonoDelta when); // Schedules the task for running later but doesn't actually run it yet. - virtual void Run(ReactorThread* reactor) OVERRIDE; + void Run(ReactorThread* thread) override; // Behaves like ReactorTask::Abort. - virtual void Abort(const Status& abort_status) OVERRIDE; + void Abort(const Status& abort_status) override; private: // libev callback for when the registered timer fires. @@ -172,7 +177,7 @@ class ReactorThread { // Transition back from negotiating to processing requests. // Must be called from the reactor thread. void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn, - const Status& status); + const Status& status); // Collect metrics. // Must be called from the reactor thread. @@ -217,7 +222,7 @@ class ReactorThread { void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call); // Register a new connection. - void RegisterConnection(const scoped_refptr<Connection>& conn); + void RegisterConnection(scoped_refptr<Connection> conn); // Actually perform shutdown of the thread, tearing down any connections, // etc. This is called from within the thread. http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index ea9d266..9769ffe 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -365,8 +365,8 @@ TEST_F(TestRpc, TestNegotiationTimeout) { // Create another thread to accept the connection on the fake server. scoped_refptr<Thread> acceptor_thread; ASSERT_OK(Thread::Create("test", "acceptor", - AcceptAndReadForever, &listen_sock, - &acceptor_thread)); + AcceptAndReadForever, &listen_sock, + &acceptor_thread)); // Set up client. shared_ptr<Messenger> client_messenger(CreateMessenger("Client")); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_common.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc index 359dc80..5e4aae6 100644 --- a/src/kudu/rpc/sasl_common.cc +++ b/src/kudu/rpc/sasl_common.cc @@ -30,6 +30,7 @@ #include "kudu/gutil/macros.h" #include "kudu/gutil/once.h" #include "kudu/gutil/stringprintf.h" +#include "kudu/rpc/constants.h" #include "kudu/util/flag_tags.h" #include "kudu/util/mutex.h" #include "kudu/util/net/sockaddr.h" @@ -46,11 +47,8 @@ const char* const kSaslMechGSSAPI = "GSSAPI"; static __thread string* g_auth_failure_capture = nullptr; // Determine whether initialization was ever called -struct InitializationData { - Status status; - string app_name; -}; -static struct InitializationData* sasl_init_data; +static Status sasl_init_status = Status::OK(); +static bool sasl_is_initialized = false; // If true, then we expect someone else has initialized SASL. static bool g_disable_sasl_init = false; @@ -203,14 +201,9 @@ static bool SaslMutexImplementationProvided() { #endif // Actually perform the initialization for the SASL subsystem. -// Meant to be called via GoogleOnceInitArg(). -static void DoSaslInit(void* app_name_char_array) { - // Explicitly cast from void* here so GoogleOnce doesn't have to deal with it. - // We were getting Clang 3.4 UBSAN errors when letting GoogleOnce cast. - const char* const app_name = reinterpret_cast<const char* const>(app_name_char_array); +// Meant to be called via GoogleOnceInit(). +static void DoSaslInit() { VLOG(3) << "Initializing SASL library"; - sasl_init_data = new InitializationData(); - sasl_init_data->app_name = app_name; bool sasl_initialized = SaslIsInitialized(); if (sasl_initialized && !g_disable_sasl_init) { @@ -222,7 +215,7 @@ static void DoSaslInit(void* app_name_char_array) { if (g_disable_sasl_init) { if (!sasl_initialized) { - sasl_init_data->status = Status::RuntimeError( + sasl_init_status = Status::RuntimeError( "SASL initialization was disabled, but SASL was not externally initialized."); return; } @@ -232,30 +225,29 @@ static void DoSaslInit(void* app_name_char_array) { << "but was not provided with a mutex implementation! If " << "manually initializing SASL, use sasl_set_mutex(3)."; } - sasl_init_data->status = Status::OK(); return; } internal::SaslSetMutex(); int result = sasl_client_init(&callbacks[0]); if (result != SASL_OK) { - sasl_init_data->status = Status::RuntimeError("Could not initialize SASL client", - sasl_errstring(result, nullptr, nullptr)); + sasl_init_status = Status::RuntimeError("Could not initialize SASL client", + sasl_errstring(result, nullptr, nullptr)); return; } - result = sasl_server_init(&callbacks[0], sasl_init_data->app_name.c_str()); + result = sasl_server_init(&callbacks[0], kSaslAppName); if (result != SASL_OK) { - sasl_init_data->status = Status::RuntimeError("Could not initialize SASL server", - sasl_errstring(result, nullptr, nullptr)); + sasl_init_status = Status::RuntimeError("Could not initialize SASL server", + sasl_errstring(result, nullptr, nullptr)); return; } - sasl_init_data->status = Status::OK(); + sasl_is_initialized = true; } Status DisableSaslInitialization() { if (g_disable_sasl_init) return Status::OK(); - if (sasl_init_data != nullptr) { + if (sasl_is_initialized) { return Status::IllegalState("SASL already initialized. Initialization can only be disabled " "before first usage."); } @@ -263,18 +255,11 @@ Status DisableSaslInitialization() { return Status::OK(); } -Status SaslInit(const char* app_name) { +Status SaslInit() { // Only execute SASL initialization once static GoogleOnceType once = GOOGLE_ONCE_INIT; - GoogleOnceInitArg(&once, - &DoSaslInit, - // This is a bit ugly, but Clang 3.4 UBSAN complains otherwise. - reinterpret_cast<void*>(const_cast<char*>(app_name))); - if (PREDICT_FALSE(sasl_init_data->app_name != app_name)) { - return Status::InvalidArgument("SaslInit called successively with different arguments", - StringPrintf("Previous: %s, current: %s", sasl_init_data->app_name.c_str(), app_name)); - } - return sasl_init_data->status; + GoogleOnceInit(&once, &DoSaslInit); + return sasl_init_status; } static string CleanSaslError(const string& err) { http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_common.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_common.h b/src/kudu/rpc/sasl_common.h index 419fc5f..53b713e 100644 --- a/src/kudu/rpc/sasl_common.h +++ b/src/kudu/rpc/sasl_common.h @@ -53,7 +53,7 @@ extern const char* const kSaslMechGSSAPI; // // This function is thread safe and uses a static lock. // This function should NOT be called during static initialization. -Status SaslInit(const char* app_name); +Status SaslInit(); // Disable Kudu's initialization of SASL. See equivalent method in client.h. Status DisableSaslInitialization(); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_helper.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_helper.cc b/src/kudu/rpc/sasl_helper.cc index 4e408f7..0614f3e 100644 --- a/src/kudu/rpc/sasl_helper.cc +++ b/src/kudu/rpc/sasl_helper.cc @@ -17,28 +17,24 @@ #include "kudu/rpc/sasl_helper.h" -#include <set> #include <string> #include <glog/logging.h> #include <google/protobuf/message_lite.h> -#include "kudu/gutil/endian.h" -#include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/rpc/blocking_ops.h" #include "kudu/rpc/constants.h" #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/sasl_common.h" #include "kudu/rpc/serialization.h" -#include "kudu/util/faststring.h" -#include "kudu/util/monotime.h" #include "kudu/util/status.h" +using std::string; + namespace kudu { namespace rpc { @@ -46,13 +42,10 @@ using google::protobuf::MessageLite; SaslHelper::SaslHelper(PeerType peer_type) : peer_type_(peer_type), - conn_header_exchanged_(false), + global_mechs_(SaslListAvailableMechs()), plain_enabled_(false), gssapi_enabled_(false) { - tag_ = (peer_type_ == SERVER) ? "Sasl Server" : "Sasl Client"; -} - -SaslHelper::~SaslHelper() { + tag_ = (peer_type_ == SERVER) ? "Server" : "Client"; } void SaslHelper::set_local_addr(const Sockaddr& addr) { @@ -76,27 +69,11 @@ const char* SaslHelper::server_fqdn() const { return server_fqdn_.empty() ? nullptr : server_fqdn_.c_str(); } -const std::set<std::string>& SaslHelper::GlobalMechs() const { - if (!global_mechs_) { - global_mechs_.reset(new set<string>(SaslListAvailableMechs())); - } - return *global_mechs_; -} - -void SaslHelper::AddToLocalMechList(const string& mech) { - mechs_.insert(mech); -} - -const std::set<std::string>& SaslHelper::LocalMechs() const { - return mechs_; +const char* SaslHelper::EnabledMechsString() const { + JoinStrings(enabled_mechs_, " ", &enabled_mechs_string_); + return enabled_mechs_string_.c_str(); } -const char* SaslHelper::LocalMechListString() const { - JoinStrings(mechs_, " ", &mech_list_); - return mech_list_.c_str(); -} - - int SaslHelper::GetOptionCb(const char* plugin_name, const char* option, const char** result, unsigned* len) { DVLOG(4) << tag_ << ": GetOption Callback called. "; @@ -112,7 +89,7 @@ int SaslHelper::GetOptionCb(const char* plugin_name, const char* option, if (plugin_name == nullptr) { // SASL library option, not a plugin option if (strcmp(option, "mech_list") == 0) { - *result = LocalMechListString(); + *result = EnabledMechsString(); if (len != nullptr) *len = strlen(*result); VLOG(4) << tag_ << ": Enabled mech list: " << *result; return SASL_OK; @@ -137,10 +114,10 @@ Status SaslHelper::EnableGSSAPI() { } Status SaslHelper::EnableMechanism(const string& mech) { - if (PREDICT_FALSE(!ContainsKey(GlobalMechs(), mech))) { + if (PREDICT_FALSE(!ContainsKey(global_mechs_, mech))) { return Status::InvalidArgument("unable to find SASL plugin", mech); } - AddToLocalMechList(mech); + enabled_mechs_.insert(mech); return Status::OK(); } @@ -148,11 +125,11 @@ bool SaslHelper::IsPlainEnabled() const { return plain_enabled_; } -Status SaslHelper::SanityCheckNegotiateCallId(int32_t call_id) const { +Status SaslHelper::CheckNegotiateCallId(int32_t call_id) const { if (call_id != kNegotiateCallId) { Status s = Status::IllegalState(strings::Substitute( - "Non-Negotiate request during negotiation. Expected callId: $0, received callId: $1", - kNegotiateCallId, call_id)); + "Received illegal call-id during negotiation; expected: $0, received: $1", + kNegotiateCallId, call_id)); LOG(DFATAL) << tag_ << ": " << s.ToString(); return s; } @@ -167,27 +144,5 @@ Status SaslHelper::ParseNegotiatePB(const Slice& param_buf, NegotiatePB* msg) { return Status::OK(); } -Status SaslHelper::SendNegotiatePB(Socket* sock, - const MessageLite& header, - const MessageLite& msg, - const MonoTime& deadline) { - DCHECK(sock != nullptr); - DCHECK(header.IsInitialized()) << tag_ << ": Header must be initialized"; - DCHECK(msg.IsInitialized()) << tag_ << ": Message must be initialized"; - - // Write connection header, if needed - if (PREDICT_FALSE(peer_type_ == CLIENT && !conn_header_exchanged_)) { - const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength; - uint8_t buf[buflen]; - serialization::SerializeConnHeader(buf); - size_t nsent; - RETURN_NOT_OK(sock->BlockingWrite(buf, buflen, &nsent, deadline)); - conn_header_exchanged_ = true; - } - - RETURN_NOT_OK(SendFramedMessageBlocking(sock, header, msg, deadline)); - return Status::OK(); -} - } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/sasl_helper.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/sasl_helper.h b/src/kudu/rpc/sasl_helper.h index f0a676e..05d6904 100644 --- a/src/kudu/rpc/sasl_helper.h +++ b/src/kudu/rpc/sasl_helper.h @@ -23,30 +23,19 @@ #include <sasl/sasl.h> -#include "kudu/gutil/gscoped_ptr.h" -#include "kudu/gutil/macros.h" -#include "kudu/util/net/socket.h" - -namespace google { -namespace protobuf { -class MessageLite; -} // namespace protobuf -} // namespace google +#include "kudu/util/status.h" namespace kudu { -class MonoTime; class Sockaddr; -class Status; namespace rpc { -using std::string; - class NegotiatePB; -// Helper class which contains functionality that is common to SaslClient & SaslServer. -// Most of these methods are convenience methods for interacting with the libsasl2 library. +// Helper class which contains functionality that is common to client and server +// SASL negotiations. Most of these methods are convenience methods for +// interacting with the libsasl2 library. class SaslHelper { public: enum PeerType { @@ -55,7 +44,7 @@ class SaslHelper { }; explicit SaslHelper(PeerType peer_type); - ~SaslHelper(); + ~SaslHelper() = default; // Specify IP:port of local side of connection. void set_local_addr(const Sockaddr& addr); @@ -66,20 +55,18 @@ class SaslHelper { const char* remote_addr_string() const; // Specify the fully-qualified domain name of the remote server. - void set_server_fqdn(const string& domain_name); + void set_server_fqdn(const std::string& domain_name); const char* server_fqdn() const; // Globally-registered available SASL plugins. - const std::set<string>& GlobalMechs() const; + const std::set<std::string>& GlobalMechs() const { + return global_mechs_; + } // Helper functions for managing the list of active SASL mechanisms. - void AddToLocalMechList(const string& mech); - const std::set<string>& LocalMechs() const; - - // Returns space-delimited local mechanism list string suitable for passing - // to libsasl2, such as via "mech_list" callbacks. - // The returned pointer is valid only until the next call to LocalMechListString(). - const char* LocalMechListString() const; + const std::set<std::string>& EnabledMechs() const { + return enabled_mechs_; + } // Implements the client_mech_list / mech_list callbacks. int GetOptionCb(const char* plugin_name, const char* option, const char** result, unsigned* len); @@ -95,31 +82,29 @@ class SaslHelper { // Sanity check that the call ID is the negotiation call ID. // Logs DFATAL if call_id does not match. - Status SanityCheckNegotiateCallId(int32_t call_id) const; + Status CheckNegotiateCallId(int32_t call_id) const; // Parse msg from the given Slice. Status ParseNegotiatePB(const Slice& param_buf, NegotiatePB* msg); - // Encode and send a message over a socket, sending the connection header if necessary. - Status SendNegotiatePB(Socket* sock, - const google::protobuf::MessageLite& header, - const google::protobuf::MessageLite& msg, - const MonoTime& deadline); - private: Status EnableMechanism(const std::string& mech); - string local_addr_; - string remote_addr_; - string server_fqdn_; + // Returns space-delimited local mechanism list string suitable for passing + // to libsasl2, such as via "mech_list" callbacks. + // The returned pointer is valid only until the next call to EnabledMechsString(). + const char* EnabledMechsString() const; + + std::string local_addr_; + std::string remote_addr_; + std::string server_fqdn_; // Authentication types and data. const PeerType peer_type_; - bool conn_header_exchanged_; - string tag_; - mutable gscoped_ptr< std::set<string> > global_mechs_; // Cache of global mechanisms. - std::set<string> mechs_; // Active mechanisms. - mutable string mech_list_; // Mechanism list string returned by callbacks. + std::string tag_; + std::set<std::string> global_mechs_; // Cache of global mechanisms. + std::set<std::string> enabled_mechs_; // Active mechanisms. + mutable std::string enabled_mechs_string_; // Mechanism list string returned by callbacks. bool plain_enabled_; bool gssapi_enabled_; http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/server_negotiation.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc index 9e91481..837fb47 100644 --- a/src/kudu/rpc/server_negotiation.cc +++ b/src/kudu/rpc/server_negotiation.cc @@ -17,17 +17,18 @@ #include "kudu/rpc/server_negotiation.h" -#include <glog/logging.h> -#include <google/protobuf/message_lite.h> #include <limits> -#include <sasl/sasl.h> +#include <memory> #include <set> #include <string> +#include <glog/logging.h> +#include <google/protobuf/message_lite.h> +#include <sasl/sasl.h> + #include "kudu/gutil/endian.h" #include "kudu/gutil/map-util.h" -#include "kudu/gutil/stringprintf.h" -#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/blocking_ops.h" #include "kudu/rpc/constants.h" #include "kudu/rpc/serialization.h" @@ -36,242 +37,197 @@ #include "kudu/util/scoped_cleanup.h" #include "kudu/util/trace.h" +using std::set; +using std::string; +using std::unique_ptr; + namespace kudu { namespace rpc { -static int SaslServerGetoptCb(void* sasl_server, const char* plugin_name, const char* option, - const char** result, unsigned* len) { - return static_cast<SaslServer*>(sasl_server) - ->GetOptionCb(plugin_name, option, result, len); +static int ServerNegotiationGetoptCb(ServerNegotiation* server_negotiation, + const char* plugin_name, + const char* option, + const char** result, + unsigned* len) { + return server_negotiation->GetOptionCb(plugin_name, option, result, len); } -static int SaslServerPlainAuthCb(sasl_conn_t *conn, void *sasl_server, const char *user, - const char *pass, unsigned passlen, struct propctx *propctx) { - return static_cast<SaslServer*>(sasl_server) - ->PlainAuthCb(conn, user, pass, passlen, propctx); +static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn, + ServerNegotiation* server_negotiation, + const char* user, + const char* pass, + unsigned passlen, + struct propctx* propctx) { + return server_negotiation->PlainAuthCb(conn, user, pass, passlen, propctx); } -SaslServer::SaslServer(string app_name, Socket* socket) - : app_name_(std::move(app_name)), - sock_(socket), +ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket) + : socket_(std::move(socket)), helper_(SaslHelper::SERVER), - server_state_(SaslNegotiationState::NEW), negotiated_mech_(SaslMechanism::INVALID), deadline_(MonoTime::Max()) { callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT, - reinterpret_cast<int (*)()>(&SaslServerGetoptCb), this)); + reinterpret_cast<int (*)()>(&ServerNegotiationGetoptCb), this)); callbacks_.push_back(SaslBuildCallback(SASL_CB_SERVER_USERDB_CHECKPASS, - reinterpret_cast<int (*)()>(&SaslServerPlainAuthCb), this)); + reinterpret_cast<int (*)()>(&ServerNegotiationPlainAuthCb), this)); callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr)); } -Status SaslServer::EnablePlain() { - DCHECK_EQ(server_state_, SaslNegotiationState::NEW); +Status ServerNegotiation::EnablePlain() { RETURN_NOT_OK(helper_.EnablePlain()); return Status::OK(); } -Status SaslServer::EnableGSSAPI() { - DCHECK_EQ(server_state_, SaslNegotiationState::NEW); +Status ServerNegotiation::EnableGSSAPI() { return helper_.EnableGSSAPI(); } -SaslMechanism::Type SaslServer::negotiated_mechanism() const { - DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED); +SaslMechanism::Type ServerNegotiation::negotiated_mechanism() const { return negotiated_mech_; } -const std::string& SaslServer::authenticated_user() const { - DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED); +const string& ServerNegotiation::authenticated_user() const { return authenticated_user_; } -void SaslServer::set_local_addr(const Sockaddr& addr) { - DCHECK_EQ(server_state_, SaslNegotiationState::NEW); +void ServerNegotiation::set_local_addr(const Sockaddr& addr) { helper_.set_local_addr(addr); } -void SaslServer::set_remote_addr(const Sockaddr& addr) { - DCHECK_EQ(server_state_, SaslNegotiationState::NEW); +void ServerNegotiation::set_remote_addr(const Sockaddr& addr) { helper_.set_remote_addr(addr); } -void SaslServer::set_server_fqdn(const string& domain_name) { - DCHECK_EQ(server_state_, SaslNegotiationState::NEW); +void ServerNegotiation::set_server_fqdn(const string& domain_name) { helper_.set_server_fqdn(domain_name); } -void SaslServer::set_deadline(const MonoTime& deadline) { - DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED); +void ServerNegotiation::set_deadline(const MonoTime& deadline) { deadline_ = deadline; } -// calls sasl_server_init() and sasl_server_new() -Status SaslServer::Init(const string& service_type) { - RETURN_NOT_OK(SaslInit(app_name_.c_str())); - - // Ensure we are not called more than once. - if (server_state_ != SaslNegotiationState::NEW) { - return Status::IllegalState("Init() may only be called once per SaslServer object."); - } - - // TODO(unknown): Support security flags. - unsigned secflags = 0; - - sasl_conn_t* sasl_conn = nullptr; - Status s = WrapSaslCall(nullptr /* no conn */, [&]() { - return sasl_server_new( - // Registered name of the service using SASL. Required. - service_type.c_str(), - // The fully qualified domain name of this server. - helper_.server_fqdn(), - // Permits multiple user realms on server. NULL == use default. - nullptr, - // Local and remote IP address strings. (NULL disables - // mechanisms which require this info.) - helper_.local_addr_string(), - helper_.remote_addr_string(), - // Connection-specific callbacks. - &callbacks_[0], - // Security flags. - secflags, - &sasl_conn); - }); - - if (PREDICT_FALSE(!s.ok())) { - return Status::RuntimeError("Unable to create new SASL server", - s.message()); - } - sasl_conn_.reset(sasl_conn); - - server_state_ = SaslNegotiationState::INITIALIZED; - return Status::OK(); -} - -Status SaslServer::Negotiate() { - // After negotiation, we no longer need the SASL library object, so - // may as well free its memory since the connection may be long-lived. - auto cleanup = MakeScopedCleanup([&]() { - sasl_conn_.reset(); - }); - DVLOG(4) << "Called SaslServer::Negotiate()"; - - // Ensure we are called exactly once, and in the right order. - if (server_state_ == SaslNegotiationState::NEW) { - return Status::IllegalState("SaslServer: Init() must be called before calling Negotiate()"); - } - if (server_state_ == SaslNegotiationState::NEGOTIATED) { - return Status::IllegalState("SaslServer: Negotiate() may only be called once per object."); - } +Status ServerNegotiation::Negotiate() { + TRACE("Beginning negotiation"); // Ensure we can use blocking calls on the socket during negotiation. - RETURN_NOT_OK(EnsureBlockingMode(sock_)); + RETURN_NOT_OK(EnsureBlockingMode(socket_.get())); faststring recv_buf; - // Read connection header + // Step 1: Read the connection header. RETURN_NOT_OK(ValidateConnectionHeader(&recv_buf)); - nego_ok_ = false; - while (!nego_ok_) { - TRACE("Waiting for next Negotiation message..."); - RequestHeader header; - Slice param_buf; - RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, ¶m_buf, deadline_)); + { // Step 2: Receive and respond to the NEGOTIATE step message. + NegotiatePB request; + RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf)); + RETURN_NOT_OK(HandleNegotiate(request)); + } + // Step 3: SASL negotiation. + RETURN_NOT_OK(InitSaslServer()); + { NegotiatePB request; - RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &request)); - - switch (request.step()) { - // NEGOTIATE: They want a list of available mechanisms. - case NegotiatePB::NEGOTIATE: - RETURN_NOT_OK(HandleNegotiateRequest(request)); - break; - - // INITIATE: They want to initiate negotiation based on their specified mechanism. - case NegotiatePB::SASL_INITIATE: - RETURN_NOT_OK(HandleInitiateRequest(request)); - break; - - // RESPONSE: Client sent a new request as a follow-up to a SASL_CHALLENGE response. - case NegotiatePB::SASL_RESPONSE: - RETURN_NOT_OK(HandleResponseRequest(request)); - break; - - // Client sent us an unsupported Negotiation request. - default: { - TRACE("SASL Server: Received unsupported request from client"); - Status s = Status::InvalidArgument("RPC server doesn't support negotiation step in request", - NegotiatePB::NegotiateStep_Name(request.step())); - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); - return s; - } + RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf)); + Status s = HandleSaslInitiate(request); + + while (s.IsIncomplete()) { + RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf)); + s = HandleSaslResponse(request); } + RETURN_NOT_OK(s); } - const char* username = nullptr; int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME, reinterpret_cast<const void**>(&username)); // We expect that SASL_USERNAME will always get set. - CHECK(rc == SASL_OK && username != nullptr) - << "No username on authenticated connection"; + CHECK(rc == SASL_OK && username != nullptr) << "No username on authenticated connection"; authenticated_user_ = username; - TRACE("SASL Server: Successful negotiation"); - server_state_ = SaslNegotiationState::NEGOTIATED; + // Step 4: Receive connection context. + RETURN_NOT_OK(RecvConnectionContext(&recv_buf)); + + TRACE("Negotiation successful"); return Status::OK(); } -Status SaslServer::ValidateConnectionHeader(faststring* recv_buf) { - TRACE("Waiting for connection header"); - size_t num_read; - const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength; - recv_buf->resize(conn_header_len); - RETURN_NOT_OK(sock_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_)); - DCHECK_EQ(conn_header_len, num_read); +Status ServerNegotiation::PreflightCheckGSSAPI() { + // TODO(todd): the error messages that come from this function on el6 + // are relatively useless due to the following krb5 bug: + // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973 + // This may not be useful anymore given the keytab login that happens + // in security/init.cc. - RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf)); - TRACE("Connection header received"); - return Status::OK(); + // Initialize a ServerNegotiation with a null socket, and enable + // only GSSAPI. + // + // We aren't going to actually send/receive any messages, but + // this makes it easier to reuse the initialization code. + ServerNegotiation server(nullptr); + Status s = server.EnableGSSAPI(); + if (!s.ok()) { + return Status::RuntimeError(s.message()); + } + + RETURN_NOT_OK(server.InitSaslServer()); + + // Start the SASL server as if we were accepting a connection. + const char* server_out = nullptr; // ignored + uint32_t server_out_len = 0; + s = WrapSaslCall(server.sasl_conn_.get(), [&]() { + return sasl_server_start( + server.sasl_conn_.get(), + kSaslMechGSSAPI, + "", 0, // Pass a 0-length token. + &server_out, &server_out_len); + }); + + // We expect 'Incomplete' status to indicate that the first step of negotiation + // was correct. + if (s.IsIncomplete()) return Status::OK(); + + string err_msg = s.message().ToString(); + if (err_msg == "Permission denied") { + // For bad keytab permissions, we get a rather vague message. So, + // we make it more specific for better usability. + err_msg = "error accessing keytab: " + err_msg; + } + return Status::RuntimeError(err_msg); } -Status SaslServer::ParseNegotiatePB(const RequestHeader& header, - const Slice& param_buf, - NegotiatePB* request) { - Status s = helper_.SanityCheckNegotiateCallId(header.call_id()); +Status ServerNegotiation::RecvNegotiatePB(NegotiatePB* msg, faststring* recv_buf) { + RequestHeader header; + Slice param_buf; + RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, ¶m_buf, deadline_)); + Status s = helper_.CheckNegotiateCallId(header.call_id()); if (!s.ok()) { - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s)); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s)); + return s; } - s = helper_.ParseNegotiatePB(param_buf, request); + s = helper_.ParseNegotiatePB(param_buf, msg); if (!s.ok()) { - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s)); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s)); return s; } + TRACE("Received $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg->step())); return Status::OK(); } -Status SaslServer::SendNegotiatePB(const NegotiatePB& msg) { - DCHECK_NE(server_state_, SaslNegotiationState::NEW) - << "Must not send Negotiate messages before calling Init()"; - DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED) - << "Must not send Negotiate messages after Negotiate() succeeds"; - - // Create header with negotiation-specific callId +Status ServerNegotiation::SendNegotiatePB(const NegotiatePB& msg) { ResponseHeader header; header.set_call_id(kNegotiateCallId); - return helper_.SendNegotiatePB(sock_, header, msg, deadline_); + + DCHECK(socket_); + DCHECK(msg.IsInitialized()) << "message must be initialized"; + DCHECK(msg.has_step()) << "message must have a step"; + + TRACE("Sending $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg.step())); + return SendFramedMessageBlocking(socket(), header, msg, deadline_); } -Status SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) { - DCHECK_NE(server_state_, SaslNegotiationState::NEW) - << "Must not send SASL messages before calling Init()"; - DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED) - << "Must not send SASL messages after Negotiate() succeeds"; - if (err.ok()) { - return Status::InvalidArgument("Cannot send error message using OK status"); - } +Status ServerNegotiation::SendError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) { + DCHECK(!err.ok()); // Create header with negotiation-specific callId ResponseHeader header; @@ -283,39 +239,87 @@ Status SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status msg.set_code(code); msg.set_message(err.ToString()); - RETURN_NOT_OK(helper_.SendNegotiatePB(sock_, header, msg, deadline_)); - TRACE("Sent SASL error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code)); + TRACE("Sending RPC error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code)); + RETURN_NOT_OK(SendFramedMessageBlocking(socket(), header, msg, deadline_)); + + return Status::OK(); +} + +Status ServerNegotiation::ValidateConnectionHeader(faststring* recv_buf) { + TRACE("Waiting for connection header"); + size_t num_read; + const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength; + recv_buf->resize(conn_header_len); + RETURN_NOT_OK(socket_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_)); + DCHECK_EQ(conn_header_len, num_read); + + RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf)); + TRACE("Connection header received"); return Status::OK(); } -Status SaslServer::HandleNegotiateRequest(const NegotiatePB& request) { - TRACE("SASL Server: Received NEGOTIATE request from client"); +// calls sasl_server_init() and sasl_server_new() +Status ServerNegotiation::InitSaslServer() { + RETURN_NOT_OK(SaslInit()); + + // TODO(unknown): Support security flags. + unsigned secflags = 0; + + sasl_conn_t* sasl_conn = nullptr; + RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() { + return sasl_server_new( + // Registered name of the service using SASL. Required. + kSaslProtoName, + // The fully qualified domain name of this server. + helper_.server_fqdn(), + // Permits multiple user realms on server. NULL == use default. + nullptr, + // Local and remote IP address strings. (NULL disables + // mechanisms which require this info.) + helper_.local_addr_string(), + helper_.remote_addr_string(), + // Connection-specific callbacks. + &callbacks_[0], + // Security flags. + secflags, + &sasl_conn); + }), "Unable to create new SASL server"); + sasl_conn_.reset(sasl_conn); + return Status::OK(); +} + +Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) { + if (request.step() != NegotiatePB::NEGOTIATE) { + return Status::NotAuthorized("expected NEGOTIATE step", + NegotiatePB::NegotiateStep_Name(request.step())); + } + TRACE("Received NEGOTIATE request from client"); // Fill in the set of features supported by the client. for (int flag : request.supported_features()) { // We only add the features that our local build knows about. RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ? static_cast<RpcFeatureFlag>(flag) : UNKNOWN; - if (ContainsKey(kSupportedServerRpcFeatureFlags, feature_flag)) { + if (feature_flag != UNKNOWN) { client_features_.insert(feature_flag); } } - set<string> server_mechs = helper_.LocalMechs(); + set<string> server_mechs = helper_.EnabledMechs(); if (PREDICT_FALSE(server_mechs.empty())) { // This will happen if no mechanisms are enabled before calling Init() - Status s = Status::IllegalState("SASL server mechanism list is empty!"); + Status s = Status::NotAuthorized("SASL server mechanism list is empty!"); LOG(ERROR) << s.ToString(); - TRACE("SASL Server: Sending FATAL_UNAUTHORIZED response to client"); - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + TRACE("Sending FATAL_UNAUTHORIZED response to client"); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); return s; } - RETURN_NOT_OK(SendNegotiateResponse(server_mechs)); + RETURN_NOT_OK(SendNegotiate(server_mechs)); return Status::OK(); } -Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) { +Status ServerNegotiation::SendNegotiate(const set<string>& server_mechs) { NegotiatePB response; response.set_step(NegotiatePB::NEGOTIATE); @@ -329,31 +333,32 @@ Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) { } RETURN_NOT_OK(SendNegotiatePB(response)); - TRACE("Sent NEGOTIATE response"); return Status::OK(); } - -Status SaslServer::HandleInitiateRequest(const NegotiatePB& request) { - TRACE("SASL Server: Received INITIATE request from client"); +Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) { + if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_INITIATE)) { + Status s = Status::NotAuthorized("expected SASL_INITIATE step", + NegotiatePB::NegotiateStep_Name(request.step())); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + return s; + } + TRACE("Received SASL_INITIATE request from client"); if (request.auths_size() != 1) { - Status s = Status::NotAuthorized(StringPrintf( - "SASL INITIATE request must include exactly one SaslAuth section, found: %d", - request.auths_size())); - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + Status s = Status::NotAuthorized( + "SASL_INITIATE request must include exactly one SaslAuth section, found", + std::to_string(request.auths_size())); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); return s; } const NegotiatePB::SaslAuth& auth = request.auths(0); - TRACE("SASL Server: Client requested to use mechanism: $0", auth.mechanism()); - - // Security issue to display this. Commented out but left for debugging purposes. - //DVLOG(3) << "SASL server: Client token: " << request.token(); + TRACE("Client requested to use mechanism: $0", auth.mechanism()); const char* server_out = nullptr; uint32_t server_out_len = 0; - TRACE("SASL Server: Calling sasl_server_start()"); + TRACE("Calling sasl_server_start()"); Status s = WrapSaslCall(sasl_conn_.get(), [&]() { return sasl_server_start( @@ -364,55 +369,41 @@ Status SaslServer::HandleInitiateRequest(const NegotiatePB& request) { &server_out, // The output of the SASL library, might not be NULL terminated &server_out_len); // Output len. }); + if (PREDICT_FALSE(!s.ok() && !s.IsIncomplete())) { - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); return s; } + negotiated_mech_ = SaslMechanism::value_of(auth.mechanism()); // We have a valid mechanism match if (s.ok()) { - nego_ok_ = true; - RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len)); + RETURN_NOT_OK(SendSaslSuccess(server_out, server_out_len)); } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE) - RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len)); + RETURN_NOT_OK(SendSaslChallenge(server_out, server_out_len)); } - return Status::OK(); -} - -Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen) { - NegotiatePB response; - response.set_step(NegotiatePB::SASL_CHALLENGE); - response.mutable_token()->assign(challenge, clen); - TRACE("SASL Server: Sending SASL_CHALLENGE response to client"); - RETURN_NOT_OK(SendNegotiatePB(response)); - return Status::OK(); + return s; } -Status SaslServer::SendSuccessResponse(const char* token, unsigned tlen) { - NegotiatePB response; - response.set_step(NegotiatePB::SASL_SUCCESS); - if (PREDICT_FALSE(tlen > 0)) { - response.mutable_token()->assign(token, tlen); +Status ServerNegotiation::HandleSaslResponse(const NegotiatePB& request) { + if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_RESPONSE)) { + Status s = Status::NotAuthorized("expected SASL_RESPONSE step", + NegotiatePB::NegotiateStep_Name(request.step())); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + return s; } - TRACE("SASL Server: Sending SASL_SUCCESS response to client"); - RETURN_NOT_OK(SendNegotiatePB(response)); - return Status::OK(); -} - - -Status SaslServer::HandleResponseRequest(const NegotiatePB& request) { - TRACE("SASL Server: Received RESPONSE request from client"); + TRACE("Received SASL_RESPONSE request from client"); if (!request.has_token()) { - Status s = Status::InvalidArgument("No token in SASL_RESPONSE from client"); - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + Status s = Status::NotAuthorized("no token in SASL_RESPONSE from client"); + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); return s; } const char* server_out = nullptr; uint32_t server_out_len = 0; - TRACE("SASL Server: Calling sasl_server_step()"); + TRACE("Calling sasl_server_step()"); Status s = WrapSaslCall(sasl_conn_.get(), [&]() { return sasl_server_step( sasl_conn_.get(), // The SASL connection context created by init() @@ -421,78 +412,76 @@ Status SaslServer::HandleResponseRequest(const NegotiatePB& request) { &server_out, // The output of the SASL library, might not be NULL terminated &server_out_len); // Output len }); - if (!s.ok() && !s.IsIncomplete()) { - RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); - return s; - } if (s.ok()) { - nego_ok_ = true; - RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len)); - } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE) - RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len)); + return SendSaslSuccess(server_out, server_out_len); } - return Status::OK(); + if (s.IsIncomplete()) { + return SendSaslChallenge(server_out, server_out_len); + } + RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s)); + return s; } -int SaslServer::GetOptionCb(const char* plugin_name, const char* option, - const char** result, unsigned* len) { - return helper_.GetOptionCb(plugin_name, option, result, len); +Status ServerNegotiation::SendSaslChallenge(const char* challenge, unsigned clen) { + NegotiatePB response; + response.set_step(NegotiatePB::SASL_CHALLENGE); + response.mutable_token()->assign(challenge, clen); + RETURN_NOT_OK(SendNegotiatePB(response)); + return Status::Incomplete(""); } -int SaslServer::PlainAuthCb(sasl_conn_t * /*conn*/, const char * /*user*/, const char * /*pass*/, - unsigned /*passlen*/, struct propctx * /*propctx*/) { - TRACE("SASL Server: Received PLAIN auth."); - if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { - LOG(DFATAL) << "Password authentication callback called while PLAIN auth disabled"; - return SASL_BADPARAM; +Status ServerNegotiation::SendSaslSuccess(const char* token, unsigned tlen) { + NegotiatePB response; + response.set_step(NegotiatePB::SASL_SUCCESS); + if (PREDICT_FALSE(tlen > 0)) { + response.mutable_token()->assign(token, tlen); } - // We always allow PLAIN authentication to succeed. - return SASL_OK; + RETURN_NOT_OK(SendNegotiatePB(response)); + return Status::OK(); } -Status SaslServer::PreflightCheckGSSAPI(const string& app_name) { - // TODO(todd): the error messages that come from this function on el6 - // are relatively useless due to the following krb5 bug: - // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973 - // This may not be useful anymore given the keytab login that happens - // in security/init.cc. +Status ServerNegotiation::RecvConnectionContext(faststring* recv_buf) { + TRACE("Waiting for connection context"); + RequestHeader header; + Slice param_buf; + RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, ¶m_buf, deadline_)); + DCHECK(header.IsInitialized()); - // Initialize a SaslServer with a null socket, and enable - // only GSSAPI. - // - // We aren't going to actually send/receive any messages, but - // this makes it easier to reuse the initialization code. - SaslServer server(app_name, nullptr); - Status s = server.EnableGSSAPI(); - if (!s.ok()) { - return Status::RuntimeError(s.message()); + if (header.call_id() != kConnectionContextCallId) { + return Status::NotAuthorized("expected ConnectionContext callid, received", + std::to_string(header.call_id())); } - RETURN_NOT_OK(server.Init(app_name)); + ConnectionContextPB conn_context; + if (!conn_context.ParseFromArray(param_buf.data(), param_buf.size())) { + return Status::NotAuthorized("invalid ConnectionContextPB message, missing fields", + conn_context.InitializationErrorString()); + } - // Start the SASL server as if we were accepting a connection. - const char* server_out = nullptr; // ignored - uint32_t server_out_len = 0; - s = WrapSaslCall(server.sasl_conn_.get(), [&]() { - return sasl_server_start( - server.sasl_conn_.get(), - kSaslMechGSSAPI, - "", 0, // Pass a 0-length token. - &server_out, &server_out_len); - }); + // Currently none of the fields of the connection context are used. + return Status::OK(); +} - // We expect 'Incomplete' status to indicate that the first step of negotiation - // was correct. - if (s.IsIncomplete()) return Status::OK(); +int ServerNegotiation::GetOptionCb(const char* plugin_name, + const char* option, + const char** result, + unsigned* len) { + return helper_.GetOptionCb(plugin_name, option, result, len); +} - string err_msg = s.message().ToString(); - if (err_msg == "Permission denied") { - // For bad keytab permissions, we get a rather vague message. So, - // we make it more specific for better usability. - err_msg = "error accessing keytab: " + err_msg; +int ServerNegotiation::PlainAuthCb(sasl_conn_t* /*conn*/, + const char* /*user*/, + const char* /*pass*/, + unsigned /*passlen*/, + struct propctx* /*propctx*/) { + TRACE("Received PLAIN auth."); + if (PREDICT_FALSE(!helper_.IsPlainEnabled())) { + LOG(DFATAL) << "Password authentication callback called while PLAIN auth disabled"; + return SASL_BADPARAM; } - return Status::RuntimeError(err_msg); + // We always allow PLAIN authentication to succeed. + return SASL_OK; } } // namespace rpc http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/rpc/server_negotiation.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h index 53f674d..089bc0b 100644 --- a/src/kudu/rpc/server_negotiation.h +++ b/src/kudu/rpc/server_negotiation.h @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -#ifndef KUDU_RPC_SASL_SERVER_H -#define KUDU_RPC_SASL_SERVER_H +#pragma once +#include <memory> #include <set> #include <string> #include <vector> #include <sasl/sasl.h> +#include "kudu/gutil/gscoped_ptr.h" #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/sasl_common.h" #include "kudu/rpc/sasl_helper.h" @@ -37,23 +38,24 @@ class Slice; namespace rpc { -using std::string; - -// Class for doing SASL negotiation with a SaslClient over a bidirectional socket. +// Class for doing KRPC negotiation with a remote client over a bidirectional socket. // Operations on this class are NOT thread-safe. -class SaslServer { +class ServerNegotiation { public: - // Does not take ownership of 'socket'. - SaslServer(string app_name, Socket* socket); + // Creates a new server negotiation instance, taking ownership of the + // provided socket. After completing the negotiation process by setting the + // desired options and calling Negotiate((), the socket can be retrieved with + // release_socket(). + explicit ServerNegotiation(std::unique_ptr<Socket> socket); // Enable PLAIN authentication. // Despite PLAIN authentication taking a username and password, we disregard // the password and use this as a "unauthenticated" mode. - // Must be called after Init(). + // Must be called before Negotiate(). Status EnablePlain(); // Enable GSSAPI (Kerberos) authentication. - // Call after Init(). + // Must be called before Negotiate(). Status EnableGSSAPI(); // Returns mechanism negotiated by this connection. @@ -62,41 +64,49 @@ class SaslServer { // Returns the set of RPC system features supported by the remote client. // Must be called after Negotiate(). - const std::set<RpcFeatureFlag>& client_features() const { + std::set<RpcFeatureFlag> client_features() const { return client_features_; } + // Returns the set of RPC system features supported by the remote client. + // Must be called after Negotiate(). + // Subsequent calls to this method or client_features() will return an empty set. + std::set<RpcFeatureFlag> take_client_features() { + return std::move(client_features_); + } + // Name of the user that was authenticated. // Must be called after a successful Negotiate(). const std::string& authenticated_user() const; // Specify IP:port of local side of connection. - // Must be called before Init(). Required for some mechanisms. + // Must be called before Negotiate(). Required for some mechanisms. void set_local_addr(const Sockaddr& addr); // Specify IP:port of remote side of connection. - // Must be called before Init(). Required for some mechanisms. + // Must be called before Negotiate(). Required for some mechanisms. void set_remote_addr(const Sockaddr& addr); // Specify the fully-qualified domain name of the remote server. - // Must be called before Init(). Required for some mechanisms. - void set_server_fqdn(const string& domain_name); + // Must be called before Negotiate(). Required for some mechanisms. + void set_server_fqdn(const std::string& domain_name); // Set deadline for connection negotiation. void set_deadline(const MonoTime& deadline); - // Get deadline for connection negotiation. - const MonoTime& deadline() const { return deadline_; } + Socket* socket() const { return socket_.get(); } - // Initialize a new SASL server. Must be called before Negotiate(). - // Returns OK on success, otherwise RuntimeError. - Status Init(const string& service_type); + // Returns the socket owned by this server negotiation. The caller will own + // the socket after this call, and the negotiation instance should no longer + // be used. Must be called after Negotiate(). + std::unique_ptr<Socket> release_socket() { return std::move(socket_); } - // Begin negotiation with the SASL client on the other side of the fd socket - // that this server was constructed with. - // Returns OK on success. - // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status. - Status Negotiate(); + // Negotiate with the remote client. Should only be called once per + // ServerNegotiation and socket instance, after all options have been set. + // + // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or + // another non-OK status. + Status Negotiate() WARN_UNUSED_RESULT; // SASL callback for plugin options, supported mechanisms, etc. // Returns SASL_FAIL if the option is not handled, which does not fail the handshake. @@ -109,72 +119,71 @@ class SaslServer { // Perform a "pre-flight check" that everything required to act as a Kerberos // server is properly set up. - static Status PreflightCheckGSSAPI(const std::string& app_name); + static Status PreflightCheckGSSAPI() WARN_UNUSED_RESULT; private: - // Parse and validate connection header. - Status ValidateConnectionHeader(faststring* recv_buf); - // Parse request body. If malformed, sends an error message to the client. - Status ParseNegotiatePB(const RequestHeader& header, - const Slice& param_buf, - NegotiatePB* request); + // Parse a negotiate request from the client, deserializing it into 'msg'. + // If the request is malformed, sends an error message to the client. + Status RecvNegotiatePB(NegotiatePB* msg, faststring* recv_buf) WARN_UNUSED_RESULT; - // Encode and send the specified SASL message to the client. - Status SendNegotiatePB(const NegotiatePB& msg); + // Encode and send the specified negotiate response message to the server. + Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT; // Encode and send the specified RPC error message to the client. // Calls Status.ToString() for the embedded error message. - Status SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err); + Status SendError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) WARN_UNUSED_RESULT; + + // Parse and validate connection header. + Status ValidateConnectionHeader(faststring* recv_buf) WARN_UNUSED_RESULT; + + // Initialize the SASL server negotiation instance. + Status InitSaslServer() WARN_UNUSED_RESULT; // Handle case when client sends NEGOTIATE request. - Status HandleNegotiateRequest(const NegotiatePB& request); + Status HandleNegotiate(const NegotiatePB& request) WARN_UNUSED_RESULT; // Send a NEGOTIATE response to the client with the list of available mechanisms. - Status SendNegotiateResponse(const std::set<string>& server_mechs); + Status SendNegotiate(const std::set<std::string>& server_mechs) WARN_UNUSED_RESULT; + + // Handle case when client sends SASL_INITIATE request. + // Returns Status::OK if the SASL negotiation is complete, or + // Status::Incomplete if a SASL_RESPONSE step is expected. + Status HandleSaslInitiate(const NegotiatePB& request) WARN_UNUSED_RESULT; - // Handle case when client sends INITIATE request. - Status HandleInitiateRequest(const NegotiatePB& request); + // Handle case when client sends SASL_RESPONSE request. + Status HandleSaslResponse(const NegotiatePB& request) WARN_UNUSED_RESULT; - // Send a CHALLENGE response to the client with a challenge token. - Status SendChallengeResponse(const char* challenge, unsigned clen); + // Send a SASL_CHALLENGE response to the client with a challenge token. + Status SendSaslChallenge(const char* challenge, unsigned clen) WARN_UNUSED_RESULT; - // Send a SUCCESS response to the client with an token (typically empty). - Status SendSuccessResponse(const char* token, unsigned tlen); + // Send a SASL_SUCCESS response to the client with an token (typically empty). + Status SendSaslSuccess(const char* token, unsigned tlen) WARN_UNUSED_RESULT; - // Handle case when client sends RESPONSE request. - Status HandleResponseRequest(const NegotiatePB& request); + // Receive and validate the ConnectionContextPB. + Status RecvConnectionContext(faststring* recv_buf) WARN_UNUSED_RESULT; - string app_name_; - Socket* sock_; + // The socket to the remote client. + std::unique_ptr<Socket> socket_; + + // SASL state. std::vector<sasl_callback_t> callbacks_; - // The SASL connection object. This is initialized in Init() and - // freed after Negotiate() completes (regardless whether it was successful). gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_; SaslHelper helper_; - // The set of features that the client supports. Filled in - // after we receive the NEGOTIATE request from the client. + // The set of features supported by the client. Filled in during negotiation. std::set<RpcFeatureFlag> client_features_; - // The successfully-authenticated user, if applicable. - string authenticated_user_; - - SaslNegotiationState::Type server_state_; + // The successfully-authenticated user, if applicable. Filled in during + // negotiation. + std::string authenticated_user_; - // The mechanism we negotiated with the client. + // The SASL mechanism. Filled in during negotiation. SaslMechanism::Type negotiated_mech_; - // Intra-negotiation state. - bool nego_ok_; // During negotiation: did we get a SASL_OK response from the SASL library? - // Negotiation timeout deadline. MonoTime deadline_; - - DISALLOW_COPY_AND_ASSIGN(SaslServer); }; } // namespace rpc } // namespace kudu - -#endif // KUDU_RPC_SASL_SERVER_H http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/security/ssl_socket.cc ---------------------------------------------------------------------- diff --git a/src/kudu/security/ssl_socket.cc b/src/kudu/security/ssl_socket.cc index 126d2f1..c2725c2 100644 --- a/src/kudu/security/ssl_socket.cc +++ b/src/kudu/security/ssl_socket.cc @@ -44,6 +44,7 @@ SSLSocket::SSLSocket(int fd, SSL* ssl, bool is_server) : } SSLSocket::~SSLSocket() { + WARN_NOT_OK(Close(), "unable to close SSL socket in destructor"); } Status SSLSocket::DoHandshake() { @@ -154,7 +155,10 @@ Status SSLSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) { } Status SSLSocket::Close() { - CHECK(ssl_); + if (!ssl_) { + // Socket is already closed. + return Status::OK(); + } ERR_clear_error(); errno = 0; int32_t ret = SSL_shutdown(ssl_); http://git-wip-us.apache.org/repos/asf/kudu/blob/dc852535/src/kudu/security/ssl_socket.h ---------------------------------------------------------------------- diff --git a/src/kudu/security/ssl_socket.h b/src/kudu/security/ssl_socket.h index 4f67d48..f7570fb 100644 --- a/src/kudu/security/ssl_socket.h +++ b/src/kudu/security/ssl_socket.h @@ -32,9 +32,10 @@ class Sockaddr; class SSLSocket : public Socket { public: + SSLSocket(int fd, SSL* ssl, bool is_server); - ~SSLSocket(); + ~SSLSocket() override; // Do the SSL handshake as a client or a server and verify that the credentials were correctly // verified.