Repository: incubator-singa Updated Branches: refs/heads/master 56d32e8a0 -> 7d39f8813
SINGA-21 Code review review socket.h and socket.cc -- remove BasePoll class -- rename Socket to SocketInterface -- refine functions: Dealer.Connect, Router.Bind -- formatting Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/b2d7332c Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/b2d7332c Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/b2d7332c Branch: refs/heads/master Commit: b2d7332c62c8ec8531c1190babb85873fd3f8aac Parents: b0483f2 Author: wang sheng <[email protected]> Authored: Mon Jun 22 16:16:36 2015 +0800 Committer: wang wei <[email protected]> Committed: Wed Jun 24 16:57:59 2015 +0800 ---------------------------------------------------------------------- include/communication/msg.h | 12 +-- include/communication/socket.h | 161 +++++++++++++++++++----------------- src/communication/socket.cc | 159 ++++++++++++++++++++--------------- 3 files changed, 184 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/include/communication/msg.h ---------------------------------------------------------------------- diff --git a/include/communication/msg.h b/include/communication/msg.h index e63c3cf..6ff887f 100644 --- a/include/communication/msg.h +++ b/include/communication/msg.h @@ -1,14 +1,17 @@ #ifndef SINGA_COMMUNICATION_MSG_H_ #define SINGA_COMMUNICATION_MSG_H_ -#include <czmq.h> -#include <glog/logging.h> +// TODO(wangwei): make it a compiler argument +#define USE_ZMQ + #include <algorithm> #include <string> -namespace singa { +#ifdef USE_ZMQ +#include <czmq.h> +#endif -#define USE_ZMQ +namespace singa { class Msg { public: @@ -60,7 +63,6 @@ class Msg { src_ = msg->src_; dst_ = msg->dst_; } - /** * Add a frame (a chunck of bytes) into the message */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/include/communication/socket.h ---------------------------------------------------------------------- diff --git a/include/communication/socket.h b/include/communication/socket.h index 09c71e3..77c701a 100644 --- a/include/communication/socket.h +++ b/include/communication/socket.h @@ -1,81 +1,82 @@ -#ifndef INCLUDE_COMMUNICATION_SOCKET_H_ -#define INCLUDE_COMMUNICATION_SOCKET_H_ +#ifndef SINGA_COMMUNICATION_SOCKET_H_ +#define SINGA_COMMUNICATION_SOCKET_H_ + #include <map> +#include <string> #include <vector> + #include "communication/msg.h" + +#ifdef USE_ZMQ +#include <czmq.h> +#endif + namespace singa { -const std::string kInprocRouterEndpoint="inproc://router"; -class Socket{ - public: - Socket(){} - virtual ~Socket(){} +const char kInprocRouterEndpoint[] = "inproc://router"; + +class SocketInterface { + public: + virtual ~SocketInterface() {} /** - * Send a message to connected socket(s), non-blocking. The message will - * be deallocated after sending, thus should not be used after calling Send(); - * @param the message to be sent + * Send a message to connected socket(s), non-blocking. The message + * will be deallocated after sending, thus should not be used after + * calling Send(); + * + * @param msg The message to be sent * @return 1 for success queuing the message for sending, 0 for failure */ - virtual int Send(Msg** msg)=0; + virtual int Send(Msg** msg) = 0; /** * Receive a message from any connected socket. * * @return a message pointer if success; nullptr if failure */ - virtual Msg* Receive()=0; + virtual Msg* Receive() = 0; /** * @return Identifier of the implementation dependent socket. E.g., zsock_t* * for ZeroMQ implementation and rank for MPI implementation. */ - virtual void* InternalID() const=0; - - protected: - int local_id_; + virtual void* InternalID() const = 0; }; -class BasePoller{ +class Poller { public: + Poller(); /** * Add a socket for polling; Multiple sockets can be polled together by * adding them into the same poller. */ - virtual void Add(Socket* socket)=0; + void Add(SocketInterface* socket); /** * Poll for all sockets added into this poller. - * @param timeout stop after this number of mseconds - * @return pointer to the socket if it has one message in the receiving + * @param timeout Stop after this number of mseconds + * @return pointer To the socket if it has one message in the receiving * queue; nullptr if no message in any sockets, */ - virtual Socket* Wait(int timeout)=0; -}; + SocketInterface* Wait(int duration); -#define USE_ZMQ -#include <czmq.h> - -#ifdef USE_ZMQ -class Poller: public BasePoller{ - public: - Poller(); - virtual void Add(Socket* socket); - virtual Socket* Wait(int duration); protected: +#ifdef USE_ZMQ zpoller_t *poller_; - std::map<zsock_t*, Socket*> zsock2Socket_; + std::map<zsock_t*, SocketInterface*> zsock2Socket_; +#endif }; -class Dealer : public Socket{ +class Dealer : public SocketInterface { public: /* - * @param id local dealer ID within a procs if the dealer is from worker or + * @param id Local dealer ID within a procs if the dealer is from worker or * server thread, starts from 1 (0 is used by the router); or the connected * remote procs ID for inter-process dealers from the stub thread. */ - Dealer(int id=-1); - virtual ~Dealer(); + Dealer(); + explicit Dealer(int id); + ~Dealer() override; /** * Setup the connection with the router. * - * @param endpoint identifier of the router. For intra-process + * @param endpoint Identifier of the router. For intra-process * connection, the endpoint follows the format of ZeroMQ, i.e., * starting with "inproc://"; in Singa, since each process has one * router, hence we can fix the endpoint to be "inproc://router" for @@ -83,62 +84,66 @@ class Dealer : public Socket{ * format, i.e., IP:port, where IP is the connected process. * @return 1 connection sets up successfully; 0 otherwise */ - virtual int Connect(std::string endpoint); - virtual int Send(Msg** msg); - virtual Msg* Receive(); - virtual void* InternalID() const{ - return dealer_; - } + int Connect(const std::string& endpoint); + int Send(Msg** msg) override; + Msg* Receive() override; + void* InternalID() const override; + protected: - int id_; - zsock_t* dealer_; - zpoller_t* poller_; + int id_ = -1; +#ifdef USE_ZMQ + zsock_t* dealer_ = nullptr; + zpoller_t* poller_ = nullptr; +#endif }; -class Router : public Socket{ +class Router : public SocketInterface { public: - virtual ~Router(); + Router(); /** - * Constructor. - * * There is only one router per procs, hence its local id is 0 and is not set * explicitly. * - * @param bufsize buffer at most this number of messages + * @param bufsize Buffer at most this number of messages + */ + explicit Router(int bufsize); + ~Router() override; + /** + * Setup the connection with dealers. + * + * It automatically binds to the endpoint for intra-process communication, + * i.e., "inproc://router". + * + * @param endpoint The identifier for the Dealer socket in other process + * to connect. It has the format IP:Port, where IP is the host machine. + * If endpoint is empty, it means that all connections are + * intra-process connection. + * @return number of connected dealers. */ - Router(int bufsize=100); - /** - * Setup the connection with dealers. - * - * It automatically binds to the endpoint for intra-process communication, - * i.e., "inproc://router". - * - * @param endpoint the identifier for the Dealer socket in other process - * to connect. It has the format IP:Port, where IP is the host machine. - * If endpoint is empty, it means that all connections are - * intra-process connection. - * @return number of connected dealers. - */ - virtual int Bind(std::string endpoint); - /** + int Bind(const std::string& endpoint); + /** * If the destination socket has not connected yet, buffer this the message. */ - virtual int Send(Msg** msg); - virtual Msg* Receive(); - virtual void* InternalID() const{ - return router_; - } + int Send(Msg** msg) override; + Msg* Receive() override; + void* InternalID() const override; + protected: - zsock_t* router_; - zpoller_t* poller_; + int nBufmsg_ = 0; + int bufsize_ = 100; +#ifdef USE_ZMQ + zsock_t* router_ = nullptr; + zpoller_t* poller_ = nullptr; std::map<int, zframe_t*> id2addr_; std::map<int, std::vector<zmsg_t*>> bufmsg_; - int nBufmsg_, bufsize_; +#endif }; -#elif USE_MPI -vector<shared_ptr<SafeQueue>> MPIQueues; +#ifdef USE_MPI +// TODO(wangsheng): add intra-process communication using shared queue +std::vector<SafeQueue*> MPIQueues; #endif -} /* singa */ -#endif // INCLUDE_COMMUNICATION_SOCKET_H_ +} // namespace singa + +#endif // SINGA_COMMUNICATION_SOCKET_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/src/communication/socket.cc ---------------------------------------------------------------------- diff --git a/src/communication/socket.cc b/src/communication/socket.cc index 30dff5d..5321724 100644 --- a/src/communication/socket.cc +++ b/src/communication/socket.cc @@ -1,120 +1,149 @@ #include "communication/socket.h" +#include <glog/logging.h> + namespace singa { -Poller::Poller(){ - poller_=zpoller_new(NULL); + +#ifdef USE_ZMQ +Poller::Poller() { + poller_ = zpoller_new(nullptr); } -void Poller::Add(Socket* socket){ - zsock_t* zsock=static_cast<zsock_t*>(socket->InternalID()); +void Poller::Add(SocketInterface* socket) { + zsock_t* zsock = static_cast<zsock_t*>(socket->InternalID()); zpoller_add(poller_, zsock); - zsock2Socket_[zsock]=socket; + zsock2Socket_[zsock] = socket; } -Socket* Poller::Wait(int timeout){ - zsock_t* sock=(zsock_t*)zpoller_wait(poller_, timeout); - if(sock!=NULL) +SocketInterface* Poller::Wait(int timeout) { + zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout)); + if (sock != nullptr) return zsock2Socket_[sock]; - else return nullptr; + return nullptr; } -Dealer::Dealer(int id):id_(id){ - dealer_=zsock_new(ZMQ_DEALER); +Dealer::Dealer() : Dealer(-1) {} + +Dealer::Dealer(int id) : id_(id) { + dealer_ = zsock_new(ZMQ_DEALER); CHECK_NOTNULL(dealer_); - poller_=zpoller_new(dealer_); + poller_ = zpoller_new(dealer_); + CHECK_NOTNULL(poller_); } -int Dealer::Connect(std::string endpoint){ - if(endpoint.length()) - CHECK_EQ(zsock_connect(dealer_,"%s", endpoint.c_str()),0); - return 1; +Dealer::~Dealer() { + zsock_destroy(&dealer_); +} + +int Dealer::Connect(const std::string& endpoint) { + CHECK_GT(endpoint.length(), 0); + if (endpoint.length()) { + CHECK_EQ(zsock_connect(dealer_, "%s", endpoint.c_str()), 0); + return 1; + } + return 0; } -int Dealer::Send(Msg** msg){ - zmsg_t* zmsg=(*msg)->DumpToZmsg(); + +int Dealer::Send(Msg** msg) { + zmsg_t* zmsg = (*msg)->DumpToZmsg(); zmsg_send(&zmsg, dealer_); delete *msg; - *msg=NULL; + *msg = nullptr; return 1; } -Msg* Dealer::Receive(){ - zmsg_t* zmsg=zmsg_recv(dealer_); - if(zmsg==NULL) +Msg* Dealer::Receive() { + zmsg_t* zmsg = zmsg_recv(dealer_); + if (zmsg == nullptr) return nullptr; - Msg* msg=new Msg(); + Msg* msg = new Msg(); msg->ParseFromZmsg(zmsg); return msg; } -Dealer::~Dealer(){ - zsock_destroy(&dealer_); + +void* Dealer::InternalID() const { + return dealer_; } -Router::Router(int bufsize){ - nBufmsg_=0; - bufsize_=bufsize; - router_=zsock_new(ZMQ_ROUTER); +Router::Router() : Router(100) {} + +Router::Router(int bufsize) { + nBufmsg_ = 0; + bufsize_ = bufsize; + router_ = zsock_new(ZMQ_ROUTER); CHECK_NOTNULL(router_); - poller_=zpoller_new(router_); + poller_ = zpoller_new(router_); + CHECK_NOTNULL(poller_); } -int Router::Bind(std::string endpoint){ - if(endpoint.length()) - CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()),0); - return 1; + +Router::~Router() { + zsock_destroy(&router_); + for (auto it : id2addr_) + zframe_destroy(&it.second); + for (auto it : bufmsg_) { + for (auto *msg : it.second) + zmsg_destroy(&msg); + } +} + +int Router::Bind(const std::string& endpoint) { + CHECK_GT(endpoint.length(), 0); + if (endpoint.length()) { + CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0); + return 1; + } + return 0; } -int Router::Send(Msg **msg){ - zmsg_t* zmsg=(*msg)->DumpToZmsg(); - int dstid=(*msg)->dst(); - if(id2addr_.find(dstid)!=id2addr_.end()){ +int Router::Send(Msg **msg) { + zmsg_t* zmsg = (*msg)->DumpToZmsg(); + int dstid = (*msg)->dst(); + if (id2addr_.find(dstid) != id2addr_.end()) { // the connection has already been set up - zframe_t* addr=zframe_dup(id2addr_[dstid]); + zframe_t* addr = zframe_dup(id2addr_[dstid]); zmsg_prepend(zmsg, &addr); zmsg_send(&zmsg, router_); - }else{ + } else { // the connection is not ready, buffer the message - if(bufmsg_.size()==0) - nBufmsg_=0; + if (bufmsg_.size() == 0) + nBufmsg_ = 0; bufmsg_[dstid].push_back(zmsg); - nBufmsg_++; + ++nBufmsg_; CHECK_LE(nBufmsg_, bufsize_); } delete *msg; - *msg=NULL; + *msg = nullptr; return 1; } -Msg* Router::Receive(){ - zmsg_t* zmsg=zmsg_recv(router_); - if(zmsg==NULL) +Msg* Router::Receive() { + zmsg_t* zmsg = zmsg_recv(router_); + if (zmsg == nullptr) return nullptr; - zframe_t* dealer=zmsg_pop(zmsg); - Msg* msg=new Msg(); + zframe_t* dealer = zmsg_pop(zmsg); + Msg* msg = new Msg(); msg->ParseFromZmsg(zmsg); - if (id2addr_.find(msg->src())==id2addr_.end()){ + if (id2addr_.find(msg->src()) == id2addr_.end()) { // new connection, store the sender's identfier and send buffered messages // for it - id2addr_[msg->src()]=dealer; - if(bufmsg_.find(msg->src())!=bufmsg_.end()){ - for(auto& it: bufmsg_.at(msg->src())){ - zframe_t* addr=zframe_dup(dealer); + id2addr_[msg->src()] = dealer; + if (bufmsg_.find(msg->src()) != bufmsg_.end()) { + for (auto& it : bufmsg_.at(msg->src())) { + zframe_t* addr = zframe_dup(dealer); zmsg_prepend(it, &addr); zmsg_send(&it, router_); } bufmsg_.erase(msg->src()); } - } - else + } else { zframe_destroy(&dealer); + } return msg; } -Router::~Router(){ - zsock_destroy(&router_); - for(auto it: id2addr_) - zframe_destroy(&it.second); - for(auto it: bufmsg_){ - for(auto *msg: it.second) - zmsg_destroy(&msg); - } +void* Router::InternalID() const { + return router_; } -} /* singa */ +#endif + +} // namespace singa
