Introduce libevent SSL socket. Requires: configure --enable-libevent --enable-libevent-socket --enable-ssl
New environment variables: ``` SSL_ENABLED=(false|0,true|1) SSL_CERT_FILE=(path to certificate) SSL_KEY_FILE=(path to key) SSL_VERIFY_CERT=(false|0,true|1) SSL_REQUIRE_CERT=(false|0,true|1) SSL_VERIFY_DEPTH=(4) SSL_CA_DIR=(path to CA directory) SSL_CA_FILE=(path to CA file) SSL_CIPHERS=(accepted ciphers separated by ':') SSL_ENABLE_SSL_V2=(false|0,true|1) SSL_ENABLE_SSL_V3=(false|0,true|1) SSL_ENABLE_TLS_V1_0=(false|0,true|1) SSL_ENABLE_TLS_V1_1=(false|0,true|1) SSL_ENABLE_TLS_V1_2=(false|0,true|1) ``` Only TLSV1.2 is enabled by default. Use the `ENABLE_SSL_V*` and `ENABLE_TLS_V*` environment variables to open up more protocols. Use the `SSL_CIPHERS` environment variable to restrict or open up the supported ciphers. Review: https://reviews.apache.org/r/29406 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/654cabf9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/654cabf9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/654cabf9 Branch: refs/heads/master Commit: 654cabf9092f03a1857bd99ee510a204bfaaba51 Parents: 1f6c99b Author: Joris Van Remoortere <[email protected]> Authored: Sun Jun 14 03:11:00 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Sun Jun 14 04:12:01 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/Makefile.am | 26 +- 3rdparty/libprocess/include/process/address.hpp | 21 + 3rdparty/libprocess/include/process/socket.hpp | 49 +- 3rdparty/libprocess/src/libevent.cpp | 94 +- 3rdparty/libprocess/src/libevent.hpp | 27 +- 3rdparty/libprocess/src/libevent_ssl_socket.cpp | 926 +++++++++++++++++++ 3rdparty/libprocess/src/libevent_ssl_socket.hpp | 165 ++++ 3rdparty/libprocess/src/openssl.cpp | 563 +++++++++++ 3rdparty/libprocess/src/openssl.hpp | 79 ++ 3rdparty/libprocess/src/process.cpp | 28 +- 3rdparty/libprocess/src/socket.cpp | 24 +- 11 files changed, 1971 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index 489ce35..c781f6c 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -53,6 +53,22 @@ libprocess_la_SOURCES = \ src/subprocess.cpp \ src/timeseries.cpp +if ENABLE_LIBEVENT +else +if WITH_BUNDLED_LIBEV + EVENT_LIB = $(LIBEV)/libev.la +else + EVENT_LIB = -lev +endif +endif + +if ENABLE_SSL +libprocess_la_SOURCES += \ + src/libevent_ssl_socket.cpp \ + src/openssl.cpp \ + src/openssl.hpp +endif + libprocess_la_CPPFLAGS = \ -I$(srcdir)/include \ -I$(srcdir)/$(STOUT)/include \ @@ -89,16 +105,6 @@ else HTTP_PARSER_LIB = -lhttp_parser endif -if ENABLE_LIBEVENT - EVENT_LIB = -levent -levent_pthreads -else -if WITH_BUNDLED_LIBEV - EVENT_LIB = $(LIBEV)/libev.la -else - EVENT_LIB = -lev -endif -endif - libprocess_la_LIBADD = \ $(LIBGLOG) \ $(HTTP_PARSER_LIB) \ http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/address.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp index 729f5cd..88946d5 100644 --- a/3rdparty/libprocess/include/process/address.hpp +++ b/3rdparty/libprocess/include/process/address.hpp @@ -14,6 +14,7 @@ #include <stout/abort.hpp> #include <stout/ip.hpp> +#include <stout/net.hpp> #include <stout/stringify.hpp> namespace process { @@ -56,6 +57,26 @@ public: return ip.family(); } + /** + * Returns the hostname of this address's IP. + * + * @returns the hostname of this address's IP. + */ + // TODO(jmlvanre): Consider making this return a Future in order to + // deal with slow name resolution. + Try<std::string> hostname() const + { + const Try<std::string> hostname = ip == net::IP(INADDR_ANY) + ? net::hostname() + : net::getHostname(ip); + + if (hostname.isError()) { + return Error(hostname.error()); + } + + return hostname.get(); + } + // Returns the storage size (i.e., either sizeof(sockaddr_in) or // sizeof(sockaddr_in6) depending on the family) of this address. size_t size() const http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index b8c2274..2cf3c10 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -24,7 +24,9 @@ public: // Available kinds of implementations. enum Kind { POLL, - // TODO(jmlvanre): Add libevent SSL socket. +#ifdef USE_SSL_SOCKET + SSL +#endif }; // Returns an instance of a Socket using the specified kind of @@ -58,9 +60,11 @@ public: return s; } + // Interface functions implemented by this base class. + Try<Address> address() const; + Try<Address> bind(const Address& address); + // Socket::Impl interface. - virtual Try<Address> address() const; - virtual Try<Address> bind(const Address& address); virtual Try<Nothing> listen(int backlog) = 0; virtual Future<Socket> accept() = 0; virtual Future<Nothing> connect(const Address& address) = 0; @@ -93,12 +97,46 @@ public: // enabling reuse of a pool of preallocated strings/buffers. virtual Future<Nothing> send(const std::string& data); + virtual void shutdown() + { + if (::shutdown(s, SHUT_RD) < 0) { + PLOG(ERROR) << "Shutdown failed on fd=" << s; + } + } + + // Construct a new Socket from the given impl. This is a proxy + // function, as Impls derived from this won't have access to the + // Socket::Socket(...) constructors. + // + // TODO(jmlvanre): These should be protected; however, gcc + // complains when using them from within a lambda of a derived + // class. + static Socket socket(std::shared_ptr<Impl>&& that) + { + return Socket(std::move(that)); + } + + static Socket socket(const std::shared_ptr<Impl>& that) + { + return Socket(that); + } + protected: explicit Impl(int _s) : s(_s) { CHECK(s >= 0); } // Construct a Socket wrapper from this implementation. Socket socket() { return Socket(shared_from_this()); } + // Returns a std::shared_ptr<T> from this implementation. + template <typename T> + static std::shared_ptr<T> shared(T* t) + { + std::shared_ptr<T> pointer = + std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this()); + CHECK(pointer); + return pointer; + } + int s; }; @@ -167,6 +205,11 @@ public: return impl->send(data); } + void shutdown() + { + impl->shutdown(); + } + private: explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {} http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp index fb03859..2b82930 100644 --- a/3rdparty/libprocess/src/libevent.cpp +++ b/3rdparty/libprocess/src/libevent.cpp @@ -1,10 +1,14 @@ +#include <signal.h> #include <unistd.h> +#include <mutex> + #include <event2/event.h> #include <event2/thread.h> #include <process/logging.hpp> +#include <stout/os/signals.hpp> #include <stout/synchronized.hpp> #include "event_loop.hpp" @@ -12,24 +16,100 @@ namespace process { -struct event_base* base = NULL; +event_base* base = NULL; + + +static std::mutex* functions_mutex = new std::mutex(); +std::queue<lambda::function<void(void)>>* functions = + new std::queue<lambda::function<void(void)>>(); + + +ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>(); + + +void async_function(int socket, short which, void* arg) +{ + event* ev = reinterpret_cast<event*>(arg); + event_free(ev); + + std::queue<lambda::function<void(void)>> q; + + synchronized (functions_mutex) { + std::swap(q, *functions); + } + + while (!q.empty()) { + q.front()(); + q.pop(); + } +} + + +void run_in_event_loop( + const lambda::function<void(void)>& f, + EventLoopLogicFlow event_loop_logic_flow) +{ + if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) { + f(); + return; + } + + synchronized (functions_mutex) { + functions->push(f); + + // Add an event and activate it to interrupt the event loop. + // TODO(jmlvanre): after libevent v 2.1 we can use + // event_self_cbarg instead of re-assigning the event. For now we + // manually re-assign the event to pass in the pointer to the + // event itself as the callback argument. + event* ev = evtimer_new(base, async_function, NULL); + + // 'event_assign' is only valid on non-pending AND non-active + // events. This means we have to assign the callback before + // calling 'event_active'. + if (evtimer_assign(ev, base, async_function, ev) < 0) { + LOG(FATAL) << "Failed to assign callback on event"; + } + + event_active(ev, EV_TIMEOUT, 0); + } +} void* EventLoop::run(void*) { + __in_event_loop__ = true; + + // Block SIGPIPE in the event loop because we can not force + // underlying implementations such as SSL bufferevents to use + // MSG_NOSIGNAL. + bool unblock = os::signals::block(SIGPIPE); + do { int result = event_base_loop(base, EVLOOP_ONCE); if (result < 0) { LOG(FATAL) << "Failed to run event loop"; - } else if (result == 1) { - VLOG(1) << "All events handled, continuing event loop"; + } else if (result > 0) { + // All events are handled, continue event loop. continue; - } else if (event_base_got_break(base)) { - break; - } else if (event_base_got_exit(base)) { - break; + } else { + CHECK_EQ(0, result); + if (event_base_got_break(base)) { + break; + } else if (event_base_got_exit(base)) { + break; + } } } while (true); + + __in_event_loop__ = false; + + if (unblock) { + if (!os::signals::unblock(SIGPIPE)) { + LOG(FATAL) << "Failure to unblock SIGPIPE"; + } + } + return NULL; } http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp index f6cc721..a20f2c3 100644 --- a/3rdparty/libprocess/src/libevent.hpp +++ b/3rdparty/libprocess/src/libevent.hpp @@ -1,10 +1,35 @@ #ifndef __LIBEVENT_HPP__ #define __LIBEVENT_HPP__ +#include <stout/lambda.hpp> +#include <stout/thread.hpp> + namespace process { // Event loop. -extern struct event_base* base; +extern event_base* base; + + +// Per thread bool pointer. The extra level of indirection from +// _in_event_loop_ to __in_event_loop__ is used in order to take +// advantage of the ThreadLocal operators without needing the extra +// dereference as well as lazily construct the actual bool. +extern ThreadLocal<bool>* _in_event_loop_; + + +#define __in_event_loop__ *(*_in_event_loop_ == NULL ? \ + *_in_event_loop_ = new bool(false) : *_in_event_loop_) + + +enum EventLoopLogicFlow { + ALLOW_SHORT_CIRCUIT, + DISALLOW_SHORT_CIRCUIT +}; + + +void run_in_event_loop( + const lambda::function<void(void)>& f, + EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT); } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp new file mode 100644 index 0000000..5955796 --- /dev/null +++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp @@ -0,0 +1,926 @@ +#include <event2/buffer.h> +#include <event2/bufferevent_ssl.h> +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/thread.h> +#include <event2/util.h> + +#include <openssl/ssl.h> +#include <openssl/err.h> + +#include <process/queue.hpp> +#include <process/socket.hpp> + +#include <stout/net.hpp> +#include <stout/synchronized.hpp> + +#include "libevent.hpp" +#include "libevent_ssl_socket.hpp" +#include "openssl.hpp" + +// Locking: +// +// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents +// so that all **functions that are called from the event loop that +// take a bufferevent as a parameter will automatically have the +// lock acquired**. +// +// This means that everywhere that the libevent library does not +// already lock the bev, we need to manually 'synchronize (bev) {'. +// To further complicate matters, due to a deadlock scneario in +// libevent-openssl (v 2.0.21) we currently modify bufferevents using +// continuations in the event loop, but these functions, while run +// from within the event loop, are not passed the 'bev' as a parameter +// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment +// below for more details on why we need to invoke these continuations +// from within the event loop. + +// Continuations via 'run_in_event_loop(...)': +// +// There is a deadlock scenario in libevent-openssl (v 2.0.21) when +// modifying the bufferevent (bev) from another thread (not the event +// loop). To avoid this we run all bufferevent manipulation logic in +// continuations that are executed within the event loop. + +// Connection Extra FD: +// +// In libevent-openssl (v 2.0.21) we've had issues using the +// 'bufferevent_openssl_socket_new' call with the CONNECTING state and +// an existing socket. Therefore we allow it to construct its own +// fd and clean it up along with the Impl object when the bev is +// freed using the BEV_OPT_CLOSE_ON_FREE option. + +// DISALLOW_SHORT_CIRCUIT: +// +// We disallow short-circuiting in 'run_in_event_loop' due to a bug in +// libevent_openssl with deferred callbacks still being called (still +// in the run queue) even though a bev has been disabled. + +using std::queue; +using std::string; + +// Specialization of 'synchronize' to use bufferevent with the +// 'synchronized' macro. +static Synchronized<bufferevent> synchronize(bufferevent* bev) +{ + return Synchronized<bufferevent>( + bev, + [](bufferevent* bev) { bufferevent_lock(bev); }, + [](bufferevent* bev) { bufferevent_unlock(bev); }); +} + +namespace process { +namespace network { + +Try<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::create(int s) +{ + openssl::initialize(); + auto socket = std::make_shared<LibeventSSLSocketImpl>(s); + // See comment at 'initialize' declaration for why we call this. + socket->initialize(); + return socket; +} + + +LibeventSSLSocketImpl::~LibeventSSLSocketImpl() +{ + // We defer termination and destruction of all event loop specific + // calls and structures. This is a safety against the socket being + // destroyed before existing event loop calls have completed since + // they require valid data structures (the weak pointer). + + // Copy the members that we are interested in. This is necessary + // because 'this' points to memory that may be re-allocated and + // invalidate any reference to 'this->XXX'. We want to manipulate + // or use these data structures within the finalization lambda + // below. + evconnlistener* _listener = listener; + bufferevent* _bev = bev; + bool _accepted = accepted; + std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle; + + run_in_event_loop( + [_listener, _bev, _accepted, _event_loop_handle]() { + // Once this lambda is called, it should not be possible for + // more event loop callbacks to be triggered with 'this->bev'. + // This is important because we delete event_loop_handle which + // is the callback argument for any event loop callbacks. + // This lambda is responsible for ensuring 'this->bev' is + // disabled, and cleaning up any remaining state associated + // with the event loop. + + CHECK(__in_event_loop__); + + if (_listener != NULL) { + evconnlistener_free(_listener); + } + + if (_bev != NULL) { + SSL* ssl = bufferevent_openssl_get_ssl(_bev); + // Workaround for SSL shutdown, see http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // NOLINT + SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN); + SSL_shutdown(ssl); + + // NOTE: Removes all future callbacks using 'this->bev'. + bufferevent_disable(_bev, EV_READ | EV_WRITE); + + // Since we are using a separate fd for the connecting socket we + // end up using BEV_OPT_CLOSE_ON_FREE for the connecting, but + // not for the accepting side. since the BEV_OPT_CLOSE_ON_FREE + // also frees the SSL object, we need to manually free it for + // the accepting case. See the 'Connection Extra FD' note at top + // of file. + if (_accepted) { + SSL_free(ssl); + } + + // For the connecting socket BEV_OPT_CLOSE_ON_FREE will close + // the fd. See note below. + bufferevent_free(_bev); + } + + delete _event_loop_handle; + }, + DISALLOW_SHORT_CIRCUIT); +} + + +void LibeventSSLSocketImpl::initialize() +{ + event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this)); +} + + +void LibeventSSLSocketImpl::shutdown() +{ + // Nothing to do if this socket was never initialized. + synchronized (lock) { + if (bev == NULL) { + // If it was not initialized, then there should also be no + // requests. + CHECK(connect_request.get() == NULL); + CHECK(recv_request.get() == NULL); + CHECK(send_request.get() == NULL); + + return; + } + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + CHECK_NOTNULL(self->bev); + + synchronized (self->bev) { + Owned<RecvRequest> request; + + // Swap the 'recv_request' under the object lock. + synchronized (self->lock) { + std::swap(request, self->recv_request); + } + + // If there is still a pending receive request then close it. + if (request.get() != NULL) { + request->promise + .set(bufferevent_read(self->bev, request->data, request->size)); + } + } + }, + DISALLOW_SHORT_CIRCUIT); +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'recv_callback' unless the socket is still valid. + if (impl != NULL) { + impl->recv_callback(); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. +void LibeventSSLSocketImpl::recv_callback() +{ + CHECK(__in_event_loop__); + + Owned<RecvRequest> request; + + synchronized (lock) { + std::swap(request, recv_request); + } + + if (request.get() != NULL) { + // There is an invariant that if we are executing a + // 'recv_callback' and we have a request there must be data here + // because we should not be getting a spurrious receive callback + // invocation. Even if we discarded a request, the manual + // invocation of 'recv_callback' guarantees that there is a + // non-zero amount of data available in the bufferevent. + size_t length = bufferevent_read(bev, request->data, request->size); + CHECK(length > 0); + + request->promise.set(length); + } +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'send_callback' unless the socket is still valid. + if (impl != NULL) { + impl->send_callback(); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. +void LibeventSSLSocketImpl::send_callback() +{ + CHECK(__in_event_loop__); + + Owned<SendRequest> request; + + synchronized (lock) { + std::swap(request, send_request); + } + + if (request.get() != NULL) { + request->promise.set(request->size); + } +} + + +// Only runs in event loop. No locks required. See 'Locking' note at +// top of file. +void LibeventSSLSocketImpl::event_callback( + bufferevent* /*bev*/, + short events, + void* arg) +{ + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + // Don't call the 'event_callback' unless the socket is still valid. + if (impl != NULL) { + impl->event_callback(events); + } +} + + +// Only runs in event loop. Member function continuation of static +// 'recv_callback'. +void LibeventSSLSocketImpl::event_callback(short events) +{ + CHECK(__in_event_loop__); + + Owned<RecvRequest> current_recv_request; + Owned<SendRequest> current_send_request; + Owned<ConnectRequest> current_connect_request; + + // In all of the following conditions, we're interested in swapping + // the value of the requests with null (if they are already null, + // then there's no harm). + if (events & BEV_EVENT_EOF || + events & BEV_EVENT_CONNECTED || + events & BEV_EVENT_ERROR) { + synchronized (lock) { + std::swap(current_recv_request, recv_request); + std::swap(current_send_request, send_request); + std::swap(current_connect_request, connect_request); + } + } + + // If a request below is null, then no such request is in progress, + // either because it was never created, it has already been + // completed, or it has been discarded. + + if (events & BEV_EVENT_EOF || + (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) { + // At end of file, close the connection. + if (current_recv_request.get() != NULL) { + current_recv_request->promise.set(0); + } + + if (current_send_request.get() != NULL) { + current_send_request->promise.set(0); + } + + if (current_connect_request.get() != NULL) { + bufferevent_free(CHECK_NOTNULL(bev)); + bev = NULL; + current_connect_request->promise.fail( + "Failed connect: connection closed"); + } + } else if (events & BEV_EVENT_CONNECTED) { + // We should not have receiving or sending request while still + // connecting. + CHECK(current_recv_request.get() == NULL); + CHECK(current_send_request.get() == NULL); + CHECK_NOTNULL(current_connect_request.get()); + + // If we're connecting, then we've succeeded. Time to do + // post-verification. + CHECK_NOTNULL(bev); + + // Do post-validation of connection. + SSL* ssl = bufferevent_openssl_get_ssl(bev); + + Try<Nothing> verify = openssl::verify(ssl, peer_hostname); + if (verify.isError()) { + VLOG(1) << "Failed connect, verification error: " << verify.error(); + bufferevent_free(bev); + bev = NULL; + current_connect_request->promise.fail(verify.error()); + return; + } + + current_connect_request->promise.set(Nothing()); + } else if (events & BEV_EVENT_ERROR) { + CHECK(EVUTIL_SOCKET_ERROR() != 0); + std::ostringstream error_stream; + error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); + + // If there is a valid error, fail any requests and log the error. + VLOG(1) << "Socket error: " << error_stream.str(); + + if (current_recv_request.get() != NULL) { + current_recv_request->promise.fail( + "Failed recv, connection error: " + + error_stream.str()); + } + + if (current_send_request.get() != NULL) { + current_send_request->promise.fail( + "Failed send, connection error: " + + error_stream.str()); + } + + if (current_connect_request.get() != NULL) { + bufferevent_free(CHECK_NOTNULL(bev)); + bev = NULL; + current_connect_request->promise.fail( + "Failed connect, connection error: " + + error_stream.str()); + } + } +} + + +// For the connecting socket we currently don't use the fd associated +// with 'Socket'. See the 'Connection Extra FD' note at top of file. +LibeventSSLSocketImpl::LibeventSSLSocketImpl(int _s) + : Socket::Impl(_s), + bev(NULL), + listener(NULL), + lock(ATOMIC_FLAG_INIT), + recv_request(NULL), + send_request(NULL), + connect_request(NULL), + event_loop_handle(NULL), + accepted(false) {} + + +// For the connecting socket we currently don't use the fd associated +// with 'Socket'. See the 'Connection Extra FD' note at top of file. +LibeventSSLSocketImpl::LibeventSSLSocketImpl( + int _s, + bufferevent* _bev, + Option<std::string>&& _peer_hostname) + : Socket::Impl(_s), + bev(_bev), + listener(NULL), + lock(ATOMIC_FLAG_INIT), + recv_request(NULL), + send_request(NULL), + connect_request(NULL), + event_loop_handle(NULL), + accepted(true), + peer_hostname(std::move(_peer_hostname)) {} + + +Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address) +{ + if (bev != NULL) { + return Failure("Socket is already connected"); + } + + if (connect_request.get() != NULL) { + return Failure("Socket is already connecting"); + } + + SSL* ssl = SSL_new(openssl::context()); + if (ssl == NULL) { + return Failure("Failed to connect: SSL_new"); + } + + // Construct the bufferevent in the connecting state. We don't use + // the existing FD due to an issue in libevent-openssl. See the + // 'Connection Extra FD' note at top of file. + CHECK(bev == NULL); + bev = bufferevent_openssl_socket_new( + base, + -1, + ssl, + BUFFEREVENT_SSL_CONNECTING, + BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); + + if (bev == NULL) { + // We need to free 'ssl' here because the bev won't clean it up + // for us. + SSL_free(ssl); + return Failure("Failed to connect: bufferevent_openssl_socket_new"); + } + + // From this point on, as long as 'bev' is freed properly, it will + // free 'ssl' along with it due to the BEV_OPT_CLOSE_ON_FREE' flag. + + // Assign the callbacks for the bufferevent. + bufferevent_setcb( + bev, + &LibeventSSLSocketImpl::recv_callback, + &LibeventSSLSocketImpl::send_callback, + &LibeventSSLSocketImpl::event_callback, + CHECK_NOTNULL(event_loop_handle)); + + // Try and determine the 'peer_hostname' from the address we're + // connecting to in order to properly verify the SSL connection later. + const Try<string> hostname = address.hostname(); + + if (hostname.isError()) { + VLOG(2) << "Could not determine hostname of peer: " << hostname.error(); + } else { + VLOG(2) << "Connecting to " << hostname.get(); + peer_hostname = hostname.get(); + } + + // Optimistically construct a 'ConnectRequest' and future. + Owned<ConnectRequest> request(new ConnectRequest()); + Future<Nothing> future = request->promise.future(); + + // Assign 'connect_request' under lock, fail on error. + synchronized (lock) { + if (connect_request.get() != NULL) { + bufferevent_free(bev); + bev = NULL; + return Failure("Socket is already connecting"); + } + std::swap(request, connect_request); + } + + sockaddr_storage addr = net::createSockaddrStorage(address.ip, address.port); + + if (bufferevent_socket_connect( + bev, + reinterpret_cast<sockaddr*>(&addr), + sizeof(addr)) < 0) { + bufferevent_free(bev); + bev = NULL; + return Failure("Failed to connect: bufferevent_socket_connect"); + } + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size) +{ + // Optimistically construct a 'RecvRequest' and future. + Owned<RecvRequest> request(new RecvRequest(data, size)); + std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this)); + + // If the user of the future decides to 'discard', then we want to + // test whether the request was already satisfied. + // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr') + // because the socket could be destroyed before this lambda is + // executed. If we used a 'shared_ptr' then this lambda could extend + // the life-time of 'this' unnecessarily. + Future<size_t> future = request->promise.future() + .onDiscard([weak_self]() { + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be + // explicitly captured because we're not using it in the body of + // the lambda. We can use a 'shared_ptr' because + // run_in_event_loop is guaranteed to execute. + std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock()); + + if (self != NULL) { + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + Owned<RecvRequest> request; + + synchronized (self->lock) { + std::swap(request, self->recv_request); + } + + // Only discard if the request hasn't already been + // satisfied. + if (request.get() != NULL) { + // Discard the promise outside of the object lock as + // the callbacks can be expensive. + request->promise.discard(); + } + }, + DISALLOW_SHORT_CIRCUIT); + } + }); + + // Assign 'recv_request' under lock, fail on error. + synchronized (lock) { + if (recv_request.get() != NULL) { + return Failure("Socket is already receiving"); + } + std::swap(request, recv_request); + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self]() { + CHECK(__in_event_loop__); + CHECK(self); + + bool recv = false; + + // We check to see if 'recv_request' is null. It would be null + // if a 'discard' happened before this lambda was executed. + synchronized (self->lock) { + recv = self->recv_request.get() != NULL; + } + + // Only try to read existing data from the bufferevent if the + // request has not already been discarded. + if (recv) { + synchronized (self->bev) { + evbuffer* input = bufferevent_get_input(self->bev); + size_t length = evbuffer_get_length(input); + + // If there is already data in the buffer, fulfill the + // 'recv_request' by calling 'recv_callback()'. Otherwise + // do nothing and wait for the 'recv_callback' to run when + // we receive data over the network. + if (length > 0) { + self->recv_callback(); + } + } + } + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size) +{ + // Optimistically construct a 'SendRequest' and future. + Owned<SendRequest> request(new SendRequest(size)); + Future<size_t> future = request->promise.future(); + + // We don't add an 'onDiscard' continuation to send because we can + // not accurately detect how many bytes have been sent. Once we pass + // the data to the bufferevent, there is the possibility that parts + // of it have been sent. Another reason is that if we send partial + // messages (discard only a part of the data), then it is likely + // that the receiving end will fail parsing the message. + + // Assign 'send_request' under lock, fail on error. + synchronized (lock) { + if (send_request.get() != NULL) { + return Failure("Socket is already sending"); + } + std::swap(request, send_request); + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self, data, size]() { + CHECK(__in_event_loop__); + CHECK(self); + + // We check that send_request is valid, because we do not + // allow discards. This means there is no race between the + // entry of 'send' and the execution of this lambda. + synchronized (self->lock) { + CHECK_NOTNULL(self->send_request.get()); + } + + bufferevent_write(self->bev, data, size); + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Future<size_t> LibeventSSLSocketImpl::sendfile( + int fd, + off_t offset, + size_t size) +{ + // Optimistically construct a 'SendRequest' and future. + Owned<SendRequest> request(new SendRequest(size)); + Future<size_t> future = request->promise.future(); + + // Assign 'send_request' under lock, fail on error. + synchronized (lock) { + if (send_request.get() != NULL) { + return Failure("Socket is already sending"); + } + std::swap(request, send_request); + } + + // Extend the life-time of 'this' through the execution of the + // lambda in the event loop. Note: The 'self' needs to be explicitly + // captured because we're not using it in the body of the lambda. We + // can use a 'shared_ptr' because run_in_event_loop is guaranteed to + // execute. + auto self = shared(this); + + run_in_event_loop( + [self, fd, offset, size]() { + CHECK(__in_event_loop__); + CHECK(self); + + // We check that send_request is valid, because we do not + // allow discards. This means there is no race between the + // entry of 'sendfile' and the execution of this lambda. + synchronized (self->lock) { + CHECK_NOTNULL(self->send_request.get()); + } + + evbuffer_add_file( + bufferevent_get_output(self->bev), + fd, + offset, + size); + }, + DISALLOW_SHORT_CIRCUIT); + + return future; +} + + +Try<Nothing> LibeventSSLSocketImpl::listen(int backlog) +{ + if (listener != NULL) { + return Error("Socket is already listening"); + } + + CHECK(bev == NULL); + + listener = evconnlistener_new( + base, + [](evconnlistener* listener, + int socket, + sockaddr* addr, + int addr_length, + void* arg) { + CHECK(__in_event_loop__); + + std::weak_ptr<LibeventSSLSocketImpl>* handle = + reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>( + CHECK_NOTNULL(arg)); + + std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock()); + + if (impl != NULL) { + Try<net::IP> ip = net::IP::create(*addr); + if (ip.isError()) { + VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error(); + } + + // We pass the 'listener' into the 'AcceptRequest' because + // this function could be executed before 'this->listener' + // is set. + AcceptRequest* request = + new AcceptRequest( + socket, + listener, + ip.isSome() ? Option<net::IP>(ip.get()) : None()); + + impl->accept_callback(request); + } + }, + event_loop_handle, + LEV_OPT_REUSEABLE, + backlog, + s); + + if (listener == NULL) { + return Error("Failed to listen on socket"); + } + + // TODO(jmlvanre): attach an error callback. + + return Nothing(); +} + + +Future<Socket> LibeventSSLSocketImpl::accept() +{ + return accept_queue.get() + .then([](const Future<Socket>& future) { return future; }); +} + + +// Only runs in event loop. +void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request) +{ + CHECK(__in_event_loop__); + + // Enqueue a potential socket that we will set up SSL state for and + // verify. + accept_queue.put(request->promise.future()); + + // Set up SSL object. + SSL* ssl = SSL_new(openssl::context()); + if (ssl == NULL) { + request->promise.fail("Accept failed, SSL_new"); + delete request; + return; + } + + // We use 'request->listener' because 'this->listener' may not have + // been set by the time this function is executed. See comment in + // the lambda for evconnlistener_new in + // 'LibeventSSLSocketImpl::listen'. + event_base* ev_base = evconnlistener_get_base(request->listener); + + // Construct the bufferevent in the accepting state. + bufferevent* bev = bufferevent_openssl_socket_new( + ev_base, + request->socket, + ssl, + BUFFEREVENT_SSL_ACCEPTING, + BEV_OPT_THREADSAFE); + + if (bev == NULL) { + request->promise.fail("Accept failed: bufferevent_openssl_socket_new"); + SSL_free(ssl); + delete request; + return; + } + + bufferevent_setcb( + bev, + NULL, + NULL, + [](bufferevent* bev, short events, void* arg) { + // This handles error states or 'BEV_EVENT_CONNECTED' events + // and satisfies the promise by constructing a new socket if + // the connection was successfuly established. + CHECK(__in_event_loop__); + + AcceptRequest* request = + reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg)); + + if (events & BEV_EVENT_EOF) { + request->promise.fail("Failed accept: connection closed"); + } else if (events & BEV_EVENT_CONNECTED) { + // We will receive a 'CONNECTED' state on an accepting socket + // once the connection is established. Time to do + // post-verification. First, we need to determine the peer + // hostname. + Option<string> peer_hostname = None(); + if (request->ip.isSome()) { + Try<string> hostname = net::getHostname(request->ip.get()); + if (hostname.isError()) { + VLOG(2) << "Could not determine hostname of peer: " + << hostname.error(); + } else { + VLOG(2) << "Accepting from " << hostname.get(); + peer_hostname = hostname.get(); + } + } + + SSL* ssl = bufferevent_openssl_get_ssl(bev); + CHECK_NOTNULL(ssl); + + Try<Nothing> verify = openssl::verify(ssl, peer_hostname); + if (verify.isError()) { + VLOG(1) << "Failed accept, verification error: " << verify.error(); + request->promise.fail(verify.error()); + SSL_free(ssl); + bufferevent_free(bev); + // TODO(jmlvanre): Clean up for readability. Consider RAII + // or constructing the impl earlier. + CHECK(request->socket >= 0); + Try<Nothing> close = os::close(request->socket); + if (close.isError()) { + LOG(FATAL) + << "Failed to close socket " << stringify(request->socket) + << ": " << close.error(); + } + delete request; + return; + } + + auto impl = std::shared_ptr<LibeventSSLSocketImpl>( + new LibeventSSLSocketImpl( + request->socket, + bev, + std::move(peer_hostname))); + + // See comment at 'initialize' declaration for why we call + // this. + impl->initialize(); + + // We have to wait till after 'initialize()' is invoked for + // event_loop_handle to be valid as a callback argument for + // the callbacks. + bufferevent_setcb( + CHECK_NOTNULL(impl->bev), + &LibeventSSLSocketImpl::recv_callback, + &LibeventSSLSocketImpl::send_callback, + &LibeventSSLSocketImpl::event_callback, + CHECK_NOTNULL(impl->event_loop_handle)); + + Socket socket = Socket::Impl::socket(std::move(impl)); + + request->promise.set(socket); + } else if (events & BEV_EVENT_ERROR) { + std::ostringstream stream; + if (EVUTIL_SOCKET_ERROR() != 0) { + stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()); + } else { + char buffer[1024] = {}; + unsigned long error = bufferevent_get_openssl_error(bev); + ERR_error_string_n(error, buffer, sizeof(buffer)); + stream << buffer; + } + + // Fail the accept request and log the error. + VLOG(1) << "Socket error: " << stream.str(); + + SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev)); + SSL_free(ssl); + bufferevent_free(bev); + + // TODO(jmlvanre): Clean up for readability. Consider RAII + // or constructing the impl earlier. + CHECK(request->socket >= 0); + Try<Nothing> close = os::close(request->socket); + if (close.isError()) { + LOG(FATAL) + << "Failed to close socket " << stringify(request->socket) + << ": " << close.error(); + } + request->promise.fail( + "Failed accept: connection error: " + stream.str()); + } + + delete request; + }, + request); +} + +} // namespace network { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp new file mode 100644 index 0000000..d65638b --- /dev/null +++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp @@ -0,0 +1,165 @@ +#ifndef __LIBEVENT_SSL_SOCKET_HPP__ +#define __LIBEVENT_SSL_SOCKET_HPP__ + +#include <event2/buffer.h> +#include <event2/bufferevent_ssl.h> +#include <event2/event.h> +#include <event2/listener.h> +#include <event2/util.h> + +#include <atomic> +#include <memory> + +#include <process/queue.hpp> +#include <process/socket.hpp> + +namespace process { +namespace network { + +class LibeventSSLSocketImpl : public Socket::Impl +{ +public: + // See 'Socket::create()'. + static Try<std::shared_ptr<Socket::Impl>> create(int s); + + LibeventSSLSocketImpl(int _s); + + virtual ~LibeventSSLSocketImpl(); + + // Implement 'Socket::Impl' interface. + virtual Future<Nothing> connect(const Address& address); + virtual Future<size_t> recv(char* data, size_t size); + // Send does not currently support discard. See implementation. + virtual Future<size_t> send(const char* data, size_t size); + virtual Future<size_t> sendfile(int fd, off_t offset, size_t size); + virtual Try<Nothing> listen(int backlog); + virtual Future<Socket> accept(); + virtual Socket::Kind kind() const { return Socket::SSL; } + + // This call is used to do the equivalent of shutting down the read + // end. This means finishing the future of any outstanding read + // request. + virtual void shutdown(); + + // We need a post-initializer because 'shared_from_this()' is not + // valid until the constructor has finished. + void initialize(); + +private: + // A set of helper functions that transitions an accepted socket to + // an SSL connected socket. With the libevent-openssl library, once + // we return from the 'accept_callback()' which is scheduled by + // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED' + // state before we know the SSL connection has been established. + struct AcceptRequest + { + AcceptRequest( + int _socket, + evconnlistener* _listener, + const Option<net::IP>& _ip) + : listener(_listener), + socket(_socket), + ip(_ip) {} + Promise<Socket> promise; + evconnlistener* listener; + int socket; + Option<net::IP> ip; + }; + + struct RecvRequest + { + RecvRequest(char* _data, size_t _size) + : data(_data), size(_size) {} + Promise<size_t> promise; + char* data; + size_t size; + }; + + struct SendRequest + { + SendRequest(size_t _size) + : size(_size) {} + Promise<size_t> promise; + size_t size; + }; + + struct ConnectRequest + { + Promise<Nothing> promise; + }; + + // This is a private constructor used by the accept helper + // functions. + LibeventSSLSocketImpl( + int _s, + bufferevent* bev, + Option<std::string>&& peer_hostname); + + // This is called when the equivalent of 'accept' returns. The role + // of this function is to set up the SSL object and bev. + void accept_callback(AcceptRequest* request); + + // The following are function pairs of static functions to member + // functions. The static functions test and hold the weak pointer to + // the socket before calling the member functions. This protects + // against the socket being destroyed before the event loop calls + // the callbacks. + static void recv_callback(bufferevent* bev, void* arg); + void recv_callback(); + + static void send_callback(bufferevent* bev, void* arg); + void send_callback(); + + static void event_callback(bufferevent* bev, short events, void* arg); + void event_callback(short events); + + bufferevent* bev; + + evconnlistener* listener; + + // Protects the following instance variables. + std::atomic_flag lock; + Owned<RecvRequest> recv_request; + Owned<SendRequest> send_request; + Owned<ConnectRequest> connect_request; + + // This is a weak pointer to 'this', i.e., ourselves, this class + // instance. We need this for our event loop callbacks because it's + // possible that we'll actually want to cleanup this socket impl + // before the event loop callback gets executed ... and we'll check + // in each event loop callback whether or not this weak_ptr is valid + // by attempting to upgrade it to shared_ptr. It is the + // responsibility of the event loop through the deferred lambda in + // the destructor to clean up this pointer. + // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we + // want to test whether the object is still around from within the + // event loop. If it was a 'shared_ptr' then we would be + // contributing to the lifetime of the object and would no longer be + // able to test the lifetime correctly. + // 2) This is a pointer to a 'weak_ptr' so that we can pass this + // through to the event loop through the C-interface. We need access + // to the 'weak_ptr' from outside the object (in the event loop) to + // test if the object is still alive. By maintaining this 'weak_ptr' + // on the heap we can be sure it is safe to access from the + // event loop until it is destroyed. + std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle; + + // This queue stores buffered accepted sockets. 'Queue' is a thread + // safe queue implementation, and the event loop pushes connected + // sockets onto it, the 'accept()' call pops them off. We wrap these + // sockets with futures so that we can pass errors through and chain + // futures as well. + Queue<Future<Socket>> accept_queue; + + // This bool represents whether this socket was created through an + // 'accept' flow. We use this in the destructor to change + // the clean-up behavior for the SSL context object. + bool accepted; + + Option<std::string> peer_hostname; +}; + +} // namespace network { +} // namespace process { + +#endif // __LIBEVENT_SSL_SOCKET_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/openssl.cpp b/3rdparty/libprocess/src/openssl.cpp new file mode 100644 index 0000000..090e985 --- /dev/null +++ b/3rdparty/libprocess/src/openssl.cpp @@ -0,0 +1,563 @@ +#include "openssl.hpp" + +#include <sys/param.h> + +#include <openssl/err.h> +#include <openssl/rand.h> +#include <openssl/ssl.h> +#include <openssl/x509v3.h> + +#include <mutex> +#include <string> + +#include <process/once.hpp> + +#include <stout/flags.hpp> + +using std::ostringstream; +using std::string; + +// Must be defined by us for OpenSSL in order to capture the necessary +// data for doing locking. Note, this needs to be defined in the +// global namespace as well. +struct CRYPTO_dynlock_value +{ + std::mutex mutex; +}; + + +namespace process { +namespace network { +namespace openssl { + +// _Global_ OpenSSL context, initialized via 'initialize'. +static SSL_CTX* ctx = NULL; + + +Flags::Flags() +{ + add(&Flags::enabled, + "enabled", + "Whether SSL is enabled.", + false); + + add(&Flags::cert_file, + "cert_file", + "Path to certifcate."); + + add(&Flags::key_file, + "key_file", + "Path to key."); + + add(&Flags::verify_cert, + "verify_cert", + "Whether or not to verify peer certificates.", + false); + + add(&Flags::require_cert, + "require_cert", + "Whether or not to require peer certificates. Requiring a peer " + "certificate implies verifying it.", + false); + + add(&Flags::verification_depth, + "verification_depth", + "Maximum depth for the certificate chain verification that shall be " + "allowed.", + 4); + + add(&Flags::ca_dir, + "ca_dir", + "Path to certifcate authority (CA) directory."); + + add(&Flags::ca_file, + "ca_file", + "Path to certifcate authority (CA) file."); + + add(&Flags::ciphers, + "ciphers", + "Cryptographic ciphers to use.", + // Default TLSv1 ciphers chosen based on Amazon's security + // policy, see: + // http://docs.aws.amazon.com/ElasticLoadBalancing/latest/ + // DeveloperGuide/elb-security-policy-table.html + "AES128-SHA:AES256-SHA:RC4-SHA:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA:" + "DHE-RSA-AES256-SHA:DHE-DSS-AES256-SHA"); + + add(&Flags::enable_ssl_v2, + "enable_ssl_v2", + "Enable SSLV2.", + false); + + add(&Flags::enable_ssl_v3, + "enable_ssl_v3", + "Enable SSLV3.", + false); + + add(&Flags::enable_tls_v1_0, + "enable_tls_v1_0", + "Enable SSLV1.0.", + false); + + add(&Flags::enable_tls_v1_1, + "enable_tls_v1_1", + "Enable SSLV1.1.", + false); + + add(&Flags::enable_tls_v1_2, + "enable_tls_v1_2", + "Enable SSLV1.2.", + true); +} + + +static Flags* ssl_flags = new Flags(); + + +const Flags& flags() +{ + openssl::initialize(); + return *ssl_flags; +} + + +// Mutexes necessary to support OpenSSL locking on shared data +// structures. See 'locking_function' for more information. +static std::mutex* mutexes = NULL; + + +// Callback needed to perform locking on shared data structures. From +// the OpenSSL documentation: +// +// OpenSSL uses a number of global data structures that will be +// implicitly shared whenever multiple threads use OpenSSL. +// Multi-threaded applications will crash at random if [the locking +// function] is not set. +void locking_function(int mode, int n, const char* /*file*/, int /*line*/) +{ + if (mode & CRYPTO_LOCK) { + mutexes[n].lock(); + } else { + mutexes[n].unlock(); + } +} + + +// OpenSSL callback that returns the current thread ID, necessary for +// OpenSSL threading. +unsigned long id_function() +{ + pthread_t pthread = pthread_self(); +#ifdef __APPLE__ + mach_port_t id = pthread_mach_thread_np(pthread); +#else + pthread_t id = pthread; +#endif // __APPLE__ + return static_cast<unsigned long>(id); +} + + +// OpenSSL callback for creating new dynamic "locks", abstracted by +// the CRYPTO_dynlock_value structure. +CRYPTO_dynlock_value* dyn_create_function(const char* /*file*/, int /*line*/) +{ + CRYPTO_dynlock_value* value = new CRYPTO_dynlock_value(); + + if (value == NULL) { + return NULL; + } + + return value; +} + + +// OpenSSL callback for locking and unlocking dynamic "locks", +// abstracted by the CRYPTO_dynlock_value structure. +void dyn_lock_function( + int mode, + CRYPTO_dynlock_value* value, + const char* /*file*/, + int /*line*/) +{ + if (mode & CRYPTO_LOCK) { + value->mutex.lock(); + } else { + value->mutex.unlock(); + } +} + + +// OpenSSL callback for destroying dynamic "locks", abstracted by the +// CRYPTO_dynlock_value structure. +void dyn_destroy_function( + CRYPTO_dynlock_value* value, + const char* /*file*/, + int /*line*/) +{ + delete value; +} + + +// Callback for OpenSSL peer certificate verification. +int verify_callback(int ok, X509_STORE_CTX* store) +{ + if (ok != 1) { + // Construct and log a warning message. + ostringstream message; + + X509* cert = X509_STORE_CTX_get_current_cert(store); + int error = X509_STORE_CTX_get_error(store); + int depth = X509_STORE_CTX_get_error_depth(store); + + message << "Error with certificate at depth: " << stringify(depth) << "\n"; + + char buffer[256] {}; + + // TODO(jmlvanre): use X509_NAME_print_ex instead. + X509_NAME_oneline(X509_get_issuer_name(cert), buffer, sizeof(buffer) - 1); + + message << "Issuer: " << stringify(buffer) << "\n"; + + // TODO(jmlvanre): use X509_NAME_print_ex instead. + memset(buffer, 0, sizeof(buffer)); + X509_NAME_oneline(X509_get_subject_name(cert), buffer, sizeof(buffer) - 1); + + message << "Subject: " << stringify(buffer) << "\n"; + + message << "Error (" << stringify(error) << "): " << + stringify(X509_verify_cert_error_string(error)); + + LOG(WARNING) << message.str(); + } + + return ok; +} + + +string error_string(unsigned long code) +{ + // SSL library guarantees to stay within 120 bytes. + char buffer[128]; + + ERR_error_string_n(code, buffer, sizeof(buffer)); + string s(buffer); + + if (code == SSL_ERROR_SYSCALL) { + s += error_string(ERR_get_error()); + } + + return s; +} + + +void initialize() +{ + static Once* initialized = new Once(); + + if (initialized->once()) { + return; + } + + // Load all the flags prefixed by SSL_ from the environment. See + // comment at top of openssl.hpp for a full list. + Try<Nothing> load = ssl_flags->load("SSL_"); + if (load.isError()) { + EXIT(EXIT_FAILURE) + << "Failed to load flags from environment variables (prefixed by SSL_):" + << load.error(); + } + + // Exit early if SSL is not enabled. + if (!ssl_flags->enabled) { + initialized->done(); + return; + } + + // We MUST have entropy, or else there's no point to crypto. + if (!RAND_poll()) { + EXIT(EXIT_FAILURE) << "SSL socket requires entropy"; + } + + // Initialize the OpenSSL library. + SSL_library_init(); + SSL_load_error_strings(); + + // Prepare mutexes for threading callbacks. + mutexes = new std::mutex[CRYPTO_num_locks()]; + + // Install SSL threading callbacks. + // TODO(jmlvanre): the id mechanism is deprecated in OpenSSL. + CRYPTO_set_id_callback(&id_function); + CRYPTO_set_locking_callback(&locking_function); + CRYPTO_set_dynlock_create_callback(&dyn_create_function); + CRYPTO_set_dynlock_lock_callback(&dyn_lock_function); + CRYPTO_set_dynlock_destroy_callback(&dyn_destroy_function); + + ctx = SSL_CTX_new(SSLv23_method()); + CHECK(ctx) << "Failed to create SSL context: " + << ERR_error_string(ERR_get_error(), NULL); + + // Disable SSL session caching. + SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF); + + // Set a session id to avoid connection termination upon + // re-connect. We can use something more relevant when we care + // about session caching. + const uint64_t session_ctx = 7; + + const unsigned char* session_id = + reinterpret_cast<const unsigned char*>(&session_ctx); + + if (SSL_CTX_set_session_id_context( + ctx, + session_id, + sizeof(session_ctx)) != 1) { + LOG(FATAL) << "Session id context size exceeds maximum"; + } + + // Now do some validation of the flags/environment variables. + if (ssl_flags->key_file.isNone()) { + EXIT(EXIT_FAILURE) << "SSL requires key! NOTE: Set path with SSL_KEY_FILE"; + } + + if (ssl_flags->cert_file.isNone()) { + EXIT(EXIT_FAILURE) + << "SSL requires certificate! NOTE: Set path with SSL_CERT_FILE"; + } + + if (ssl_flags->ca_file.isNone()) { + VLOG(2) << "CA file path is unspecified! NOTE: " + << "Set CA file path with SSL_CA_FILE=<filepath>"; + } + + if (ssl_flags->ca_dir.isNone()) { + VLOG(2) << "CA directory path unspecified! NOTE: " + << "Set CA directory path with SSL_CA_DIR=<dirpath>"; + } + + if (!ssl_flags->verify_cert) { + VLOG(2) << "Will not verify peer certificate!\n" + << "NOTE: Set SSL_VERIFY_CERT=1 to enable peer certificate " + << "verification"; + } + + if (!ssl_flags->require_cert) { + VLOG(2) << "Will only verify peer certificate if presented!\n" + << "NOTE: Set SSL_REQUIRE_CERT=1 to require peer certificate " + << "verification"; + } + + if (ssl_flags->require_cert && !ssl_flags->verify_cert) { + // Requiring a certificate implies that is should be verified. + ssl_flags->verify_cert = true; + + VLOG(2) << "SSL_REQUIRE_CERT implies peer certificate verification.\n" + << "SSL_VERIFY_CERT set to true"; + } + + // Initialize OpenSSL if we've been asked to do verification of peer + // certificates. + if (ssl_flags->verify_cert) { + // Set CA locations. + if (ssl_flags->ca_file.isSome() || ssl_flags->ca_dir.isSome()) { + const char* ca_file = ssl_flags->ca_file.isSome() + ? ssl_flags->ca_file.get().c_str() + : NULL; + + const char* ca_dir = ssl_flags->ca_dir.isSome() + ? ssl_flags->ca_dir.get().c_str() + : NULL; + + if (SSL_CTX_load_verify_locations(ctx, ca_file, ca_dir) != 1) { + unsigned long error = ERR_get_error(); + EXIT(EXIT_FAILURE) + << "Could not load CA file and/or directory (" + << stringify(error) << "): " + << error_string(error) << " -> " + << (ca_file != NULL ? (stringify("FILE: ") + ca_file) : "") + << (ca_dir != NULL ? (stringify("DIR: ") + ca_dir) : ""); + } + + if (ca_file != NULL) { + VLOG(2) << "Using CA file: " << ca_file; + } + if (ca_dir != NULL) { + VLOG(2) << "Using CA dir: " << ca_dir; + } + } else { + if (SSL_CTX_set_default_verify_paths(ctx) != 1) { + EXIT(EXIT_FAILURE) << "Could not load default CA file and/or directory"; + } + + VLOG(2) << "Using default CA file and/or directory"; + } + + // Set SSL peer verification callback. + int mode = SSL_VERIFY_PEER; + + if (ssl_flags->require_cert) { + mode |= SSL_VERIFY_FAIL_IF_NO_PEER_CERT; + } + + SSL_CTX_set_verify(ctx, mode, &verify_callback); + + SSL_CTX_set_verify_depth(ctx, ssl_flags->verification_depth); + } else { + SSL_CTX_set_verify(ctx, SSL_VERIFY_NONE, NULL); + } + + // Set certificate chain. + if (SSL_CTX_use_certificate_chain_file( + ctx, + ssl_flags->cert_file.get().c_str()) != 1) { + EXIT(EXIT_FAILURE) << "Could not load cert file"; + } + + // Set private key. + if (SSL_CTX_use_PrivateKey_file( + ctx, + ssl_flags->key_file.get().c_str(), + SSL_FILETYPE_PEM) != 1) { + EXIT(EXIT_FAILURE) << "Could not load key file"; + } + + // Validate key. + if (SSL_CTX_check_private_key(ctx) != 1) { + EXIT(EXIT_FAILURE) + << "Private key does not match the certificate public key"; + } + + VLOG(2) << "Using ciphers: " << ssl_flags->ciphers; + + if (SSL_CTX_set_cipher_list(ctx, ssl_flags->ciphers.c_str()) == 0) { + EXIT(EXIT_FAILURE) << "Could not set ciphers: " << ssl_flags->ciphers; + } + + // Use server preference for cipher. + long ssl_options = SSL_OP_CIPHER_SERVER_PREFERENCE; + // Disable SSLv2. + if (!ssl_flags->enable_ssl_v2) { ssl_options |= SSL_OP_NO_SSLv2; } + // Disable SSLv3. + if (!ssl_flags->enable_ssl_v3) { ssl_options |= SSL_OP_NO_SSLv3; } + // Disable TLSv1. + if (!ssl_flags->enable_tls_v1_0) { ssl_options |= SSL_OP_NO_TLSv1; } + // Disable TLSv1.1. + if (!ssl_flags->enable_tls_v1_1) { ssl_options |= SSL_OP_NO_TLSv1_1; } + // Disable TLSv1.2. + if (!ssl_flags->enable_tls_v1_2) { ssl_options |= SSL_OP_NO_TLSv1_2; } + + SSL_CTX_set_options(ctx, ssl_options); + + initialized->done(); +} + + +SSL_CTX* context() +{ + // TODO(benh): Always call 'initialize' just in case? + return ctx; +} + + +Try<Nothing> verify(const SSL* const ssl, const Option<string>& hostname) +{ + // Return early if we don't need to verify. + if (!ssl_flags->verify_cert) { + return Nothing(); + } + + // The X509 object must be freed if this call succeeds. + // TODO(jmlvanre): handle this better. How about RAII? + X509* cert = SSL_get_peer_certificate(ssl); + + if (cert == NULL) { + return ssl_flags->require_cert + ? Error("Peer did not provide certificate") + : Try<Nothing>(Nothing()); + } + + if (SSL_get_verify_result(ssl) != X509_V_OK) { + X509_free(cert); + return Error("Could not verify peer certificate"); + } + + if (hostname.isNone()) { + X509_free(cert); + return ssl_flags->require_cert + ? Error("Cannot verify peer certificate: peer hostname unknown") + : Try<Nothing>(Nothing()); + } + + int extcount = X509_get_ext_count(cert); + if (extcount <= 0) { + X509_free(cert); + return Error("X509_get_ext_count failed: " + stringify(extcount)); + } + + for (int i = 0; i < extcount; i++) { + X509_EXTENSION* ext = X509_get_ext(cert, i); + + const string extstr = + OBJ_nid2sn(OBJ_obj2nid(X509_EXTENSION_get_object(ext))); + + if (extstr == "subjectAltName") { +#if OPENSSL_VERSION_NUMBER <= 0x00909000L + X509V3_EXT_METHOD* method = X509V3_EXT_get(ext); +#else + const X509V3_EXT_METHOD* method = X509V3_EXT_get(ext); +#endif + if (method == NULL) { + break; + } + + const unsigned char* data = ext->value->data; + + STACK_OF(CONF_VALUE)* values = method->i2v( + method, + method->d2i(NULL, &data, ext->value->length), + NULL); + + for (int j = 0; j < sk_CONF_VALUE_num(values); j++) { + CONF_VALUE* value = sk_CONF_VALUE_value(values, j); + if ((strcmp(value->name, "DNS") == 0) && + (value->value == hostname.get())) { + X509_free(cert); + return Nothing(); + } + } + } + } + + // If we still haven't verified the hostname, try doing it via + // the certificate subject name. + X509_NAME* name = X509_get_subject_name(cert); + + if (name != NULL) { + char text[_POSIX_HOST_NAME_MAX] {}; + + if (X509_NAME_get_text_by_NID( + name, + NID_commonName, + text, + sizeof(text)) > 0) { + if (hostname.get() != text) { + X509_free(cert); + return Error( + "Presented Certificate Name: " + stringify(text) + + " does not match peer hostname name: " + hostname.get()); + } + + X509_free(cert); + return Nothing(); + } + } + + // If we still haven't exited, we haven't verified it, and we give up. + X509_free(cert); + return Error( + "Could not verify presented certificate with hostname " + hostname.get()); +} + +} // namespace openssl { +} // namespace network { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/openssl.hpp b/3rdparty/libprocess/src/openssl.hpp new file mode 100644 index 0000000..60c7b07 --- /dev/null +++ b/3rdparty/libprocess/src/openssl.hpp @@ -0,0 +1,79 @@ +#ifndef __OPENSSL_HPP__ +#define __OPENSSL_HPP__ + +#include <openssl/ssl.h> + +#include <string> + +#include <stout/flags.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> + +namespace process { +namespace network { +namespace openssl { + +// Capture the environment variables that influence how we initialize +// the OpenSSL library via flags. +class Flags : public virtual flags::FlagsBase +{ +public: + Flags(); + + bool enabled; + Option<std::string> cert_file; + Option<std::string> key_file; + bool verify_cert; + bool require_cert; + unsigned int verification_depth; + Option<std::string> ca_dir; + Option<std::string> ca_file; + std::string ciphers; + bool enable_ssl_v2; + bool enable_ssl_v3; + bool enable_tls_v1_0; + bool enable_tls_v1_1; + bool enable_tls_v1_2; +}; + +const Flags& flags(); + +// Initializes the _global_ OpenSSL context (SSL_CTX) as well as the +// crypto library in order to support multi-threading. The global +// context gets initialized using the environment variables: +// +// SSL_ENABLED=(false|0,true|1) +// SSL_CERT_FILE=(path to certificate) +// SSL_KEY_FILE=(path to key) +// SSL_VERIFY_CERT=(false|0,true|1) +// SSL_REQUIRE_CERT=(false|0,true|1) +// SSL_VERIFY_DEPTH=(4) +// SSL_CA_DIR=(path to CA directory) +// SSL_CA_FILE=(path to CA file) +// SSL_CIPHERS=(accepted ciphers separated by ':') +// SSL_ENABLE_SSL_V2=(false|0,true|1) +// SSL_ENABLE_SSL_V3=(false|0,true|1) +// SSL_ENABLE_TLS_V1_0=(false|0,true|1) +// SSL_ENABLE_TLS_V1_1=(false|0,true|1) +// SSL_ENABLE_TLS_V1_2=(false|0,true|1) +// +// TODO(benh): When/If we need to support multiple contexts in the +// same process, for example for Server Name Indication (SNI), then +// we'll add other functions for initializing an SSL_CTX based on +// these environment variables. +// TODO(nneilsen): Support certification revocation. +void initialize(); + +// Returns the _global_ OpenSSL context. +SSL_CTX* context(); + +// Verify that the hostname is properly associated with the peer +// certificate associated with the specified SSL connection. +Try<Nothing> verify(const SSL* const ssl, const Option<std::string>& hostname); + +} // namespace openssl { +} // namespace network { +} // namespace process { + +#endif // __OPENSSL_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index c2baa6c..f919b99 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1646,15 +1646,22 @@ Encoder* SocketManager::next(int s) } dispose.erase(s); + auto iterator = sockets.find(s); - delete iterator->second; - sockets.erase(iterator); // We don't actually close the socket (we wait for the Socket // abstraction to close it once there are no more references), // but we do shutdown the receiving end so any DataDecoder // will get cleaned up (which might have the last reference). - shutdown(s, SHUT_RD); + + // Hold on to the Socket and remove it from the 'sockets' + // map so that in the case where 'shutdown()' ends up + // calling close the termination logic is not run twice. + Socket* socket = iterator->second; + sockets.erase(iterator); + socket->shutdown(); + + delete socket; } } } @@ -1714,6 +1721,9 @@ void SocketManager::close(int s) proxies.erase(s); } + dispose.erase(s); + auto iterator = sockets.find(s); + // We need to stop any 'ignore_data' receivers as they may have // the last Socket reference so we shutdown recvs but don't do a // full close (since that will be taken care of by ~Socket, see @@ -1722,12 +1732,16 @@ void SocketManager::close(int s) // from the socket. Note we need to do this before we call // 'sockets.erase(s)' to avoid the potential race with the last // reference being in 'sockets'. - shutdown(s, SHUT_RD); - dispose.erase(s); - auto iterator = sockets.find(s); - delete iterator->second; + + // Hold on to the Socket and remove it from the 'sockets' map so + // that in the case where 'shutdown()' ends up calling close the + // termination logic is not run twice. + Socket* socket = iterator->second; sockets.erase(iterator); + socket->shutdown(); + + delete socket; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/socket.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp index 0e1cebb..b2a27b5 100644 --- a/3rdparty/libprocess/src/socket.cpp +++ b/3rdparty/libprocess/src/socket.cpp @@ -9,6 +9,10 @@ #include <process/owned.hpp> #include <process/socket.hpp> +#ifdef USE_SSL_SOCKET +#include "libevent_ssl_socket.hpp" +#include "openssl.hpp" +#endif #include "poll_socket.hpp" using std::string; @@ -55,6 +59,16 @@ Try<Socket> Socket::create(Kind kind, int s) } return Socket(socket.get()); } +#ifdef USE_SSL_SOCKET + case SSL: { + Try<std::shared_ptr<Socket::Impl>> socket = + LibeventSSLSocketImpl::create(s); + if (socket.isError()) { + return Error(socket.error()); + } + return Socket(socket.get()); + } +#endif // By not setting a default we leverage the compiler errors when // the enumeration is augmented to find all the cases we need to // provide. @@ -64,9 +78,13 @@ Try<Socket> Socket::create(Kind kind, int s) const Socket::Kind& Socket::DEFAULT_KIND() { - // TODO(jmlvanre): Change the default based on configure or - // environment flags. - static const Kind DEFAULT = POLL; + static const Kind DEFAULT = +#ifdef USE_SSL_SOCKET + network::openssl::flags().enabled ? Socket::SSL : Socket::POLL; +#else + Socket::POLL; +#endif + return DEFAULT; }
