SINGA-233 Fix a minor bug in processing lost connections
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/90a1cd54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/90a1cd54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/90a1cd54 Branch: refs/heads/dev Commit: 90a1cd54cb590cb70ef1fd2af64061f46e4d52fe Parents: 889abf8 Author: caiqc <[email protected]> Authored: Thu Aug 4 17:21:47 2016 +0800 Committer: caiqc <[email protected]> Committed: Wed Aug 10 09:42:46 2016 +0800 ---------------------------------------------------------------------- src/io/network/endpoint.cc | 43 ++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/90a1cd54/src/io/network/endpoint.cc ---------------------------------------------------------------------- diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc index 2926a05..7fe72b3 100644 --- a/src/io/network/endpoint.cc +++ b/src/io/network/endpoint.cc @@ -269,6 +269,7 @@ void NetworkThread::onNewEp() { ep->addr_.sin_port = htons(port_); bzero(&(ep->addr_.sin_zero), 8); + LOG(INFO) << "Connecting to " << inet_ntoa(ep->addr_.sin_addr) << " fd = "<< fd; if (connect(fd, (struct sockaddr*)&ep->addr_, sizeof(struct sockaddr)) ) { LOG(INFO) << "Connect Error: " << strerror(errno); @@ -397,9 +398,11 @@ void NetworkThread::onSend(int fd) { std::vector<int> invalid_fd; if (fd == -1) { + //LOG(INFO) << "There are " << fd_ip_map_.size() << " connections"; // this is a signal of new message to send for(auto& p : fd_ip_map_) { // send message + //LOG(INFO) << "Try to send over fd " << p.first; if (asyncSend(p.first) < 0) invalid_fd.push_back(p.first); } @@ -440,6 +443,9 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) { */ int NetworkThread::asyncSend(int fd) { + if (fd_ip_map_.count(fd) == 0) + return 0; + EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); std::unique_lock<std::mutex> ep_lock(ep->mtx_); @@ -459,7 +465,7 @@ int NetworkThread::asyncSend(int fd) { else nbytes = write(fd, msg.msg_ + msg.processed_, msg.getSize() - msg.processed_); - LOG(INFO) << "Send " << nbytes << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; + // LOG(INFO) << "Send " << nbytes << " bytes to " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; if (nbytes == -1) { if (errno == EWOULDBLOCK) { @@ -488,8 +494,11 @@ int NetworkThread::asyncSend(int fd) { ep->send_.pop(); // for test - if (ep->retry_cnt_ == 0) - close(fd); + // if (ep->retry_cnt_ == 0) { + // LOG(INFO) << "Disconnect with Endpoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; + // close(fd); + // handleConnLost(fd, ep); + // } } out: if (ep->send_.empty()) @@ -506,7 +515,7 @@ void NetworkThread::onRecv(int fd) { int nread; EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); - LOG(INFO) << "Start to read from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; + //LOG(INFO) << "Start to read from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; std::unique_lock<std::mutex> lock(ep->mtx_); while(1) { @@ -519,9 +528,9 @@ void NetworkThread::onRecv(int fd) { if (errno != EWOULDBLOCK || nread == 0) { // socket error or shuts down if (nread < 0) - LOG(INFO) << "Faile to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno); + LOG(INFO) << "Fail to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno); else - LOG(INFO) << "Faile to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side"; + LOG(INFO) << "Fail to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side"; handleConnLost(fd, ep); } break; @@ -557,7 +566,7 @@ void NetworkThread::onRecv(int fd) { // got the whole metadata; readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_); - LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_; + // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_; } // start reading the real data @@ -571,9 +580,9 @@ void NetworkThread::onRecv(int fd) { if (errno != EWOULDBLOCK || nread == 0) { // socket error or shuts down if (nread < 0) - LOG(INFO) << "Faile to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno); + LOG(INFO) << "Fail to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno); else - LOG(INFO) << "Faile to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side"; + LOG(INFO) << "Fail to receive from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side"; handleConnLost(fd, ep); } break; @@ -609,22 +618,20 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, bool reconn) { ev_io_stop(loop_, &this->fd_rwatcher_map_[fd]); fd_wwatcher_map_.erase(fd); fd_rwatcher_map_.erase(fd); - close(fd); + //close(fd); if (fd == ep->fd_[0] || ep->fd_[0] < 0) { if (!ep->send_.empty()) ep->send_.front()->processed_ = 0; } - if (reconn) { - - int sfd = (fd == ep->fd_[0]) ? ep->fd_[1] : ep->fd_[0]; - - if (fd == ep->fd_[0]) - ep->fd_[0] = -1; - else - ep->fd_[1] = -1; + int sfd = (fd == ep->fd_[0]) ? ep->fd_[1] : ep->fd_[0]; + if (fd == ep->fd_[0]) + ep->fd_[0] = -1; + else + ep->fd_[1] = -1; + if (reconn) { // see if the other fd is ok or not if (sfd < 0) { if (ep->retry_cnt_ < MAX_RETRY_CNT) {
