SINGA-233 Fix bugs in sending large messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d308e06d Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d308e06d Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d308e06d Branch: refs/heads/dev Commit: d308e06d6478203b82a0857efab9ce26e3375b09 Parents: 90a1cd5 Author: caiqc <[email protected]> Authored: Fri Aug 5 00:03:24 2016 +0800 Committer: caiqc <[email protected]> Committed: Wed Aug 10 09:42:46 2016 +0800 ---------------------------------------------------------------------- include/singa/io/network/endpoint.h | 5 + src/io/network/endpoint.cc | 178 +++++++++++++++++++++---------- test/CMakeLists.txt | 15 ++- test/singa/test_ep.cc | 8 +- 4 files changed, 142 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/include/singa/io/network/endpoint.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network/endpoint.h b/include/singa/io/network/endpoint.h index 1079fcc..063ca11 100644 --- a/include/singa/io/network/endpoint.h +++ b/include/singa/io/network/endpoint.h @@ -50,6 +50,9 @@ namespace singa { #define MAX_RETRY_CNT 3 +#define EV_WATCHER_STOP 0 +#define EV_WATCHER_START 1 + class NetworkThread; class EndPointFactory; @@ -62,6 +65,7 @@ class EndPoint { std::mutex mtx_; struct sockaddr_in addr_; int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other + int pfd_ = -1; int conn_status_ = CONN_INIT; int pending_cnt_ = 0; int retry_cnt_ = 0; @@ -113,6 +117,7 @@ class NetworkThread{ void doWork(); int asyncSend(int); void asyncSendPendingMsg(EndPoint*); + void afterConnEst(EndPoint* ep, int fd, bool active); public: EndPointFactory* epf_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/src/io/network/endpoint.cc ---------------------------------------------------------------------- diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc index 7fe72b3..a74f5c2 100644 --- a/src/io/network/endpoint.cc +++ b/src/io/network/endpoint.cc @@ -283,17 +283,21 @@ void NetworkThread::onNewEp() { ev_io_start(this->loop_, &this->fd_wwatcher_map_[fd]); } } else { + afterConnEst(ep, fd, true); + // connection established immediately - LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd; - ep->conn_status_ = CONN_EST; - ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]); + // LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd; + // ep->conn_status_ = CONN_EST; - // poll for new msgs - ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); - ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]); + // //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]); + // ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE); - asyncSendPendingMsg(ep); - ep->cv_.notify_all(); + // // poll for new msgs + // ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); + // ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]); + + // asyncSendPendingMsg(ep); + // ep->cv_.notify_all(); } } } @@ -314,32 +318,21 @@ void NetworkThread::onConnEst(int fd) { handleConnLost(ep->fd_[0], ep); - switch(ep->conn_status_) { - case CONN_INIT: - case CONN_PENDING: - return; - default: - break; - } + if (ep->conn_status_ == CONN_EST && ep->conn_status_ == CONN_ERROR) + ep->cv_.notify_all(); } else { - LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd; - ep->conn_status_ = CONN_EST; - // connect established; poll for new msgs - ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]); - ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); - ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]); - } + afterConnEst(ep, fd, true); - if (ep->conn_status_ == CONN_EST && ep->to_ack_.size() > 0) - // if there are pending message, it means these msgs were sent over - // previous sockets that have been lost now - // we need to resend these msgs to the remote side - asyncSendPendingMsg(ep); + //ep->conn_status_ = CONN_EST; + //// connect established; poll for new msgs + //ev_io_stop(this->loop_, &this->fd_wwatcher_map_[fd]); + //ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE); - // Finally notify all waiting threads - ep->cv_.notify_all(); + //ev_io_init(&this->fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); + //ev_io_start(this->loop_, &this->fd_rwatcher_map_[fd]); + } } void NetworkThread::onNewConn() { @@ -363,35 +356,93 @@ void NetworkThread::onNewConn() { ep = epf_->getOrCreateEp(a); std::unique_lock<std::mutex> lock(ep->mtx_); - if (ep->fd_[1] >= 0) { - // the previous connection is lost - handleConnLost(ep->fd_[1], ep, false); + // Passive connection + afterConnEst(ep, fd, false); + + // This should not be put before afterConnEst otherwise it may have no + // effect + fd_ip_map_[fd] = a; + + // record the remote address + bcopy(&addr, &ep->addr_, len); +} + +/** + * @brief The processing for a connected socket + * + * @param ep + * @param fd + * @param active indicate whethen this socket is locally initiated or not + */ +void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) { + + if (active) + LOG(INFO) << "Connected to " << inet_ntoa(ep->addr_.sin_addr) << ", fd = "<< fd; + + int sfd; + + if (active) { + ep->fd_[0] = fd; + sfd = ep->fd_[1]; + } else { + if (ep->fd_[1] >= 0) { + // the previous connection is lost + handleConnLost(ep->fd_[1], ep, false); + } + ep->fd_[1] = fd; + sfd = ep->fd_[0]; } - if (ep->fd_[0] == fd) { - // this fd is reused + if (sfd == fd) + // this fd is a reuse of a previous socket fd + // so we first need to clean the resouce for that fd handleConnLost(fd, ep, false); - } - fd_ip_map_[fd] = a; + // initialize io watchers and add the read watcher to the ev loop ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); ev_io_start(loop_, &fd_rwatcher_map_[fd]); - // record the remote address - bcopy(&addr, &ep->addr_, len); + // stop watching the writable watcher if necessary + if (active) + ev_io_stop(loop_, &fd_wwatcher_map_[fd]); + ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE); + + // see whether there is already a established connection for this fd + if (ep->conn_status_ == CONN_EST && sfd >= 0) { + // check if fd and sfd are associate with the same socket + struct sockaddr_in addr; + socklen_t len; + if (getsockname(fd, (struct sockaddr*)&addr, &len)) { + LOG(INFO) << "Unable to get local socket address: " << strerror(errno); + return; + } - ep->conn_status_ = CONN_EST; - ep->fd_[1] = fd; + // see whether the local address of fd is the same as the remote side + // of sfd, which has already been stored in ep->addr_ + if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port == ep->addr_.sin_port) { + LOG(INFO) << fd << " and " << sfd << " are associated with the same socket"; + } else { + // this socket is redundant, we close it maunally + close(fd); + handleConnLost(fd, ep); + } + } else { + ep->pfd_ = fd; // set the primary fd + ep->conn_status_ = CONN_EST; + } - if (ep->to_ack_.size() > 0) - // see if there are any messages waiting for ack - // if yes, resend them - asyncSendPendingMsg(ep); + if (fd == ep->pfd_) { + this->asyncSendPendingMsg(ep); + } - // this connection is initiaed by remote side, - // so we dont need to notify the waiting thread + // Finally notify all waiting threads + // if this connection is initiaed by remote side, + // we dont need to notify the waiting thread // later threads wanting to send to this ep, however, // are able to reuse this ep + if (active) { + ep->cv_.notify_all(); + } } void NetworkThread::onSend(int fd) { @@ -423,6 +474,9 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) { LOG(INFO) << "There are " << ep->send_.size() << " to-send msgs, and " << ep->to_ack_.size() << " to-ack msgs"; + if (ep->to_ack_.empty()) + return; + while (!ep->send_.empty()) { ep->to_ack_.push(ep->send_.front()); ep->send_.pop(); @@ -443,14 +497,16 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) { */ int NetworkThread::asyncSend(int fd) { - if (fd_ip_map_.count(fd) == 0) - return 0; - EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); std::unique_lock<std::mutex> ep_lock(ep->mtx_); + if (fd != ep->pfd_) + // we only send over the primary fd + return 0; + if (ep->conn_status_ != CONN_EST) + // This happens during reconnection goto out; while(!ep->send_.empty()) { @@ -465,12 +521,10 @@ int NetworkThread::asyncSend(int fd) { else nbytes = write(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_); - // LOG(INFO) << "Send " << nbytes << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; - if (nbytes == -1) { if (errno == EWOULDBLOCK) { - ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE); - ev_io_start(loop_, &fd_wwatcher_map_[fd]); + if (!ev_is_active(&fd_wwatcher_map_[fd]) && !ev_is_pending(&fd_wwatcher_map_[fd])) + ev_io_start(loop_, &fd_wwatcher_map_[fd]); goto out; } else { // this connection is lost; reset the send status @@ -480,14 +534,24 @@ int NetworkThread::asyncSend(int fd) { } } else msg.processed_ += nbytes; + + //std::size_t m, p; + //uint8_t type; + //uint32_t id; + //if (msg.msg_) { + // readInteger(msg.msg_, type, id, m, p); + // LOG(INFO) << "Send " << msg.processed_ << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd << " for the current DATA MSG " << msg.id_ << ", " << id << ", " << m << ", " << p; + //} } CHECK(msg.processed_ == msg.getSize()); if (msg.type_ != MSG_ACK) { + LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << msg.getSize();; msg.processed_ = 0; ep->to_ack_.push(&msg); } else { + //LOG(INFO) << "Send an ACK message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_; delete &msg; } @@ -497,7 +561,7 @@ int NetworkThread::asyncSend(int fd) { // if (ep->retry_cnt_ == 0) { // LOG(INFO) << "Disconnect with Endpoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; // close(fd); - // handleConnLost(fd, ep); + // goto err; // } } out: @@ -566,7 +630,8 @@ void NetworkThread::onRecv(int fd) { // got the whole metadata; readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_); - // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_; + + //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; } // start reading the real data @@ -589,6 +654,9 @@ void NetworkThread::onRecv(int fd) { } msg.processed_ += nread; + + //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << ", processed_ = " << msg.processed_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; + if (msg.processed_ == msg.getSize()) { LOG(INFO) << "Receive a " << msg.processed_ << " bytes DATA message from " << inet_ntoa(ep->addr_.sin_addr) << " with id " << msg.id_; ep->recv_.push(new Message(static_cast<Message&&>(msg))); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 044d65a..3bfd36c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -9,9 +9,14 @@ IF(NOT USE_OPENCL) LIST(REMOVE_ITEM singa_test_source "singa/test_opencl.cc") ENDIF() -ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source}) -ADD_DEPENDENCIES(test_singa singa_core singa_utils) -MESSAGE(STATUS "link libs" ${singa_linker_libs}) -TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model +#ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source}) +#ADD_DEPENDENCIES(test_singa singa_core singa_utils) +#MESSAGE(STATUS "link libs" ${singa_linker_libs}) +#TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model +# singa_io proto protobuf ${SINGA_LINKER_LIBS}) +#SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread") + +ADD_EXECUTABLE(test_ep "singa/test_ep.cc") +ADD_DEPENDENCIES(test_ep singa_io) +TARGET_LINK_LIBRARIES(test_ep singa_core singa_utils singa_model singa_io proto protobuf ${SINGA_LINKER_LIBS}) -SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread") http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d308e06d/test/singa/test_ep.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc index 2435f28..e6d3e07 100644 --- a/test/singa/test_ep.cc +++ b/test/singa/test_ep.cc @@ -6,13 +6,13 @@ #include "singa/utils/logging.h" -#define SIZE 100 +#define SIZE 10000000 #define PORT 10000 using namespace singa; int main(int argc, char** argv) { - char md[SIZE]; - char payload[SIZE]; + char* md = new char[SIZE]; + char* payload = new char[SIZE]; char* host = "localhost"; int port = PORT; @@ -45,7 +45,7 @@ int main(int argc, char** argv) { int cnt = 0; - while(ep && cnt++ <= 100 && ep->send(m) > 0 ) { + while(ep && cnt++ <= 5 && ep->send(m) > 0 ) { LOG(INFO) << "Send a " << m->getSize() << " bytes message";
