Repository: mesos Updated Branches: refs/heads/master fefcf370c -> a60da5487
Fixed use of Future::get on failed Socket::connect future. Minor bug fix after checking that a future had failed we still called 'get()' on it which causes an abort. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a60da548 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a60da548 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a60da548 Branch: refs/heads/master Commit: a60da54875dc1fb5872ef380abddcbbf9a3ef00c Parents: fefcf37 Author: Benjamin Hindman <[email protected]> Authored: Sat Nov 15 22:06:41 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 22:06:44 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 4 +- 3rdparty/libprocess/src/process.cpp | 58 +++++++++++++-------- 2 files changed, 38 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a60da548/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index c022924..20136ba 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -74,7 +74,7 @@ public: return s >= 0 ? s : create().get(); } - Future<Socket> connect(const Node& node); + Future<Nothing> connect(const Node& node); Future<size_t> read(char* data, size_t size); @@ -129,7 +129,7 @@ public: return impl->get(); } - Future<Socket> connect(const Node& node) + Future<Nothing> connect(const Node& node) { return impl->connect(node); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a60da548/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 5d3b947..404a32d 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1247,7 +1247,7 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request) namespace internal { -Future<Socket> connect(const Socket& socket) +Future<Nothing> connect(const Socket& socket) { // Now check that a successful connection was made. int opt; @@ -1260,13 +1260,13 @@ Future<Socket> connect(const Socket& socket) return Failure("Socket error while connecting"); } - return socket; + return Nothing(); } } // namespace internal { -Future<Socket> Socket::Impl::connect(const Node& node) +Future<Nothing> Socket::Impl::connect(const Node& node) { sockaddr_in addr; memset(&addr, 0, sizeof(addr)); @@ -1283,7 +1283,7 @@ Future<Socket> Socket::Impl::connect(const Node& node) .then(lambda::bind(&internal::connect, Socket(shared_from_this()))); } - return Socket(shared_from_this()); + return Nothing(); } @@ -1511,23 +1511,25 @@ void ignore_read_data( } -void link_connect(const Future<Socket>& socket) +void link_connect(const Future<Nothing>& future, Socket* socket) { - if (socket.isDiscarded() || socket.isFailed()) { - if (socket.isFailed()) { - VLOG(1) << "Failed to link, connect: " << socket.failure(); + if (future.isDiscarded() || future.isFailed()) { + if (future.isFailed()) { + VLOG(1) << "Failed to link, connect: " << future.failure(); } - socket_manager->close(socket.get()); + socket_manager->close(*socket); + delete socket; return; } size_t size = 80 * 1024; char* data = new char[size]; - socket.get().read(data, size) + + socket->read(data, size) .onAny(lambda::bind( &ignore_read_data, lambda::_1, - new Socket(socket.get()), + socket, data, size)); } @@ -1562,7 +1564,10 @@ void SocketManager::link(ProcessBase* process, const UPID& to) persists[to.node] = s; socket.connect(to.node) - .onAny(lambda::bind(&internal::link_connect, lambda::_1)); + .onAny(lambda::bind( + &internal::link_connect, + lambda::_1, + new Socket(socket))); } } } @@ -1725,33 +1730,38 @@ void SocketManager::send( namespace internal { -void send_connect(const Future<Socket>& socket, Message* message) +void send_connect( + const Future<Nothing>& future, + Socket* socket, + Message* message) { - if (socket.isDiscarded() || socket.isFailed()) { - if (socket.isFailed()) { - VLOG(1) << "Failed to send, connect: " << socket.failure(); + if (future.isDiscarded() || future.isFailed()) { + if (future.isFailed()) { + VLOG(1) << "Failed to send, connect: " << future.failure(); } - socket_manager->close(socket.get()); + socket_manager->close(*socket); + delete socket; delete message; return; } - Encoder* encoder = new MessageEncoder(socket.get(), message); + Encoder* encoder = new MessageEncoder(*socket, message); // Read and ignore data from this socket. Note that we don't // expect to receive anything other than HTTP '202 Accepted' // responses which we just ignore. size_t size = 80 * 1024; char* data = new char[size]; - socket.get().read(data, size) + + socket->read(data, size) .onAny(lambda::bind( &ignore_read_data, lambda::_1, - new Socket(socket.get()), + new Socket(*socket), data, size)); - internal::send(encoder, new Socket(socket.get())); + internal::send(encoder, socket); } } // namespace internal { @@ -1787,7 +1797,11 @@ void SocketManager::send(Message* message) outgoing[s]; socket.connect(node) - .onAny(lambda::bind(&internal::send_connect, lambda::_1, message)); + .onAny(lambda::bind( + &internal::send_connect, + lambda::_1, + new Socket(socket), + message)); } } }
