SINGA-21 Code Review clean msg.h msg.cc -- remove BaseMsg interface in msg.h -- move zmq-related implementation to msg.cc -- formatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/767bad29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/767bad29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/767bad29 Branch: refs/heads/master Commit: 767bad29e4362e14341dfdf4e921c07591f88c75 Parents: 0f3a8ff Author: wang sheng <[email protected]> Authored: Sat Jun 20 19:23:41 2015 +0800 Committer: wang sheng <[email protected]> Committed: Sat Jun 20 19:23:41 2015 +0800 ---------------------------------------------------------------------- include/communication/msg.h | 244 +++++++++++------------------------- include/communication/socket.h | 6 +- src/communication/msg.cc | 47 ++++++- src/communication/socket.cc | 4 +- 4 files changed, 126 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/include/communication/msg.h ---------------------------------------------------------------------- diff --git a/include/communication/msg.h b/include/communication/msg.h index 61cbd01..ba7d064 100644 --- a/include/communication/msg.h +++ b/include/communication/msg.h @@ -1,196 +1,102 @@ -#ifndef INCLUDE_COMMUNICATION_MSG_H_ -#define INCLUDE_COMMUNICATION_MSG_H_ -#include <string> +#ifndef SINGA_COMMUNICATION_MSG_H_ +#define SINGA_COMMUNICATION_MSG_H_ + #include <czmq.h> #include <glog/logging.h> +#include <algorithm> +#include <string> -using std::string; namespace singa { -class BaseMsg{ - public: - /** - * Destructor to free memory - */ - virtual ~BaseMsg(){}; + +#define USE_ZMQ + +class Msg { + public: + Msg(); + ~Msg(); + /** * @param first worker/server group id - * @param id worker/server id within the group + * @param second worker/server id within the group * @param flag 0 for server, 1 for worker, 2 for stub */ - virtual void set_src(int first, int second, int flag)=0; - virtual void set_dst(int first, int second, int flag)=0; - virtual void set_src(int procs_id, int flag)=0; - virtual void set_dst(int procs_id, int flag)=0; - virtual int src_first() const=0; - virtual int dst_first() const=0; - virtual int src_second() const=0; - virtual int dst_second() const=0; - virtual int src_flag() const=0; - virtual int dst_flag() const=0; - virtual void set_type(int type)=0; - virtual int type() const=0; - virtual void set_target(int first, int second)=0; - virtual int target_first() const=0; - virtual int target_second() const=0; - + inline void set_src(int first, int second, int flag) { + src_ = (first << kOff1) | (second << kOff2) | flag; + } + inline void set_dst(int first, int second, int flag) { + dst_ = (first << kOff1) | (second << kOff2) | flag; + } + inline void set_src(int procs_id, int flag) { set_src(procs_id, 0, flag); } + inline void set_dst(int procs_id, int flag) { set_dst(procs_id, 0, flag); } + inline int src() const { return src_; } + inline int dst() const { return dst_; } + inline int src_first() const { return src_ >> kOff1; } + inline int dst_first() const { return dst_ >> kOff1; } + inline int src_second() const { return (src_ & kMask1) >> kOff2; } + inline int dst_second() const { return (dst_ & kMask1) >> kOff2; } + inline int src_flag() const { return src_&kMask2; } + inline int dst_flag() const { return dst_&kMask2; } + inline void SwapAddr() { std::swap(src_, dst_); } + inline void set_type(int type) { type_ = type; } + inline int type() const { return type_; } + inline void set_target(int first, int second) { + target_first_ = first; + target_second_ = second; + } + inline int target_first() const { return target_first_; } + inline int target_second() const { return target_second_; } /** * Copy src and dst address, including first, id, flag */ - virtual BaseMsg* CopyAddr()=0; - virtual void SetAddr(BaseMsg* msg)=0; + inline Msg* CopyAddr() { + Msg* msg = new Msg(); + msg->src_ = src_; + msg->dst_ = dst_; + return msg; + } + inline void SetAddr(Msg* msg) { + src_ = msg->src_; + dst_ = msg->dst_; + } /** * Add a frame (a chunck of bytes) into the message */ - virtual void add_frame(const void*, int nBytes)=0; - virtual int frame_size()=0; - virtual void* frame_data()=0; + void add_frame(const void* addr, int nBytes); + int frame_size(); + void* frame_data(); /** * Move the cursor to the next frame * @return true if the next frame is not NULL; otherwise false */ - virtual bool next_frame()=0; -}; -// TODO make it a compiler argument -#define USE_ZMQ - + bool next_frame(); #ifdef USE_ZMQ -class Msg : public BaseMsg{ - public: - Msg() { - msg_=zmsg_new(); - } - virtual ~Msg(){ - if(msg_!=NULL) - zmsg_destroy(&msg_); - } - virtual void set_src(int first, int second, int flag){ - src_=(first<<kOff1)|(second<<kOff2)|flag; - } - virtual void set_dst(int first, int second, int flag){ - dst_=(first<<kOff1)|(second<<kOff2)|flag; - } - virtual void set_src(int procs_id, int flag){ - set_src(procs_id, 0, flag); - } - virtual void set_dst(int procs_id, int flag){ - set_dst(procs_id, 0, flag); - } - int src() const { - return src_; - } - int dst() const { - return dst_; - } - virtual int src_first() const { - int ret=src_>>kOff1; - return ret; - } - - virtual int dst_first() const{ - int ret=dst_>>kOff1; - return ret; - } - virtual int src_second() const{ - int ret=(src_&kMask1)>>kOff2; - return ret; - } - virtual int dst_second() const{ - int ret=(dst_&kMask1)>>kOff2; - return ret; - } - virtual int src_flag() const{ - int ret=src_&kMask2; - return ret; - } - virtual int dst_flag() const{ - int ret=dst_&kMask2; - return ret; - } - - void SwapAddr(){ - std::swap(src_,dst_); - } - - virtual void set_type(int type){ - type_=type; - } - virtual int type() const{ - return type_; - } - - virtual void set_target(int first, int second){ - target_first_=first; - target_second_=second; - } - virtual int target_first() const{ - return target_first_; - } - virtual int target_second() const{ - return target_second_; - } - - virtual BaseMsg* CopyAddr(){ - Msg* msg=new Msg(); - msg->src_=src_; - msg->dst_=dst_; - return msg; - } - - virtual void SetAddr(BaseMsg* msg){ - src_=(static_cast<Msg*>(msg))->src_; - dst_=(static_cast<Msg*>(msg))->dst_; - } - - virtual void add_frame(const void* addr, int nBytes){ - zmsg_addmem(msg_, addr, nBytes); - } - virtual int frame_size(){ - return zframe_size(frame_); - } - - virtual void* frame_data(){ - return zframe_data(frame_); - } - - virtual bool next_frame(){ - frame_=zmsg_next(msg_); - return frame_!=NULL; - } - - void ParseFromZmsg(zmsg_t* msg){ - char* tmp=zmsg_popstr(msg); - sscanf(tmp, "%d %d %d %d %d", - &src_, &dst_, &type_, &target_first_, &target_second_); - //LOG(ERROR)<<"recv "<<src_<<" "<<dst_<<" "<<target_; - frame_=zmsg_next(msg); - msg_=msg; - } - - zmsg_t* DumpToZmsg(){ - zmsg_pushstrf(msg_, "%d %d %d %d %d", - src_, dst_, type_, target_first_, target_second_); - //LOG(ERROR)<<"send "<<src_<<" "<<dst_<<" "<<target_; - zmsg_t *tmp=msg_; - msg_=NULL; - return tmp; - } + void ParseFromZmsg(zmsg_t* msg); + zmsg_t* DumpToZmsg(); +#endif protected: - static const unsigned int kOff1=16, kOff2=4; - static const unsigned int kMask1=(1<<kOff1)-1, kMask2=(1<<kOff2)-1; - int src_, dst_; - int type_, target_first_, target_second_; - zmsg_t* msg_; - zframe_t *frame_; -}; + static const unsigned int kOff1 = 16; + static const unsigned int kOff2 = 4; + static const unsigned int kMask1 = (1 << kOff1) - 1; + static const unsigned int kMask2 = (1 << kOff2) - 1; + + int src_ = 0; + int dst_ = 0; + int type_ = 0; + int target_first_ = 0; + int target_second_ = 0; +#ifdef USE_ZMQ + zmsg_t* msg_ = nullptr; + zframe_t *frame_ = nullptr; #endif -inline void DeleteMsg(Msg** msg){ +}; + +inline void DeleteMsg(Msg** msg) { delete *msg; - *msg=nullptr; + *msg = nullptr; } +} // namespace singa -} /* singa */ - -#endif // INCLUDE_COMMUNICATION_MSG_H_ +#endif // SINGA_COMMUNICATION_MSG_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/include/communication/socket.h ---------------------------------------------------------------------- diff --git a/include/communication/socket.h b/include/communication/socket.h index 4b1f467..09c71e3 100644 --- a/include/communication/socket.h +++ b/include/communication/socket.h @@ -5,7 +5,7 @@ #include "communication/msg.h" namespace singa { -const string kInprocRouterEndpoint="inproc://router"; +const std::string kInprocRouterEndpoint="inproc://router"; class Socket{ public: Socket(){} @@ -83,7 +83,7 @@ class Dealer : public Socket{ * format, i.e., IP:port, where IP is the connected process. * @return 1 connection sets up successfully; 0 otherwise */ - virtual int Connect(string endpoint); + virtual int Connect(std::string endpoint); virtual int Send(Msg** msg); virtual Msg* Receive(); virtual void* InternalID() const{ @@ -119,7 +119,7 @@ class Router : public Socket{ * intra-process connection. * @return number of connected dealers. */ - virtual int Bind(string endpoint); + virtual int Bind(std::string endpoint); /** * If the destination socket has not connected yet, buffer this the message. */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/src/communication/msg.cc ---------------------------------------------------------------------- diff --git a/src/communication/msg.cc b/src/communication/msg.cc index 80f2304..2a22b05 100644 --- a/src/communication/msg.cc +++ b/src/communication/msg.cc @@ -1,5 +1,50 @@ #include "communication/msg.h" namespace singa { -} /* singa */ + +#ifdef USE_ZMQ +Msg::Msg() { + msg_ = zmsg_new(); +} + +Msg::~Msg() { + if (msg_ != nullptr) + zmsg_destroy(&msg_); +} + +void Msg::add_frame(const void* addr, int nBytes) { + zmsg_addmem(msg_, addr, nBytes); +} + +int Msg::frame_size() { + return zframe_size(frame_); +} + +void* Msg::frame_data() { + return zframe_data(frame_); +} + +bool Msg::next_frame() { + frame_ = zmsg_next(msg_); + return frame_ != nullptr; +} + +void Msg::ParseFromZmsg(zmsg_t* msg) { + char* tmp = zmsg_popstr(msg); + sscanf(tmp, "%d %d %d %d %d", + &src_, &dst_, &type_, &target_first_, &target_second_); + frame_ = zmsg_next(msg); + msg_ = msg; +} + +zmsg_t* Msg::DumpToZmsg() { + zmsg_pushstrf(msg_, "%d %d %d %d %d", + src_, dst_, type_, target_first_, target_second_); + zmsg_t *tmp = msg_; + msg_ = nullptr; + return tmp; +} +#endif + +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/src/communication/socket.cc ---------------------------------------------------------------------- diff --git a/src/communication/socket.cc b/src/communication/socket.cc index 385950b..30dff5d 100644 --- a/src/communication/socket.cc +++ b/src/communication/socket.cc @@ -24,7 +24,7 @@ Dealer::Dealer(int id):id_(id){ poller_=zpoller_new(dealer_); } -int Dealer::Connect(string endpoint){ +int Dealer::Connect(std::string endpoint){ if(endpoint.length()) CHECK_EQ(zsock_connect(dealer_,"%s", endpoint.c_str()),0); return 1; @@ -56,7 +56,7 @@ Router::Router(int bufsize){ CHECK_NOTNULL(router_); poller_=zpoller_new(router_); } -int Router::Bind(string endpoint){ +int Router::Bind(std::string endpoint){ if(endpoint.length()) CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()),0); return 1;
