This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a827b1ee2ec1528e54514b38e01873970e7cfaf6 Author: Joseph Wu <[email protected]> AuthorDate: Wed Oct 23 18:01:35 2019 -0700 SSL Socket: Implemented send/recv and shutdown. This completes a fully functional SSL socket, satisfying all the existing SSL socket tests in libprocess. Review: https://reviews.apache.org/r/71666 --- 3rdparty/libprocess/src/ssl/openssl_socket.cpp | 140 +++++++++++++++++++++++-- 3rdparty/libprocess/src/ssl/openssl_socket.hpp | 9 ++ 2 files changed, 141 insertions(+), 8 deletions(-) diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp index e1d8ad3..42a1918 100644 --- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp +++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp @@ -299,7 +299,8 @@ Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd socket) OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd socket) : PollSocketImpl(socket), - ssl(nullptr) {} + ssl(nullptr), + dirty_shutdown(false) {} OpenSSLSocketImpl::~OpenSSLSocketImpl() @@ -308,6 +309,11 @@ OpenSSLSocketImpl::~OpenSSLSocketImpl() SSL_free(ssl); ssl = nullptr; } + + if (compute_thread.isSome()) { + process::terminate(compute_thread.get()); + compute_thread = None(); + } } @@ -410,15 +416,74 @@ Future<Nothing> OpenSSLSocketImpl::connect( } -Future<size_t> OpenSSLSocketImpl::recv(char* data, size_t size) +Future<size_t> OpenSSLSocketImpl::recv(char* output, size_t size) { - UNIMPLEMENTED; + if (dirty_shutdown) { + return Failure("Socket is shutdown"); + } + + // Hold a weak pointer since the incoming socket is not guaranteed + // to terminate before the receiving end does. + std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); + + return process::loop( + compute_thread, + [weak_self, output, size]() -> Future<int> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while receiving"); + } + + ERR_clear_error(); + return SSL_read(self->ssl, output, size); + }, + [weak_self](int result) -> Future<ControlFlow<size_t>> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while receiving"); + } + + if (result == 0) { + // Check if EOF has been reached. + BIO* bio = SSL_get_rbio(self->ssl); + if (BIO_eof(bio) == 1) { + return Break(0u); + } + } + + return self->handle_ssl_return_result(result, true); + }); } -Future<size_t> OpenSSLSocketImpl::send(const char* data, size_t size) +Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size) { - UNIMPLEMENTED; + if (dirty_shutdown) { + return Failure("Socket is shutdown"); + } + + // Hold a weak pointer since a write may become backlogged indefinitely. + std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); + + return process::loop( + compute_thread, + [weak_self, input, size]() -> Future<int> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while sending"); + } + + ERR_clear_error(); + return SSL_write(self->ssl, input, size); + }, + [weak_self](int result) -> Future<ControlFlow<size_t>> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while sending"); + } + + return self->handle_ssl_return_result(result, false); + }); } @@ -442,7 +507,7 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept() // we do not wait for the accept logic to complete before accepting a // new socket. process::loop( - None(), + compute_thread, [weak_self]() -> Future<std::shared_ptr<SocketImpl>> { std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); @@ -571,12 +636,68 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept() Try<Nothing, SocketError> OpenSSLSocketImpl::shutdown(int how) { - UNIMPLEMENTED; + if (dirty_shutdown) { + return Nothing(); + } + + // Treat this as a dirty shutdown (i.e. closing the socket before sending + // the SSL close notification) because we are not guaranteed to properly + // shutdown synchronously. + dirty_shutdown = true; + + // Hold a weak pointer since we are ok with closing the socket before + // shutdown is completed gracefully. + std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); + + // The shutdown itself will happen asynchronously. + process::loop( + compute_thread, + [weak_self]() -> Future<int> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while doing shutdown"); + } + + ERR_clear_error(); + return SSL_shutdown(self->ssl); + }, + [weak_self](int result) -> Future<ControlFlow<size_t>> { + std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); + if (self == nullptr) { + return Failure("Socket destroyed while doing shutdown"); + } + + // A successful shutdown will return 0 if the close notification + // was sent, or 1 if both sides of the connection have closed. + // Either case is sufficient for a clean shutdown. + if (result >= 0) { + return Break(0u); + } + + // Check if EOF has been reached. + BIO* bio = SSL_get_rbio(self->ssl); + if (BIO_eof(bio) == 1) { + return Break(0u); + } + + return self->handle_ssl_return_result(result, false); + }); + + return Nothing(); } Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl) { + // NOTE: We would normally create this UPID in the socket's constructor. + // However, during libprocess initialization, the libprocess listening socket + // is constructed before spawning processes is allowed. + // This function is guaranteed to be called for any SSL socket that may + // transmit encrypted data. Listening sockets will not create a UPID. + if (compute_thread.isNone()) { + compute_thread = spawn(new ProcessBase(), true); + } + // Save a reference to the SSL object. ssl = _ssl; @@ -591,7 +712,7 @@ Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl) std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); return process::loop( - None(), + compute_thread, [weak_self]() -> Future<int> { std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); if (self == nullptr) { @@ -677,8 +798,11 @@ Future<ControlFlow<size_t>> OpenSSLSocketImpl::handle_ssl_return_result( // to any interaction with the OpenSSL library. UNREACHABLE(); case SSL_ERROR_SSL: + dirty_shutdown = true; return Failure("Protocol error"); case SSL_ERROR_SYSCALL: + dirty_shutdown = true; + // NOTE: If there is an error (`ERR_peek_error() != 0`), // we fall through to the default error handling case. if (ERR_peek_error() == 0) { diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.hpp b/3rdparty/libprocess/src/ssl/openssl_socket.hpp index 9fafcd7..2d0259d 100644 --- a/3rdparty/libprocess/src/ssl/openssl_socket.hpp +++ b/3rdparty/libprocess/src/ssl/openssl_socket.hpp @@ -84,6 +84,15 @@ private: // We wrap the socket in a 'Future' so that we can pass failures or // discards through. Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue; + + // Set to true whenever the connection is terminated before a proper + // SSL shutdown can be sent. This will also prevent `shutdown` from + // doing anything, as the connection will be presumed dead. + bool dirty_shutdown; + + // An actor used to dispatch the compute-heavy work of encryption and + // decryption, like `SSL_read` and `SSL_write`. + Option<UPID> compute_thread; }; } // namespace internal {
