SINGA-233 Add the support for the detection of endpoint timeout
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/1565e659 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/1565e659 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/1565e659 Branch: refs/heads/dev Commit: 1565e659659a3b142e7a68bd27b6dce20eb10b43 Parents: d308e06 Author: caiqc <[email protected]> Authored: Fri Aug 5 16:41:52 2016 +0800 Committer: caiqc <[email protected]> Committed: Wed Aug 10 09:42:46 2016 +0800 ---------------------------------------------------------------------- include/singa/io/network/endpoint.h | 12 ++- src/io/network/endpoint.cc | 158 ++++++++++++++++++++++--------- src/io/network/message.cc | 1 + test/singa/test_ep.cc | 61 ++++++++---- 4 files changed, 165 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/include/singa/io/network/endpoint.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network/endpoint.h b/include/singa/io/network/endpoint.h index 063ca11..ac243ff 100644 --- a/include/singa/io/network/endpoint.h +++ b/include/singa/io/network/endpoint.h @@ -50,8 +50,7 @@ namespace singa { #define MAX_RETRY_CNT 3 -#define EV_WATCHER_STOP 0 -#define EV_WATCHER_START 1 +#define EP_TIMEOUT 5. class NetworkThread; class EndPointFactory; @@ -64,13 +63,16 @@ class EndPoint { std::condition_variable cv_; std::mutex mtx_; struct sockaddr_in addr_; + ev_timer timer_; + ev_tstamp last_msg_time_; int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other int pfd_ = -1; + bool is_socket_loop_ = false; int conn_status_ = CONN_INIT; int pending_cnt_ = 0; int retry_cnt_ = 0; NetworkThread* thread_ = nullptr; - EndPoint(NetworkThread* t):thread_(t){} + EndPoint(NetworkThread* t); ~EndPoint(); friend class NetworkThread; friend class EndPointFactory; @@ -105,7 +107,8 @@ class NetworkThread{ std::unordered_map<int, ev_io> fd_wwatcher_map_; std::unordered_map<int, ev_io> fd_rwatcher_map_; - std::unordered_map<int, uint32_t> fd_ip_map_; + + std::unordered_map<int, EndPoint*> fd_ep_map_; std::map<int, Message> pending_msgs_; @@ -130,6 +133,7 @@ class NetworkThread{ void onConnEst(int fd); void onNewEp(); void onNewConn(); + void onTimeout(struct ev_timer* timer); }; } #endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/src/io/network/endpoint.cc ---------------------------------------------------------------------- diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc index a74f5c2..c7e06f9 100644 --- a/src/io/network/endpoint.cc +++ b/src/io/network/endpoint.cc @@ -58,6 +58,14 @@ static void accept_cb(struct ev_loop* loop, ev_io* ev, int revent) { reinterpret_cast<NetworkThread*>(ev_userdata(loop))->onNewConn(); } +static void timeout_cb(struct ev_loop* loop, ev_timer* ev, int revent) { + reinterpret_cast<NetworkThread*>(ev_userdata(loop))->onTimeout(ev); +} + +EndPoint::EndPoint(NetworkThread* t) : thread_(t) { + this->timer_.data = reinterpret_cast<void*>(this); +} + EndPoint::~EndPoint() { while(!recv_.empty()) { delete send_.front(); @@ -262,7 +270,7 @@ void NetworkThread::onNewEp() { // set this fd non-blocking fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); - this->fd_ip_map_[fd] = ntohl(ep->addr_.sin_addr.s_addr); + this->fd_ep_map_[fd] = ep; // initialize the addess ep->addr_.sin_family = AF_INET; @@ -305,7 +313,9 @@ void NetworkThread::onNewEp() { void NetworkThread::onConnEst(int fd) { - EndPoint* ep = epf_->getEp(this->fd_ip_map_[fd]); + //EndPoint* ep = epf_->getEp(this->fd_ip_map_[fd]); + CHECK(fd_ep_map_.count(fd) > 0); + EndPoint* ep = fd_ep_map_.at(fd); std::unique_lock<std::mutex> lock(ep->mtx_); @@ -359,14 +369,41 @@ void NetworkThread::onNewConn() { // Passive connection afterConnEst(ep, fd, false); - // This should not be put before afterConnEst otherwise it may have no - // effect - fd_ip_map_[fd] = a; - // record the remote address bcopy(&addr, &ep->addr_, len); } +void NetworkThread::onTimeout(struct ev_timer* timer) { + + EndPoint* ep = reinterpret_cast<EndPoint*>(timer->data); + + ev_tstamp timeout = EP_TIMEOUT + ep->last_msg_time_; + ev_tstamp now = ev_now(loop_); + + std::unique_lock<std::mutex> lock(ep->mtx_); + if (now > timeout) { + if (!ep->to_ack_.empty() || !ep->send_.empty()) { + + LOG(INFO) << "EndPoint " << inet_ntoa(ep->addr_.sin_addr) << " timeouts"; + // we consider this ep has been disconnected + for (int i = 0; i < 2; ++i) + { + int fd = ep->fd_[i]; + if (fd >= 0) + handleConnLost(fd, ep); + } + return; + } + + timer->repeat = EP_TIMEOUT; + + } else { + timer->repeat = timeout - now; + } + + ev_timer_again(loop_, &ep->timer_); +} + /** * @brief The processing for a connected socket * @@ -393,10 +430,14 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) { sfd = ep->fd_[0]; } - if (sfd == fd) + if (sfd == fd) { // this fd is a reuse of a previous socket fd // so we first need to clean the resouce for that fd - handleConnLost(fd, ep, false); + // we duplicate this fd to let the resouce of the oldf fd can be freed + // also indicate there is no need to reconnect + fd = dup(fd); + handleConnLost(sfd, ep, false); + } // initialize io watchers and add the read watcher to the ev loop ev_io_init(&fd_rwatcher_map_[fd], readable_cb, fd, EV_READ); @@ -407,6 +448,8 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) { ev_io_stop(loop_, &fd_wwatcher_map_[fd]); ev_io_init(&fd_wwatcher_map_[fd], writable_cb, fd, EV_WRITE); + ep->last_msg_time_ = ev_now(loop_); + // see whether there is already a established connection for this fd if (ep->conn_status_ == CONN_EST && sfd >= 0) { // check if fd and sfd are associate with the same socket @@ -414,27 +457,37 @@ void NetworkThread::afterConnEst(EndPoint* ep, int fd, bool active) { socklen_t len; if (getsockname(fd, (struct sockaddr*)&addr, &len)) { LOG(INFO) << "Unable to get local socket address: " << strerror(errno); - return; - } - - // see whether the local address of fd is the same as the remote side - // of sfd, which has already been stored in ep->addr_ - if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port == ep->addr_.sin_port) { - LOG(INFO) << fd << " and " << sfd << " are associated with the same socket"; } else { - // this socket is redundant, we close it maunally - close(fd); - handleConnLost(fd, ep); + // see whether the local address of fd is the same as the remote side + // of sfd, which has already been stored in ep->addr_ + if (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port == ep->addr_.sin_port) { + LOG(INFO) << fd << " and " << sfd << " are associated with the same socket"; + ep->is_socket_loop_ = true; + } else { + // this socket is redundant, we close it maunally if the local ip + // is smaller than the peer ip + if ((addr.sin_addr.s_addr < ep->addr_.sin_addr.s_addr) + || (addr.sin_addr.s_addr == ep->addr_.sin_addr.s_addr && addr.sin_port < ep->addr_.sin_port)) + handleConnLost(fd, ep, false); + } } } else { ep->pfd_ = fd; // set the primary fd ep->conn_status_ = CONN_EST; + + // start timeout watcher to detect the liveness of EndPoint + ev_init(&ep->timer_, timeout_cb); + ep->timer_.repeat = EP_TIMEOUT; + ev_timer_start(loop_, &ep->timer_); + //timeout_cb(loop_, &ep->timer_, EV_TIMER); } if (fd == ep->pfd_) { this->asyncSendPendingMsg(ep); } + fd_ep_map_[fd] = ep; + // Finally notify all waiting threads // if this connection is initiaed by remote side, // we dont need to notify the waiting thread @@ -451,7 +504,7 @@ void NetworkThread::onSend(int fd) { if (fd == -1) { //LOG(INFO) << "There are " << fd_ip_map_.size() << " connections"; // this is a signal of new message to send - for(auto& p : fd_ip_map_) { + for(auto& p : fd_ep_map_) { // send message //LOG(INFO) << "Try to send over fd " << p.first; if (asyncSend(p.first) < 0) @@ -463,7 +516,8 @@ void NetworkThread::onSend(int fd) { } for (auto& p : invalid_fd) { - EndPoint* ep = epf_->getEp(fd_ip_map_.at(p)); + //EndPoint* ep = epf_->getEp(fd_ip_map_.at(p)); + EndPoint* ep = fd_ep_map_.at(p); std::unique_lock<std::mutex> lock(ep->mtx_); handleConnLost(p, ep); } @@ -474,16 +528,14 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) { LOG(INFO) << "There are " << ep->send_.size() << " to-send msgs, and " << ep->to_ack_.size() << " to-ack msgs"; - if (ep->to_ack_.empty()) - return; - - while (!ep->send_.empty()) { - ep->to_ack_.push(ep->send_.front()); - ep->send_.pop(); + if (!ep->to_ack_.empty()) { + while (!ep->send_.empty()) { + ep->to_ack_.push(ep->send_.front()); + ep->send_.pop(); + } + std::swap(ep->send_, ep->to_ack_); } - std::swap(ep->send_, ep->to_ack_); - if (ep->send_.size() > 0) { notify(SIG_MSG); } @@ -497,13 +549,16 @@ void NetworkThread::asyncSendPendingMsg(EndPoint* ep) { */ int NetworkThread::asyncSend(int fd) { - EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); + //EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); + CHECK(fd_ep_map_.count(fd) > 0); + EndPoint* ep = fd_ep_map_.at(fd); std::unique_lock<std::mutex> ep_lock(ep->mtx_); - if (fd != ep->pfd_) + if (fd != ep->pfd_ ) // we only send over the primary fd - return 0; + // return -1 to indicate this fd is redundant + return ep->is_socket_loop_ ? 0 : -1; if (ep->conn_status_ != CONN_EST) // This happens during reconnection @@ -532,8 +587,10 @@ int NetworkThread::asyncSend(int fd) { msg.processed_ = 0; goto err; } - } else + } else { + ep->last_msg_time_ = ev_now(loop_); msg.processed_ += nbytes; + } //std::size_t m, p; //uint8_t type; @@ -547,7 +604,7 @@ int NetworkThread::asyncSend(int fd) { CHECK(msg.processed_ == msg.getSize()); if (msg.type_ != MSG_ACK) { - LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << msg.getSize();; + LOG(INFO) << "Send a DATA message to " << inet_ntoa(ep->addr_.sin_addr) << " for MSG " << msg.id_ << ", len = " << msg.getSize() << " over fd " << fd; msg.processed_ = 0; ep->to_ack_.push(&msg); } else { @@ -557,7 +614,7 @@ int NetworkThread::asyncSend(int fd) { ep->send_.pop(); - // for test + //for test // if (ep->retry_cnt_ == 0) { // LOG(INFO) << "Disconnect with Endpoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; // close(fd); @@ -577,13 +634,17 @@ void NetworkThread::onRecv(int fd) { Message* m = &pending_msgs_[fd]; Message& msg = (*m); int nread; - EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); + //EndPoint* ep = epf_->getEp(fd_ip_map_[fd]); + + CHECK(fd_ep_map_.count(fd) > 0); + EndPoint* ep = fd_ep_map_.at(fd); //LOG(INFO) << "Start to read from EndPoint " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; std::unique_lock<std::mutex> lock(ep->mtx_); - while(1) { + ep->last_msg_time_ = ev_now(loop_); + while(1) { if (msg.processed_ < Message::hsize_) { nread = read(fd, msg.mdata_ + msg.processed_, Message::hsize_ - msg.processed_); @@ -631,7 +692,7 @@ void NetworkThread::onRecv(int fd) { // got the whole metadata; readInteger(msg.mdata_, msg.type_, msg.id_, msg.msize_, msg.psize_); - //LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; + LOG(INFO) << "Receive a message: id = " << msg.id_ << ", msize_ = " << msg.msize_ << ", psize_ = " << msg.psize_ << " from " << inet_ntoa(ep->addr_.sin_addr) << " over fd " << fd; } // start reading the real data @@ -681,14 +742,14 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, bool reconn) { LOG(INFO) << "Lost connection to EndPoint " << inet_ntoa(ep->addr_.sin_addr) << ", fd = " << fd; this->pending_msgs_.erase(fd); - this->fd_ip_map_.erase(fd); + this->fd_ep_map_.erase(fd); ev_io_stop(loop_, &this->fd_wwatcher_map_[fd]); ev_io_stop(loop_, &this->fd_rwatcher_map_[fd]); fd_wwatcher_map_.erase(fd); fd_rwatcher_map_.erase(fd); - //close(fd); + close(fd); - if (fd == ep->fd_[0] || ep->fd_[0] < 0) { + if (fd == ep->pfd_) { if (!ep->send_.empty()) ep->send_.front()->processed_ = 0; } @@ -700,8 +761,10 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, bool reconn) { ep->fd_[1] = -1; if (reconn) { - // see if the other fd is ok or not + // see if the other fd is alive or not if (sfd < 0) { + if (ep->conn_status_ == CONN_EST) + ev_timer_stop(loop_, &ep->timer_); if (ep->retry_cnt_ < MAX_RETRY_CNT) { // notify myself for retry ep->retry_cnt_++; @@ -711,13 +774,20 @@ void NetworkThread::handleConnLost(int fd, EndPoint* ep, bool reconn) { } else { LOG(INFO) << "Maximum retry count achieved for EndPoint " << inet_ntoa(ep->addr_.sin_addr); ep->conn_status_ = CONN_ERROR; + // notify all threads that this ep is no longer connected ep->cv_.notify_all(); } } else { - // if there is another working fd, try to send data over this fd - if (!ep->send_.empty()) - this->notify(SIG_MSG); + if (!ep->is_socket_loop_) { + // if there is another working fd, set this fd as primary and + // send data over this fd + ep->pfd_ = sfd; + ep->last_msg_time_ = ev_now(loop_); + asyncSendPendingMsg(ep); + } else { + handleConnLost(sfd, ep); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/src/io/network/message.cc ---------------------------------------------------------------------- diff --git a/src/io/network/message.cc b/src/io/network/message.cc index ac3fc14..e63d176 100644 --- a/src/io/network/message.cc +++ b/src/io/network/message.cc @@ -34,6 +34,7 @@ Message::Message(Message&& msg) { std::swap(psize_, msg.psize_); std::swap(msg_, msg.msg_); std::swap(type_, msg.type_); + std::swap(id_, msg.id_); } Message::Message(int type, uint32_t ack_msg_id): type_(type), id_(ack_msg_id) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1565e659/test/singa/test_ep.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc index e6d3e07..63dbb43 100644 --- a/test/singa/test_ep.cc +++ b/test/singa/test_ep.cc @@ -8,6 +8,7 @@ #define SIZE 10000000 #define PORT 10000 +#define ITER 10 using namespace singa; int main(int argc, char** argv) { @@ -30,10 +31,6 @@ int main(int argc, char** argv) { memset(md, 'a', SIZE); memset(payload, 'b', SIZE); - Message* m = new Message(); - m->setMetadata(md, SIZE); - m->setPayload(payload, SIZE); - NetworkThread* t = new NetworkThread(port); EndPointFactory* epf = t->epf_; @@ -43,27 +40,53 @@ int main(int argc, char** argv) { EndPoint* ep = epf->getEp(host); - int cnt = 0; + Message* m[ITER]; + for (int i = 0; i < ITER; ++i) + { + m[i] = new Message(); + m[i]->setMetadata(md, SIZE); + m[i]->setPayload(payload, SIZE); + } + + while (1) { + for (int i = 0; i < ITER; ++i) + { + if (ep->send(m[i]) < 0) return 1; + delete m[i]; + } + + for (int i = 0; i < ITER; ++i) + { + m[i] = ep->recv(); + if (!m[i]) + return 1; + char *p; + CHECK(m[i]->getMetadata((void**)&p) == SIZE); + CHECK(0 == strncmp(p, md, SIZE)); + CHECK(m[i]->getPayload((void**)&p) == SIZE); + CHECK(0 == strncmp(p, payload, SIZE)); + } + } - while(ep && cnt++ <= 5 && ep->send(m) > 0 ) { + //while(ep && cnt++ <= 5 && ep->send(m) > 0 ) { - LOG(INFO) << "Send a " << m->getSize() << " bytes message"; + // LOG(INFO) << "Send a " << m->getSize() << " bytes message"; - Message* m1 = ep->recv(); + // Message* m1 = ep->recv(); - if (!m1) - break; + // if (!m1) + // break; - char *p; + // char *p; - LOG(INFO) << "Receive a " << m1->getSize() << " bytes message"; + // LOG(INFO) << "Receive a " << m1->getSize() << " bytes message"; - CHECK(m1->getMetadata((void**)&p) == SIZE); - CHECK(0 == strncmp(p, md, SIZE)); - CHECK(m1->getPayload((void**)&p) == SIZE); - CHECK(0 == strncmp(p, payload, SIZE)); + // CHECK(m1->getMetadata((void**)&p) == SIZE); + // CHECK(0 == strncmp(p, md, SIZE)); + // CHECK(m1->getPayload((void**)&p) == SIZE); + // CHECK(0 == strncmp(p, payload, SIZE)); - delete m; - m = m1; - } + // delete m; + // m = m1; + //} }
