SINGA-233 Fix a minor bug in processing lost connections

Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/90a1cd54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/90a1cd54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/90a1cd54

Branch: refs/heads/dev
Commit: 90a1cd54cb590cb70ef1fd2af64061f46e4d52fe
Parents: 889abf8
Author: caiqc <[email protected]>
Authored: Thu Aug 4 17:21:47 2016 +0800
Committer: caiqc <[email protected]>
Committed: Wed Aug 10 09:42:46 2016 +0800

----------------------------------------------------------------------
 src/io/network/endpoint.cc | 43 ++++++++++++++++++++++++-----------------
 1 file changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/90a1cd54/src/io/network/endpoint.cc
----------------------------------------------------------------------
diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc
index 2926a05..7fe72b3 100644
--- a/src/io/network/endpoint.cc
+++ b/src/io/network/endpoint.cc
@@ -269,6 +269,7 @@ void NetworkThread::onNewEp() {
             ep->addr_.sin_port = htons(port_);
             bzero(&(ep->addr_.sin_zero), 8);
 
+            LOG(INFO) << "Connecting to " << inet_ntoa(ep->addr_.sin_addr) << 
" fd = "<< fd;
             if (connect(fd, (struct sockaddr*)&ep->addr_, 
                     sizeof(struct sockaddr)) ) {
                 LOG(INFO) << "Connect Error: " << strerror(errno);
@@ -397,9 +398,11 @@ void NetworkThread::onSend(int fd) {
     std::vector<int> invalid_fd;
 
     if (fd == -1) {
+        //LOG(INFO) << "There are " << fd_ip_map_.size() << " connections";
         // this is a signal of new message to send
         for(auto& p : fd_ip_map_) {
             // send message
+            //LOG(INFO) << "Try to send over fd " << p.first;
             if (asyncSend(p.first) < 0)
                 invalid_fd.push_back(p.first);
         }
@@ -440,6 +443,9 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
  */
 int NetworkThread::asyncSend(int fd) {
 
+    if (fd_ip_map_.count(fd) == 0)
+        return 0;
+
     EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
 
     std::unique_lock<std::mutex> ep_lock(ep->mtx_);
@@ -459,7 +465,7 @@ int NetworkThread::asyncSend(int fd) {
             else
                 nbytes = write(fd, msg.msg_ + msg.processed_, msg.getSize() - 
msg.processed_);
 
-            LOG(INFO) << "Send " << nbytes << " bytes to " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
+            // LOG(INFO) << "Send " << nbytes << " bytes to " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
 
             if (nbytes == -1) {
                 if (errno == EWOULDBLOCK) {
@@ -488,8 +494,11 @@ int NetworkThread::asyncSend(int fd) {
         ep->send_.pop();
 
         // for test
-        if (ep->retry_cnt_ == 0)
-            close(fd);
+        // if (ep->retry_cnt_ == 0) {
+        //     LOG(INFO) << "Disconnect with Endpoint " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
+        //     close(fd);
+        //     handleConnLost(fd, ep);
+        // }
     }
 out:
     if (ep->send_.empty())
@@ -506,7 +515,7 @@ void NetworkThread::onRecv(int fd) {
     int nread;
     EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
 
-    LOG(INFO) << "Start to read from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; 
+    //LOG(INFO) << "Start to read from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
 
     std::unique_lock<std::mutex> lock(ep->mtx_);
     while(1) {
@@ -519,9 +528,9 @@ void NetworkThread::onRecv(int fd) {
                 if (errno != EWOULDBLOCK || nread == 0) {
                     // socket error or shuts down 
                     if (nread < 0)
-                        LOG(INFO) << "Faile to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
+                        LOG(INFO) << "Fail to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
                     else
-                        LOG(INFO) << "Faile to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side";
+                        LOG(INFO) << "Fail to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side";
                     handleConnLost(fd, ep);
                 }
                 break;
@@ -557,7 +566,7 @@ void NetworkThread::onRecv(int fd) {
 
             // got the whole metadata; 
             readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, 
msg.psize_);
-            LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = 
" << msg.msize_ << ", psize_ = " << msg.psize_;
+            // LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ 
= " << msg.msize_ << ", psize_ = " << msg.psize_;
         }
 
         // start reading the real data
@@ -571,9 +580,9 @@ void NetworkThread::onRecv(int fd) {
             if (errno != EWOULDBLOCK || nread == 0) {
                 // socket error or shuts down 
                 if (nread < 0)
-                    LOG(INFO) << "Faile to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
+                    LOG(INFO) << "Fail to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": " << strerror(errno);
                 else
-                    LOG(INFO) << "Faile to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side";
+                    LOG(INFO) << "Fail to receive from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ": Connection reset by remote side";
                 handleConnLost(fd, ep);
             }
             break;
@@ -609,22 +618,20 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, 
bool reconn) {
     ev_io_stop(loop_, &this->fd_rwatcher_map_[fd]);
     fd_wwatcher_map_.erase(fd);
     fd_rwatcher_map_.erase(fd);
-    close(fd);
+    //close(fd);
 
     if (fd == ep->fd_[0] || ep->fd_[0] < 0) {
         if (!ep->send_.empty())
             ep->send_.front()->processed_ = 0;
     }
 
-    if (reconn) {
-
-        int sfd = (fd == ep->fd_[0]) ? ep->fd_[1] : ep->fd_[0];
-
-        if (fd == ep->fd_[0])
-            ep->fd_[0] = -1;
-        else
-            ep->fd_[1] = -1;
+    int sfd = (fd == ep->fd_[0]) ? ep->fd_[1] : ep->fd_[0];
+    if (fd == ep->fd_[0])
+        ep->fd_[0] = -1;
+    else
+        ep->fd_[1] = -1;
 
+    if (reconn) {
         // see if the other fd is ok or not
         if (sfd < 0) {
             if (ep->retry_cnt_ < MAX_RETRY_CNT) {

Reply via email to