SINGA-156 Remove the dependency on ZMQ for single process training bug fixing in communication part check cpplint
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/914c1e72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/914c1e72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/914c1e72 Branch: refs/heads/master Commit: 914c1e722d3c5e81c2f2bb4b1ffbd14a63f4aa3a Parents: 42f5253 Author: WANG Sheng <[email protected]> Authored: Mon Apr 4 13:53:06 2016 +0800 Committer: WANG Sheng <[email protected]> Committed: Mon Apr 4 16:54:19 2016 +0800 ---------------------------------------------------------------------- include/singa/comm/socket.h | 18 ++-- include/singa/utils/safe_queue.h | 141 +++++++++++++++++++------------- src/comm/msg.cc | 5 +- src/comm/socket.cc | 4 +- src/stub.cc | 3 +- src/test/test_connection_layers.cc | 12 ++- 6 files changed, 107 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/comm/socket.h ---------------------------------------------------------------------- diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h index 3194d8c..40d4cc3 100644 --- a/include/singa/comm/socket.h +++ b/include/singa/comm/socket.h @@ -43,17 +43,17 @@ class Dealer { /** * @param id used for identifying the msg queue of this dealer. */ - Dealer(int id); + explicit Dealer(int id); ~Dealer(); /** - * Setup the connection with the remote router. - * - * For local router, there is no need to connect it. - * - * @param endpoint Identifier of the remote router to connect. It follows - * ZeroMQ's format, i.e., IP:port, where IP is the connected process. - * @return 1 connection sets up successfully; 0 otherwise - */ + * Setup the connection with the remote router. + * + * For local router, there is no need to connect it. + * + * @param endpoint Identifier of the remote router to connect. It follows + * ZeroMQ's format, i.e., IP:port, where IP is the connected process. + * @return 1 connection sets up successfully; 0 otherwise + */ int Connect(const std::string& endpoint); /** * Send a message to the local router (id=-1) or remote outer. It is http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/utils/safe_queue.h ---------------------------------------------------------------------- diff --git a/include/singa/utils/safe_queue.h b/include/singa/utils/safe_queue.h index 99adbf0..31df1ef 100644 --- a/include/singa/utils/safe_queue.h +++ b/include/singa/utils/safe_queue.h @@ -1,7 +1,35 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +/** + * The code is adapted from following source: + * http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html + * under Creative Commons Attribution 4.0 International Public License + */ + #ifndef SINGA_UTILS_SAFE_QUEUE_H_ #define SINGA_UTILS_SAFE_QUEUE_H_ // source: http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html +#include <algorithm> #include <queue> #include <list> #include <mutex> @@ -12,33 +40,31 @@ /** A thread-safe asynchronous queue */ template <class T, class Container = std::list<T>> class SafeQueue { - typedef typename Container::value_type value_type; typedef typename Container::size_type size_type; typedef Container container_type; public: - /*! Create safe queue. */ SafeQueue() = default; - SafeQueue (SafeQueue&& sq) { - m_queue = std::move (sq.m_queue); + SafeQueue(SafeQueue&& sq) { + m_queue = std::move(sq.m_queue); } - SafeQueue (const SafeQueue& sq) { - std::lock_guard<std::mutex> lock (sq.m_mutex); + SafeQueue(const SafeQueue& sq) { + std::lock_guard<std::mutex> lock(sq.m_mutex); m_queue = sq.m_queue; } /*! Destroy safe queue. */ ~SafeQueue() { - std::lock_guard<std::mutex> lock (m_mutex); + std::lock_guard<std::mutex> lock(m_mutex); } /** * Sets the maximum number of items in the queue. Defaults is 0: No limit * \param[in] item An item. */ - void set_max_num_items (unsigned int max_num_items) { + void set_max_num_items(unsigned int max_num_items) { m_max_num_items = max_num_items; } @@ -47,13 +73,13 @@ class SafeQueue { * \param[in] item An item. * \return true if an item was pushed into the queue */ - bool push (const value_type& item) { - std::lock_guard<std::mutex> lock (m_mutex); + bool push(const value_type& item) { + std::lock_guard<std::mutex> lock(m_mutex); if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) return false; - m_queue.push (item); + m_queue.push(item); m_condition.notify_one(); return true; } @@ -63,13 +89,13 @@ class SafeQueue { * \param[in] item An item. * \return true if an item was pushed into the queue */ - bool push (const value_type&& item) { - std::lock_guard<std::mutex> lock (m_mutex); + bool push(const value_type&& item) { + std::lock_guard<std::mutex> lock(m_mutex); if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) return false; - m_queue.push (item); + m_queue.push(item); m_condition.notify_one(); return true; } @@ -78,12 +104,11 @@ class SafeQueue { * Pops item from the queue. If queue is empty, this function blocks until item becomes available. * \param[out] item The item. */ - void pop (value_type& item) { - std::unique_lock<std::mutex> lock (m_mutex); - m_condition.wait (lock, [this]() // Lambda funct - { + void pop(value_type& item) { + std::unique_lock<std::mutex> lock(m_mutex); + m_condition.wait(lock, [this]() { // Lambda funct return !m_queue.empty(); - }); + }); item = m_queue.front(); m_queue.pop(); } @@ -94,13 +119,12 @@ class SafeQueue { * If queue is empty, this function blocks until item becomes available. * \param[out] item The item. */ - void move_pop (value_type& item) { - std::unique_lock<std::mutex> lock (m_mutex); - m_condition.wait (lock, [this]() // Lambda funct - { + void move_pop(value_type& item) { + std::unique_lock<std::mutex> lock(m_mutex); + m_condition.wait(lock, [this]() { // Lambda funct return !m_queue.empty(); - }); - item = std::move (m_queue.front()); + }); + item = std::move(m_queue.front()); m_queue.pop(); } @@ -109,8 +133,8 @@ class SafeQueue { * \param[out] item The item. * \return False is returned if no item is available. */ - bool try_pop (value_type& item) { - std::unique_lock<std::mutex> lock (m_mutex); + bool try_pop(value_type& item) { + std::unique_lock<std::mutex> lock(m_mutex); if (m_queue.empty()) return false; @@ -126,13 +150,13 @@ class SafeQueue { * \param[out] item The item. * \return False is returned if no item is available. */ - bool try_move_pop (value_type& item) { - std::unique_lock<std::mutex> lock (m_mutex); + bool try_move_pop(value_type& item) { + std::unique_lock<std::mutex> lock(m_mutex); if (m_queue.empty()) return false; - item = std::move (m_queue.front()); + item = std::move(m_queue.front()); m_queue.pop(); return true; } @@ -143,15 +167,15 @@ class SafeQueue { * \param[in] timeout The number of microseconds to wait. * \return true if get an item from the queue, false if no item is received before the timeout. */ - bool timeout_pop (value_type& item, std::uint64_t timeout) { - std::unique_lock<std::mutex> lock (m_mutex); + bool timeout_pop(value_type& item, std::uint64_t timeout) { + std::unique_lock<std::mutex> lock(m_mutex); - if (m_queue.empty()) - { + if (m_queue.empty()) { if (timeout == 0) return false; - if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout) + if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) + == std::cv_status::timeout) return false; } @@ -168,19 +192,19 @@ class SafeQueue { * \param[in] timeout The number of microseconds to wait. * \return true if get an item from the queue, false if no item is received before the timeout. */ - bool timeout_move_pop (value_type& item, std::uint64_t timeout) { - std::unique_lock<std::mutex> lock (m_mutex); + bool timeout_move_pop(value_type& item, std::uint64_t timeout) { + std::unique_lock<std::mutex> lock(m_mutex); - if (m_queue.empty()) - { + if (m_queue.empty()) { if (timeout == 0) return false; - if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout) + if (m_condition.wait_for(lock, std::chrono::microseconds(timeout)) + == std::cv_status::timeout) return false; } - item = std::move (m_queue.front()); + item = std::move(m_queue.front()); m_queue.pop(); return true; } @@ -190,7 +214,7 @@ class SafeQueue { * \return Number of items in the queue. */ size_type size() const { - std::lock_guard<std::mutex> lock (m_mutex); + std::lock_guard<std::mutex> lock(m_mutex); return m_queue.size(); } @@ -199,7 +223,7 @@ class SafeQueue { * \return true if queue is empty. */ bool empty() const { - std::lock_guard<std::mutex> lock (m_mutex); + std::lock_guard<std::mutex> lock(m_mutex); return m_queue.empty(); } @@ -207,11 +231,11 @@ class SafeQueue { * Swaps the contents. * \param[out] sq The SafeQueue to swap with 'this'. */ - void swap (SafeQueue& sq) { + void swap(SafeQueue& sq) { if (this != &sq) { - std::lock_guard<std::mutex> lock1 (m_mutex); - std::lock_guard<std::mutex> lock2 (sq.m_mutex); - m_queue.swap (sq.m_queue); + std::lock_guard<std::mutex> lock1(m_mutex); + std::lock_guard<std::mutex> lock2(sq.m_mutex); + m_queue.swap(sq.m_queue); if (!m_queue.empty()) m_condition.notify_all(); @@ -224,10 +248,10 @@ class SafeQueue { /*! The copy assignment operator */ SafeQueue& operator= (const SafeQueue& sq) { if (this != &sq) { - std::lock_guard<std::mutex> lock1 (m_mutex); - std::lock_guard<std::mutex> lock2 (sq.m_mutex); - std::queue<T, Container> temp {sq.m_queue}; - m_queue.swap (temp); + std::lock_guard<std::mutex> lock1(m_mutex); + std::lock_guard<std::mutex> lock2(sq.m_mutex); + std::queue<T, Container> temp{sq.m_queue}; + m_queue.swap(temp); if (!m_queue.empty()) m_condition.notify_all(); @@ -238,17 +262,15 @@ class SafeQueue { /*! The move assignment operator */ SafeQueue& operator= (SafeQueue && sq) { - std::lock_guard<std::mutex> lock (m_mutex); - m_queue = std::move (sq.m_queue); + std::lock_guard<std::mutex> lock(m_mutex); + m_queue = std::move(sq.m_queue); - if (!m_queue.empty()) m_condition.notify_all(); + if (!m_queue.empty()) m_condition.notify_all(); return *this; } - private: - std::queue<T, Container> m_queue; mutable std::mutex m_mutex; std::condition_variable m_condition; @@ -257,7 +279,8 @@ class SafeQueue { /*! Swaps the contents of two SafeQueue objects. */ template <class T, class Container> -void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) { - q1.swap (q2); +void swap(SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) { + q1.swap(q2); } -#endif // SINGA_UTILS_SAFE_QUEUE_H_ + +#endif // SINGA_UTILS_SAFE_QUEUE_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/msg.cc ---------------------------------------------------------------------- diff --git a/src/comm/msg.cc b/src/comm/msg.cc index 94f3074..8128b46 100644 --- a/src/comm/msg.cc +++ b/src/comm/msg.cc @@ -128,7 +128,10 @@ int Msg::FrameSize() { } char* Msg::FrameStr() { - return static_cast<char*>(frames_.at(idx_).first); + char* ret = new char[frames_.at(idx_).second]; + memcpy(ret, static_cast<char*>(frames_.at(idx_).first), + frames_.at(idx_).second); + return ret; } void* Msg::FrameData() { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/socket.cc ---------------------------------------------------------------------- diff --git a/src/comm/socket.cc b/src/comm/socket.cc index aa1ee85..9afc54c 100644 --- a/src/comm/socket.cc +++ b/src/comm/socket.cc @@ -23,7 +23,7 @@ #include <glog/logging.h> namespace singa { -const int TIME_OUT = 2; // max blocking time in milliseconds. +const int TIME_OUT = 2; // max blocking time in milliseconds. std::unordered_map<int, SafeQueue<Msg*>> msgQueues; Dealer::~Dealer() { #ifdef USE_ZMQ @@ -68,7 +68,7 @@ int Dealer::Send(Msg** msg) { Msg* Dealer::Receive(int timeout) { Msg* msg = nullptr; if (timeout > 0) { - if(!msgQueues.at(id_).timeout_pop(msg, timeout)) + if (!msgQueues.at(id_).timeout_pop(msg, timeout)) return nullptr; } else { msgQueues.at(id_).pop(msg); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/stub.cc ---------------------------------------------------------------------- diff --git a/src/stub.cc b/src/stub.cc index c7658fc..4bc8c3d 100644 --- a/src/stub.cc +++ b/src/stub.cc @@ -48,8 +48,9 @@ void Stub::Setup() { const string hostip = cluster->hostip(); int port = router_->Bind("tcp://" + hostip + ":*"); endpoint_ = hostip + ":" + std::to_string(port); - } else + } else { endpoint_ = "localhost"; + } } /** * Get a hash id for a Param object from a group. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/test/test_connection_layers.cc ---------------------------------------------------------------------- diff --git a/src/test/test_connection_layers.cc b/src/test/test_connection_layers.cc index 6529840..cd7f5f5 100644 --- a/src/test/test_connection_layers.cc +++ b/src/test/test_connection_layers.cc @@ -118,11 +118,15 @@ TEST(ConnectionLayerTest, BridgeTest) { ASSERT_EQ(dst.data(nullptr).shape(0), N); ASSERT_EQ(dst.data(nullptr).shape(1), M); + msgQueues[-1]; + msgQueues[Addr(0, 0, kWorkerLayer)]; + // bind bridges to socket - Router router(N); - router.Bind("inproc://router"); - Dealer dealer(0); - dealer.Connect("inproc://router"); + // Router router(N); + Router router; + // router.Bind("inproc://router"); + Dealer dealer(Addr(0, 0, kWorkerLayer)); + // dealer.Connect("inproc://router"); std::unordered_map<std::string, Layer*> name2bridge; name2bridge[src.name()] = &src; name2bridge[dst.name()] = &dst;
