http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/common.h ---------------------------------------------------------------------- diff --git a/include/utils/common.h b/include/utils/common.h new file mode 100644 index 0000000..993c153 --- /dev/null +++ b/include/utils/common.h @@ -0,0 +1,51 @@ +#ifndef INCLUDE_UTILS_COMMON_H_ +#define INCLUDE_UTILS_COMMON_H_ +#pragma once +#include <glog/logging.h> +#include <gflags/gflags.h> +#include <google/protobuf/message.h> +#include <stdarg.h> +#include <thread> // std::this_thread::sleep_for +#include <chrono> +#include <string> +#include <vector> +#include <mutex> +#include <queue> +#include <sys/stat.h> +#include <map> + +using std::vector; +using std::string; +using std::map; +using google::protobuf::Message; + +#ifndef GFLAGS_GFLAGS_H_ +namespace gflags = google; +#endif // GFLAGS_GFLAGS_H_ + + +namespace singa { + +void ReadProtoFromTextFile(const char* filename, Message* proto) ; +void WriteProtoToTextFile(const Message& proto, const char* filename) ; +void ReadProtoFromBinaryFile(const char* filename, Message* proto) ; +void WriteProtoToBinaryFile(const Message& proto, const char* filename); + +std::string IntVecToString(const vector<int>& vec) ; +string StringPrintf(string fmt, ...) ; +void Debug() ; +inline bool check_exists(const std::string& name) { + struct stat buffer; + return (stat (name.c_str(), &buffer) == 0); +} + +inline void Sleep(int millisec=1){ + std::this_thread::sleep_for(std::chrono::milliseconds(millisec)); +} + +inline float rand_real(){ + return static_cast<float>(rand())/(RAND_MAX+1.0f); +} + +} /* singa */ +#endif // INCLUDE_UTILS_COMMON_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/data_shard.h ---------------------------------------------------------------------- diff --git a/include/utils/data_shard.h b/include/utils/data_shard.h new file mode 100644 index 0000000..2ebade9 --- /dev/null +++ b/include/utils/data_shard.h @@ -0,0 +1,145 @@ +#ifndef INCLUDE_UTILS_SHARD_H_ +#define INCLUDE_UTILS_SHARD_H_ + +#include <google/protobuf/message.h> +#include <fstream> +#include <string> +#include <unordered_set> + + +using google::protobuf::Message; + +namespace singa { + +/** + * Data shard stores training/validation/test tuples. + * Every worker node should have a training shard (validation/test shard + * is optional). The shard file for training is + * singa::Cluster::workspace()/train/shard.dat; The shard file for validation + * is singa::Cluster::workspace()/train/shard.dat; Similar path for test. + * + * shard.dat consists of a set of unordered tuples. Each tuple is + * encoded as [key_len key record_len val] (key_len and record_len are of type + * uint32, which indicate the bytes of key and record respectively. + * + * When Shard obj is created, it will remove the last key if the record size and + * key size do not match because the last write of tuple crashed. + * + * TODO + * 1. split one shard into multile shards. + * 2. add threading to prefetch and parse records + * + */ +class DataShard { + public: + enum { + //!< read only mode used in training + kRead=0, + //!< write mode used in creating shard (will overwrite previous one) + kCreate=1, + //!< append mode, e.g. used when previous creating crashes + kAppend=2 + }; + + public: + /** + * Init the shard obj. + * @folder shard folder (path excluding shard.dat) on worker node + * @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend + * @bufsize batch bufsize bytes data for every disk op (read or write), + * default is 100MB + */ + DataShard(std::string folder, char mode, int capacity=104857600); + ~DataShard(); + + /** + * read next tuple from the shard. + * @key key + * @param val record of type Message + * @return true if read success otherwise false, e.g., the tuple was not + * inserted completely. + */ + bool Next(std::string *key, Message* val); + /** + * read next tuple from the shard. + * @key key tuple key + * @param val record of type string + * @return true if read success otherwise false, e.g., the tuple was not + * inserted completely. + */ + bool Next(std::string *key, std::string* val); + + /** + * Append one tuple to the shard. + * @param key e.g., image path + * @param val + * @return reture if sucess, otherwise false, e.g., inserted before + */ + bool Insert(const std::string& key, const Message& tuple); + /** + * Append one tuple to the shard. + * @param key e.g., image path + * @param val + * @return reture if sucess, otherwise false, e.g., inserted before + */ + bool Insert(const std::string& key, const std::string& tuple); + /** + * Move the read pointer to the head of the shard file. + * Used for repeated reading. + */ + void SeekToFirst(); + /** + * Flush buffered data to disk. + * Used only for kCreate or kAppend. + */ + void Flush() ; + /** + * Iterate through all tuples to get the num of all tuples. + * @return num of tuples + */ + const int Count(); + /** + * @return path to shard file + */ + const std::string path(){ + return path_; + } + + protected: + /** + * Read the next key and prepare buffer for reading value. + * @param key + * @return length (i.e., bytes) of value field. + */ + int Next(std::string *key); + /** + * Setup the disk pointer to the right position for append in case that + * the pervious write crashes. + * @param path shard path. + * @return offset (end pos) of the last success written record. + */ + int PrepareForAppend(std::string path); + /** + * Read data from disk if the current data in the buffer is not a full field. + * @param size size of the next field. + */ + bool PrepareNextField(int size); + + private: + char mode_; + std::string path_; + // either ifstream or ofstream + std::fstream fdat_; + // to avoid replicated record + std::unordered_set<std::string> keys_; + // internal buffer + char* buf_; + // offset inside the buf_ + int offset_; + // allocated bytes for the buf_ + int capacity_; + // bytes in buf_, used in reading + int bufsize_; +}; +} /* singa */ +#endif // INCLUDE_UTILS_SHARD_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/factory.h ---------------------------------------------------------------------- diff --git a/include/utils/factory.h b/include/utils/factory.h new file mode 100644 index 0000000..c8fef32 --- /dev/null +++ b/include/utils/factory.h @@ -0,0 +1,57 @@ +#ifndef INCLUDE_UTILS_FACTORY_H_ +#define INCLUDE_UTILS_FACTORY_H_ +#include <glog/logging.h> + +#include <functional> +#include <utility> +#include <map> +/** + * macro that creats a function which instantiate a subclass instance and + * returns pointer to the base class. + */ +#define CreateInstance(SubClass, BaseClass) \ + [](void)->BaseClass* {return new SubClass();} + +/** + * factory template to generate class (or a sub-class) object based on id. + * 1. register class creation function that generates a class + * object based on id. + * 2. call Create() func to call the creation function and return + * a pointer to the base calss. + */ + +template<typename T> +class Factory{ + //template<Factory<T>> friend class Singleton; + public: + /** + * Register functions to create user defined classes. + * This function is called by the REGISTER_FACTORY macro. + * @param id identifier of the creating function/class + * @param create_function a function that creates a layer instance + */ + void Register(const std::string id, std::function<T*(void)> func); + /** + * create a layer instance by providing its type + * @param type the identifier of the layer to be created + */ + T *Create(const std::string id); + + private: + //<! Map that stores the registered creation functions + std::map<std::string, std::function<T*(void)>> str2func_; +}; + +template<typename T> +void Factory<T>::Register(const std::string id, + std::function<T*(void)> func) { + str2func_[id] = func; +} + +template<typename T> +T *Factory<T>::Create(const std::string id) { + CHECK(str2func_.find(id) != str2func_.end()) + << "The creation function for " << id << " has not been registered"; + return str2func_[id](); +} +#endif // INCLUDE_UTILS_FACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/graph.h ---------------------------------------------------------------------- diff --git a/include/utils/graph.h b/include/utils/graph.h new file mode 100644 index 0000000..ca582b5 --- /dev/null +++ b/include/utils/graph.h @@ -0,0 +1,150 @@ +#ifndef INCLUDE_UTILS_GRAPH_H_ +#define INCLUDE_UTILS_GRAPH_H_ +#include <glog/logging.h> +#include <vector> +#include <string> +#include <map> +#include <stack> +#include <memory> + +using std::vector; +using std::string; +using std::map; +using std::pair; +using std::shared_ptr; +using std::make_shared; + + +typedef struct _LayerInfo{ + // origin identifies the origin of this node, i.e., the corresponding layer + string origin; + int locationid;// locationidation id; + int partitionid; + int slice_dimension; + int concate_dimension; +}LayerInfo; +typedef LayerInfo V; + + +class Node; +typedef shared_ptr<Node> SNode; + +class Node{ + public: + typedef shared_ptr<Node> SNode; + Node(string name): name_(name){} + Node(string name, const V& v): + name_(name), val_(v){} + + void AddDstNode(SNode dstnode){ + dstnodes_.push_back(dstnode); + } + void AddSrcNode(SNode srcnode){ + srcnodes_.push_back(srcnode); + } + + void RemoveDstNode(SNode dst){ + auto iter=dstnodes_.begin(); + while((*iter)->name_!=dst->name_&&iter!=dstnodes_.end()) iter++; + CHECK((*iter)->name_==dst->name_); + dstnodes_.erase(iter); + } + void RemoveSrcNode(SNode src){ + auto iter=srcnodes_.begin(); + while((*iter)->name_!=src->name_&&iter!=srcnodes_.end()) iter++; + CHECK((*iter)->name_==src->name_); + srcnodes_.erase(iter); + } + const string& name() const {return name_;} + const V& val() const {return val_;} + const SNode srcnodes(int k) const {return srcnodes_[k]; } + const SNode dstnodes(int k) const {return dstnodes_[k]; } + const vector<SNode>& srcnodes() const {return srcnodes_; } + const vector<SNode>& dstnodes() const {return dstnodes_; } + int dstnodes_size() const {return dstnodes_.size(); } + int srcnodes_size() const {return srcnodes_.size(); } + + private: + string name_; + vector<SNode> srcnodes_; + vector<SNode> dstnodes_; + + V val_; + // properties + string color_, weight_, shape_; +}; + + +/** + * For partition neuralnet and displaying the neuralnet structure + */ +class Graph{ + public: + Graph(){} + void Sort(); + const SNode& AddNode(string name, V origin){ + nodes_.push_back(make_shared<Node>(name, origin)); + name2node_[name]=nodes_.back(); + return nodes_.back(); + } + const SNode& AddNode(string name){ + nodes_.push_back(make_shared<Node>(name)); + name2node_[name]=nodes_.back(); + return nodes_.back(); + } + + void AddEdge(SNode srcnode, SNode dstnode){ + srcnode->AddDstNode(dstnode); + dstnode->AddSrcNode(srcnode); + } + + void AddEdge(const string& src, const string& dst){ + CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src; + CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst; + + SNode srcnode=name2node_[src], dstnode=name2node_[dst]; + AddEdge(srcnode, dstnode); + } + + void RemoveEdge(const string &src, const string& dst){ + CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src; + CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst; + + SNode srcnode=name2node_[src], dstnode=name2node_[dst]; + RemoveEdge(srcnode, dstnode); + } + + void RemoveEdge(SNode src, SNode dst){ + src->RemoveDstNode(dst); + dst->RemoveSrcNode(src); + } + + const vector<SNode>& nodes() const{ + return nodes_; + }; + + const SNode& node(string name) const{ + CHECK(name2node_.find(name)!= name2node_.end()) + <<"can't find dst node "<<name; + return name2node_.at(name); + } + + const string ToString() const; + const string ToString(const map<string, string>& info) const ; + + bool Check() const; + + SNode InsertSliceNode(SNode srcnode, const vector<SNode>& dstnodes, + const V& info, bool connect_dst=true); + SNode InsertConcateNode(const vector<SNode>&srcnodes, SNode dstnode, + const V& info); + SNode InsertSplitNode(SNode srcnode, const vector<SNode>& dstnodes); + std::pair<SNode, SNode> InsertBridgeNode(SNode srcnode, SNode dstnode); + void topology_sort_inner(SNode node, map<string, bool> *visited, + std::stack<string> *stack); + + private: + vector<SNode> nodes_; + map<string, SNode> name2node_; +}; +#endif // INCLUDE_UTILS_GRAPH_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/param.h ---------------------------------------------------------------------- diff --git a/include/utils/param.h b/include/utils/param.h new file mode 100644 index 0000000..907ef8c --- /dev/null +++ b/include/utils/param.h @@ -0,0 +1,172 @@ +#ifndef INCLUDE_UTILS_PARAM_H_ +#define INCLUDE_UTILS_PARAM_H_ +#include <vector> +#include <string> +#include <map> +#include <functional> +#include "proto/model.pb.h" +#include "utils/blob.h" +#include "communication/msg.h" +// Base paramter class. +namespace singa { +class Param { + public: + Param(); + virtual ~Param(); + + virtual Msg* GenGetMsg(void* arg=nullptr); + virtual Msg* GenPutMsg(void* arg=nullptr); + virtual Msg* GenUpdateMsg(void* arg=nullptr); + virtual Msg* GenSyncMsg(void* arg=nullptr); + + virtual Msg* HandleGetMsg(Msg** msg); + virtual Msg* HandlePutMsg(Msg** msg); + virtual int ParseUpdateMsg(Msg** msg); + virtual Msg* GenUpdateResponseMsg(void* arg=nullptr); + virtual Msg* HandleSyncMsg(Msg** msg); + + virtual int ParseGetResponseMsg(Msg** msg); + virtual int ParsePutResponseMsg(Msg** msg); + virtual int ParseUpdateResponseMsg(Msg** msg); + virtual int ParseSyncResponseMsg(Msg** msg); + + /** + * setup param shape + */ + virtual void Setup(const ParamProto& proto, const std::vector<int>& shape, int fan_in); + /* + * fill the data according to initmethod, i.e., random/gaussian/fixed value + */ + virtual void Init(int v=0); + void ShareData(shared_ptr<Param> other){ + owner_=other->id(); + CHECK(std::equal(data_.shape().begin(), data_.shape().end(), + other->data_.shape().begin())); + data_.ShareData(other->data_); + } + float learning_rate_multiplier() { + return proto_.learning_rate_multiplier(); + } + float weight_decay_multiplier() { + return proto_.weight_decay_multiplier(); + } + /* + const int split_threshold(){ + return proto_.split_threshold(); + } + */ + /** + * if the Param shares data with others, then point to the owner. + * otherwise points to itself. + */ + const int owner() const{ + return owner_; + } + const std::string& name() { + return proto_.name(); + } + + int id() const{ + return proto_.id(); + } + void set_id(int id){ + proto_.set_id(id); + } + + int version() const { + return proto_.version(); // TODO store version in data blob + } + void set_version(int v) { + proto_.set_version(v); // TODO read version from data blob + } + /** + * @return num of floats. + */ + int size() const { + return data_.count(); + } + /** + * Return const mem address for the content of this parameter + */ + const Blob<float> &data() { + return data_; + } + Blob<float> *mutable_data() { + return &data_; + } + /** + * Return gradient of this parameter + */ + const Blob<float> &grad() { + return grad_; + } + Blob<float> *mutable_grad() { + return &grad_; + } + + const Blob<float> &history() { + return history_; + } + Blob<float> *mutable_history() { + return &history_; + } + + float* mutable_cpu_data(){ + return data_.mutable_cpu_data(); + } + float* mutable_cpu_grad(){ + return grad_.mutable_cpu_data(); + } + float* mutable_cpu_history(){ + return history_.mutable_cpu_data(); + } + protected: + /** + * name of the parameter used to share wights between neuralnets + */ + std::string name_; + //! content, gradient, history gradient of this parameter + Blob<float> data_, grad_, history_; + int owner_; + + ParamProto proto_; + int fan_in_; +}; +/** + * Sync with server by randomly sampling some parameters for every sync. +class RandomSyncParam: public Param{ + public: + virtual zmsg_t* HandleSyncMsg(zmsg_t** msg); + virtual zmsg_t *GenSyncMsgFromWorker(float sample_ratio); + virtual void ParseSyncMsgFromPS(zmsg_t** msg); + virtual void Setup(const ParamProto& proto, const vector<int>& shape, int fan_in); + virtual void Init(); + + float* mutable_cpu_snapshot(){ + return snapshot_.mutable_cpu_data(); + } + const float* cpu_snapshot(){ + return snapshot_.cpu_data(); + } + + protected: + const vector<int> RandomSample(int seed, int m, int n); + + + Blob<float> snapshot_; +}; + */ +/** + * Sync with server by elastic SGD see http://arxiv.org/abs/1412.6651. +class ElasticParam: public Param{ + public: + virtual zmsg_t* HandleSyncMsg(zmsg_t** msg); + virtual zmsg_t *GenSyncMsgFromWorker(float moving_rate); + virtual void ParseSyncMsgFromPS(zmsg_t** msg); +}; + */ + + +} // namespace singa + +#endif // INCLUDE_UTILS_PARAM_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/singleton.h ---------------------------------------------------------------------- diff --git a/include/utils/singleton.h b/include/utils/singleton.h new file mode 100644 index 0000000..2e2bdfb --- /dev/null +++ b/include/utils/singleton.h @@ -0,0 +1,41 @@ +#ifndef INCLUDE_UTILS_SINGLETON_H_ +#define INCLUDE_UTILS_SINGLETON_H_ + +template<typename T> +class Singleton { + public: + static T* Instance() { + if (data_==nullptr) { + data_ = new T(); + } + return data_; + } + private: + static T* data_; +}; + +template<typename T> T* Singleton<T>::data_ = nullptr; + + +/** + * Singleton initiated with argument + */ +template<typename T, typename X=int> +class ASingleton { + public: + static T* Instance(){ + return data_; + } + static T* Instance(X x) { + if (data_==nullptr) { + data_ = new T(x); + } + return data_; + } + private: + static T* data_; +}; + +template<typename T, typename X> T* ASingleton<T,X>::data_ = nullptr; + +#endif // INCLUDE_UTILS_SINGLETON_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/updater.h ---------------------------------------------------------------------- diff --git a/include/utils/updater.h b/include/utils/updater.h new file mode 100644 index 0000000..2a6dd43 --- /dev/null +++ b/include/utils/updater.h @@ -0,0 +1,78 @@ +#ifndef INCLUDE_UTILS_UPDATER_H_ +#define INCLUDE_UTILS_UPDATER_H_ +#include "proto/model.pb.h" +#include "utils/param.h" + +namespace singa{ +/** + * Updater for Param. + */ +class Updater{ + public: + virtual void Init(const UpdaterProto &proto){ + proto_=proto; + } + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f)=0; + + float GetLearningRate(int step); + protected: + UpdaterProto proto_; +}; +class SGDUpdater : public Updater{ + public: + virtual void Init(const UpdaterProto& proto); + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f); + + protected: + float base_lr_; + float momentum_; + float weight_decay_; +}; +class NesterovUpdater : public Updater{ + public: + virtual void Init(const UpdaterProto& proto); + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f); + + protected: + float base_lr_; + float momentum_; + float weight_decay_; +}; +class AdaGradUpdater : public Updater{ + public: + virtual void Init(const UpdaterProto& proto); + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f); + + protected: + float base_lr_; + float delta_; + float weight_decay_; +}; + +class RMSPropUpdater : public Updater{ + public: + virtual void Init(const UpdaterProto& proto); + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f); + + protected: + float base_lr_; + float delta_; + float rho_; + float weight_decay_; +}; + +/* +class AdaDeltaUpdater : public Updater{ + public: + virtual void Init(const UpdaterProto& proto); + virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f); + + protected: + float rho_; + float delta_; + float weight_decay_; +}; +*/ +} + +#endif // INCLUDE_UTILS_UPDATER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/script/node.sh ---------------------------------------------------------------------- diff --git a/script/node.sh b/script/node.sh new file mode 100755 index 0000000..74e0d8a --- /dev/null +++ b/script/node.sh @@ -0,0 +1,71 @@ +#!/bin/bash +if [[ $# < 2 || ! -f $2 ]] +then + echo "Usage: process/folder management" + echo "[cat, create, delete, kill, ls, ps, reset, scp, ssh] hostfile [args]" + echo " cat hostfile file--- cat the file on every node in hostfile" + echo " create hostfile folder--- create the folder on every node in hostfile" + echo " delete hostfile folder--- delete the folder on every node in hostfile" + echo " kill hostfile job_name--- kill the job on every node in hostfile" + echo " ls hostfile folder--- list the folder on every node in hostfile" + echo " ps hostfile job_name--- ps aux|grep job_name on every node in hostfile" + echo " reset hostfile folder--- delete and create the folder on every node in hostfile" + echo " scp hostfile local_dir [remote_dir]--- copy the local_dir to remote_dir on every node in hostfile, if remote_dir is omitted, remote_dir=local_dir" + echo " ssh hostfile--- test whether the nodes in hostfile are alive" + echo "each line in hostfile is a node name followed by a space and other fields" + exit +fi + +ssh_options="-oStrictHostKeyChecking=no \ +-oUserKnownHostsFile=/dev/null \ +-oLogLevel=quiet" + +hosts=(`cat $2 |cut -d ' ' -f 1`) + +for i in ${hosts[@]} +do + if [ $1 == "cat" ] + then + cmd="cat $3" + elif [ $1 == "create" -o $1 == "reset" ] + then + cmd="mkdir -p $3" + elif [ $1 == "delete" -o $1 == "reset" ] + then + cmd="rm -rf $3" + elif [ $1 == "kill" ] + then + cmd="ps ax|pgrep $3 |xargs kill" + elif [ $1 == "ls" ] + then + cmd="ls -l $3" + elif [ $1 == "scp" ] + then + local_dir=$3 + remote_dir=$3 + if [ $# -eq 4 ] + then + remote_dir=$4 + fi + r='' + if [[ -d $3 ]] + then + r='-r' + fi + echo "scp $r $local_dir $i:$remote_dir" + scp $r $local_dir $i:$remote_dir + elif [ $1 == "ssh" ] + then + cmd="exit" + elif [ $1 == "ps" ] + then + cmd="ps ax|pgrep $3" + else + echo "Incorrect commands:" $1 + fi + if [ $1 != "scp" ] + then + echo $cmd + ssh $i $cmd + fi +done http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/msg.cc ---------------------------------------------------------------------- diff --git a/src/communication/msg.cc b/src/communication/msg.cc new file mode 100644 index 0000000..80f2304 --- /dev/null +++ b/src/communication/msg.cc @@ -0,0 +1,5 @@ +#include "communication/msg.h" + +namespace singa { +} /* singa */ + http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/socket.cc ---------------------------------------------------------------------- diff --git a/src/communication/socket.cc b/src/communication/socket.cc new file mode 100644 index 0000000..279d758 --- /dev/null +++ b/src/communication/socket.cc @@ -0,0 +1,118 @@ +#include "communication/socket.h" + +namespace singa { +Poller::Poller(){ + poller_=zpoller_new(NULL); +} + +void Poller::Add(Socket* socket){ + zsock_t* zsock=static_cast<zsock_t*>(socket->InternalID()); + zpoller_add(poller_, zsock); + zsock2Socket_[zsock]=socket; +} + +Socket* Poller::Wait(int timeout){ + zsock_t* sock=(zsock_t*)zpoller_wait(poller_, timeout); + if(sock!=NULL) + return zsock2Socket_[sock]; + else return nullptr; +} + +Dealer::Dealer(int id):id_(id){ + dealer_=zsock_new(ZMQ_DEALER); + CHECK_NOTNULL(dealer_); + poller_=zpoller_new(dealer_); +} + +int Dealer::Connect(string endpoint){ + if(endpoint.length()) + CHECK_EQ(zsock_connect(dealer_,endpoint.c_str()),0); + return 1; +} +int Dealer::Send(Msg *msg){ + zmsg_t* zmsg=(static_cast<Msg*>(msg))->DumpToZmsg(); + zmsg_send(&zmsg, dealer_); + delete msg; + return 1; +} + +Msg* Dealer::Receive(){ + zmsg_t* zmsg=zmsg_recv(dealer_); + if(zmsg==NULL) + return nullptr; + Msg* msg=new Msg(); + msg->ParseFromZmsg(zmsg); + return msg; +} +Dealer::~Dealer(){ + zsock_destroy(&dealer_); +} + +Router::Router(int bufsize){ + nBufmsg_=0; + bufsize_=bufsize; + router_=zsock_new(ZMQ_ROUTER); + CHECK_NOTNULL(router_); + poller_=zpoller_new(router_); +} +int Router::Bind(string endpoint){ + if(endpoint.length()) + CHECK_EQ(zsock_bind(router_, endpoint.c_str()),0); + return 1; +} + +int Router::Send(Msg *msg){ + zmsg_t* zmsg=static_cast<Msg*>(msg)->DumpToZmsg(); + int dstid=static_cast<Msg*>(msg)->dst(); + if(id2addr_.find(dstid)!=id2addr_.end()){ + // the connection has already been set up + zframe_t* addr=zframe_dup(id2addr_[dstid]); + zmsg_prepend(zmsg, &addr); + zmsg_send(&zmsg, router_); + }else{ + // the connection is not ready, buffer the message + if(bufmsg_.size()==0) + nBufmsg_=0; + bufmsg_[dstid].push_back(zmsg); + nBufmsg_++; + CHECK_LE(nBufmsg_, bufsize_); + } + delete msg; + return 1; +} + +Msg* Router::Receive(){ + zmsg_t* zmsg=zmsg_recv(router_); + if(zmsg==NULL) + return nullptr; + zframe_t* dealer=zmsg_pop(zmsg); + Msg* msg=new Msg(); + msg->ParseFromZmsg(zmsg); + if (id2addr_.find(msg->src())==id2addr_.end()){ + // new connection, store the sender's identfier and send buffered messages + // for it + id2addr_[msg->src()]=dealer; + if(bufmsg_.find(msg->src())!=bufmsg_.end()){ + for(auto& it: bufmsg_.at(msg->src())){ + zframe_t* addr=zframe_dup(dealer); + zmsg_prepend(it, &addr); + zmsg_send(&it, router_); + } + bufmsg_.erase(msg->src()); + } + } + else + zframe_destroy(&dealer); + return msg; +} + +Router::~Router(){ + zsock_destroy(&router_); + for(auto it: id2addr_) + zframe_destroy(&it.second); + for(auto it: bufmsg_){ + for(auto *msg: it.second) + zmsg_destroy(&msg); + } +} +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/main.cc ---------------------------------------------------------------------- diff --git a/src/main.cc b/src/main.cc new file mode 100644 index 0000000..89306d8 --- /dev/null +++ b/src/main.cc @@ -0,0 +1,49 @@ +#include <gflags/gflags.h> +#include <glog/logging.h> +#include "trainer/trainer.h" + +/** + * \file main.cc is the main entry of SINGA, like the driver program for Hadoop. + * + * 1. Users register their own implemented classes, e.g., layer, updater, etc. + * 2. Users prepare the google protobuf object for the model configuration and + * the cluster configuration. + * 3. Users call trainer to start the training. + * + * TODO + * 1. Add the resume function to continue training from a previously stopped + * point. + * 2. Add helper functions for users to configure their model and cluster + * easily, e.g., AddLayer(layer_type, source_layers, meta_data). + */ + +DEFINE_int32(procsID, 0, "Global process ID"); +DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file"); +DEFINE_string(model, "examples/mnist/conv.conf", "Model config file"); + +/** + * Register layers, and other customizable classes. + * + * If users want to use their own implemented classes, they should register + * them here. Refer to the Worker::RegisterDefaultClasses() + */ +void RegisterClasses(const singa::ModelProto& proto){ +} + +int main(int argc, char **argv) { + // TODO set log dir + google::InitGoogleLogging(argv[0]); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + singa::ClusterProto cluster; + singa::ReadProtoFromTextFile(FLAGS_cluster.c_str(), &cluster); + singa::ModelProto model; + singa::ReadProtoFromTextFile(FLAGS_model.c_str(), &model); + LOG(INFO)<<"The cluster config is\n"<<cluster.DebugString(); + LOG(INFO)<<"The model config is\n"<<model.DebugString(); + + RegisterClasses(model); + singa::Trainer trainer; + trainer.Start(model, cluster, FLAGS_procsID); + return 0; +} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/base_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/base_layer.cc b/src/neuralnet/base_layer.cc new file mode 100644 index 0000000..50fc396 --- /dev/null +++ b/src/neuralnet/base_layer.cc @@ -0,0 +1,194 @@ +#include <opencv2/highgui/highgui.hpp> +#include <opencv2/imgproc/imgproc.hpp> +#include <cblas.h> +#include <math.h> +#include <cfloat> +#include "neuralnet/base_layer.h" +namespace singa { +/***************************************************************************** + * Implementation for Layer + *****************************************************************************/ +void Layer::Init(const LayerProto &proto) { + layer_proto_=proto; +} + +void Layer::Init(const Layer& other, const vector<int>& shape){ + data_.Reshape(shape); + grad_.Reshape(shape); + layer_proto_=other.layer_proto_; +} +void Layer::Setup(){ + Setup(layer_proto_, srclayers_); +} +void Layer::SetupAfterPartition(){ + vector<int> shape=data_.shape(); + SetupAfterPartition(layer_proto_, shape, srclayers_); + //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); + CHECK(std::equal(shape.begin(), shape.end(), data_.shape().begin()))<<name() + <<IntVecToString(shape)<<"--"<<IntVecToString(data_.shape()); +} +void Layer::ComputeFeature(bool training){ + ComputeFeature(training, srclayers_); +} +void Layer::ComputeGradient(){ + ComputeGradient(srclayers_); +} + +void Layer::ToProto(LayerProto *proto, bool copyData) { +} +void BridgeSrcLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.ReshapeLike(data_); +} +void BridgeSrcLayer::SetupAfterPartition(){ + Setup(layer_proto_, srclayers_); + //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); +} + +void BridgeSrcLayer::ComputeFeature(bool training, + const vector<SLayer>& srclayers){ + if(training) + ready_=false; + else + ready_=true; +} +void BridgeSrcLayer::ComputeGradient(const vector<SLayer>& srclayers){ + +} +void BridgeDstLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.ReshapeLike(data_); +} +void BridgeDstLayer::SetupAfterPartition(){ + Setup(layer_proto_, srclayers_); + //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); +} + +void BridgeDstLayer::ComputeFeature(bool training, + const vector<SLayer>& srclayers){ + if(training) + ready_=true; + else + ready_=false; +} +void BridgeDstLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){ + +} + +/******************************* + * Implementation for ConcateLayer + *******************************/ +void ConcateLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + size_t concate_dim=proto.concate_param().concate_dimension(); + CHECK_GE(concate_dim,0); + CHECK_GT(srclayers.size(),1); + vector<int> shape=srclayers[0]->data(this).shape(); + for(size_t i=1;i<srclayers.size();i++){ + const vector<int>& srcshape=srclayers[i]->data(this).shape(); + for(size_t j=0;j<shape.size();j++) + if(j==concate_dim) + shape[j]+=srcshape[j]; + else + CHECK_EQ(shape[j], srcshape[j]); + } + data_.Reshape(shape); + grad_.Reshape(shape); +} + +void ConcateLayer::SetupAfterPartition(){ + Setup(layer_proto_, srclayers_); +// LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); +} + +void ConcateLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){} + +void ConcateLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){} +/***************************************************************************** + * Implementation for SliceLayer + *****************************************************************************/ +void SliceLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + int slice_dim=proto.slice_param().slice_dimension(); + int slice_num=proto.slice_param().slice_num(); + CHECK_GE(slice_dim,0); + CHECK_EQ(slice_num, dstlayers_.size()); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.ReshapeLike(data_); + datavec_.resize(slice_num); + gradvec_.resize(slice_num); + //LOG(ERROR)<<"slice dim "<<slice_dim<<" slice num "<<slice_num; + for(int i=0;i<slice_num;i++){ + vector<int> newshape(data_.shape()); + newshape[slice_dim]=newshape[slice_dim]/slice_num+ + ((i==slice_num-1)?newshape[slice_dim]%slice_num:0); + datavec_[i].Reshape(newshape); + gradvec_[i].Reshape(newshape); + //LOG(ERROR)<<"slice "<<IntVecToString(newshape); + } +} + +void SliceLayer::SetupAfterPartition(){ + Setup(layer_proto_, srclayers_); + //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); +} + + +int SliceLayer::SliceID(const Layer* layer) const { + CHECK(layer!= nullptr); + for(size_t i=0;i<datavec_.size();i++){ + //LOG(ERROR)<<"get slice "<<IntVecToString(shapes_[i]); + if(dstlayers_[i].get() == layer) + return i; + } + CHECK(false); + return -1; +} + +const Blob<float>& SliceLayer::data(const Layer* layer) const { + if(layer==nullptr) + return data_; + return datavec_[SliceID(layer)]; +} +const Blob<float>& SliceLayer::grad(const Layer* layer) const { + if(layer==nullptr) + return grad_; + return gradvec_[SliceID(layer)]; +} +Blob<float>* SliceLayer::mutable_data(const Layer* layer) { + if(layer==nullptr) + return &data_; + return &datavec_[SliceID(layer)]; +} +Blob<float>* SliceLayer::mutable_grad(const Layer* layer){ + if(layer==nullptr) + return &grad_; + return &gradvec_[SliceID(layer)]; +} +void SliceLayer::ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers){} +void SliceLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){} + +void SplitLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.Reshape(srclayers[0]->data(this).shape()); +} + +void SplitLayer::SetupAfterPartition(){ + Setup(layer_proto_, srclayers_); + //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_); +} +void SplitLayer::ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers){ + +} +void SplitLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){ + +} + +} // namespace singa + http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc new file mode 100644 index 0000000..d45bcc0 --- /dev/null +++ b/src/neuralnet/layer.cc @@ -0,0 +1,781 @@ +#include <glog/logging.h> +#include <memory> +#include <algorithm> +#include <opencv2/highgui/highgui.hpp> +#include <opencv2/imgproc/imgproc.hpp> +#include "mshadow/tensor.h" +#include "mshadow/cxxnet_op.h" +#include "neuralnet/layer.h" +#include "utils/singleton.h" +#include "utils/factory.h" + +using namespace mshadow; +using namespace mshadow::expr; + +namespace singa { + +/************ Implementation for ConvProductLayer*************************/ +void ConvolutionLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + ConvolutionProto conv_param=proto.convolution_param(); + kernel_=conv_param.kernel(); + CHECK_GT(kernel_, 0) << "Filter size cannot be zero."; + pad_=conv_param.pad(); + stride_=conv_param.stride(); + num_filters_=conv_param.num_filters(); + const vector<int>& srcshape=srclayers[0]->data(this).shape(); + int dim=srcshape.size(); + CHECK_GT(dim, 2); + width_=srcshape[dim-1]; + height_=srcshape[dim-2]; + if(dim>3) + channels_=srcshape[dim-3]; + else if(dim>2) + channels_=1; + batchsize_=srcshape[0]; + conv_height_=(height_ + 2 * pad_ - kernel_) / stride_ + 1; + conv_width_= (width_ + 2 * pad_ - kernel_) / stride_ + 1; + col_height_=channels_*kernel_*kernel_; + col_width_=conv_height_*conv_width_; + vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_}; + data_.Reshape(shape); + grad_.Reshape(shape); + col_data_.Reshape(vector<int>{col_height_, col_width_}); + col_grad_.Reshape(vector<int>{col_height_, col_width_}); + + Factory<Param>* factory=Singleton<Factory<Param>>::Instance(); + weight_=shared_ptr<Param>(factory->Create("Param")); + weight_->Setup(proto.param(0), vector<int>{num_filters_, col_height_}, col_height_); + bias_=shared_ptr<Param>(factory->Create("Param")); + bias_->Setup(proto.param(1), vector<int>{num_filters_},0); +} + +void ConvolutionLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + LayerProto newproto(proto); + ConvolutionProto *conv_param=newproto.mutable_convolution_param(); + conv_param->set_num_filters(shape[1]); + Setup(newproto, srclayers); +} + +void ConvolutionLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), + Shape4(batchsize_, channels_, height_, width_)); + Tensor<cpu, 3> data(data_.mutable_cpu_data(), + Shape3(batchsize_, num_filters_, conv_height_* conv_width_)); + Tensor<cpu, 2> col(col_data_.mutable_cpu_data(), + Shape2(col_height_, col_width_)); + Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), + Shape2(num_filters_, col_height_)); + Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), + Shape1(num_filters_)); + + for(int n=0;n<batchsize_;n++){ + if(pad_>0) + col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_); + else + col=unpack_patch2col(src[n], kernel_, stride_); + data[n]=dot(weight, col); + } + data+=broadcast<1>(bias, data.shape); +} + +void ConvolutionLayer::ComputeGradient(const vector<SLayer>& srclayers) { + Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), + Shape4(batchsize_, channels_, height_, width_)); + Tensor<cpu, 2> col(col_data_.mutable_cpu_data(), + Shape2(col_height_, col_width_)); + Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), + Shape2(num_filters_, col_height_)); + + Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this); + Tensor<cpu, 4> gsrc(Shape4(batchsize_, channels_, height_, width_)); + if(gsrcblob!=nullptr) + gsrc.dptr=gsrcblob->mutable_cpu_data(); + Tensor<cpu, 3> grad(grad_.mutable_cpu_data(), + Shape3(batchsize_, num_filters_, conv_height_* conv_width_)); + Tensor<cpu, 2> gcol(col_grad_.mutable_cpu_data(), + Shape2(col_height_, col_width_)); + Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), + Shape2(num_filters_, col_height_)); + Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), + Shape1(num_filters_)); + + gweight=0.0f; + gbias=sumall_except_dim<1>(grad); + Shape<3> padshape(gsrc.shape.SubShape()); + padshape[0]+=2*pad_;padshape[1]+=2*pad_; + Shape<2> imgshape=Shape2(height_, width_); + for(int n=0;n<batchsize_;n++){ + if(pad_>0) + col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_); + else + col=unpack_patch2col(src[n], kernel_, stride_); + gweight+=dot(grad[n], col.T()); + + if(gsrcblob!=nullptr){ + gcol=dot(weight.T(), grad[n]); + gsrc[n]=crop(pack_col2patch(gcol, padshape, kernel_, stride_), imgshape); + } + } +} + +/****************** Implementation for DropoutLayer ***********************/ +void DropoutLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(*srclayers[0]->mutable_grad(this)); + mask_.Reshape(srclayers[0]->data(this).shape()); + pdrop_=proto.dropout_param().dropout_ratio(); + unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); + ASingleton<Random<cpu>>::Instance(seed); +} + +void DropoutLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} + +void DropoutLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) { + // check training + if(!training){ + data_.CopyFrom(srclayers[0]->data()); + return; + } + float pkeep=1-pdrop_; + Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count())); + mask = F<op::threshold>(ASingleton<Random<cpu>>::Instance()\ + ->uniform(mask.shape), pkeep ) * (1.0f/pkeep); + Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); + Blob<float>* srcblob=srclayers[0]->mutable_data(); + Tensor<cpu, 1> src(srcblob->mutable_cpu_data(), Shape1(srcblob->count())); + data=src*mask; +} + +void DropoutLayer::ComputeGradient(const vector<SLayer>& srclayers) { + Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(data_.count())); + Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count())); + Blob<float>* gsrcblob=srclayers[0]->mutable_grad(); + Tensor<cpu, 1> gsrc(gsrcblob->mutable_cpu_data(), Shape1(gsrcblob->count())); + gsrc=grad*mask; +} +/**************** Implementation for InnerProductLayer********************/ +void InnerProductLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + const auto& src=srclayers[0]->data(this); + batchsize_=src.shape()[0]; + vdim_=src.count()/batchsize_; + hdim_=proto.inner_product_param().num_output(); + data_.Reshape(vector<int>{batchsize_, hdim_}); + grad_.ReshapeLike(data_); + Factory<Param>* factory=Singleton<Factory<Param>>::Instance(); + weight_=shared_ptr<Param>(factory->Create("Param")); + bias_=shared_ptr<Param>(factory->Create("Param")); + weight_->Setup(proto.param(0), vector<int>{vdim_, hdim_}, vdim_*hdim_); + bias_->Setup(proto.param(1), vector<int>{hdim_},0); +} +void InnerProductLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + LayerProto newproto(proto); + InnerProductProto * innerproto=newproto.mutable_inner_product_param(); + innerproto->set_num_output(shape[1]); + Setup(newproto, srclayers); +} + +void InnerProductLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) { + Tensor<cpu, 2> data(data_.mutable_cpu_data(), Shape2(batchsize_,hdim_)); + CHECK_EQ(srclayers[0]->data().count(), batchsize_*vdim_); + Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(), + Shape2(batchsize_,vdim_)); + Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_)); + Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), Shape1(hdim_)); + data=dot(src, weight); + // repmat: repeat bias vector into batchsize rows + data+=repmat(bias, batchsize_); +} + +void InnerProductLayer::ComputeGradient(const vector<SLayer>& srclayers) { + Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(), + Shape2(batchsize_,vdim_)); + Tensor<cpu, 2> grad(grad_.mutable_cpu_data(),Shape2(batchsize_,hdim_)); + Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_)); + Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), Shape2(vdim_,hdim_)); + Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), Shape1(hdim_)); + + gbias=sum_rows(grad); + gweight=dot(src.T(), grad); + if(srclayers[0]->mutable_grad(this)!=nullptr){ + Tensor<cpu, 2> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), + Shape2(batchsize_,vdim_)); + gsrc=dot(grad, weight.T()); + } +} +/***************************************************************************** + * Implementation for LabelLayer + *****************************************************************************/ +void LabelLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); + data_.Reshape(vector<int>{batchsize}); +} + +void LabelLayer::ParseRecords(bool training, const vector<Record>& records, Blob<float>* blob){ + LOG_IF(ERROR, records.size()==0)<<"Empty records to parse"; + float *label= blob->mutable_cpu_data() ; + int rid=0; + for(const Record& record: records){ + label[rid++]=record.image().label(); + CHECK_LT(record.image().label(),10); + } + CHECK_EQ(rid, blob->shape()[0]); +} + + +/*********************LMDBDataLayer**********************************/ +void LMDBDataLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + if(random_skip_){ + int nskip=rand()%random_skip_; + int n=0; + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_FIRST), MDB_SUCCESS); + while (mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_NEXT) == MDB_SUCCESS) + n++; + LOG(INFO)<<"Random Skip "<<nskip<<" records of total "<<n<<"records"; + // We have reached the end. Restart from the first. + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_FIRST), MDB_SUCCESS); + for(int i=0;i<nskip;i++){ + if (mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_NEXT) != MDB_SUCCESS) { + // We have reached the end. Restart from the first. + DLOG(INFO) << "Restarting data prefetching from start."; + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_FIRST), MDB_SUCCESS); + } + } + random_skip_=0; + } + Datum datum; + for(auto& record: records_){ + SingleLabelImageRecord* image=record.mutable_image(); + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS); + datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size); + ConvertDatumToSingleLableImageRecord(datum, image); + if (mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_NEXT) != MDB_SUCCESS) { + // We have reached the end. Restart from the first. + DLOG(INFO) << "Restarting data prefetching from start."; + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, + &mdb_value_, MDB_FIRST), MDB_SUCCESS); + } + } +} + +void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum, + SingleLabelImageRecord* record){ + record->set_label(datum.label()); + record->clear_shape(); + if(datum.has_channels()) + record->add_shape(datum.channels()); + if(datum.has_height()) + record->add_shape(datum.height()); + if(datum.has_width()) + record->add_shape(datum.width()); + if(datum.has_data()) + record->set_pixel(datum.data()); + if(datum.float_data_size()){ + record->clear_data(); + for(float x: datum.float_data()) + record->add_data(x); + } +} + +void LMDBDataLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed"; + CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB + CHECK_EQ(mdb_env_open(mdb_env_, + proto.data_param().path().c_str(), + MDB_RDONLY, 0664), MDB_SUCCESS) << "cannot open lmdb " + << proto.data_param().path(); + CHECK_EQ(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_), MDB_SUCCESS) + << "mdb_txn_begin failed"; + CHECK_EQ(mdb_open(mdb_txn_, NULL, 0, &mdb_dbi_), MDB_SUCCESS) + << "mdb_open failed"; + CHECK_EQ(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_), MDB_SUCCESS) + << "mdb_cursor_open failed"; + LOG(INFO) << "Opening lmdb " << proto.data_param().path(); + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_FIRST), + MDB_SUCCESS) << "mdb_cursor_get failed"; + + if (mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT) + != MDB_SUCCESS) { + CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, + MDB_FIRST), MDB_SUCCESS); + } + Datum datum; + datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size); + SingleLabelImageRecord* record=sample_.mutable_image(); + ConvertDatumToSingleLableImageRecord(datum, record); + + batchsize_=proto.data_param().batchsize(); + records_.resize(batchsize_); + random_skip_=proto.data_param().random_skip(); +} + +/***************** Implementation for LRNLayer *************************/ +void LRNLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + lsize_ = proto.lrn_param().local_size(); + CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol"; + knorm_=proto.lrn_param().knorm(); + alpha_ = proto.lrn_param().alpha(); + beta_ = proto.lrn_param().beta(); + + const vector<int>& s=srclayers[0]->data(this).shape(); + data_.Reshape(s); + grad_.Reshape(s); + norm_.Reshape(s); + batchsize_=s[0]; + channels_=s[1]; + height_=s[2]; + width_=s[3]; +} + +void LRNLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} + +void LRNLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + const float salpha = alpha_ / lsize_; + Shape<4> s=Shape4(batchsize_,channels_, height_, width_); + Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s); + Tensor<cpu, 4> data(data_.mutable_cpu_data(), s); + Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s); + // stores normalizer without power + norm= chpool<red::sum>( F<op::square>(src) , lsize_ ) * salpha + knorm_; + data = src * F<op::power>(norm, -beta_ ); +} + +void LRNLayer::ComputeGradient(const vector<SLayer>& srclayers) { + const float salpha = alpha_ / lsize_; + Shape<4> s=Shape4(batchsize_,channels_, height_, width_); + Tensor<cpu, 4> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s); + Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s); + Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s); + Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), s); + + gsrc = grad * F<op::power>( norm, -beta_ ); + gsrc += ( - 2.0f * beta_ * salpha ) * chpool<red::sum>( + grad * src * F<op::power>( norm, -beta_-1.0f ), lsize_ ) * src; +} + +/**************** Implementation for MnistImageLayer******************/ + +void MnistImageLayer::ParseRecords(bool training, const vector<Record>& records, + Blob<float>* blob){ + LOG_IF(ERROR, records.size()==0)<<"Empty records to parse"; + int ndim=records.at(0).image().shape_size(); + int inputsize =records.at(0).image().shape(ndim-1); + + float* dptr=blob->mutable_cpu_data(); + for(const Record& record: records){ + // copy from record to cv::Mat + cv::Mat input(inputsize, inputsize, CV_32FC1); + const SingleLabelImageRecord& imagerecord=record.image(); + if(imagerecord.pixel().size()){ + string pixel=imagerecord.pixel(); + for(int i=0,k=0;i<inputsize;i++) + for(int j=0;j<inputsize;j++) + // NOTE!!! must cast pixel to uint8_t then to float!!! waste a lot of + // time to debug this + input.at<float>(i,j)=static_cast<float>(static_cast<uint8_t>(pixel[k++])); + }else{ + for(int i=0,k=0;i<inputsize;i++) + for(int j=0;j<inputsize;j++) + input.at<float>(i,j)=imagerecord.data(k++); + } + int size=blob->shape()[1]; + /* + cv::Mat resizeMat=input; + // affine transform, scaling, rotation and shearing + if(gamma_){ + float r1=rand_real()*2-1; + float r2=rand_real()*2-1; + int h=static_cast<int>(inputsize*(1.+r1*gamma_/100.0)); + int w=static_cast<int>(inputsize*(1.+r2*gamma_/100.0)); + cv::resize(input, resizeMat, cv::Size(h,w)); + } + cv::Mat betaMat=resizeMat; + cv::Mat warpmat(2,3, CV_32FC1); + warpmat.at<float>(0,0)=1.0; + warpmat.at<float>(0,1)=0.0; + warpmat.at<float>(0,2)=0.0; + warpmat.at<float>(1,0)=0.0; + warpmat.at<float>(1,1)=1.0; + warpmat.at<float>(1,2)=0.0; + + if(beta_){ + float r=rand_real()*2-1; + if(rand() % 2){ // rotation + cv::Point center(resizeMat.rows/2, resizeMat.cols/2); + warpmat=cv::getRotationMatrix2D(center, r*beta_, 1.0); + }else{ + //shearing + warpmat.at<float>(0,1)=r*beta_/90; + if(imagerecord.label()==1 ||imagerecord.label()==7) + warpmat.at<float>(0,1)/=2.0; + } + } + cv::warpAffine(resizeMat, betaMat, warpmat, cv::Size(size, size)); + */ + + for(int i=0;i<size;i++){ + for(int j=0;j<size;j++){ + *dptr=input.at<float>(i,j)/norm_a_-norm_b_; + dptr++; + } + } + } + CHECK_EQ(dptr, blob->mutable_cpu_data()+blob->count()); +} +void MnistImageLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); + Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample(); + kernel_=proto.mnist_param().kernel(); + sigma_=proto.mnist_param().sigma(); + alpha_=proto.mnist_param().alpha(); + beta_=proto.mnist_param().beta(); + gamma_=proto.mnist_param().gamma(); + resize_=proto.mnist_param().resize(); + norm_a_=proto.mnist_param().norm_a(); + norm_b_=proto.mnist_param().norm_b(); + elastic_freq_=proto.mnist_param().elastic_freq(); + + int ndim=sample.image().shape_size(); + CHECK_GE(ndim,2); + if(resize_) + data_.Reshape(vector<int>{batchsize, resize_, resize_}); + else{ + int s=sample.image().shape(ndim-1); + CHECK_EQ(s,sample.image().shape(ndim-2)); + data_.Reshape(vector<int>{batchsize, s, s }); + } +} + +/******************** Implementation for PoolingLayer******************/ +void PoolingLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + PoolingProto pool_param = proto.pooling_param(); + kernel_=pool_param.kernel(); + stride_=pool_param.stride(); + CHECK_LT(pad_, kernel_); + pool_=proto.pooling_param().pool(); + CHECK(pool_ == PoolingProto_PoolMethod_AVE + || pool_ == PoolingProto_PoolMethod_MAX) + << "Padding implemented only for average and max pooling."; + + const auto& srcshape=srclayers[0]->data(this).shape(); + int dim=srcshape.size(); + CHECK_GT(dim,2); + width_ = srcshape[dim-1]; + height_ = srcshape[dim-2]; + if(dim>3) + channels_ = srcshape[dim-3]; + else + channels_=1; + batchsize_=srcshape[0]; + pooled_height_ = static_cast<int>((height_ - kernel_) / stride_) + 1; + pooled_width_ = static_cast<int>(( width_ - kernel_) / stride_) + 1; + data_.Reshape(vector<int>{batchsize_, channels_, pooled_height_, pooled_width_}); + grad_.ReshapeLike(data_); +} + +void PoolingLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} + +void PoolingLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), + Shape4(batchsize_, channels_, height_, width_)); + Tensor<cpu, 4> data(data_.mutable_cpu_data(), + Shape4(batchsize_, channels_, pooled_height_, pooled_width_)); + if(pool_ == PoolingProto_PoolMethod_MAX) + data=pool<red::maximum>(src, kernel_, stride_); + else if(pool_ == PoolingProto_PoolMethod_AVE) + data=pool<red::sum>(src, kernel_, stride_) + *(1.0f/(kernel_*kernel_)); +} + +/* + * partition only on num/channel dim + * assume grad and data have the same paritition + */ +void PoolingLayer::ComputeGradient(const vector<SLayer>& srclayers) { + Shape<4> s1= Shape4(batchsize_, channels_, height_, width_); + Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),s1); + Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),s1); + Shape<4> s2= Shape4(batchsize_, channels_, pooled_height_, pooled_width_); + Tensor<cpu, 4> data(data_.mutable_cpu_data(), s2); + Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s2); + if(pool_ == PoolingProto_PoolMethod_MAX) + gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_); + else if(pool_ == PoolingProto_PoolMethod_AVE) + gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_) + *(1.0f/(kernel_*kernel_)); +} + +/***************** Implementation for ReLULayer *****************************/ + +void ReLULayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + data_.ReshapeLike(srclayers[0]->data()); + grad_.ReshapeLike(*(srclayers[0]->mutable_grad())); +} + +void ReLULayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} + +void ReLULayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); + Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), + Shape1(data_.count())); + data=F<op::relu>(src); +} + +void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) { + Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count())); + Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); + Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), + Shape1(data_.count())); + gsrc=F<op::relu_grad>(data)*grad; +} + +/*************** Implementation for RGBImageLayer *************************/ + +void RGBImageLayer::ParseRecords(bool training, const vector<Record>& records, + Blob<float>* blob){ + LOG_IF(ERROR, records.size()==0)<<"Empty records to parse"; + const vector<int>& s=blob->shape(); + Tensor<cpu, 4> images(blob->mutable_cpu_data(), Shape4(s[0],s[1],s[2],s[3])); + const SingleLabelImageRecord& r=records.at(0).image(); + Tensor<cpu, 3> raw_image(Shape3(r.shape(0),r.shape(1),r.shape(2))); + AllocSpace(raw_image); + Tensor<cpu, 3> croped_image(Shape3(s[1],s[2],s[3])); + if(cropsize_) + AllocSpace(croped_image); + //CHECK(std::equal(croped_image.shape(), raw_image.shape()); + int rid=0; + const float* meandptr=mean_.cpu_data(); + for(const Record& record: records){ + auto image=images[rid]; + bool do_crop=cropsize_>0&&training; + bool do_mirror=mirror_&&rand()%2&&training; + float* dptr=nullptr; + if(do_crop||do_mirror) + dptr=raw_image.dptr; + else + dptr=image.dptr; + if(record.image().pixel().size()){ + string pixel=record.image().pixel(); + for(size_t i=0;i<pixel.size();i++) + dptr[i]=static_cast<float>(static_cast<uint8_t>(pixel[i])); + }else { + memcpy(dptr, record.image().data().data(), + sizeof(float)*record.image().data_size()); + } + for(int i=0;i<mean_.count();i++) + dptr[i]-=meandptr[i]; + + if(do_crop){ + int hoff=rand()%(r.shape(1)-cropsize_); + int woff=rand()%(r.shape(2)-cropsize_); + Shape<2> cropshape=Shape2(cropsize_, cropsize_); + if(do_mirror){ + croped_image=crop(raw_image, cropshape, hoff, woff); + image=mirror(croped_image); + }else{ + image=crop(raw_image, cropshape, hoff, woff); + } + }else if(do_mirror){ + image=mirror(raw_image); + } + rid++; + } + if(scale_) + images=images*scale_; + + FreeSpace(raw_image); + if(cropsize_) + FreeSpace(croped_image); +} +void RGBImageLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),1); + scale_=proto.rgbimage_param().scale(); + cropsize_=proto.rgbimage_param().cropsize(); + mirror_=proto.rgbimage_param().mirror(); + int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); + Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample(); + vector<int> shape; + shape.push_back(batchsize); + for(int x: sample.image().shape()) + shape.push_back(x); + CHECK_EQ(shape.size(),4); + if(cropsize_){ + shape[2]=cropsize_; + shape[3]=cropsize_; + } + data_.Reshape(shape); + mean_.Reshape({shape[1],shape[2],shape[3]}); + if(proto.rgbimage_param().has_meanfile()){ + BlobProto tmp; + ReadProtoFromBinaryFile(proto.rgbimage_param().meanfile().c_str(), &tmp); + CHECK_EQ(mean_.count(), tmp.data_size()); + memcpy(mean_.mutable_cpu_data(), tmp.data().data(), sizeof(float)*tmp.data_size()); + }else{ + memset(mean_.mutable_cpu_data(),0,sizeof(float)*mean_.count()); + } +} + +/***************Implementation for ShardDataLayer**************************/ +void ShardDataLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + if(random_skip_){ + int nskip=rand()%random_skip_; + LOG(INFO)<<"Random Skip "<<nskip<<" records, there are "<<shard_->Count() + <<" records in total"; + string key; + for(int i=0;i<nskip;i++){ + shard_->Next(&key, &sample_); + } + random_skip_=0; + } + for(auto& record: records_){ + string key; + shard_->Next(&key, &record); + } +} + +void ShardDataLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + shard_= std::make_shared<DataShard>(proto.data_param().path(), + DataShard::kRead); + string key; + shard_->Next(&key, &sample_); + batchsize_=proto.data_param().batchsize(); + + records_.resize(batchsize_); + random_skip_=proto.data_param().random_skip(); +} +/*******************Implementation of TanLayer***************************/ +void TanhLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(srclayers[0]->grad(this)); +} + +void TanhLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} + + +void TanhLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){ + Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); + Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), + Shape1(data_.count())); + data=F<op::stanh>(src); +} + +void TanhLayer::ComputeGradient(const vector<SLayer>& srclayers) { + Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); + Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count())); + Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), + Shape1(data_.count())); + gsrc=F<op::stanh_grad>(data)*grad; +} +/********** * Implementation for SoftmaxLossLayer*************************/ +void SoftmaxLossLayer::Setup(const LayerProto& proto, + const vector<SLayer>& srclayers){ + CHECK_EQ(srclayers.size(),2); + data_.Reshape(srclayers[0]->data(this).shape()); + batchsize_=data_.shape()[0]; + dim_=data_.count()/batchsize_; + topk_=proto.softmaxloss_param().topk(); + metric_.Reshape(vector<int>{2}); + scale_=proto.softmaxloss_param().scale(); +} +void SoftmaxLossLayer::SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){ + Setup(proto, srclayers); +} +void SoftmaxLossLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) { + Shape<2> s=Shape2(batchsize_, dim_); + Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s); + Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s); + Softmax(prob, src); + const float* label=srclayers[1]->data().cpu_data(); + const float* probptr=prob.dptr; + float loss=0, precision=0; + for(int n=0;n<batchsize_;n++){ + int ilabel=static_cast<int>(label[n]); + CHECK_LT(ilabel,10); + CHECK_GE(ilabel,0); + float prob_of_truth=probptr[ilabel]; + loss-=log(std::max(prob_of_truth, FLT_MIN)); + vector<std::pair<float, int> > probvec; + for (int j = 0; j < dim_; ++j) { + probvec.push_back(std::make_pair(probptr[j], j)); + } + std::partial_sort( + probvec.begin(), probvec.begin() + topk_, + probvec.end(), std::greater<std::pair<float, int> >()); + // check if true label is in top k predictions + for (int k = 0; k < topk_; k++) { + if (probvec[k].second == static_cast<int>(label[n])) { + precision++; + break; + } + } + probptr+=dim_; + } + CHECK_EQ(probptr, prob.dptr+prob.shape.Size()); + float *metric=metric_.mutable_cpu_data(); + metric[0]=loss*scale_/(1.0f*batchsize_); + metric[1]=precision*scale_/(1.0f*batchsize_); +} + +void SoftmaxLossLayer::ComputeGradient(const vector<SLayer>& srclayers) { + const float* label=srclayers[1]->data().cpu_data(); + Blob<float>* gsrcblob=srclayers[0]->mutable_grad(); + gsrcblob->CopyFrom(data_); + float* gsrcptr=gsrcblob->mutable_cpu_data(); + for(int n=0;n<batchsize_;n++){ + gsrcptr[n*dim_+static_cast<int>(label[n])]-=1.0f; + } + Tensor<cpu, 1> gsrc(gsrcptr, Shape1(gsrcblob->count())); + gsrc*=scale_/(1.0f*batchsize_); +} + +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc new file mode 100644 index 0000000..0bca26e --- /dev/null +++ b/src/neuralnet/neuralnet.cc @@ -0,0 +1,401 @@ +#include <algorithm> +#include <queue> + +#include "neuralnet/neuralnet.h" +#include "utils/singleton.h" +#include "utils/factory.h" +#include "utils/graph.h" + + +namespace singa { +#define CreateLayer(id) CreateInstance(id, Layer) + +void NeuralNet::RegisterLayers(){ + Factory<Layer>* factory=Singleton<Factory<Layer>>::Instance(); + factory->Register("kConvolution", CreateLayer(ConvolutionLayer)); + factory->Register("kConcate", CreateLayer(ConcateLayer)); + factory->Register("kDropout", CreateLayer(DropoutLayer)); + factory->Register("kInnerProduct", CreateLayer(InnerProductLayer)); + factory->Register("kRGBImage", CreateLayer(RGBImageLayer)); + factory->Register("kLabel", CreateLayer(LabelLayer)); + factory->Register("kLMDBData", CreateLayer(LMDBDataLayer)); + factory->Register("kLRN", CreateLayer(LRNLayer)); + factory->Register("kMnistImage", CreateLayer(MnistImageLayer)); + factory->Register("kBridgeDst", CreateLayer(BridgeDstLayer)); + factory->Register("kBridgeSrc", CreateLayer(BridgeSrcLayer)); + factory->Register("kPooling", CreateLayer(PoolingLayer)); + factory->Register("kReLU", CreateLayer(ReLULayer)); + factory->Register("kShardData", CreateLayer(ShardDataLayer)); + factory->Register("kSlice", CreateLayer(SliceLayer)); + factory->Register("kSoftmaxLoss", CreateLayer(SoftmaxLossLayer)); + factory->Register("kSplit", CreateLayer(SplitLayer)); + factory->Register("kTanh", CreateLayer(TanhLayer)); +} +shared_ptr<NeuralNet> NeuralNet::SetupNeuralNet(const NetProto& np, Phase phase){ + NetProto proto; + proto.set_partition_type(np.partition_type()); + // exclude layers if necessary + for(auto& layer:np.layer()){ + bool include=true; + for(int x: layer.exclude()){ + if(x==phase) + include=false; + } + if(include){ + LayerProto* lp=proto.add_layer(); + lp->CopyFrom(layer); + } + } + LOG(INFO)<<"NeuralNet config is "<<proto.DebugString(); + shared_ptr<NeuralNet> net(new NeuralNet(proto)); + return net; +} +NeuralNet::NeuralNet(NetProto net_proto, int group_size) { + group_size_=group_size; + for(int i=0;i<net_proto.layer_size();i++){ + LayerProto * layer_proto=net_proto.mutable_layer(i); + if(!layer_proto->has_partition_type()) + layer_proto->set_partition_type(net_proto.partition_type()); + } + + LOG(INFO)<<"Construct Neural Net..."; + ConstructNeuralNet(net_proto); + if(group_size_>1) + PartitionNeuralNet(); + for(auto layer: layers_){ + DLOG(INFO)<<layer->name(); + } + // assign id for params; + int paramid=0; + for(auto& layer: layers_){ + for(shared_ptr<Param> p: layer->GetParams()){ + params_.push_back(p); + p->set_id(paramid++); + } + } + + LOG(INFO)<<"Neural Net constructed"; +} + +void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){ + // construct graph, one node for one layer, identified by layer name + map<string, LayerProto> protos; + for (auto &layer_proto : net_proto.layer()){ + graph_.AddNode(layer_proto.name()); + protos[layer_proto.name()]=layer_proto; + } + for (auto &layer_proto : net_proto.layer()) + if(layer_proto.srclayers_size()) + for(const string& src: layer_proto.srclayers()) + graph_.AddEdge(src, layer_proto.name()); + + // topology sort + graph_.Sort(); + //DLOG(INFO)<<"pure graph without partition\n"<< graph_.ToString(); + + auto* factory=Singleton<Factory<Layer>>::Instance(); + // create Layers according to topology order + for(SNode node: graph_.nodes()){ + shared_ptr<Layer> layer(factory->Create(protos[node->name()].type())); + layer->Init(protos[node->name()]); + name2layer_[node->name()]=layer; + layers_.push_back(layer); + } + + // connect Layers. + for(SNode node: graph_.nodes()){ + auto layer=name2layer_[node->name()]; + for(SNode dst: node->dstnodes()) + layer->AddDstLayer(name2layer_[dst->name()]); + for(SNode src: node->srcnodes()) + layer->AddSrcLayer(name2layer_[src->name()]); + } + // setup layer properties, e.g., shapes + for(auto& layer: layers_){ + layer->Setup(); + } + LOG(INFO)<<"network graph witout partition\n"<<ToString(); +} + +void NeuralNet::PartitionNeuralNet(){ + graph_=CreatePartitonedGraph(layers_, name2layer_); + //DLOG(ERROR)<<"pure graph after partition\n"<<graph_.ToString(); + map<string, shared_ptr<Layer>> name2layer(name2layer_); + name2layer_.clear(); + layers_.clear(); + int gsize=group_size_; + auto* factory=Singleton<Factory<Layer>>::Instance(); + // create Layers according to topology order + for(SNode node: graph_.nodes()){ + LayerProto proto; + proto.set_name(node->name()); + proto.set_locationid(node->val().locationid); + proto.set_partitionid(node->val().partitionid); + const string& origin=node->val().origin; + if (origin=="kSlice"){ + proto.set_type(origin); + SliceProto *slice=proto.mutable_slice_param(); + slice->set_slice_dimension(node->val().slice_dimension); + slice->set_slice_num(node->dstnodes().size()); + }else if(origin== "kConcate"){ + proto.set_type(origin); + ConcateProto *concate=proto.mutable_concate_param(); + concate->set_concate_dimension(node->val().concate_dimension); + concate->set_concate_num(node->srcnodes().size()); + }else if(origin=="kSplit"){ + proto.set_type(origin); + SplitProto *split=proto.mutable_split_param(); + split->set_num_splits(node->dstnodes().size()); + }else if(origin=="kBridgeSrc" || origin== "kBridgeDst"){ + proto.set_type(origin); + }else{ + CHECK(name2layer.find(node->val().origin)!=name2layer_.end()) + <<"Unkown origin for node "<<node->val().origin; + } + shared_ptr<Layer> newlayer; + if(proto.has_type()){ + // layers added due to partition + shared_ptr<Layer> layer(factory->Create(proto.type())); + layer->Init(proto); + newlayer=layer; + }else{ + // partitioned layers from origin neuralnet + auto oldlayer=name2layer.at(node->val().origin); + vector<int> shape=oldlayer->shape(nullptr); + if(oldlayer->partition_type()==kNone){ + newlayer=oldlayer; + } else{ + int pdim=oldlayer->partition_dimension(); + shape[pdim]=shape[pdim]/gsize+ + ((node->val().partitionid==gsize-1)?shape[pdim]%gsize:0); + shared_ptr<Layer> layer(factory->Create(oldlayer->type())); + layer->Init(*oldlayer, shape); + layer->set_name(node->name()); + newlayer=layer; + } + } + layers_.push_back(newlayer); + name2layer_[node->name()]=newlayer; + } + + // connect Layers. + for(SNode node: graph_.nodes()){ + auto layer=name2layer_[node->name()]; + layer->ClearDstLayers(); + for(SNode dst: node->dstnodes()) + layer->AddDstLayer(name2layer_[dst->name()]); + layer->ClearSrcLayers(); + for(SNode src: node->srcnodes()) + layer->AddSrcLayer(name2layer_[src->name()]); + } + + LOG(INFO)<<"Adjacency matrix\n"<<ToAdjacency(); + + // set up layers after + for(shared_ptr<Layer> layer: layers_){ + const vector<int>& shape=layer->shape(nullptr); + layer->SetupAfterPartition(); + const vector<int>& newshape=layer->shape(nullptr); + if(shape.size()) + CHECK(std::equal(shape.begin(),shape.end(),newshape.begin())); + } + + LOG(INFO)<<"network graph after partition layers\n"<<ToString(); +} + +Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers, + const map<string, shared_ptr<Layer>>& name2layer){ + Graph graph; + // partition origin nodes/layers + map<string, vector<SNode>> layer2nodes; //from name of original layer to nodes + int gsize=group_size_; + for(const auto& layer: layers){ + vector<SNode> nodes; + if(layer->partition_type()==kDataPartition|| + layer->partition_type()==kLayerPartition){ + char suffix[4]; + for(int i=0;i<gsize;i++){ + sprintf(suffix, "%02d", i); + // differentiate partitions + string nodename=layer->name()+"-"+string(suffix); + LayerInfo info; + auto node=graph.AddNode(nodename, LayerInfo{layer->name(),i, i,-1,-1}); + nodes.push_back(node); + } + }else if(layer->partition_type()==kNone){ + auto node=graph.AddNode(layer->name(), + LayerInfo{layer->name(), layer->locationid(), 0,-1,-1}); + nodes.push_back(node); + }else{ + LOG(FATAL)<<"Unknown partition type "<<layer->partition_type(); + } + layer2nodes[layer->name()]=nodes; + } + + + // connect nodes, nodes for ConcateLayer and SliceLayer are added. + for(shared_ptr<Layer> layer: layers){ + string name=layer->name(); + PartitionType type=layer->partition_type(); + const vector<SNode>& nodes=layer2nodes.at(name); + for(int srcid=0;srcid<layer->srclayers_size();srcid++){ + shared_ptr<Layer> srclayer=layer->srclayers()[srcid]; + string srcname=srclayer->name(); + const vector<SNode> srcnodes=layer2nodes.at(srcname); + PartitionType srctype=srclayer->partition_type(); + ConnectionType connection=layer->connection_type(srcid); + if(srctype==kNone){ + CHECK_EQ(srcnodes.size(),1) + <<"local layer "<<srcname<<" should not be partitioned"; + SNode srcnode=srcnodes[0]; + if(type==kDataPartition||(type==kLayerPartition&&connection==kOneToOne)){ + LayerInfo info=srcnode->val(); + info.slice_dimension=name2layer.at(name)->partition_dimension(); + graph.InsertSliceNode(srcnode, nodes, info); + } else if(type==kNone){ + CHECK_EQ(nodes.size(),1) + <<"local layer "<<name<<" should not be nodeed"; + graph.AddEdge(srcnode, nodes[0]); + } else { // type==kLayerPartition&&connection==kOneToAll + graph.InsertSplitNode(srcnode, nodes); + } + }else if((type==kNone + &&(srctype==kDataPartition||srctype==kLayerPartition)) + ||(type==kLayerPartition&&connection==kOneToAll&& + (srctype==kDataPartition||srctype==kLayerPartition))){ + // copy/concate the whole srclayer for every dst partition + for(SNode node:nodes){ + LayerInfo info=node->val(); + info.concate_dimension=name2layer.at(srcname)->partition_dimension(); + CHECK_GE(info.concate_dimension,0); + graph.InsertConcateNode(srcnodes, node, info); + } + }else if((srctype==kLayerPartition&&type==kDataPartition) + || (srctype==kDataPartition&&type==kLayerPartition)){ + // the most complext scenario + vector<SNode> slicenodes; + for(SNode srcnode: srcnodes){ + LayerInfo info=srcnode->val(); + info.slice_dimension=name2layer.at(name)->partition_dimension(); + slicenodes.push_back(graph.InsertSliceNode(srcnode, nodes, + info, false)); + } + for(SNode node: nodes){ + LayerInfo info=node->val(); + info.concate_dimension=name2layer.at(srcname)->partition_dimension(); + CHECK_GE(info.concate_dimension,0); + graph.InsertConcateNode(slicenodes, node, info); + } + }else if((srctype==kDataPartition&&type==kDataPartition)|| + (srctype==kLayerPartition&&type==kLayerPartition&& + layer->connection_type(srcid)==kOneToOne)){ + CHECK_EQ(srcnodes.size(), nodes.size()); + for(size_t i=0;i<srcnodes.size();i++){ + graph.AddEdge(srcnodes[i], nodes[i]); + } + } + } + } + // must do topology sort, because we have added new nodes. + graph.Sort(); + //LOG(ERROR)<<graph.ToString(); + + // add node for split layer + bool data_node=true; + vector<SNode> oldnodes=graph.nodes(); + for(SNode node: oldnodes){ + if(node->dstnodes_size()>1&&node->val().origin!="kSlice" + &&node->val().origin!="kSplit"&&!data_node){ + vector<SNode> dstnodes=node->dstnodes(); + for(SNode dst: dstnodes) + graph.RemoveEdge(node, dst); + graph.InsertSplitNode(node, dstnodes); + } + data_node=false; + } + + // add bridge + oldnodes=graph.nodes(); + for(SNode node: oldnodes){ + vector<SNode> dstnodes=node->dstnodes(); + for(size_t i=0;i<dstnodes.size();i++){ + SNode dstnode=dstnodes.at(i); + if(node->val().locationid!=dstnode->val().locationid){ + graph.RemoveEdge(node, dstnode); + graph.InsertBridgeNode(node, dstnode); + } + } + } + graph.Sort(); + return graph; +} + +std::string NeuralNet::ToString(){ + map<string, string> info; + for(auto layer: layers_){ + info[layer->name()]=IntVecToString(layer->shape(nullptr)); + string type=layer->type(); + } + return graph_.ToString(info); +} + +std::string NeuralNet::ToAdjacency(){ + string disp=""; + for(auto& layer: layers_){ + disp+=layer->name()+": "; + for(const auto& dst: layer->dstlayers()) + disp+=dst->name()+", "; + disp+="\n"; + } + return disp; +} + + +void NeuralNet::ToProto(NetProto *proto, bool copyData) { + proto->clear_layer(); +} + +string NeuralNet::DebugInfo(){ + string ret; + char display[4096]; + for(auto& layer: layers_){ + if(!layer->is_datalayer()){ + sprintf(display, "Forward layer %10s data norm1 %13.9f\n", + layer->name().c_str(), layer->data().asum_data()); + ret+=string(display); + } + } + for (auto it = layers_.rbegin(); it != layers_.rend(); it++){ + shared_ptr<Layer> layer=*it; + if(!(layer->is_datalayer()||layer->is_losslayer()||layer->is_parserlayer())){ + sprintf(display, "Backward layer %10s grad norm1 %13.9f\n", + layer->name().c_str(), layer->grad().asum_data()); + ret+=string(display); + } + } + for(auto& layer: layers_){ + for(auto param: layer->GetParams()){ + sprintf(display, "Layer %10s, param id %2d, name %10s,\ + value norm1 %13.9f, grad norm1 %13.9f\n", + layer->name().c_str(), param->id(), param->name().c_str(), + param->data().asum_data(), param->grad().asum_data()); + ret+=string(display); + } + } + return ret; +} +void NeuralNet::ShareParams(shared_ptr<NeuralNet> other, int flag){ + for(auto& layer: layers_){ + auto otherlayer=other->name2layer(layer->name()); + if(otherlayer!=nullptr){ + const auto& otherparams=otherlayer->GetParams(); + const auto& params=layer->GetParams(); + CHECK_EQ(params.size(), otherparams.size()); + for(size_t i=0;i<params.size();i++){ + params[i]->ShareData(otherparams[i]); + } + } + } +} + +} // namespace singa
