http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/include/singa/utils/zk_service.h ---------------------------------------------------------------------- diff --git a/include/singa/utils/zk_service.h b/include/singa/utils/zk_service.h deleted file mode 100644 index 789215b..0000000 --- a/include/singa/utils/zk_service.h +++ /dev/null @@ -1,116 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_UTILS_ZK_SERVICE_H_ -#define SINGA_UTILS_ZK_SERVICE_H_ - -#include <zookeeper/zookeeper.h> -#include <string> -#include <vector> - -#include "singa/utils/cluster_rt.h" - -namespace singa { - -const int kZKBufSize = 100; -// following paths are global -const std::string kZKPathSinga = "/singa"; -const std::string kZKPathSys = "/singa/sys"; -const std::string kZKPathJLock = "/singa/sys/job-lock"; -const std::string kZKPathHostIdx = "/singa/sys/host-idx"; -const std::string kZKPathApp = "/singa/app"; -const std::string kZKPathJob = "/singa/app/job-"; -// following paths are local under /singa/app/job-X -const std::string kZKPathJobGroup = "/group"; -const std::string kZKPathJobProc = "/proc"; -const std::string kZKPathJobPLock = "/proc-lock"; - -inline std::string GetZKJobWorkspace(int job_id) { - char buf[kZKBufSize]; - snprintf(buf, kZKBufSize, "%010d", job_id); - return kZKPathJob + buf; -} - -/* - * A wrapper for zookeeper service which handles error code and reconnections - */ -class ZKService { - public: - static void ChildChanges(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - - ~ZKService(); - bool Init(const std::string& host, int timeout); - bool CreateNode(const char* path, const char* val, int flag, char* output); - bool DeleteNode(const char* path); - bool Exist(const char* path); - bool UpdateNode(const char* path, const char* val); - bool GetNode(const char* path, char* output); - bool GetChild(const char* path, std::vector<std::string>* vt); - bool WGetChild(const char* path, std::vector<std::string>* vt, - RTCallback *cb); - - private: - const int kNumRetry = 5; - const int kSleepSec = 1; - - static void WatcherGlobal(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - - zhandle_t* zkhandle_ = nullptr; -}; - -/* - * A ClusterRuntime implementation using zookeeper - */ -class ZKClusterRT : public ClusterRuntime { - public: - ZKClusterRT(const std::string& host, int job_id); - ~ZKClusterRT(); - - bool Init() override; - int RegistProc(const std::string& host_addr, int pid) override; - std::string GetProcHost(int proc_id) override; - bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override; - bool JoinSGroup(int gid, int wid, int s_group) override; - bool LeaveSGroup(int gid, int wid, int s_group) override; - - private: - inline std::string groupPath(int gid) { - return group_path_ + "/sg" + std::to_string(gid); - } - inline std::string workerPath(int gid, int wid) { - return "/g" + std::to_string(gid) + "_w" + std::to_string(wid); - } - - int timeout_ = 30000; - std::string host_ = ""; - ZKService zk_; - std::string workspace_ = ""; - std::string group_path_ = ""; - std::string proc_path_ = ""; - std::string proc_lock_path_ = ""; - std::vector<RTCallback*> cb_vec_; -}; - -} // namespace singa - -#endif // SINGA_UTILS_ZK_SERVICE_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/include/singa/worker.h ---------------------------------------------------------------------- diff --git a/include/singa/worker.h b/include/singa/worker.h deleted file mode 100644 index d53e54b..0000000 --- a/include/singa/worker.h +++ /dev/null @@ -1,340 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_WORKER_H_ -#define SINGA_WORKER_H_ - -#include <string> -#include <unordered_map> -#include <vector> -#include "singa/comm/socket.h" -#include "singa/neuralnet/neuralnet.h" -#include "singa/proto/job.pb.h" -#include "singa/neuralnet/connection_layer.h" -#include "singa/neuralnet/neuron_layer.h" - -namespace singa { - -//!< sleep 5 milliseconds if the Param is not updated to the expected version -const int kCollectSleepTime = 5; -/** - * The Worker class which runs the training algorithm. - * The first worker group will initialize parameters of the Net, - * and put them into the distributed memory/table. - * The virtual function TrainOneBatch and TestOneBatch implement the - * training and test algorithm for one mini-batch data. - * - * Child workers override the two functions to implement their training - * algorithms, e.g., the BPWorker/CDWorker/BPTTWorker implements the BP/CD/BPTT - * algorithm respectively. - */ -class Worker { - public: - /** - * Create an instance of the subclass of Worker. - * - * @param[in] conf configuration of the TrainOneBatch algorithm. Different - * Worker subclasses implement different algorithms. Hence the creation is - * based on the TrainOneBatch algorithm type. Currently SINGA - * provides two algorithms: - * -# Back-propagation for the feed-forward models, e.g., CNN and MLP, and the - * recurrent neural networks. - * -# Contrastive divergence for the energy models, e.g., RBM. - * - * @return a pointer to the instance of the Worker subclass. - */ - static Worker* CreateWorker(const std::string str); - static Worker* Create(const AlgProto& conf); - virtual ~Worker(); - /** - * @param[in] grp_id global worker group ID - * @param[in] id worker ID within the group - * @param[in] conf job configuration - * @param[in] train_net pointer to the training neural net, which could be - * shared with other workers from the same group. Different workers run over - * differnt subset of layers. - * @param[in] val_net pointer to the validation neural net. Currently only the - * first worker from the first group would have validation neural net. All - * other workers receive nullptr for this argument. - * @param[in] test_net pointer to the test neural net. Currently only the - * first worker from the first group would have test neural net. All other - * workers receive nullptr for this argument. - */ - virtual void Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net); - /** - * Main function of Worker. - * - * Train the neuralnet step by step, test/validation is done periodically. - */ - void Run(); - /** - * Run TestOneBatch() over the a neural net for a total number of steps. - * - * @param[in] steps total number of test iterations. - * @param[in] phase kVal or kTest - * @param[in] net run test over the passed in neural net - */ - void Test(int steps, Phase phase, NeuralNet* net); - /** - * Init sockets in a worker, including: - * 1. a global socket communicates with stub - * 2. a bridge socket dedicated for bridge layer communications - * - * the bridge socket will be binded to each bridge layer - * - * @param[in] net pointer to a neural net whose bridge layer will be binded - * with a socket. - */ - void InitSockets(const NeuralNet* net); - /** - * Init values of Param instances assocaited with local layers (i.e., layers - * dispatched to this worker). - * - * If one Param is owned by the worker, then it should be initialized and put - * to servers. Otherwise Get() should be called to get the Param. The Get() - * may not send get requests if the Param owner is in the same procs, for - * which case the memory space of the Param objects are shared. But if this - * worker and the Param owner worker run on different devices (e.g., GPUs), - * then the get request would be sent. - * - * If the training starts from scrath, every Param object is initialzed using - * ParamGenerator. After that, the worker may - * train for a couple of steps to warmup the params before put - * them to servers (warmup of JobProto controls this). - * - * If one Param object's name matches that of one Param object from the - * checkpoint files, its Param values would be loaded from checkpoint files. - * - * @param[in] job_conf job configuration which provides settings for - * checkpoint file paths, warmup steps and Param versions. - * @param[out] net pointer to a neural net whose Param values will be - * initialized. - */ - void InitNetParams(const JobProto& job_conf, NeuralNet* net); - void InitNetParams(const std::string& folder, vector<Layer*> net); - /** - * Checkpoint all Param objects owned by the worker onto disk. - * The serialization is done using BlobProtos which includes the name, version - * and values of each Param object. - * Different workers would generate different checkpoint files. The file path - * is <workspace>/checkpoint-<jobname>-step<step>-worker<worker_id>.bin - * @param[in] step training step - * @param[in] folder directory to put the checkpoint file - * @param net the training net whose Param objects will be dumped. - */ - void Checkpoint(int step, const std::string& folder, NeuralNet* net); - void Checkpoint(int step, const std::string& folder, vector<Layer*> net); - /** - * Train one mini-batch. - * Test/Validation is done before training. - * - * @param[in] step training step. - * @param[in] net neural net to be trained. - */ - virtual void TrainOneBatch(int step, NeuralNet* net) = 0; - /** - * Test/validate one mini-batch data. - * - * @param[in] step test step. - * @param[in] phase test could be done for validation or test phase. - * @param[in] net neural net for test - */ - virtual void TestOneBatch(int step, Phase phase, NeuralNet* net) = 0; - /** - * Display infomation from layers. - * - * @param flag could be a combination of multiple phases, e.g, kTest|kForward, - * it is passed to the Layer::ToString() function for each layer to decide - * what to display . - * @param prefix display prefix, e.g., 'Train step 100', 'Test step 90'. - * @param net display layers from this neural net. - */ - virtual void Display(int flag, const std::string& prefix, NeuralNet* net); - /** - * Put Param values to server. - * - * @param param - * @param step used as current param version for the put request - */ - int Put(int step, Param* param); - /** - * Get Param with specific version from server - * If the current version >= the requested version, then return. - * Otherwise send a get request to stub who would forwards it to servers. - * @param param - * @param step requested param version - */ - int Get(int step, Param* param); - /** - * Update Param. - * - * @param param - * @param step training step used for updating (e.g., deciding learning rate). - */ - int Update(int step, Param* param); - /** - * Wait for the response of the update/get requests. - * - * @param param - * @param step not used now. - */ - int Collect(int step, Param* param); - /** - * Call Collect() for every param of net - */ - int CollectAll(int step, NeuralNet* net); - /** - * @param[in] step - * @return true if it is time to display training info, e.g., loss; otherwise - * false. - */ - inline bool DisplayNow(int step) const { - return job_conf_.disp_freq() > 0 - && step >= job_conf_.disp_after() - && ((step - job_conf_.disp_after()) % job_conf_.disp_freq() == 0); - } - /** - * @param[in] step - * @return true if it is time to finish the training; otherwise false. - */ - inline bool StopNow(int step) const { - return step >= job_conf_.train_steps(); - } - /** - * @param[in] step - * @return true if it is time to do checkpoint Param objects; otherwise false. - */ - inline bool CheckpointNow(int step) const { - return job_conf_.checkpoint_freq() > 0 - && step >= job_conf_.checkpoint_after() - && ((step - job_conf_.checkpoint_after()) - % job_conf_.checkpoint_freq() == 0); - } - /** - * @param[in] step - * @return true if it is time to do test over the test dataset. - */ - inline bool TestNow(int step) const { - return job_conf_.test_freq() > 0 - && job_conf_.test_steps() > 0 - && step >= job_conf_.test_after() - && ((step - job_conf_.test_after()) % job_conf_.test_freq() == 0); - } - /** - * @param[in] step - * @return true if it is time to do test over the validation dataset. - */ - inline bool ValidateNow(int step) const { - return job_conf_.validate_freq() > 0 - && job_conf_.validate_steps() > 0 - && step >= job_conf_.validate_after() - && ((step - job_conf_.validate_after()) % job_conf_.validate_freq() == 0); - } - /** - * @return a vector with pointers to all neural nets. - */ - const std::vector<NeuralNet*> GetNets() const { - return std::vector<NeuralNet*> {train_net_, val_net_, test_net_}; - } - /** - * @return training net. - */ - inline NeuralNet* train_net() const { - return train_net_; - } - /** - * @return group ID - */ - inline int grp_id() const { return grp_id_; } - /** - * @reutrn worker ID within the worker group. - */ - inline int id() const { return id_; } - - protected: - int grp_id_ = -1, id_ = -1; - int step_ = 0; - JobProto job_conf_; - NeuralNet* train_net_ = nullptr; - NeuralNet* test_net_ = nullptr; - NeuralNet* val_net_ = nullptr; - Dealer* dealer_ = nullptr; - // bridge layer related - Dealer* bridge_dealer_ = nullptr; - std::unordered_map<std::string, Layer*> name2bridge_; -}; - -class BPWorker: public Worker { - public: - void TrainOneBatch(int step, NeuralNet* net) override; - void TestOneBatch(int step, Phase phase, NeuralNet* net) override; - virtual void Forward(int step, Phase phase, NeuralNet* net); - virtual void Backward(int step, NeuralNet* net); -}; - -/** - * Subclass of Worker that implements BPTT (Backpropagation through time) - * algorithm for computing gradients of RNN models. - * Max BPTT/unrolling length is configured by users. - */ -class BPTTWorker: public BPWorker { - public: - void Forward(int step, Phase phase, NeuralNet* net) override; - void Backward(int step, NeuralNet* net) override; - void Display(int flag, const std::string& prefix, NeuralNet* net) override; - - private: - /* - * indicator used in truncted BPTT, which feeds the hidden state of the last - * unrolled unit to the first unit in Forward() for the next iteration. - * currently always feed the last hidden state to the first. - */ - bool full_state_ = false; - //!< indicator used for the starting of a new pass of the dataset. - bool begin_ = false; -}; -/** - * Subclass of Worker that implements the Contrastive Divergence algorithm for - * computing the gradients of paramters of energy models. - */ -class CDWorker: public Worker { - public: - void TrainOneBatch(int step, NeuralNet* net) override; - void TestOneBatch(int step, Phase phase, NeuralNet* net) override; -}; - -inline int BlobTrgt(int grp, int layer) { - return (grp << 16) | layer; -} - -inline int BlobGrp(int blob_trgt) { - return blob_trgt >> 16; -} - -inline int BlobLayer(int blob_trgt) { - static int mask = (1 << 16) -1; - return blob_trgt & mask; -} - -} // namespace singa - -#endif // SINGA_WORKER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/comm/msg.cc ---------------------------------------------------------------------- diff --git a/src/comm/msg.cc b/src/comm/msg.cc deleted file mode 100644 index 8128b46..0000000 --- a/src/comm/msg.cc +++ /dev/null @@ -1,265 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/comm/msg.h" - -#include <glog/logging.h> -#include <stdarg.h> - -namespace singa { - -Msg::~Msg() { -#ifdef USE_ZMQ - if (msg_ != nullptr) - zmsg_destroy(&msg_); - frame_ = nullptr; -#else - for (auto& frame : frames_) - delete static_cast<char*>(frame.first); -#endif -} - -Msg::Msg() { -#ifdef USE_ZMQ - msg_ = zmsg_new(); -#endif -} - -Msg::Msg(const Msg& msg) { - src_ = msg.src_; - dst_ = msg.dst_; - type_ = msg.type_; - trgt_val_ = msg.trgt_val_; - trgt_version_ = msg.trgt_version_; -#ifdef USE_ZMQ - msg_ = zmsg_dup(msg.msg_); -#endif -} - -Msg::Msg(int src, int dst) { - src_ = src; - dst_ = dst; -#ifdef USE_ZMQ - msg_ = zmsg_new(); -#endif -} - -void Msg::SwapAddr() { - std::swap(src_, dst_); -} - -#ifdef USE_ZMQ -int Msg::size() const { - return zmsg_content_size(msg_); -} -void Msg::AddFrame(const void* addr, int nBytes) { - zmsg_addmem(msg_, addr, nBytes); -} -int Msg::FrameSize() { - return zframe_size(frame_); -} -char* Msg::FrameStr() { - return zframe_strdup(frame_); -} -void* Msg::FrameData() { - return zframe_data(frame_); -} -bool Msg::NextFrame() { - frame_ = zmsg_next(msg_); - return frame_ != nullptr; -} -void Msg::FirstFrame() { - frame_ = zmsg_first(msg_); -} -void Msg::LastFrame() { - frame_ = zmsg_last(msg_); -} -void Msg::ParseFromZmsg(zmsg_t* msg) { - char* tmp = zmsg_popstr(msg); - sscanf(tmp, "%d %d %d %d %d", - &src_, &dst_, &type_, &trgt_val_, &trgt_version_); - frame_ = zmsg_first(msg); - msg_ = msg; -} - -zmsg_t* Msg::DumpToZmsg() { - zmsg_pushstrf(msg_, "%d %d %d %d %d", - src_, dst_, type_, trgt_val_, trgt_version_); - zmsg_t *tmp = msg_; - msg_ = nullptr; - return tmp; -} - -#else - -int Msg::size() const { - int s = 0; - for (auto& entry : frames_) - s += entry.second; - return s; -} - -void Msg::AddFrame(const void* addr, int nBytes) { - char* tmp = new char[nBytes]; - memcpy(tmp, addr, nBytes); - frames_.push_back(std::make_pair(tmp, nBytes)); -} - -int Msg::FrameSize() { - return frames_.at(idx_).second; -} - -char* Msg::FrameStr() { - char* ret = new char[frames_.at(idx_).second]; - memcpy(ret, static_cast<char*>(frames_.at(idx_).first), - frames_.at(idx_).second); - return ret; -} - -void* Msg::FrameData() { - return frames_.at(idx_).first; -} - -bool Msg::NextFrame() { - idx_++; -// LOG(ERROR) << "idx " << idx_ << " vs size " << frames_.size(); - return idx_ < frames_.size(); -} - -void Msg::FirstFrame() { - idx_ = 0; -} - -void Msg::LastFrame() { - idx_ = frames_.size() - 1; -} - -#endif - -// frame marker indicating this frame is serialize like printf -#define FMARKER "*singa*" - -#define kMaxFrameLen 2048 - -int Msg::AddFormatFrame(const char *format, ...) { - va_list argptr; - va_start(argptr, format); - int size = strlen(FMARKER); - char dst[kMaxFrameLen]; - memcpy(dst, FMARKER, size); - dst[size++] = 0; - while (*format) { - if (*format == 'i') { - int x = va_arg(argptr, int); - dst[size++] = 'i'; - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else if (*format == 'f') { - float x = static_cast<float> (va_arg(argptr, double)); - dst[size++] = 'f'; - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else if (*format == '1') { - uint8_t x = va_arg(argptr, int); - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else if (*format == '2') { - uint16_t x = va_arg(argptr, int); - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else if (*format == '4') { - uint32_t x = va_arg(argptr, uint32_t); - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else if (*format == 's') { - char* x = va_arg(argptr, char *); - dst[size++] = 's'; - memcpy(dst + size, x, strlen(x)); - size += strlen(x); - dst[size++] = 0; - } else if (*format == 'p') { - void* x = va_arg(argptr, void *); - dst[size++] = 'p'; - memcpy(dst + size, &x, sizeof(x)); - size += sizeof(x); - } else { - LOG(ERROR) << "Unknown format " << *format; - } - format++; - CHECK_LE(size, kMaxFrameLen); - } - va_end(argptr); - AddFrame(dst, size); - return size; -} - -int Msg::ParseFormatFrame(const char *format, ...) { - va_list argptr; - va_start(argptr, format); - char* src = FrameStr(); - CHECK_STREQ(FMARKER, src); - int size = strlen(FMARKER) + 1; - while (*format) { - if (*format == 'i') { - int *x = va_arg(argptr, int *); - CHECK_EQ(src[size++], 'i'); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else if (*format == 'f') { - float *x = va_arg(argptr, float *); - CHECK_EQ(src[size++], 'f'); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else if (*format == '1') { - uint8_t *x = va_arg(argptr, uint8_t *); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else if (*format == '2') { - uint16_t *x = va_arg(argptr, uint16_t *); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else if (*format == '4') { - uint32_t *x = va_arg(argptr, uint32_t *); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else if (*format == 's') { - char* x = va_arg(argptr, char *); - CHECK_EQ(src[size++], 's'); - int len = strlen(src + size); - memcpy(x, src + size, len); - x[len] = 0; - size += len + 1; - } else if (*format == 'p') { - void** x = va_arg(argptr, void **); - CHECK_EQ(src[size++], 'p'); - memcpy(x, src + size, sizeof(*x)); - size += sizeof(*x); - } else { - LOG(ERROR) << "Unknown format type " << *format; - } - format++; - } - va_end(argptr); - // delete src; - return size; -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/comm/socket.cc ---------------------------------------------------------------------- diff --git a/src/comm/socket.cc b/src/comm/socket.cc deleted file mode 100644 index eba6a0c..0000000 --- a/src/comm/socket.cc +++ /dev/null @@ -1,146 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include "singa/comm/socket.h" - -#include <glog/logging.h> - -namespace singa { -const int TIME_OUT = 2; // max blocking time in milliseconds. -std::unordered_map<int, SafeQueue<Msg*>> msgQueues; -Dealer::~Dealer() { -#ifdef USE_ZMQ - zsock_destroy(&dealer_); -#endif -} - -Dealer::Dealer(int id) : id_ (id) { - msgQueues[id]; -} - -int Dealer::Connect(const std::string& endpoint) { - if (endpoint.length() > 0) { -#ifdef USE_ZMQ - dealer_ = zsock_new(ZMQ_DEALER); - CHECK_NOTNULL(dealer_); - CHECK_EQ(zsock_connect(dealer_, "%s", endpoint.c_str()), 0); -#else - LOG(FATAL) << "No message passing lib is linked"; -#endif - endpoint_ = endpoint; - } - return 1; -} - -int Dealer::Send(Msg** msg) { - if (endpoint_.length()) { -#ifdef USE_ZMQ - zmsg_t* zmsg = (*msg)->DumpToZmsg(); - zmsg_send(&zmsg, dealer_); -#else - LOG(FATAL) << "No message passing lib is linked"; -#endif - delete *msg; - *msg = nullptr; - } else { - msgQueues.at(-1).Push(*msg); - } - return 1; -} - -Msg* Dealer::Receive(int timeout) { - Msg* msg = nullptr; - if (timeout > 0) { - if (!msgQueues.at(id_).Pop(msg, timeout)) - return nullptr; - } else { - msgQueues.at(id_).Pop(msg); - } - msg->FirstFrame(); - return msg; -} - -Router::~Router() { -#ifdef USE_ZMQ - zsock_destroy(&router_); -#endif -} - -Router::Router() { - msgQueues[-1]; -} - -int Router::Bind(const std::string& endpoint) { - int port = -1; - if (endpoint.length() > 0) { - endpoint_ = endpoint; -#ifdef USE_ZMQ - router_ = zsock_new(ZMQ_ROUTER); - CHECK_NOTNULL(router_); - port = zsock_bind(router_, "%s", endpoint.c_str()); - CHECK_NE(port, -1) << endpoint; - LOG(INFO) << "bind successfully to " << zsock_endpoint(router_); - poller_ = zpoller_new(router_); -#else - LOG(FATAL) << "No message passing lib is linked"; -#endif - } - return port; -} - -int Router::Send(Msg **msg) { - int dstid = (*msg)->dst(); - if (msgQueues.find(dstid) != msgQueues.end()) { - msgQueues.at(dstid).Push(*msg); - } else { - LOG(FATAL) << "The dst queue not exist for dstid = " << dstid; - } - return 1; -} - -Msg* Router::Receive(int timeout) { - Msg* msg = nullptr; - if (timeout == 0) - timeout = TIME_OUT; - while (msg == nullptr) { -#ifdef USE_ZMQ - if (router_ != nullptr) { - zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout)); - if (sock != NULL) { - zmsg_t* zmsg = zmsg_recv(router_); - if (zmsg == nullptr) { - LOG(ERROR) << "Connection broken!"; - exit(0); - } - zframe_t* dealer = zmsg_pop(zmsg); - zframe_destroy(&dealer); - Msg* remote_msg = new Msg(); - remote_msg->ParseFromZmsg(zmsg); - msgQueues.at(-1).Push(remote_msg); - } - } -#endif - msgQueues.at(-1).Pop(msg, timeout * 10); - } - msg->FirstFrame(); - return msg; -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/core/tensor.cc ---------------------------------------------------------------------- diff --git a/src/core/tensor.cc b/src/core/tensor.cc new file mode 100644 index 0000000..d1a7d2c --- /dev/null +++ b/src/core/tensor.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +namespace singa { + + + + + + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/driver.cc ---------------------------------------------------------------------- diff --git a/src/driver.cc b/src/driver.cc deleted file mode 100644 index 2e38e53..0000000 --- a/src/driver.cc +++ /dev/null @@ -1,402 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/driver.h" - -#include <glog/logging.h> -#include <set> -#include <string> -#include <vector> -#include "singa/comm/socket.h" -#include "singa/neuralnet/layer.h" -#include "singa/utils/common.h" -#include "singa/utils/tinydir.h" -#include "singa/utils/cluster.h" -#include "singa/utils/context.h" -#include "singa/proto/job.pb.h" -#include "singa/server.h" -#include "singa/stub.h" -#include "singa/worker.h" - -#include "singa/neuralnet/connection_layer.h" -#include "singa/neuralnet/input_layer.h" -#include "singa/neuralnet/loss_layer.h" -#include "singa/neuralnet/neuron_layer.h" -#include "singa/neuralnet/output_layer.h" - -extern "C" void openblas_set_num_threads(int num); - -namespace singa { - -void Driver::Init(int argc, char **argv) { - // unique job ID generated from singa-run.sh, passed in as "-singa_job <id>" - int arg_pos = ArgPos(argc, argv, "-singa_job"); - job_id_ = (arg_pos != -1) ? atoi(argv[arg_pos + 1]) : -1; - // global signa conf passed by singa-run.sh as "-singa_conf <path>" - arg_pos = ArgPos(argc, argv, "-singa_conf"); - if (arg_pos != -1) - ReadProtoFromTextFile(argv[arg_pos + 1], &singa_conf_); - // set log path - if (singa_conf_.has_log_dir()) - SetupLog(singa_conf_.log_dir(), "driver"); - // job conf passed by users as "-conf <path>" - arg_pos = ArgPos(argc, argv, "-conf"); - if (arg_pos != -1) - ReadProtoFromTextFile(argv[arg_pos + 1], &job_conf_); - arg_pos = ArgPos(argc, argv, "-host"); - if (arg_pos != -1) - hostip_ = argv[arg_pos + 1]; - else - hostip_ = "localhost"; - - // register layers - - // input and output layers - RegisterLayer<RecordInputLayer, int>(kRecordInput); - RegisterLayer<CSVInputLayer, int>(kCSVInput); - RegisterLayer<ImagePreprocessLayer, int>(kImagePreprocess); - RegisterLayer<RecordOutputLayer, int>(kRecordOutput); - RegisterLayer<CSVOutputLayer, int>(kCSVOutput); - RegisterLayer<CharRNNInputLayer, int>(kCharRNN); - RegisterLayer<RNNLabelLayer, int>(kRNNLabel); - RegisterLayer<OneHotLayer, int>(kOneHot); - RegisterLayer<CharRNNOutputLayer, int>(kCharRNNOutput); - - // connection layers - RegisterLayer<BridgeDstLayer, int>(kBridgeDst); - RegisterLayer<BridgeSrcLayer, int>(kBridgeSrc); - RegisterLayer<ConcateLayer, int>(kConcate); - RegisterLayer<SliceLayer, int>(kSlice); - RegisterLayer<SplitLayer, int>(kSplit); - RegisterLayer<RNNDummyLayer, int>(kRNNDummy); - - RegisterLayer<AccuracyLayer, int>(kAccuracy); - RegisterLayer<ArgSortLayer, int>(kArgSort); - RegisterLayer<ConvolutionLayer, int>(kConvolution); - RegisterLayer<CConvolutionLayer, int>(kCConvolution); - RegisterLayer<CPoolingLayer, int>(kCPooling); - RegisterLayer<EmbeddingLayer, int>(kEmbedding); - RegisterLayer<ActivationLayer, int>(kActivation); - -#ifdef USE_CUDNN - RegisterLayer<CudnnActivationLayer, int>(kCudnnActivation); - RegisterLayer<CudnnConvLayer, int>(kCudnnConv); - RegisterLayer<CudnnPoolLayer, int>(kCudnnPool); - RegisterLayer<CudnnLRNLayer, int>(kCudnnLRN); - RegisterLayer<CudnnSoftmaxLayer, int>(kCudnnSoftmax); - RegisterLayer<CudnnSoftmaxLossLayer, int>(kCudnnSoftmaxLoss); -#if CUDNN_MAJOR == 4 - RegisterLayer<CudnnBMLayer, int>(kCudnnBM); -#endif -#endif - - RegisterLayer<DropoutLayer, int>(kDropout); - RegisterLayer<DummyLayer, int>(kDummy); - RegisterLayer<EuclideanLossLayer, int>(kEuclideanLoss); - RegisterLayer<InnerProductLayer, int>(kInnerProduct); - RegisterLayer<LabelLayer, int>(kLabel); - RegisterLayer<LRNLayer, int>(kLRN); - RegisterLayer<MnistLayer, int>(kMnist); - RegisterLayer<PoolingLayer, int>(kPooling); - RegisterLayer<RBMHidLayer, int>(kRBMHid); - RegisterLayer<RBMVisLayer, int>(kRBMVis); - RegisterLayer<RGBImageLayer, int>(kRGBImage); - RegisterLayer<ReLULayer, int>(kReLU); - RegisterLayer<ShardDataLayer, int>(kShardData); - RegisterLayer<SigmoidLayer, int>(kSigmoid); - RegisterLayer<SoftmaxLossLayer, int>(kSoftmaxLoss); - RegisterLayer<STanhLayer, int>(kSTanh); - RegisterLayer<SoftmaxLayer, int>(kSoftmax); - RegisterLayer<GRULayer, int>(kGRU); - RegisterLayer<BMLayer, int>(kBM); - -#ifdef USE_LMDB - RegisterLayer<LMDBDataLayer, int>(kLMDBData); -#endif - - // register updaters - RegisterUpdater<AdaGradUpdater>(kAdaGrad); - RegisterUpdater<NesterovUpdater>(kNesterov); - RegisterUpdater<RMSPropUpdater>(kRMSProp); - RegisterUpdater<AdaDeltaUpdater>(kAdaDelta); - RegisterUpdater<AdamUpdater>(kAdam); - RegisterUpdater<AdamMaxUpdater>(kAdamMax); - - RegisterUpdater<SGDUpdater>(kSGD); - - // register learning rate change methods - RegisterLRGenerator<LRGenerator>(kFixed); - RegisterLRGenerator<FixedStepLRGen>(kFixedStep); - RegisterLRGenerator<StepLRGen>(kStep); - RegisterLRGenerator<LinearLRGen>(kLinear); - RegisterLRGenerator<ExpLRGen>(kExponential); - RegisterLRGenerator<InvLRGen>(kInverse); - RegisterLRGenerator<InvTLRGen>(kInverseT); - - // register workers - RegisterWorker<BPWorker>(kBP); - RegisterWorker<BPTTWorker>(kBPTT); - RegisterWorker<CDWorker>(kCD); - - // register params - RegisterParam<Param>(0); - - // register param init methods - RegisterParamGenerator<ParamGenerator>(kConstant); - RegisterParamGenerator<GaussianGen>(kGaussian); - RegisterParamGenerator<UniformGen>(kUniform); - RegisterParamGenerator<GaussianSqrtFanInGen>(kGaussianSqrtFanIn); - RegisterParamGenerator<UniformSqrtFanInGen>(kUniformSqrtFanIn); - RegisterParamGenerator<UniformSqrtFanInOutGen>(kUniformSqrtFanInOut); -} - -void Driver::InitLog(char* arg) { - google::InitGoogleLogging(arg); -} - -void Driver::Train(bool resume, const std::string str) { - JobProto job_conf; - job_conf.ParseFromString(str); - Train(resume, job_conf); -} - -void Driver::Train(bool resume, const JobProto& job_conf) { - if (singa_conf_.has_log_dir()) - SetupLog(singa_conf_.log_dir(), - std::to_string(job_id_) + "-" + job_conf.name()); - Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); - tinydir_dir workspace; - if (tinydir_open(&workspace, job_conf.cluster().workspace().c_str()) == -1) - LOG(FATAL) << "workspace not exist: " << job_conf.cluster().workspace(); - if (job_conf.num_openblas_threads() != 1) - LOG(WARNING) << "openblas luanches " - << job_conf.num_openblas_threads() << " threads"; - openblas_set_num_threads(job_conf.num_openblas_threads()); - - JobProto job; - job.CopyFrom(job_conf); - if (resume) - SetupForResume(&job); - job.set_id(job_id_); - Train(job); -} - -void Driver::Test(const std::string str) { - JobProto job_conf; - job_conf.ParseFromString(str); - Test(job_conf); -} - -void Driver::Test(const JobProto& job_conf) { - Cluster::Setup(job_id_, singa_conf_, job_conf.cluster()); - Cluster::Get()->Register(getpid(), "localhost"); - // TODO(wangwei) extend to a group with multiple workers - auto worker = Worker::Create(job_conf.train_one_batch()); - worker->Setup(0, 0, job_conf, nullptr, nullptr, nullptr); - auto net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); - WriteStringToTextFile(Cluster::Get()->vis_folder() + "/test_net.json", - net->ToGraph(true).ToJson()); - vector<string> paths; - for (const auto& p : job_conf.checkpoint_path()) - paths.push_back(p); - net->Load(paths); - worker->Test(job_conf.test_steps(), kTest, net); -} - -void Driver::Train(const JobProto& job_conf) { - auto cluster = Cluster::Get(); - int nserver_grps = cluster->nserver_groups(); - int grp_size = cluster->nworkers_per_group(); - Stub stub; - // no need to create Stub if there is only a single worker without servers, - // i.e., the training will be conducted by the single worker. - if (grp_size > 1 || nserver_grps > 0) { - auto router = new Router(); - if (cluster->nprocs() > 1) { - int binding_port = router->Bind("tcp://" + hostip_ + ":*"); - cluster->Register(getpid(), hostip_ + ":" + std::to_string(binding_port)); - } else { - cluster->Register(getpid(), hostip_ + ":0"); // fake endpoint - } - stub.set_router(router); - } - - NeuralNet* net = NeuralNet::Create(job_conf.neuralnet(), kTrain, grp_size); - WriteStringToTextFile(cluster->vis_folder() + "/train_net.json", - net->ToGraph(true).ToJson()); - const vector<Worker*> workers = CreateWorkers(job_conf, net); - const vector<Server*> servers = CreateServers(job_conf, net); - - vector<std::thread> threads; - for (auto server : servers) - threads.push_back(std::thread(&Server::Run, server)); - int gpu = 0; - auto context = Singleton<Context>::Instance(); - // CHECK_LE(workers.size(), job_conf.gpu_size()); - for (auto worker : workers) { - threads.push_back(std::thread(&Worker::Run, worker)); - int device_id = -1; - if (gpu < job_conf.gpu_size()) { - device_id = job_conf.gpu(gpu++); - } - context->SetupDevice(threads.back().get_id(), device_id); - } - if (grp_size > 1 || nserver_grps > 0) { - int nservers_per_grp = cluster->nservers_per_group(); - int lcm = LeastCommonMultiple(nservers_per_grp, nserver_grps); - auto slices = Param::ComputeSlices(lcm, net->params()); - auto slice2server = PartitionSlices(nservers_per_grp, slices); - stub.Run(slice2server, workers, servers); - } - - for (auto& thread : threads) - thread.join(); - for (auto server : servers) - delete server; - delete net; - std::set<NeuralNet*> deleted{net, nullptr}; - for (auto worker : workers) { - for (auto ptr : worker->GetNets()) - if (deleted.find(ptr) == deleted.end()) { - delete ptr; - deleted.insert(ptr); - } - delete worker; - } -} - -void Driver::SetupForResume(JobProto* job_conf) { - tinydir_dir dir; - std::string folder = Cluster::Get()->checkpoint_folder(); - tinydir_open(&dir, folder.c_str()); - int latest_step = 0; - // there would be multi checkpoint files (from diff workers) for one step - vector<std::string> ck_files; - // iterate all files to get the files for the last checkpoint - while (dir.has_next) { - tinydir_file file; - tinydir_readfile(&dir, &file); - tinydir_next(&dir); - char* ch = strstr(file.name, "step"); - if (ch == nullptr) { - if (file.name[0] != '.') - LOG(INFO) << "Irregular file in checkpoint folder: " << file.name; - continue; - } - LOG(INFO) << "Add checkpoint file for resume: " << ch; - int step = atoi(ch+4); - if (step == latest_step) { - ck_files.push_back(file.name); - } else if (step > latest_step) { - latest_step = step; - ck_files.clear(); - ck_files.push_back(std::string(file.name)); - } - } - if (latest_step > 0) { - job_conf->set_step(latest_step); - if (!job_conf->has_reset_param_version()) - job_conf->set_reset_param_version(false); - job_conf->clear_checkpoint_path(); - for (auto ck_file : ck_files) - job_conf->add_checkpoint_path(folder + "/" + ck_file); - } - tinydir_close(&dir); -} - -const vector<Worker*> Driver::CreateWorkers(const JobProto& job_conf, - NeuralNet* net) { - auto cluster = Cluster::Get(); - vector<Worker*> workers; - if (!cluster->has_worker()) return workers; - int wgrp_size = cluster->nworkers_per_group(); - int nservers_per_grp = cluster->nservers_per_group(); - int nserver_grps = cluster->nserver_groups(); - int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); - const vector<int> rng = cluster->ExecutorRng(cluster->procs_id(), - cluster->nworkers_per_group(), cluster->nworkers_per_procs()); - int gstart = rng[0], gend = rng[1], wstart = rng[2], wend = rng[3]; - for (int gid = gstart; gid < gend; gid++) { - NeuralNet* train_net = nullptr, *test_net = nullptr, *val_net = nullptr; - if (gid == gstart) { - train_net = net; - Param::SliceParams(lcm, train_net->params()); - // test and validation are performed by the 1st group. - if (gid == 0 && job_conf.test_steps() > 0) { - test_net = NeuralNet::Create(job_conf.neuralnet(), kTest, 1); - test_net->ShareParamsFrom(train_net, false); - } - if (gid == 0 && job_conf.validate_steps() > 0) { - val_net = NeuralNet::Create(job_conf.neuralnet(), kVal, 1); - val_net->ShareParamsFrom(train_net, false); - } - } else { - train_net = NeuralNet::Create(job_conf.neuralnet(), kTrain, wgrp_size); - if (cluster->share_memory()) { - train_net->ShareParamsFrom(net, true); - } else { - Param::SliceParams(lcm, train_net->params()); - } - } - for (int wid = wstart; wid < wend; wid++) { - auto *worker = Worker::Create(job_conf.train_one_batch()); - // TODO(wangwei) extend to test among workers in a grp - if (wid == 0) - worker->Setup(gid, wid, job_conf, train_net, val_net, test_net); - else - worker->Setup(gid, wid, job_conf, train_net, nullptr, nullptr); - workers.push_back(worker); - } - } - return workers; -} - -const vector<Server*> Driver::CreateServers(const JobProto& job_conf, - NeuralNet* net) { - auto cluster = Cluster::Get(); - vector<Server*> servers; - if (!cluster->has_server()) return servers; - int nservers_per_grp = cluster->nservers_per_group(); - int nserver_grps = cluster->nserver_groups(); - int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); - auto slices = Param::ComputeSlices(lcm, net->params()); - // partition among server groups, each group maintains one sub-set for sync - auto slice2group = PartitionSlices(nserver_grps, slices); - // partition within one server group, each server updates for one sub-set - auto slice2server = PartitionSlices(nservers_per_grp, slices); - - int server_procs = cluster->procs_id(); - // if true, server procs (logical) id starts after worker procs - if (cluster->server_worker_separate()) - server_procs -= cluster->nworker_procs(); - const vector<int> rng = cluster->ExecutorRng(server_procs, - cluster->nservers_per_group(), cluster->nservers_per_procs()); - int gstart = rng[0], gend = rng[1], start = rng[2], end = rng[3]; - for (int gid = gstart; gid < gend; gid++) { - for (int sid = start; sid < end; sid++) { - auto server = new Server(gid, sid, job_conf, slice2group, slice2server); - servers.push_back(server); - } - } - return servers; -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/hdfsfile.cc ---------------------------------------------------------------------- diff --git a/src/io/hdfsfile.cc b/src/io/hdfsfile.cc deleted file mode 100644 index e093d81..0000000 --- a/src/io/hdfsfile.cc +++ /dev/null @@ -1,135 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, - -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/io/hdfsfile.h" - -#include <glog/logging.h> -#include <iostream> -namespace singa { -namespace io { - -HDFSFile::HDFSFile(const std::string& path, Mode mode): path_(path), - mode_(mode) { - // check that path starts with hdfs:// - CHECK_EQ(path.find("hdfs://"), 0); - - // extract namenode from path - int path_idx = path.find_first_of("/", 7); - int colon_idx = path.find_first_of(":", 7); - std::string namenode = path.substr(7, colon_idx-7); - int port = atoi(path.substr(colon_idx+1, path_idx-colon_idx-1).c_str()); - std::string filepath = path.substr(path_idx); - - // connect to HDFS - fs_ = hdfsConnect(namenode.c_str(), port); - CHECK_NOTNULL(fs_); - - if (mode == HDFSFile::kRead) { - file_ = hdfsOpenFile(fs_, filepath.c_str(), O_RDONLY, 0, 0, 0); - } else { - // check if the directory exists, create it if not. - int file_idx = path.find_last_of("/"); - std::string hdfs_directory_path = path.substr(path_idx, file_idx-path_idx); - if (hdfsExists(fs_, hdfs_directory_path.c_str()) == -1) - CHECK_EQ(hdfsCreateDirectory(fs_, hdfs_directory_path.c_str()), 0); - file_ = hdfsOpenFile(fs_, filepath.c_str(), O_WRONLY, 0, 0, 0); - } - - CHECK_NOTNULL(file_); -} - -HDFSFile::~HDFSFile() { - if (mode_ != HDFSFile::kRead) - Flush(); - hdfsCloseFile(fs_, file_); -} - -#ifdef USE_PROTOBUF -bool HDFSFile::Next(google::protobuf::Message* val) { - // read from file_, then turns it to a message - // red size, then content - int size; - if (hdfsRead(fs_, file_, &size, sizeof(int)) <= 0) - return false; - char *temp_buf = reinterpret_cast<char*>(malloc(size*sizeof(char))); - CHECK(hdfsRead(fs_, file_, temp_buf, size)); - val->ParseFromArray(temp_buf, size); - free(temp_buf); - return true; -} - -bool HDFSFile::Insert(const google::protobuf::Message& val) { - std::string str; - val.SerializeToString(&str); - return Insert(str); -} -#endif - -bool HDFSFile::Next(std::string* val) { - char size_buf[sizeof(int)]; - // a hack to read across blocks. The first read my return in complete data, - // so try the second read. - int read_size_size = hdfsRead(fs_, file_, size_buf, sizeof(int)); - - if (read_size_size == 0) - return false; - - if (read_size_size < (static_cast<int>(sizeof(int)))) - CHECK_EQ(hdfsRead(fs_, file_, size_buf+read_size_size, - sizeof(int)-read_size_size), - sizeof(int)-read_size_size); - int size; - memcpy(&size, size_buf, sizeof(int)); - - char *temp_buf = reinterpret_cast<char*>(malloc(size*sizeof(char))); - - int read_size = hdfsRead(fs_, file_, temp_buf, size); - if (read_size < size) - CHECK_EQ(hdfsRead(fs_, file_, temp_buf+read_size, size-read_size), - size-read_size); - val->clear(); - val->append(temp_buf, size); - free(temp_buf); - return true; -} - -// append one record to the end of the file -bool HDFSFile::Insert(const std::string& val) { - CHECK(mode_ != HDFSFile::kRead); - // write length, then content - int size = val.length(); - CHECK_EQ(hdfsWrite(fs_, file_, &size, sizeof(int)), sizeof(int)); - CHECK_EQ(hdfsWrite(fs_, file_, val.c_str(), val.length()), val.length()); - return true; -} - -void HDFSFile::Seek(int offset) { - CHECK_EQ(mode_, kRead); - // seek back to the parition offset - CHECK_EQ(hdfsSeek(fs_, file_, offset), 0); -} - -void HDFSFile::Flush() { - CHECK_EQ(hdfsFlush(fs_, file_), 0); -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/hdfsfile_store.cc ---------------------------------------------------------------------- diff --git a/src/io/hdfsfile_store.cc b/src/io/hdfsfile_store.cc deleted file mode 100644 index 9464169..0000000 --- a/src/io/hdfsfile_store.cc +++ /dev/null @@ -1,75 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <glog/logging.h> -#include "singa/io/hdfs_store.h" - -namespace singa { -namespace io { - -bool HDFSStore::Open(const std::string& source, Mode mode) { - CHECK(file_ == nullptr); - if (mode == kRead) - file_ = new HDFSFile(source, HDFSFile::kRead); - else if (mode == kCreate) - file_ = new HDFSFile(source, HDFSFile::kCreate); - else if (mode == kAppend) - file_ = new HDFSFile(source, HDFSFile::kAppend); - mode_ = mode; - return file_ != nullptr; -} - -void HDFSStore::Close() { - if (file_ != nullptr) - delete file_; - file_ = nullptr; -} - -bool HDFSStore::Read(std::string* key, std::string* value) { - CHECK_EQ(mode_, kRead); - CHECK(file_ != nullptr); - return file_->Next(value); -} - -void HDFSStore::SeekToFirst() { - CHECK_EQ(mode_, kRead); - CHECK(file_ != nullptr); - file_->Seek(0); -} - -void HDFSStore::Seek(int offset) { - file_->Seek(offset); -} - -bool HDFSStore::Write(const std::string& key, const std::string& value) { - CHECK_NE(mode_, kRead); - CHECK(file_ != nullptr); - return file_->Insert(value); -} - -void HDFSStore::Flush() { - CHECK_NE(mode_, kRead); - CHECK(file_!= nullptr); - file_->Flush(); -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/kvfile.cc ---------------------------------------------------------------------- diff --git a/src/io/kvfile.cc b/src/io/kvfile.cc deleted file mode 100644 index 3120be1..0000000 --- a/src/io/kvfile.cc +++ /dev/null @@ -1,219 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/io/kvfile.h" - -#include <glog/logging.h> - -namespace singa { -namespace io { - -KVFile::KVFile(const std::string& path, Mode mode, int capacity) : -path_(path), mode_(mode), capacity_(capacity) { - buf_ = new char[capacity]; - switch (mode) { - case KVFile::kRead: - fdat_.open(path_, std::ios::in | std::ios::binary); - if (!fdat_.is_open()) { - // path may be a directory - path_ = path + "/shard.dat"; - fdat_.open(path_, std::ios::in | std::ios::binary); - } - CHECK(fdat_.is_open()) << "Cannot create file " << path_; - break; - case KVFile::kCreate: - fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc); - CHECK(fdat_.is_open()) << "Cannot create file " << path_; - break; - case KVFile::kAppend: - fdat_.open(path_, std::ios::in | std::ios::binary); - if (!fdat_.is_open()) { - // path may be a directory - path_ = path + "/shard.dat"; - fdat_.open(path_, std::ios::in | std::ios::binary); - } - CHECK(fdat_.is_open()) << "Cannot open file " << path_; - fdat_.close(); - { - int last_tuple = PrepareForAppend(path_); - fdat_.open(path_, std::ios::binary | std::ios::out - | std::ios::in | std::ios::ate); - fdat_.seekp(last_tuple); - } - break; - default: - LOG(FATAL) << "unknown model to open KVFile " << mode; - break; - } -} - -KVFile::~KVFile() { - if (mode_ != kRead) - Flush(); - delete[] buf_; - fdat_.close(); -} -#ifdef USE_PROTOBUF -bool KVFile::Next(std::string* key, google::protobuf::Message* val) { - int vallen = Next(key); - if (vallen == 0) return false; - val->ParseFromArray(buf_ + offset_, vallen); - offset_ += vallen; - return true; -} - -bool KVFile::Insert(const std::string& key, - const google::protobuf::Message& val) { - std::string str; - val.SerializeToString(&str); - return Insert(key, str); -} -#endif - -bool KVFile::Next(std::string *key, std::string* val) { - int vallen = Next(key); - if (vallen == 0) return false; - val->clear(); - for (int i = 0; i < vallen; ++i) - val->push_back(buf_[offset_ + i]); - offset_ += vallen; - return true; -} - -// insert one complete tuple -bool KVFile::Insert(const std::string& key, const std::string& val) { - if (keys_.find(key) != keys_.end() || val.size() == 0) - return false; - int size = key.size() + val.size() + 2*sizeof(size_t); - if (bufsize_ + size > capacity_) { - fdat_.write(buf_, bufsize_); - bufsize_ = 0; - CHECK_LE(size, capacity_) << "Tuple size is larger than capacity " - << "Try a larger capacity size"; - } - *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size(); - bufsize_ += sizeof(size_t); - memcpy(buf_ + bufsize_, key.data(), key.size()); - bufsize_ += key.size(); - *reinterpret_cast<size_t*>(buf_ + bufsize_) = val.size(); - bufsize_ += sizeof(size_t); - memcpy(buf_ + bufsize_, val.data(), val.size()); - bufsize_ += val.size(); - return true; -} - -void KVFile::SeekToFirst() { - CHECK_EQ(mode_, kRead); - bufsize_ = 0; - offset_ = 0; - fdat_.clear(); - fdat_.seekg(0); - CHECK(fdat_.is_open()) << "Cannot create file " << path_; -} - -void KVFile::Flush() { - fdat_.write(buf_, bufsize_); - fdat_.flush(); - bufsize_ = 0; -} - -int KVFile::Count() { - std::ifstream fin(path_, std::ios::in | std::ios::binary); - CHECK(fdat_.is_open()) << "Cannot create file " << path_; - int count = 0; - while (true) { - size_t len; - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.seekg(len, std::ios_base::cur); - if (!fin.good()) break; - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.seekg(len, std::ios_base::cur); - if (!fin.good()) break; - count++; - } - fin.close(); - return count; -} - -int KVFile::Next(std::string *key) { - key->clear(); - int ssize = sizeof(size_t); - if (!PrepareNextField(ssize)) return 0; - int keylen = *reinterpret_cast<size_t*>(buf_ + offset_); - offset_ += ssize; - if (!PrepareNextField(keylen)) return 0; - for (int i = 0; i < keylen; ++i) - key->push_back(buf_[offset_ + i]); - offset_ += keylen; - if (!PrepareNextField(ssize)) return 0; - int vallen = *reinterpret_cast<size_t*>(buf_ + offset_); - offset_ += ssize; - if (!PrepareNextField(vallen)) return 0; - return vallen; -} - -int KVFile::PrepareForAppend(const std::string& path) { - std::ifstream fin(path, std::ios::in | std::ios::binary); - if (!fin.is_open()) return 0; - int last_tuple_offset = 0; - char buf[256]; - size_t len; - while (true) { - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.read(buf, len); - buf[len] = '\0'; - if (!fin.good()) break; - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.seekg(len, std::ios_base::cur); - if (!fin.good()) break; - keys_.insert(std::string(buf)); - last_tuple_offset = fin.tellg(); - } - fin.close(); - return last_tuple_offset; -} - -// if the buf does not have the next complete field, read data from disk -bool KVFile::PrepareNextField(int size) { - if (offset_ + size > bufsize_) { - bufsize_ -= offset_; - // wangsh: commented, not sure what this check does - // CHECK_LE(bufsize_, offset_); - for (int i = 0; i < bufsize_; ++i) - buf_[i] = buf_[i + offset_]; - offset_ = 0; - if (fdat_.eof()) { - return false; - } else { - fdat_.read(buf_ + bufsize_, capacity_ - bufsize_); - bufsize_ += fdat_.gcount(); - if (size > bufsize_) return false; - } - } - return true; -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/kvfile_store.cc ---------------------------------------------------------------------- diff --git a/src/io/kvfile_store.cc b/src/io/kvfile_store.cc deleted file mode 100644 index a2a40cd..0000000 --- a/src/io/kvfile_store.cc +++ /dev/null @@ -1,76 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/io/kvfile_store.h" - -#include <glog/logging.h> - -namespace singa { -namespace io { - -bool KVFileStore::Open(const std::string& source, Mode mode) { - CHECK(file_ == nullptr); - if (mode == kRead) - file_ = new KVFile(source, KVFile::kRead); - else if (mode == kCreate) - file_ = new KVFile(source, KVFile::kCreate); - else if (mode == kAppend) - file_ = new KVFile(source, KVFile::kAppend); - mode_ = mode; - return file_ != nullptr; -} - -void KVFileStore::Close() { - if (file_ != nullptr) - delete file_; - file_ = nullptr; -} - -bool KVFileStore::Read(std::string* key, std::string* value) { - CHECK_EQ(mode_, kRead); - CHECK(file_ != nullptr); - return file_->Next(key, value); -} - -void KVFileStore::SeekToFirst() { - CHECK_EQ(mode_, kRead); - CHECK(file_ != nullptr); - file_->SeekToFirst(); -} - -void KVFileStore::Seek(int offset) { - LOG(FATAL) << "Operation not supported."; -} - -bool KVFileStore::Write(const std::string& key, const std::string& value) { - CHECK_NE(mode_, kRead); - CHECK(file_ != nullptr); - return file_->Insert(key, value); -} - -void KVFileStore::Flush() { - CHECK_NE(mode_, kRead); - CHECK(file_!= nullptr); - file_->Flush(); -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/store.cc ---------------------------------------------------------------------- diff --git a/src/io/store.cc b/src/io/store.cc deleted file mode 100644 index 4621772..0000000 --- a/src/io/store.cc +++ /dev/null @@ -1,70 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/io/store.h" -#include <glog/logging.h> -#include "singa/io/kvfile_store.h" -#include "singa/io/textfile_store.h" -#ifdef USE_HDFS -#include "singa/io/hdfs_store.h" -#endif - -namespace singa { -namespace io { - -Store* CreateStore(const std::string& backend) { - Store *store = nullptr; - if (backend.compare("textfile") == 0) { - store = new TextFileStore(); - } else if (backend.compare("kvfile") == 0) { - store = new KVFileStore(); - } - -#ifdef USE_LMDB - if (backend == "lmdb") { - store = new LMDBStore(); - } -#endif - -#ifdef USE_OPENCV - if (backend == "imagefolder") { - store = new ImageFolderStore(); - } -#endif - -#ifdef USE_HDFS - if (backend == "hdfsfile") { - store = new HDFSStore(); - } -#endif - - CHECK(store) << "Backend type (" << backend << ") not recognized"; - return store; -} - -Store* OpenStore(const string& backend, const string& path, Mode mode) { - auto store = CreateStore(backend); - store->Open(path, mode); - return store; -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/io/textfile_store.cc ---------------------------------------------------------------------- diff --git a/src/io/textfile_store.cc b/src/io/textfile_store.cc deleted file mode 100644 index 4c2f1b9..0000000 --- a/src/io/textfile_store.cc +++ /dev/null @@ -1,89 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - - -#include "singa/io/textfile_store.h" -#include <glog/logging.h> - -namespace singa { -namespace io { - -bool TextFileStore::Open(const std::string& source, Mode mode) { - if (mode == kRead) - fs_ = new std::fstream(source, std::fstream::in); - else if (mode == kCreate) - fs_ = new std::fstream(source, std::fstream::out); - mode_ = mode; - return fs_->is_open(); -} - -void TextFileStore::Close() { - if (fs_ != nullptr) { - if (fs_->is_open()) { - if (mode_ != kRead) - fs_->flush(); - fs_->close(); - } - delete fs_; - fs_ = nullptr; - } -} - -bool TextFileStore::Read(std::string* key, std::string* value) { - CHECK_EQ(mode_, kRead); - CHECK(fs_ != nullptr); - CHECK(value != nullptr); - CHECK(key != nullptr); - if (!std::getline(*fs_, *value)) { - if (fs_->eof()) - return false; - else - LOG(FATAL) << "error in reading csv file"; - } - *key = std::to_string(lineNo_++); - return true; -} - -void TextFileStore::SeekToFirst() { - CHECK_EQ(mode_, kRead); - CHECK(fs_ != nullptr); - lineNo_ = 0; - fs_->clear(); - fs_->seekg(0); -} - -void TextFileStore::Seek(int offset) { -} - -bool TextFileStore::Write(const std::string& key, const std::string& value) { - CHECK_NE(mode_, kRead); - CHECK(fs_ != nullptr); - // csv store does not write key - *fs_ << value << '\n'; - return true; -} - -void TextFileStore::Flush() { - fs_->flush(); -} - -} // namespace io -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/layer/conv.cc ---------------------------------------------------------------------- diff --git a/src/layer/conv.cc b/src/layer/conv.cc new file mode 100644 index 0000000..d1a7d2c --- /dev/null +++ b/src/layer/conv.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +namespace singa { + + + + + + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/main.cc ---------------------------------------------------------------------- diff --git a/src/main.cc b/src/main.cc deleted file mode 100644 index 0ce7d19..0000000 --- a/src/main.cc +++ /dev/null @@ -1,79 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <glog/logging.h> -#include <iostream> -#include "singa/singa.h" - -/** - * \file main.cc provides an example main function. - * - * Like the main func of Hadoop, it prepares the job configuration and submit it - * to the Driver which starts the training. - * - * Users can define their own main func to prepare the job configuration in - * different ways other than reading it from a configuration file. But the main - * func must call Driver::Init at the beginning, and pass the job configuration - * and resume option to the Driver for job submission. - * - * Optionally, users can register their own implemented subclasses of Layer, - * Updater, etc. through the registration function provided by the Driver. - * - * Users must pass at least one argument to the singa-run.sh, i.e., the job - * configuration file which includes the cluster topology setting. Other fields - * e.g, neuralnet, updater can be configured in main.cc. - * - * TODO - * Add helper functions for users to generate configurations for popular models - * easily, e.g., MLP(layer1_size, layer2_size, tanh, loss); - */ -int main(int argc, char **argv) { - if (argc < 2) { - std::cout << "Args: -conf JOB_CONF [-singa SINGA_CONF] [-job_id JOB_ID] " - << " [-resume|-test]\n" - << "-resume\t resume training from latest checkpoint files\n" - << "-test\t test performance or extract features\n"; - return 0; - } - - // initialize glog before creating the driver - google::InitGoogleLogging(argv[0]); - - // must create driver at the beginning and call its Init method. - singa::Driver driver; - driver.Init(argc, argv); - - // users can register new subclasses of layer, updater, etc. - - // get the job conf, and custmize it if need - singa::JobProto jobConf = driver.job_conf(); - - if (singa::ArgPos(argc, argv, "-test") != -1) { - driver.Test(jobConf); - } else { - // if -resume in argument list, set resume to true; otherwise false - int resume_pos = singa::ArgPos(argc, argv, "-resume"); - bool resume = (resume_pos != -1); - // submit the job for training - driver.Train(resume, jobConf); - } - return 0; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/model/rnn.cc ---------------------------------------------------------------------- diff --git a/src/model/rnn.cc b/src/model/rnn.cc new file mode 100644 index 0000000..d1a7d2c --- /dev/null +++ b/src/model/rnn.cc @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +namespace singa { + + + + + + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/connection_layer/bridge.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer/bridge.cc b/src/neuralnet/connection_layer/bridge.cc deleted file mode 100644 index 2cfd55a..0000000 --- a/src/neuralnet/connection_layer/bridge.cc +++ /dev/null @@ -1,108 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/neuralnet/connection_layer.h" -#include "singa/comm/msg.h" - -namespace singa { - -using std::vector; - -void BridgeLayer::MakePaired(Layer* pair, int grp_id, Dealer* dealer, - std::unordered_map<std::string, Layer*>* name2bridge) { - pair_ = pair; - group_id_ = grp_id; - dealer_ = dealer; - name2bridge_ = name2bridge; -} - -void BridgeLayer::SendBlobs(bool handle_data) { - CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_ - << ", " << partition_id() << ")"; - Msg *msg = new Msg(); - msg->set_src(Addr(group_id_, partition_id(), kWorkerLayer)); - msg->set_dst(Addr(group_id_, pair_->partition_id(), kWorkerLayer)); - msg->AddFrame(pair_->name().c_str(), pair_->name().length()); - auto const& blob = handle_data ? data(nullptr) : grad(nullptr); - msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float)); - dealer_->Send(&msg); -} - -void BridgeLayer::ReceiveBlobs(bool handle_data) { - CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_ - << ", " << partition_id() << ")"; - while (!ready()) { - auto msg = dealer_->Receive(); - CHECK_EQ(AddrGrp(msg->src()), group_id_); - string name(static_cast<char*>(msg->FrameData()), msg->FrameSize()); - auto receive_layer = name2bridge_->at(name); - auto blob = handle_data ? receive_layer->mutable_data(nullptr) : - receive_layer -> mutable_grad(nullptr); - msg->NextFrame(); - memcpy(blob->mutable_cpu_data(), msg->FrameData(), msg->FrameSize()); - dynamic_cast<BridgeLayer*>(receive_layer)->set_ready(true); - delete msg; - } -} - -void BridgeSrcLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - CHECK_GE(srclayers.size(), 1); - Layer::Setup(conf, srclayers); - data_.Reshape(srclayers[0]->data(this).shape()); - grad_.ReshapeLike(data_); - data_.ShareData(srclayers[0]->mutable_data(this), false); - grad_.ShareData(srclayers[0]->mutable_grad(this), false); -} - -void BridgeSrcLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) { - // send data - SendBlobs(true); - // reset flag for receiving gradient in compute gradient phase - set_ready(false); -} - -void BridgeSrcLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) { - // receive gradient - ReceiveBlobs(false); -} - -void BridgeDstLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - CHECK_EQ(srclayers.size(), 1); - Layer::Setup(conf, srclayers); - data_.Reshape(srclayers[0]->data(this).shape()); - grad_.ReshapeLike(data_); -} - -void BridgeDstLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) { - // receive data - ReceiveBlobs(true); -} - -void BridgeDstLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) { - // send gradient - SendBlobs(false); - // reset flag for receiving data in compute feature phase - set_ready(false); -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/connection_layer/concate.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer/concate.cc b/src/neuralnet/connection_layer/concate.cc deleted file mode 100644 index 9d3fd0c..0000000 --- a/src/neuralnet/connection_layer/concate.cc +++ /dev/null @@ -1,118 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/neuralnet/connection_layer.h" -#include "singa/utils/singleton.h" -#include "singa/utils/context.h" - -namespace singa { - -void ConcateLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - CHECK_GT(srclayers.size(), 1); - Layer::Setup(conf, srclayers); - vector<int> shape = srclayers[0]->data(this).shape(); - concate_dim_ = conf.concate_conf().concate_dim(); - num_concates_ = conf.concate_conf().num_concates(); - CHECK_GE(concate_dim_, 0); - CHECK_LT(concate_dim_, shape.size()); - CHECK_EQ(num_concates_, srclayers.size()); - for (size_t i = 1; i < srclayers.size(); i++) { - const vector<int>& src_shape = srclayers[i]->data(this).shape(); - for (size_t j = 0; j < shape.size(); j++) - if (static_cast<int>(j) == concate_dim_) - shape[j] += src_shape[j]; - else - CHECK_EQ(shape[j], src_shape[j]); - } - data_.Reshape(shape); - grad_.Reshape(shape); -} - -void ConcateLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - CHECK_GT(srclayers.size(), 1); - CHECK_EQ(num_concates_, srclayers.size()); - // calculate step for each memcpy - int step = srclayers[0]->data(this).shape()[concate_dim_]; - for (unsigned i = concate_dim_ + 1; i < data_.shape().size(); ++i) - step *= data_.shape()[i]; - int srclayer_offset = 0; - int concate_offset = 0; - auto context = Singleton<Context>::Instance(); - int device = context->device_id(std::this_thread::get_id()); - while (concate_offset < data_.count()) { - for (size_t i = 0; i < srclayers.size(); ++i) { - if (device < 0) { - const float* src = srclayers[i]->data(this).cpu_data() - + srclayer_offset; - float* dst = data_.mutable_cpu_data() + concate_offset; - memcpy(dst, src, step * sizeof(float)); - } else { -#ifdef USE_GPU - const float* src = srclayers[i]->data(this).gpu_data() - + srclayer_offset; - float* dst = data_.mutable_gpu_data() + concate_offset; - cudaMemcpy(dst, src, step * sizeof(float), cudaMemcpyDefault); -#else - LOG(FATAL) << "GPU is not supported"; -#endif - } - concate_offset += step; - } - srclayer_offset += step; - } -} - -void ConcateLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { - CHECK_GT(srclayers.size(), 1); - CHECK_EQ(num_concates_, srclayers.size()); - // calculate step for each memcpy - int step = srclayers[0]->grad(this).shape()[concate_dim_]; - for (unsigned i = concate_dim_ + 1; i < grad_.shape().size(); ++i) - step *= grad_.shape()[i]; - int srclayer_offset = 0; - int concate_offset = 0; - auto context = Singleton<Context>::Instance(); - int device = context->device_id(std::this_thread::get_id()); - while (concate_offset < grad_.count()) { - for (size_t i = 0; i < srclayers.size(); ++i) { - if (device < 0) { - const float* src = grad_.cpu_data() + concate_offset; - float* dst = srclayers[i]->mutable_grad(this)->mutable_cpu_data() - + srclayer_offset; - memcpy(dst, src, step * sizeof(float)); - } else { -#ifdef USE_GPU - const float* src = grad_.gpu_data() + concate_offset; - float* dst = srclayers[i]->mutable_grad(this)->mutable_gpu_data() - + srclayer_offset; - cudaMemcpy(dst, src, step * sizeof(float), cudaMemcpyDefault); -#else - LOG(FATAL) << "GPU is not supported"; -#endif - } - concate_offset += step; - } - srclayer_offset += step; - } -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/connection_layer/rnn_dummy.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer/rnn_dummy.cc b/src/neuralnet/connection_layer/rnn_dummy.cc deleted file mode 100644 index 865066f..0000000 --- a/src/neuralnet/connection_layer/rnn_dummy.cc +++ /dev/null @@ -1,67 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/neuralnet/connection_layer.h" -#include "singa/utils/math_blob.h" - -namespace singa { - -void RNNDummyLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - Layer::Setup(conf, srclayers); - dynamic_src_ = AddPrefixSuffix(unroll_index(), partition_id(), - conf.rnn_dummy_conf().dynamic_srclayer()); - LOG(ERROR) << dynamic_src_; - vector<int> shape; - for (int s : conf.rnn_dummy_conf().shape()) - shape.push_back(s); - integer_ = conf.rnn_dummy_conf().integer(); - low_ = conf.rnn_dummy_conf().low(); - high_ = conf.rnn_dummy_conf().high(); - // if no src layer, then it will genereate data by itself based on shape - // and random range - if (srclayers.size() == 0) { - CHECK(shape.size()); - CHECK_NE(low_, high_); - data_.Reshape(shape); - srclayer_ = nullptr; - } else { - srclayer_ = srclayers.at(0); - data_.ReshapeLike(srclayer_->data(this)); - data_.ShareData(srclayer_->mutable_data(this), false); - } -} - -void RNNDummyLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - if (srclayers.size() == 0) { - SampleUniform(low_, high_, &data_); - if (integer_) { - for (int i = 0; i < data_.count(); i ++) { - data_.mutable_cpu_data()[i] = floor(data_.cpu_data()[i]); - } - } - } else if (srclayer_ != srclayers.at(0)) { - srclayer_ = srclayers.at(0); - data_.ShareData(srclayer_->mutable_data(this), false); - } -} -} // namespace singa -
