SINGA-233 Add the support for the detection of endpoint timeout

Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/1565e659
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/1565e659
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/1565e659

Branch: refs/heads/dev
Commit: 1565e659659a3b142e7a68bd27b6dce20eb10b43
Parents: d308e06
Author: caiqc <[email protected]>
Authored: Fri Aug 5 16:41:52 2016 +0800
Committer: caiqc <[email protected]>
Committed: Wed Aug 10 09:42:46 2016 +0800

----------------------------------------------------------------------
 include/singa/io/network/endpoint.h |  12 ++-
 src/io/network/endpoint.cc          | 158 ++++++++++++++++++++++---------
 src/io/network/message.cc           |   1 +
 test/singa/test_ep.cc               |  61 ++++++++----
 4 files changed, 165 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/include/singa/io/network/endpoint.h
----------------------------------------------------------------------
diff --git a/include/singa/io/network/endpoint.h 
b/include/singa/io/network/endpoint.h
index 063ca11..ac243ff 100644
--- a/include/singa/io/network/endpoint.h
+++ b/include/singa/io/network/endpoint.h
@@ -50,8 +50,7 @@ namespace singa {
 
 #define MAX_RETRY_CNT 3
 
-#define EV_WATCHER_STOP 0
-#define EV_WATCHER_START 1
+#define EP_TIMEOUT 5.
 
 class NetworkThread;
 class EndPointFactory;
@@ -64,13 +63,16 @@ class EndPoint {
         std::condition_variable cv_;
         std::mutex mtx_;
         struct sockaddr_in addr_;
+        ev_timer timer_;
+        ev_tstamp last_msg_time_;
         int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each 
other
         int pfd_ = -1;
+        bool is_socket_loop_ = false;
         int conn_status_ = CONN_INIT;
         int pending_cnt_ = 0;
         int retry_cnt_ = 0;
         NetworkThread* thread_ = nullptr;
-        EndPoint(NetworkThread* t):thread_(t){}
+        EndPoint(NetworkThread* t);
         ~EndPoint();
         friend class NetworkThread;
         friend class EndPointFactory;
@@ -105,7 +107,8 @@ class NetworkThread{
 
         std::unordered_map<int, ev_io> fd_wwatcher_map_;
         std::unordered_map<int, ev_io> fd_rwatcher_map_;
-        std::unordered_map<int, uint32_t> fd_ip_map_;
+
+        std::unordered_map<int, EndPoint*> fd_ep_map_;
 
         std::map<int, Message> pending_msgs_;
 
@@ -130,6 +133,7 @@ class NetworkThread{
         void onConnEst(int fd);
         void onNewEp();
         void onNewConn();
+        void onTimeout(struct ev_timer* timer);
 };
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/src/io/network/endpoint.cc
----------------------------------------------------------------------
diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc
index a74f5c2..c7e06f9 100644
--- a/src/io/network/endpoint.cc
+++ b/src/io/network/endpoint.cc
@@ -58,6 +58,14 @@ static void accept_cb(struct ev_loop* loop, ev_io* ev, int 
revent) {
     reinterpret_cast<NetworkThread*>(ev_userdata(loop))->onNewConn();
 }
 
+static void timeout_cb(struct ev_loop* loop, ev_timer* ev, int revent) {
+    reinterpret_cast<NetworkThread*>(ev_userdata(loop))->onTimeout(ev);
+}
+
+EndPoint::EndPoint(NetworkThread* t) : thread_(t) { 
+    this->timer_.data = reinterpret_cast<void*>(this);
+}
+
 EndPoint::~EndPoint() {
     while(!recv_.empty()) {
         delete send_.front();
@@ -262,7 +270,7 @@ void NetworkThread::onNewEp() {
             // set this fd non-blocking
             fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
 
-            this->fd_ip_map_[fd] = ntohl(ep->addr_.sin_addr.s_addr);
+            this->fd_ep_map_[fd] = ep;
 
             // initialize the addess
             ep->addr_.sin_family = AF_INET;
@@ -305,7 +313,9 @@ void NetworkThread::onNewEp() {
 
 void NetworkThread::onConnEst(int fd) {
 
-    EndPoint* ep = epf_->getEp(this->fd_ip_map_[fd]);
+    //EndPoint* ep = epf_->getEp(this->fd_ip_map_[fd]);
+    CHECK(fd_ep_map_.count(fd) > 0);
+    EndPoint* ep = fd_ep_map_.at(fd);
 
     std::unique_lock<std::mutex> lock(ep->mtx_);
 
@@ -359,14 +369,41 @@ void NetworkThread::onNewConn() {
     // Passive connection
     afterConnEst(ep, fd, false);
 
-    // This should not be put before afterConnEst otherwise it may have no
-    // effect
-    fd_ip_map_[fd] = a;
-
     // record the remote address
     bcopy(&addr, &ep->addr_, len);
 }
 
+void NetworkThread::onTimeout(struct ev_timer* timer) {
+        
+    EndPoint* ep = reinterpret_cast<EndPoint*>(timer->data);
+
+    ev_tstamp timeout = EP_TIMEOUT + ep->last_msg_time_;
+    ev_tstamp now = ev_now(loop_);
+
+    std::unique_lock<std::mutex> lock(ep->mtx_);
+    if (now > timeout) {
+        if (!ep->to_ack_.empty() || !ep->send_.empty()) {
+
+            LOG(INFO) << "EndPoint " << inet_ntoa(ep->addr_.sin_addr) << " 
timeouts";
+            // we consider this ep has been disconnected
+            for (int i = 0; i < 2; ++i)
+            {
+                int fd = ep->fd_[i];
+                if (fd >= 0)
+                    handleConnLost(fd, ep);
+            }
+            return;
+        }
+
+        timer->repeat = EP_TIMEOUT;
+
+    } else {
+        timer->repeat = timeout - now;
+    }
+
+    ev_timer_again(loop_, &ep->timer_);
+}
+
 /**
  * @brief The processing for a connected socket
  *
@@ -393,10 +430,14 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, 
bool active) {
         sfd = ep->fd_[0];
     }
 
-    if (sfd == fd)
+    if (sfd == fd) {
         // this fd is a reuse of a previous socket fd
         // so we first need to clean the resouce for that fd
-        handleConnLost(fd, ep, false);
+        // we duplicate this fd to let the resouce of the oldf fd can be freed
+        // also indicate there is no need to reconnect
+        fd = dup(fd);
+        handleConnLost(sfd, ep, false);
+    }
 
     // initialize io watchers and add the read watcher to the ev loop
     ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ);
@@ -407,6 +448,8 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool 
active) {
         ev_io_stop(loop_, &fd_wwatcher_map_[fd]);
     ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE);
 
+    ep->last_msg_time_ = ev_now(loop_);
+
     // see whether there is already a established connection for this fd
     if (ep->conn_status_ == CONN_EST && sfd >= 0) {
         // check if fd and sfd are associate with the same socket
@@ -414,27 +457,37 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, 
bool active) {
         socklen_t len;
         if (getsockname(fd, (struct sockaddr*)&addr, &len)) {
             LOG(INFO) << "Unable to get local socket address: " << 
strerror(errno);
-            return;
-        }
-
-        // see whether the local address of fd is the same as the remote side
-        // of sfd, which has already been stored in ep->addr_
-        if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port 
== ep->addr_.sin_port) {
-            LOG(INFO) << fd << " and " << sfd << " are associated with the 
same socket";
         } else {
-            // this socket is redundant, we close it maunally
-            close(fd);
-            handleConnLost(fd, ep);
+            // see whether the local address of fd is the same as the remote 
side
+            // of sfd, which has already been stored in ep->addr_
+            if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && 
addr.sin_port == ep->addr_.sin_port) {
+                LOG(INFO) << fd << " and " << sfd << " are associated with the 
same socket";
+                ep->is_socket_loop_ = true;
+            } else {
+                // this socket is redundant, we close it maunally if the local 
ip
+                // is smaller than the peer ip
+                if ((addr.sin_addr.s_addr < ep->addr_.sin_addr.s_addr) 
+                        || (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr 
&& addr.sin_port < ep->addr_.sin_port))
+                    handleConnLost(fd, ep, false);
+            }
         }
     } else {
         ep->pfd_ = fd; // set the primary fd
         ep->conn_status_ = CONN_EST;
+
+        // start timeout watcher to detect the liveness of EndPoint
+        ev_init(&ep->timer_, timeout_cb);
+        ep->timer_.repeat = EP_TIMEOUT;
+        ev_timer_start(loop_, &ep->timer_);
+        //timeout_cb(loop_, &ep->timer_, EV_TIMER);
     }
 
     if (fd == ep->pfd_) {
         this->asyncSendPendingMsg(ep);
     }
 
+    fd_ep_map_[fd] = ep;
+
     // Finally notify all waiting threads
     // if this connection is initiaed by remote side, 
     // we dont need to notify the waiting thread
@@ -451,7 +504,7 @@ void NetworkThread::onSend(int fd) {
     if (fd == -1) {
         //LOG(INFO) << "There are " << fd_ip_map_.size() << " connections";
         // this is a signal of new message to send
-        for(auto& p : fd_ip_map_) {
+        for(auto& p : fd_ep_map_) {
             // send message
             //LOG(INFO) << "Try to send over fd " << p.first;
             if (asyncSend(p.first) < 0)
@@ -463,7 +516,8 @@ void NetworkThread::onSend(int fd) {
     }
 
     for (auto& p : invalid_fd) {
-        EndPoint* ep = epf_->getEp(fd_ip_map_.at(p));
+        //EndPoint* ep = epf_->getEp(fd_ip_map_.at(p));
+        EndPoint* ep = fd_ep_map_.at(p);
         std::unique_lock<std::mutex> lock(ep->mtx_);
         handleConnLost(p, ep);
     } 
@@ -474,16 +528,14 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
 
     LOG(INFO) << "There are " << ep->send_.size() << " to-send msgs, and " << 
ep->to_ack_.size() << " to-ack msgs";
 
-    if (ep->to_ack_.empty())
-        return;
-
-    while (!ep->send_.empty()) {
-        ep->to_ack_.push(ep->send_.front());
-        ep->send_.pop();
+    if (!ep->to_ack_.empty()) {
+        while (!ep->send_.empty()) {
+            ep->to_ack_.push(ep->send_.front());
+            ep->send_.pop();
+        }
+        std::swap(ep->send_, ep->to_ack_);
     }
 
-    std::swap(ep->send_, ep->to_ack_);
-
     if (ep->send_.size() > 0) {
         notify(SIG_MSG);
     }
@@ -497,13 +549,16 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) {
  */
 int NetworkThread::asyncSend(int fd) {
 
-    EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
+    //EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
+    CHECK(fd_ep_map_.count(fd) > 0);
+    EndPoint* ep = fd_ep_map_.at(fd);
 
     std::unique_lock<std::mutex> ep_lock(ep->mtx_);
 
-    if (fd != ep->pfd_)
+    if (fd != ep->pfd_ )
         // we only send over the primary fd
-        return 0;
+        // return -1 to indicate this fd is redundant
+        return ep->is_socket_loop_ ? 0 : -1;
 
     if (ep->conn_status_ != CONN_EST)
         // This happens during reconnection
@@ -532,8 +587,10 @@ int NetworkThread::asyncSend(int fd) {
                     msg.processed_ = 0;
                     goto err;
                 }
-            } else 
+            } else  {
+                ep->last_msg_time_ = ev_now(loop_);
                 msg.processed_ += nbytes;
+            }
 
             //std::size_t m, p;
             //uint8_t type; 
@@ -547,7 +604,7 @@ int NetworkThread::asyncSend(int fd) {
         CHECK(msg.processed_ == msg.getSize());
 
         if (msg.type_ != MSG_ACK) {
-            LOG(INFO) << "Send a DATA message to " << 
inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << 
msg.getSize();;
+            LOG(INFO) << "Send a DATA message to " << 
inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << 
msg.getSize() << " over fd " << fd;
             msg.processed_ = 0;
             ep->to_ack_.push(&msg);
         } else {
@@ -557,7 +614,7 @@ int NetworkThread::asyncSend(int fd) {
 
         ep->send_.pop();
 
-        // for test
+        //for test
         // if (ep->retry_cnt_ == 0) {
         //     LOG(INFO) << "Disconnect with Endpoint " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
         //     close(fd);
@@ -577,13 +634,17 @@ void NetworkThread::onRecv(int fd) {
     Message* m = &pending_msgs_[fd];
     Message& msg = (*m);
     int nread;
-    EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
+    //EndPoint* ep = epf_->getEp(fd_ip_map_[fd]);
+
+    CHECK(fd_ep_map_.count(fd) > 0);
+    EndPoint* ep = fd_ep_map_.at(fd);
 
     //LOG(INFO) << "Start to read from EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
 
     std::unique_lock<std::mutex> lock(ep->mtx_);
-    while(1) {
 
+    ep->last_msg_time_ = ev_now(loop_);
+    while(1) {
         if (msg.processed_ < Message::hsize_) {
             nread = read(fd, msg.mdata_ + msg.processed_, 
                     Message::hsize_ - msg.processed_);
@@ -631,7 +692,7 @@ void NetworkThread::onRecv(int fd) {
             // got the whole metadata; 
             readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, 
msg.psize_);
 
-            //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ 
= " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
+            LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = 
" << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << 
inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd;
         }
 
         // start reading the real data
@@ -681,14 +742,14 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, 
bool reconn) {
     LOG(INFO) << "Lost connection to EndPoint " << 
inet_ntoa(ep->addr_.sin_addr) << ", fd = " << fd;
 
     this->pending_msgs_.erase(fd);
-    this->fd_ip_map_.erase(fd);
+    this->fd_ep_map_.erase(fd);
     ev_io_stop(loop_, &this->fd_wwatcher_map_[fd]);
     ev_io_stop(loop_, &this->fd_rwatcher_map_[fd]);
     fd_wwatcher_map_.erase(fd);
     fd_rwatcher_map_.erase(fd);
-    //close(fd);
+    close(fd);
 
-    if (fd == ep->fd_[0] || ep->fd_[0] < 0) {
+    if (fd == ep->pfd_) {
         if (!ep->send_.empty())
             ep->send_.front()->processed_ = 0;
     }
@@ -700,8 +761,10 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, 
bool reconn) {
         ep->fd_[1] = -1;
 
     if (reconn) {
-        // see if the other fd is ok or not
+        // see if the other fd is alive or not
         if (sfd < 0) {
+            if (ep->conn_status_ == CONN_EST)
+                ev_timer_stop(loop_, &ep->timer_);
             if (ep->retry_cnt_ < MAX_RETRY_CNT) {
                 // notify myself for retry
                 ep->retry_cnt_++;
@@ -711,13 +774,20 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, 
bool reconn) {
             } else {
                 LOG(INFO) << "Maximum retry count achieved for EndPoint " << 
inet_ntoa(ep->addr_.sin_addr);
                 ep->conn_status_ = CONN_ERROR;
+
                 // notify all threads that this ep is no longer connected
                 ep->cv_.notify_all();
             }
         } else {
-            // if there is another working fd, try to send data over this fd
-            if (!ep->send_.empty())
-                this->notify(SIG_MSG);
+            if (!ep->is_socket_loop_) {
+                // if there is another working fd, set this fd as primary and
+                // send data over this fd
+                ep->pfd_ = sfd;
+                ep->last_msg_time_ = ev_now(loop_);
+                asyncSendPendingMsg(ep);
+            } else {
+                handleConnLost(sfd, ep);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/src/io/network/message.cc
----------------------------------------------------------------------
diff --git a/src/io/network/message.cc b/src/io/network/message.cc
index ac3fc14..e63d176 100644
--- a/src/io/network/message.cc
+++ b/src/io/network/message.cc
@@ -34,6 +34,7 @@ Message::Message(Message&& msg) {
     std::swap(psize_, msg.psize_);
     std::swap(msg_, msg.msg_);
     std::swap(type_, msg.type_);
+    std::swap(id_, msg.id_);
 }
 
 Message::Message(int type, uint32_t ack_msg_id): type_(type), id_(ack_msg_id) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/test/singa/test_ep.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc
index e6d3e07..63dbb43 100644
--- a/test/singa/test_ep.cc
+++ b/test/singa/test_ep.cc
@@ -8,6 +8,7 @@
 
 #define SIZE 10000000
 #define PORT 10000
+#define ITER 10
 
 using namespace singa;
 int main(int argc, char** argv) {
@@ -30,10 +31,6 @@ int main(int argc, char** argv) {
     memset(md, 'a', SIZE);
     memset(payload, 'b', SIZE);
 
-    Message* m = new Message();
-    m->setMetadata(md, SIZE);
-    m->setPayload(payload, SIZE);
-
     NetworkThread* t = new NetworkThread(port);
 
     EndPointFactory* epf = t->epf_;
@@ -43,27 +40,53 @@ int main(int argc, char** argv) {
 
     EndPoint* ep = epf->getEp(host);
 
-    int cnt = 0;
+    Message* m[ITER];
+    for (int i = 0; i < ITER; ++i)
+    {
+        m[i] = new Message();
+        m[i]->setMetadata(md, SIZE);
+        m[i]->setPayload(payload, SIZE);
+    }
+
+    while (1) {
+        for (int i = 0; i < ITER; ++i)
+        {
+            if (ep->send(m[i]) < 0) return 1;
+            delete m[i];
+        }
+
+        for (int i = 0; i < ITER; ++i)
+        {
+            m[i] = ep->recv();
+            if (!m[i])
+                return 1;
+            char *p;
+            CHECK(m[i]->getMetadata((void**)&p) == SIZE);
+            CHECK(0 == strncmp(p, md, SIZE));
+            CHECK(m[i]->getPayload((void**)&p) == SIZE);
+            CHECK(0 == strncmp(p, payload, SIZE));
+        }
+    }
 
-    while(ep && cnt++ <= 5 && ep->send(m) > 0 ) {
+    //while(ep && cnt++ <= 5 && ep->send(m) > 0 ) {
 
-        LOG(INFO) << "Send a " << m->getSize() << " bytes message";
+    //    LOG(INFO) << "Send a " << m->getSize() << " bytes message";
 
-        Message* m1 = ep->recv();
+    //    Message* m1 = ep->recv();
 
-        if (!m1)
-            break;
+    //    if (!m1)
+    //        break;
 
-        char *p;
+    //    char *p;
 
-        LOG(INFO) << "Receive a " << m1->getSize() << " bytes message";
+    //    LOG(INFO) << "Receive a " << m1->getSize() << " bytes message";
 
-        CHECK(m1->getMetadata((void**)&p) == SIZE);
-        CHECK(0 == strncmp(p, md, SIZE));
-        CHECK(m1->getPayload((void**)&p) == SIZE);
-        CHECK(0 == strncmp(p, payload, SIZE));
+    //    CHECK(m1->getMetadata((void**)&p) == SIZE);
+    //    CHECK(0 == strncmp(p, md, SIZE));
+    //    CHECK(m1->getPayload((void**)&p) == SIZE);
+    //    CHECK(0 == strncmp(p, payload, SIZE));
 
-        delete m;
-        m = m1;
-    }
+    //    delete m;
+    //    m = m1;
+    //}
 }

Reply via email to