SINGA-233 Some code cleaning
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4353ce99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4353ce99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4353ce99 Branch: refs/heads/dev Commit: 4353ce99c48c5813ef18afed77a6f522e77daf82 Parents: 1565e65 Author: caiqc <[email protected]> Authored: Tue Aug 9 22:45:47 2016 +0800 Committer: caiqc <[email protected]> Committed: Wed Aug 10 09:43:02 2016 +0800 ---------------------------------------------------------------------- include/singa/io/integer.h | 73 ++++++++++++++ include/singa/io/network.h | 164 +++++++++++++++++++++++++++++++ include/singa/io/network/endpoint.h | 139 -------------------------- include/singa/io/network/integer.h | 73 -------------- include/singa/io/network/message.h | 75 -------------- src/io/network/endpoint.cc | 4 +- src/io/network/message.cc | 28 +----- test/CMakeLists.txt | 18 ++-- test/singa/test_ep.cc | 26 ++++- 9 files changed, 273 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/include/singa/io/integer.h ---------------------------------------------------------------------- diff --git a/include/singa/io/integer.h b/include/singa/io/integer.h new file mode 100644 index 0000000..9c2799d --- /dev/null +++ b/include/singa/io/integer.h @@ -0,0 +1,73 @@ +/************************************************************ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + *************************************************************/ + +#ifndef INTEGER_H_ +#define INTEGER_H_ + +#include <cstdint> + +namespace singa{ +static bool isNetworkOrder() { + int test = 1; + return (1 != *(uint8_t*)&test); +} + +template <typename T> +static inline T byteSwap(const T& v) { + int size = sizeof(v); + T ret; + uint8_t *dest = reinterpret_cast<uint8_t *>(&ret); + uint8_t *src = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(&v)); + for (int i = 0; i < size; ++i) { + dest[i] = src[size - i - 1]; + } + return ret; +} + +template <typename T> +static inline T hton(const T& v) +{ + return isNetworkOrder() ? v : byteSwap(v); +} + +template <typename T> +static inline T ntoh(const T& v) +{ + return hton(v); +} + +static inline int appendInteger(char* buf) {return 0;} +static inline int readInteger(char* buf) {return 0;} + +template<typename Type, typename... Types> +static int appendInteger(char* buf, Type value, Types... values) { + *(Type*)buf = hton(value); + return sizeof(Type) + appendInteger(buf + sizeof(Type), values...); +} + +template<typename Type, typename... Types> +static int readInteger(char* buf, Type& value, Types&... values) { + value = ntoh(*(Type*)buf); + return sizeof(Type) + readInteger(buf + sizeof(Type), values...); +} + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/include/singa/io/network.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network.h b/include/singa/io/network.h new file mode 100644 index 0000000..846c94b --- /dev/null +++ b/include/singa/io/network.h @@ -0,0 +1,164 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_COMM_NETWORK_H_ +#define SINGA_COMM_NETWORK_H_ + +#include <ev.h> +#include <thread> +#include <unordered_map> +#include <map> +#include <vector> +#include <condition_variable> +#include <mutex> +#include <atomic> +#include <string> +#include <netinet/in.h> +#include <queue> + +namespace singa { + +#define LOCKED 1 +#define UNLOCKED 0 + +#define SIG_EP 1 +#define SIG_MSG 2 + +#define CONN_INIT 0 +#define CONN_PENDING 1 +#define CONN_EST 2 +#define CONN_ERROR 3 + +#define MAX_RETRY_CNT 3 + +#define EP_TIMEOUT 5. + +#define MSG_DATA 0 +#define MSG_ACK 1 + +class NetworkThread; +class EndPoint; +class EndPointFactory; + +class Message{ + private: + uint8_t type_; + uint32_t id_; + std::size_t msize_ = 0; + std::size_t psize_ = 0; + std::size_t processed_ = 0; + char* msg_ = nullptr; + static const int hsize_ = sizeof(id_) + 2 * sizeof(std::size_t) + sizeof(type_); + char mdata_[hsize_]; + friend class NetworkThread; + friend class EndPoint; + public: + Message(int = MSG_DATA, uint32_t = 0); + Message(const Message&) = delete; + Message(Message&&); + ~Message(); + + void setMetadata(const void*, int); + void setPayload(const void*, int); + + std::size_t getMetadata(void**); + std::size_t getPayload(void**); + + std::size_t getSize(); + void setId(uint32_t); +}; + +class EndPoint { + private: + std::queue<Message*> send_; + std::queue<Message*> recv_; + std::queue<Message*> to_ack_; + std::condition_variable cv_; + std::mutex mtx_; + struct sockaddr_in addr_; + ev_timer timer_; + ev_tstamp last_msg_time_; + int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other + int pfd_ = -1; + bool is_socket_loop_ = false; + int conn_status_ = CONN_INIT; + int pending_cnt_ = 0; + int retry_cnt_ = 0; + NetworkThread* thread_ = nullptr; + EndPoint(NetworkThread* t); + ~EndPoint(); + friend class NetworkThread; + friend class EndPointFactory; + public: + int send(Message*); + Message* recv(); +}; + +class EndPointFactory { + private: + std::unordered_map<uint32_t, EndPoint*> ip_ep_map_; + std::condition_variable map_cv_; + std::mutex map_mtx_; + NetworkThread* thread_; + EndPoint* getEp(uint32_t ip); + EndPoint* getOrCreateEp(uint32_t ip); + friend class NetworkThread; + public: + EndPointFactory(NetworkThread* thread) : thread_(thread) {} + ~EndPointFactory(); + EndPoint* getEp(const char* host); + void getNewEps(std::vector<EndPoint*>& neps); +}; + +class NetworkThread{ + private: + struct ev_loop *loop_; + ev_async ep_sig_; + ev_async msg_sig_; + ev_io socket_watcher_; + int port_; + int socket_fd_; + std::thread* thread_; + std::unordered_map<int, ev_io> fd_wwatcher_map_; + std::unordered_map<int, ev_io> fd_rwatcher_map_; + std::unordered_map<int, EndPoint*> fd_ep_map_; + std::map<int, Message> pending_msgs_; + + void handleConnLost(int, EndPoint*, bool = true); + void doWork(); + int asyncSend(int); + void asyncSendPendingMsg(EndPoint*); + void afterConnEst(EndPoint* ep, int fd, bool active); + public: + EndPointFactory* epf_; + + NetworkThread(int); + void notify(int signal); + + void onRecv(int fd); + void onSend(int fd = -1); + void onConnEst(int fd); + void onNewEp(); + void onNewConn(); + void onTimeout(struct ev_timer* timer); +}; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/include/singa/io/network/endpoint.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network/endpoint.h b/include/singa/io/network/endpoint.h deleted file mode 100644 index ac243ff..0000000 --- a/include/singa/io/network/endpoint.h +++ /dev/null @@ -1,139 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_COMM_END_POINT_H_ -#define SINGA_COMM_END_POINT_H_ - -#include <ev.h> -#include <thread> -#include <unordered_map> -#include <map> -#include <vector> -#include <condition_variable> -#include <mutex> -#include <atomic> -#include <string> -#include <netinet/in.h> - -#include "singa/io/network/message.h" - -namespace singa { - -#define LOCKED 1 -#define UNLOCKED 0 - -#define SIG_EP 1 -#define SIG_MSG 2 - -#define CONN_INIT 0 -#define CONN_PENDING 1 -#define CONN_EST 2 -#define CONN_ERROR 3 - -#define MAX_RETRY_CNT 3 - -#define EP_TIMEOUT 5. - -class NetworkThread; -class EndPointFactory; - -class EndPoint { - private: - std::queue<Message*> send_; - std::queue<Message*> recv_; - std::queue<Message*> to_ack_; - std::condition_variable cv_; - std::mutex mtx_; - struct sockaddr_in addr_; - ev_timer timer_; - ev_tstamp last_msg_time_; - int fd_[2] = {-1, -1}; // two endpoints simultaneously connect to each other - int pfd_ = -1; - bool is_socket_loop_ = false; - int conn_status_ = CONN_INIT; - int pending_cnt_ = 0; - int retry_cnt_ = 0; - NetworkThread* thread_ = nullptr; - EndPoint(NetworkThread* t); - ~EndPoint(); - friend class NetworkThread; - friend class EndPointFactory; - public: - int send(Message*); - Message* recv(); -}; - -class EndPointFactory { - private: - std::unordered_map<uint32_t, EndPoint*> ip_ep_map_; - std::condition_variable map_cv_; - std::mutex map_mtx_; - NetworkThread* thread_; - EndPoint* getEp(uint32_t ip); - EndPoint* getOrCreateEp(uint32_t ip); - friend class NetworkThread; - public: - EndPointFactory(NetworkThread* thread) : thread_(thread) {} - ~EndPointFactory(); - EndPoint* getEp(const char* host); - void getNewEps(std::vector<EndPoint*>& neps); -}; - -class NetworkThread{ - private: - struct ev_loop *loop_; - ev_async ep_sig_; - ev_async msg_sig_; - - std::thread* thread_; - - std::unordered_map<int, ev_io> fd_wwatcher_map_; - std::unordered_map<int, ev_io> fd_rwatcher_map_; - - std::unordered_map<int, EndPoint*> fd_ep_map_; - - std::map<int, Message> pending_msgs_; - - ev_io socket_watcher_; - int port_; - int socket_fd_; - - void handleConnLost(int, EndPoint*, bool = true); - void doWork(); - int asyncSend(int); - void asyncSendPendingMsg(EndPoint*); - void afterConnEst(EndPoint* ep, int fd, bool active); - public: - EndPointFactory* epf_; - - NetworkThread(int); - //void join(); - void notify(int signal); - - void onRecv(int fd); - void onSend(int fd = -1); - void onConnEst(int fd); - void onNewEp(); - void onNewConn(); - void onTimeout(struct ev_timer* timer); -}; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/include/singa/io/network/integer.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network/integer.h b/include/singa/io/network/integer.h deleted file mode 100644 index 9c2799d..0000000 --- a/include/singa/io/network/integer.h +++ /dev/null @@ -1,73 +0,0 @@ -/************************************************************ - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - *************************************************************/ - -#ifndef INTEGER_H_ -#define INTEGER_H_ - -#include <cstdint> - -namespace singa{ -static bool isNetworkOrder() { - int test = 1; - return (1 != *(uint8_t*)&test); -} - -template <typename T> -static inline T byteSwap(const T& v) { - int size = sizeof(v); - T ret; - uint8_t *dest = reinterpret_cast<uint8_t *>(&ret); - uint8_t *src = const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(&v)); - for (int i = 0; i < size; ++i) { - dest[i] = src[size - i - 1]; - } - return ret; -} - -template <typename T> -static inline T hton(const T& v) -{ - return isNetworkOrder() ? v : byteSwap(v); -} - -template <typename T> -static inline T ntoh(const T& v) -{ - return hton(v); -} - -static inline int appendInteger(char* buf) {return 0;} -static inline int readInteger(char* buf) {return 0;} - -template<typename Type, typename... Types> -static int appendInteger(char* buf, Type value, Types... values) { - *(Type*)buf = hton(value); - return sizeof(Type) + appendInteger(buf + sizeof(Type), values...); -} - -template<typename Type, typename... Types> -static int readInteger(char* buf, Type& value, Types&... values) { - value = ntoh(*(Type*)buf); - return sizeof(Type) + readInteger(buf + sizeof(Type), values...); -} - -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/include/singa/io/network/message.h ---------------------------------------------------------------------- diff --git a/include/singa/io/network/message.h b/include/singa/io/network/message.h deleted file mode 100644 index 0f691c0..0000000 --- a/include/singa/io/network/message.h +++ /dev/null @@ -1,75 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_COMM_MESSAGE_H_ -#define SINGA_COMM_MESSAGE_H_ - -#include <mutex> -#include <queue> - -namespace singa { - -#define MSG_DATA 0 -#define MSG_ACK 1 - -class NetworkThread; -class EndPoint; -class Message{ - private: - uint8_t type_; - uint32_t id_; - std::size_t msize_ = 0; - std::size_t psize_ = 0; - std::size_t processed_ = 0; - char* msg_ = nullptr; - static const int hsize_ = sizeof(id_) + 2 * sizeof(std::size_t) + sizeof(type_); - char mdata_[hsize_]; - friend class NetworkThread; - friend class EndPoint; - public: - Message(int = MSG_DATA, uint32_t = 0); - Message(const Message&) = delete; - Message(Message&&); - ~Message(); - - void setMetadata(const void*, int); - void setPayload(const void*, int); - - std::size_t getMetadata(void**); - std::size_t getPayload(void**); - - std::size_t getSize(); - void setId(uint32_t); -}; - -class MessageQueue -{ - public: - void push(Message&); - Message& front(); - void pop(); - std::size_t size(); - private: - std::mutex lock_; - std::queue<Message> mqueue_; -}; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/src/io/network/endpoint.cc ---------------------------------------------------------------------- diff --git a/src/io/network/endpoint.cc b/src/io/network/endpoint.cc index c7e06f9..96a2e4a 100644 --- a/src/io/network/endpoint.cc +++ b/src/io/network/endpoint.cc @@ -19,8 +19,8 @@ * *************************************************************/ -#include "singa/io/network/endpoint.h" -#include "singa/io/network/integer.h" +#include "singa/io/network.h" +#include "singa/io/integer.h" #include "singa/utils/logging.h" #include <sys/socket.h> http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/src/io/network/message.cc ---------------------------------------------------------------------- diff --git a/src/io/network/message.cc b/src/io/network/message.cc index e63d176..5bf9b8e 100644 --- a/src/io/network/message.cc +++ b/src/io/network/message.cc @@ -24,8 +24,8 @@ #include <atomic> -#include "singa/io/network/message.h" -#include "singa/io/network/integer.h" +#include "singa/io/network.h" +#include "singa/io/integer.h" namespace singa { @@ -89,28 +89,4 @@ std::size_t Message::getPayload(void** p) { return this->psize_; } -void MessageQueue::push(Message& msg) { - this->lock_.lock(); - this->mqueue_.push(static_cast<Message&&>(msg)); - this->lock_.unlock(); -} - -void MessageQueue::pop() { - this->lock_.lock(); - this->mqueue_.pop(); - this->lock_.unlock(); -} - -Message& MessageQueue::front() { - this->lock_.lock(); - Message& ret = this->mqueue_.front(); - this->lock_.unlock(); - return ret; -} - -std::size_t MessageQueue::size() { - std::unique_lock<std::mutex> lock(lock_); - return mqueue_.size(); -} - } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3bfd36c..fda871d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -9,14 +9,14 @@ IF(NOT USE_OPENCL) LIST(REMOVE_ITEM singa_test_source "singa/test_opencl.cc") ENDIF() -#ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source}) -#ADD_DEPENDENCIES(test_singa singa_core singa_utils) -#MESSAGE(STATUS "link libs" ${singa_linker_libs}) -#TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model -# singa_io proto protobuf ${SINGA_LINKER_LIBS}) +ADD_EXECUTABLE(test_singa "gtest/gtest_main.cc" ${singa_test_source}) +ADD_DEPENDENCIES(test_singa singa_core singa_utils) +MESSAGE(STATUS "link libs" ${singa_linker_libs}) +TARGET_LINK_LIBRARIES(test_singa gtest singa_core singa_utils singa_model + singa_io proto protobuf ${SINGA_LINKER_LIBS}) #SET_TARGET_PROPERTIES(test_singa PROPERTIES LINK_FLAGS "${LINK_FLAGS} -pthread") -ADD_EXECUTABLE(test_ep "singa/test_ep.cc") -ADD_DEPENDENCIES(test_ep singa_io) -TARGET_LINK_LIBRARIES(test_ep singa_core singa_utils singa_model - singa_io proto protobuf ${SINGA_LINKER_LIBS}) +#ADD_EXECUTABLE(test_ep "singa/test_ep.cc") +#ADD_DEPENDENCIES(test_ep singa_io) +#TARGET_LINK_LIBRARIES(test_ep singa_core singa_utils singa_model +# singa_io proto protobuf ${SINGA_LINKER_LIBS}) http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4353ce99/test/singa/test_ep.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_ep.cc b/test/singa/test_ep.cc index 63dbb43..cc04064 100644 --- a/test/singa/test_ep.cc +++ b/test/singa/test_ep.cc @@ -1,6 +1,26 @@ -#include "singa/io/network/endpoint.h" -#include "singa/io/network/integer.h" -#include "singa/io/network/message.h" +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "singa/io/integer.h" +#include "singa/io/network.h" #include <assert.h> #include <unistd.h>
