http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
new file mode 100644
index 0000000..993c153
--- /dev/null
+++ b/include/utils/common.h
@@ -0,0 +1,51 @@
+#ifndef INCLUDE_UTILS_COMMON_H_
+#define INCLUDE_UTILS_COMMON_H_
+#pragma once
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+#include <google/protobuf/message.h>
+#include <stdarg.h>
+#include <thread>         // std::this_thread::sleep_for
+#include <chrono>
+#include <string>
+#include <vector>
+#include <mutex>
+#include <queue>
+#include <sys/stat.h>
+#include <map>
+
+using std::vector;
+using std::string;
+using std::map;
+using google::protobuf::Message;
+
+#ifndef GFLAGS_GFLAGS_H_
+namespace gflags = google;
+#endif  // GFLAGS_GFLAGS_H_
+
+
+namespace singa {
+
+void ReadProtoFromTextFile(const char* filename, Message* proto) ;
+void WriteProtoToTextFile(const Message& proto, const char* filename) ;
+void ReadProtoFromBinaryFile(const char* filename, Message* proto) ;
+void WriteProtoToBinaryFile(const Message& proto, const char* filename);
+
+std::string IntVecToString(const vector<int>& vec) ;
+string StringPrintf(string fmt, ...) ;
+void Debug() ;
+inline bool check_exists(const std::string& name) {
+    struct stat buffer;
+    return (stat (name.c_str(), &buffer) == 0);
+}
+
+inline void Sleep(int millisec=1){
+  std::this_thread::sleep_for(std::chrono::milliseconds(millisec));
+}
+
+inline float rand_real(){
+  return  static_cast<float>(rand())/(RAND_MAX+1.0f);
+}
+
+} /* singa */
+#endif  // INCLUDE_UTILS_COMMON_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/data_shard.h
----------------------------------------------------------------------
diff --git a/include/utils/data_shard.h b/include/utils/data_shard.h
new file mode 100644
index 0000000..2ebade9
--- /dev/null
+++ b/include/utils/data_shard.h
@@ -0,0 +1,145 @@
+#ifndef INCLUDE_UTILS_SHARD_H_
+#define INCLUDE_UTILS_SHARD_H_
+
+#include <google/protobuf/message.h>
+#include <fstream>
+#include <string>
+#include <unordered_set>
+
+
+using google::protobuf::Message;
+
+namespace singa {
+
+/**
+ * Data shard stores training/validation/test tuples.
+ * Every worker node should have a training shard (validation/test shard
+ * is optional). The shard file for training is
+ * singa::Cluster::workspace()/train/shard.dat; The shard file for validation
+ * is singa::Cluster::workspace()/train/shard.dat; Similar path for test.
+ *
+ * shard.dat consists of a set of unordered tuples. Each tuple is
+ * encoded as [key_len key record_len val] (key_len and record_len are of type
+ * uint32, which indicate the bytes of key and record respectively.
+ *
+ * When Shard obj is created, it will remove the last key if the record size 
and
+ * key size do not match because the last write of tuple crashed.
+ *
+ * TODO
+ * 1. split one shard into multile shards.
+ * 2. add threading to prefetch and parse records
+ *
+ */
+class DataShard {
+ public:
+  enum {
+    //!< read only mode used in training
+    kRead=0,
+    //!< write mode used in creating shard (will overwrite previous one)
+    kCreate=1,
+    //!< append mode, e.g. used when previous creating crashes
+    kAppend=2
+  };
+
+ public:
+  /**
+   * Init the shard obj.
+   * @folder shard folder (path excluding shard.dat) on worker node
+   * @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
+   * @bufsize batch bufsize bytes data for every disk op (read or write),
+   * default is 100MB
+   */
+  DataShard(std::string folder, char mode, int capacity=104857600);
+  ~DataShard();
+
+  /**
+   * read next tuple from the shard.
+   * @key key
+   * @param val record of type Message
+   * @return true if read success otherwise false, e.g., the tuple was not
+   * inserted completely.
+   */
+  bool Next(std::string *key, Message* val);
+  /**
+   * read next tuple from the shard.
+   * @key key tuple key
+   * @param val record of type string
+   * @return true if read success otherwise false, e.g., the tuple was not
+   * inserted completely.
+   */
+  bool Next(std::string *key, std::string* val);
+
+  /**
+   * Append one tuple to the shard.
+   * @param key e.g., image path
+   * @param val
+   * @return reture if sucess, otherwise false, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const Message& tuple);
+  /**
+   * Append one tuple to the shard.
+   * @param key e.g., image path
+   * @param val
+   * @return reture if sucess, otherwise false, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const std::string& tuple);
+  /**
+   * Move the read pointer to the head of the shard file.
+   * Used for repeated reading.
+   */
+  void SeekToFirst();
+  /**
+   * Flush buffered data to disk.
+   * Used only for kCreate or kAppend.
+   */
+  void Flush() ;
+  /**
+   * Iterate through all tuples to get the num of all tuples.
+   * @return num of tuples
+   */
+  const int Count();
+  /**
+   * @return path to shard file
+   */
+  const std::string path(){
+    return path_;
+  }
+
+ protected:
+  /**
+   * Read the next key and prepare buffer for reading value.
+   * @param key
+   * @return length (i.e., bytes) of value field.
+   */
+  int Next(std::string *key);
+  /**
+   * Setup the disk pointer to the right position for append in case that
+   * the pervious write crashes.
+   * @param path shard path.
+   * @return offset (end pos) of the last success written record.
+   */
+  int PrepareForAppend(std::string path);
+  /**
+   * Read data from disk if the current data in the buffer is not a full field.
+   * @param size size of the next field.
+   */
+  bool PrepareNextField(int size);
+
+ private:
+  char mode_;
+  std::string path_;
+  // either ifstream or ofstream
+  std::fstream fdat_;
+  // to avoid replicated record
+  std::unordered_set<std::string> keys_;
+  // internal buffer
+  char* buf_;
+  // offset inside the buf_
+  int offset_;
+  // allocated bytes for the buf_
+  int capacity_;
+  // bytes in buf_, used in reading
+  int bufsize_;
+};
+} /* singa */
+#endif  // INCLUDE_UTILS_SHARD_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/factory.h
----------------------------------------------------------------------
diff --git a/include/utils/factory.h b/include/utils/factory.h
new file mode 100644
index 0000000..c8fef32
--- /dev/null
+++ b/include/utils/factory.h
@@ -0,0 +1,57 @@
+#ifndef INCLUDE_UTILS_FACTORY_H_
+#define INCLUDE_UTILS_FACTORY_H_
+#include <glog/logging.h>
+
+#include <functional>
+#include <utility>
+#include <map>
+/**
+ * macro that creats a function which instantiate a subclass instance and
+ * returns pointer to the base class.
+ */
+#define CreateInstance(SubClass, BaseClass) \
+  [](void)->BaseClass* {return new SubClass();}
+
+/**
+ * factory template to generate class (or a sub-class) object  based on id.
+ * 1. register class creation function that generates a class
+ * object based on id.
+ * 2. call Create() func to call the creation function and return
+ * a pointer to the base calss.
+ */
+
+template<typename T>
+class Factory{
+ //template<Factory<T>> friend class Singleton;
+ public:
+  /**
+   * Register functions to create user defined classes.
+   * This function is called by the REGISTER_FACTORY macro.
+   * @param id identifier of the creating function/class
+   * @param create_function a function that creates a layer instance
+   */
+  void Register(const std::string id, std::function<T*(void)> func);
+  /**
+   * create a layer  instance by providing its type
+   * @param type the identifier of the layer to be created
+   */
+  T *Create(const std::string id);
+
+ private:
+  //<! Map that stores the registered creation functions
+  std::map<std::string, std::function<T*(void)>> str2func_;
+};
+
+template<typename T>
+void Factory<T>::Register(const std::string id,
+                                        std::function<T*(void)> func) {
+  str2func_[id] = func;
+}
+
+template<typename T>
+T *Factory<T>::Create(const std::string id) {
+  CHECK(str2func_.find(id) != str2func_.end())
+      << "The creation function for " << id << " has not been registered";
+  return str2func_[id]();
+}
+#endif // INCLUDE_UTILS_FACTORY_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/graph.h
----------------------------------------------------------------------
diff --git a/include/utils/graph.h b/include/utils/graph.h
new file mode 100644
index 0000000..ca582b5
--- /dev/null
+++ b/include/utils/graph.h
@@ -0,0 +1,150 @@
+#ifndef INCLUDE_UTILS_GRAPH_H_
+#define INCLUDE_UTILS_GRAPH_H_
+#include <glog/logging.h>
+#include <vector>
+#include <string>
+#include <map>
+#include <stack>
+#include <memory>
+
+using std::vector;
+using std::string;
+using std::map;
+using std::pair;
+using std::shared_ptr;
+using std::make_shared;
+
+
+typedef struct _LayerInfo{
+  // origin identifies the origin of this node, i.e., the corresponding layer
+  string origin;
+  int locationid;// locationidation id;
+  int partitionid;
+  int slice_dimension;
+  int concate_dimension;
+}LayerInfo;
+typedef LayerInfo V;
+
+
+class Node;
+typedef shared_ptr<Node> SNode;
+
+class Node{
+ public:
+  typedef shared_ptr<Node> SNode;
+  Node(string name): name_(name){}
+  Node(string name, const V& v):
+    name_(name), val_(v){}
+
+  void AddDstNode(SNode dstnode){
+    dstnodes_.push_back(dstnode);
+  }
+  void AddSrcNode(SNode srcnode){
+    srcnodes_.push_back(srcnode);
+  }
+
+  void RemoveDstNode(SNode dst){
+    auto iter=dstnodes_.begin();
+    while((*iter)->name_!=dst->name_&&iter!=dstnodes_.end()) iter++;
+    CHECK((*iter)->name_==dst->name_);
+    dstnodes_.erase(iter);
+  }
+  void RemoveSrcNode(SNode src){
+    auto iter=srcnodes_.begin();
+    while((*iter)->name_!=src->name_&&iter!=srcnodes_.end()) iter++;
+    CHECK((*iter)->name_==src->name_);
+    srcnodes_.erase(iter);
+  }
+  const string& name() const {return name_;}
+  const V& val() const {return val_;}
+  const SNode srcnodes(int k) const {return srcnodes_[k]; }
+  const SNode dstnodes(int k) const {return dstnodes_[k]; }
+  const vector<SNode>& srcnodes() const {return srcnodes_; }
+  const vector<SNode>& dstnodes() const {return dstnodes_; }
+  int  dstnodes_size() const {return dstnodes_.size(); }
+  int  srcnodes_size() const {return srcnodes_.size(); }
+
+ private:
+  string name_;
+  vector<SNode> srcnodes_;
+  vector<SNode> dstnodes_;
+
+  V val_;
+    // properties
+  string color_, weight_, shape_;
+};
+
+
+/**
+ * For partition neuralnet and displaying the neuralnet structure
+ */
+class Graph{
+ public:
+  Graph(){}
+  void Sort();
+  const SNode& AddNode(string name, V origin){
+    nodes_.push_back(make_shared<Node>(name, origin));
+    name2node_[name]=nodes_.back();
+    return nodes_.back();
+  }
+  const SNode& AddNode(string name){
+    nodes_.push_back(make_shared<Node>(name));
+    name2node_[name]=nodes_.back();
+    return nodes_.back();
+  }
+
+  void AddEdge(SNode srcnode, SNode dstnode){
+    srcnode->AddDstNode(dstnode);
+    dstnode->AddSrcNode(srcnode);
+  }
+
+  void AddEdge(const string& src, const string& dst){
+    CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src;
+    CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst;
+
+    SNode srcnode=name2node_[src], dstnode=name2node_[dst];
+    AddEdge(srcnode, dstnode);
+  }
+
+  void RemoveEdge(const string &src, const string& dst){
+    CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src;
+    CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst;
+
+    SNode srcnode=name2node_[src], dstnode=name2node_[dst];
+    RemoveEdge(srcnode, dstnode);
+  }
+
+  void RemoveEdge(SNode src, SNode dst){
+    src->RemoveDstNode(dst);
+    dst->RemoveSrcNode(src);
+  }
+
+  const vector<SNode>& nodes() const{
+    return nodes_;
+  };
+
+  const SNode& node(string name) const{
+    CHECK(name2node_.find(name)!= name2node_.end())
+      <<"can't find dst node "<<name;
+    return name2node_.at(name);
+  }
+
+  const string ToString() const;
+  const string ToString(const map<string, string>& info) const ;
+
+  bool Check() const;
+
+  SNode InsertSliceNode(SNode srcnode, const vector<SNode>& dstnodes,
+      const V& info, bool connect_dst=true);
+  SNode InsertConcateNode(const vector<SNode>&srcnodes, SNode dstnode,
+      const V& info);
+  SNode InsertSplitNode(SNode srcnode, const vector<SNode>& dstnodes);
+  std::pair<SNode, SNode> InsertBridgeNode(SNode srcnode, SNode dstnode);
+  void topology_sort_inner(SNode node, map<string, bool> *visited,
+    std::stack<string> *stack);
+
+ private:
+  vector<SNode> nodes_;
+  map<string, SNode> name2node_;
+};
+#endif // INCLUDE_UTILS_GRAPH_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
new file mode 100644
index 0000000..907ef8c
--- /dev/null
+++ b/include/utils/param.h
@@ -0,0 +1,172 @@
+#ifndef INCLUDE_UTILS_PARAM_H_
+#define INCLUDE_UTILS_PARAM_H_
+#include <vector>
+#include <string>
+#include <map>
+#include <functional>
+#include "proto/model.pb.h"
+#include "utils/blob.h"
+#include "communication/msg.h"
+// Base paramter class.
+namespace singa {
+class Param {
+ public:
+  Param();
+  virtual ~Param();
+
+  virtual Msg* GenGetMsg(void* arg=nullptr);
+  virtual Msg* GenPutMsg(void* arg=nullptr);
+  virtual Msg* GenUpdateMsg(void* arg=nullptr);
+  virtual Msg* GenSyncMsg(void* arg=nullptr);
+
+  virtual Msg* HandleGetMsg(Msg** msg);
+  virtual Msg* HandlePutMsg(Msg** msg);
+  virtual int ParseUpdateMsg(Msg** msg);
+  virtual Msg* GenUpdateResponseMsg(void* arg=nullptr);
+  virtual Msg* HandleSyncMsg(Msg** msg);
+
+  virtual int ParseGetResponseMsg(Msg** msg);
+  virtual int ParsePutResponseMsg(Msg** msg);
+  virtual int ParseUpdateResponseMsg(Msg** msg);
+  virtual int ParseSyncResponseMsg(Msg** msg);
+
+  /**
+   * setup param shape
+   */
+  virtual void Setup(const ParamProto& proto, const std::vector<int>& shape, 
int fan_in);
+  /*
+   * fill the data according to initmethod, i.e., random/gaussian/fixed value
+   */
+  virtual void Init(int v=0);
+  void ShareData(shared_ptr<Param> other){
+    owner_=other->id();
+    CHECK(std::equal(data_.shape().begin(), data_.shape().end(),
+          other->data_.shape().begin()));
+    data_.ShareData(other->data_);
+  }
+  float learning_rate_multiplier() {
+    return proto_.learning_rate_multiplier();
+  }
+  float weight_decay_multiplier() {
+    return proto_.weight_decay_multiplier();
+  }
+  /*
+  const int split_threshold(){
+    return proto_.split_threshold();
+  }
+  */
+  /**
+   * if the Param shares data with others, then point to the owner.
+   * otherwise points to itself.
+   */
+  const int owner() const{
+    return owner_;
+  }
+  const std::string& name() {
+    return proto_.name();
+  }
+
+  int id() const{
+    return proto_.id();
+  }
+  void set_id(int id){
+    proto_.set_id(id);
+  }
+
+  int version() const {
+    return proto_.version(); // TODO store version in data blob
+  }
+  void set_version(int v) {
+    proto_.set_version(v); // TODO read version from data blob
+  }
+   /**
+    * @return num of floats.
+    */
+  int size() const {
+    return data_.count();
+  }
+  /**
+   * Return const mem address for the content of this parameter
+   */
+  const Blob<float> &data() {
+    return data_;
+  }
+  Blob<float> *mutable_data() {
+    return &data_;
+  }
+  /**
+   * Return gradient of this parameter
+   */
+  const Blob<float> &grad() {
+    return grad_;
+  }
+  Blob<float> *mutable_grad() {
+    return &grad_;
+  }
+
+  const Blob<float> &history() {
+    return history_;
+  }
+  Blob<float> *mutable_history() {
+    return &history_;
+  }
+
+  float* mutable_cpu_data(){
+    return data_.mutable_cpu_data();
+  }
+  float* mutable_cpu_grad(){
+    return grad_.mutable_cpu_data();
+  }
+  float* mutable_cpu_history(){
+    return history_.mutable_cpu_data();
+  }
+ protected:
+  /**
+   * name of the parameter used to share wights between neuralnets
+   */
+  std::string name_;
+  //! content, gradient, history gradient of this parameter
+  Blob<float> data_, grad_, history_;
+  int owner_;
+
+  ParamProto proto_;
+  int fan_in_;
+};
+/**
+ * Sync with server by randomly sampling some parameters for every sync.
+class RandomSyncParam: public Param{
+ public:
+  virtual zmsg_t* HandleSyncMsg(zmsg_t** msg);
+  virtual zmsg_t *GenSyncMsgFromWorker(float sample_ratio);
+  virtual void ParseSyncMsgFromPS(zmsg_t** msg);
+  virtual void Setup(const ParamProto& proto, const vector<int>& shape, int 
fan_in);
+  virtual void Init();
+
+  float* mutable_cpu_snapshot(){
+    return snapshot_.mutable_cpu_data();
+  }
+  const float* cpu_snapshot(){
+    return snapshot_.cpu_data();
+  }
+
+ protected:
+  const vector<int> RandomSample(int seed, int m, int n);
+
+
+  Blob<float> snapshot_;
+};
+ */
+/**
+ * Sync with server by elastic SGD see http://arxiv.org/abs/1412.6651.
+class ElasticParam: public Param{
+ public:
+  virtual zmsg_t* HandleSyncMsg(zmsg_t** msg);
+  virtual zmsg_t *GenSyncMsgFromWorker(float moving_rate);
+  virtual void ParseSyncMsgFromPS(zmsg_t** msg);
+};
+ */
+
+
+}  // namespace singa
+
+#endif  // INCLUDE_UTILS_PARAM_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/singleton.h
----------------------------------------------------------------------
diff --git a/include/utils/singleton.h b/include/utils/singleton.h
new file mode 100644
index 0000000..2e2bdfb
--- /dev/null
+++ b/include/utils/singleton.h
@@ -0,0 +1,41 @@
+#ifndef INCLUDE_UTILS_SINGLETON_H_
+#define INCLUDE_UTILS_SINGLETON_H_
+
+template<typename T>
+class Singleton {
+ public:
+  static T* Instance() {
+    if (data_==nullptr) {
+      data_ = new T();
+    }
+    return data_;
+  }
+ private:
+  static T* data_;
+};
+
+template<typename T> T* Singleton<T>::data_ = nullptr;
+
+
+/**
+ * Singleton initiated with argument
+ */
+template<typename T, typename X=int>
+class ASingleton {
+ public:
+  static T* Instance(){
+    return data_;
+  }
+  static T* Instance(X x) {
+    if (data_==nullptr) {
+      data_ = new T(x);
+    }
+    return data_;
+  }
+ private:
+  static T* data_;
+};
+
+template<typename T, typename X> T* ASingleton<T,X>::data_ = nullptr;
+
+#endif // INCLUDE_UTILS_SINGLETON_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/updater.h
----------------------------------------------------------------------
diff --git a/include/utils/updater.h b/include/utils/updater.h
new file mode 100644
index 0000000..2a6dd43
--- /dev/null
+++ b/include/utils/updater.h
@@ -0,0 +1,78 @@
+#ifndef INCLUDE_UTILS_UPDATER_H_
+#define INCLUDE_UTILS_UPDATER_H_
+#include "proto/model.pb.h"
+#include "utils/param.h"
+
+namespace singa{
+/**
+ * Updater for Param.
+ */
+class Updater{
+ public:
+  virtual void Init(const UpdaterProto &proto){
+    proto_=proto;
+  }
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f)=0;
+
+  float GetLearningRate(int step);
+ protected:
+  UpdaterProto proto_;
+};
+class SGDUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float momentum_;
+  float weight_decay_;
+};
+class NesterovUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float momentum_;
+  float weight_decay_;
+};
+class AdaGradUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float delta_;
+  float weight_decay_;
+};
+
+class RMSPropUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float delta_;
+  float rho_;
+  float weight_decay_;
+};
+
+/*
+class AdaDeltaUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float 
grad_scale=1.0f);
+
+ protected:
+  float rho_;
+  float delta_;
+  float weight_decay_;
+};
+*/
+}
+
+#endif // INCLUDE_UTILS_UPDATER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/script/node.sh
----------------------------------------------------------------------
diff --git a/script/node.sh b/script/node.sh
new file mode 100755
index 0000000..74e0d8a
--- /dev/null
+++ b/script/node.sh
@@ -0,0 +1,71 @@
+#!/bin/bash
+if [[ $# < 2 || ! -f $2 ]]
+then
+  echo "Usage: process/folder management"
+  echo "[cat, create, delete, kill, ls, ps, reset, scp, ssh] hostfile [args]"
+  echo "   cat hostfile file--- cat the file on every node in hostfile"
+  echo "   create hostfile folder--- create the folder on every node in 
hostfile"
+  echo "   delete hostfile folder--- delete the folder on every node in 
hostfile"
+  echo "   kill hostfile job_name---  kill the job on every node in hostfile"
+  echo "   ls hostfile folder--- list the folder on every node in hostfile"
+  echo "   ps hostfile job_name---  ps aux|grep job_name on every node in 
hostfile"
+  echo "   reset hostfile folder--- delete and create the folder on every node 
in hostfile"
+  echo "   scp hostfile local_dir [remote_dir]--- copy the local_dir to 
remote_dir on every node in hostfile, if remote_dir is omitted, 
remote_dir=local_dir"
+  echo "   ssh hostfile--- test whether the nodes in hostfile are alive"
+  echo "each line in hostfile is a node name followed by a space and other 
fields"
+  exit
+fi
+
+ssh_options="-oStrictHostKeyChecking=no \
+-oUserKnownHostsFile=/dev/null \
+-oLogLevel=quiet"
+
+hosts=(`cat $2 |cut -d ' ' -f 1`)
+
+for i in ${hosts[@]}
+do
+  if [ $1 == "cat" ]
+  then
+    cmd="cat $3"
+  elif [ $1 == "create" -o $1 == "reset" ]
+  then
+    cmd="mkdir -p $3"
+  elif [ $1 == "delete" -o $1 == "reset" ]
+  then
+    cmd="rm -rf $3"
+  elif [ $1 == "kill" ]
+  then
+    cmd="ps ax|pgrep $3 |xargs kill"
+  elif [ $1 == "ls" ]
+  then
+    cmd="ls -l $3"
+  elif [ $1 == "scp" ]
+  then
+    local_dir=$3
+    remote_dir=$3
+    if [ $# -eq 4 ]
+    then
+      remote_dir=$4
+    fi
+    r=''
+    if [[ -d $3 ]]
+    then
+      r='-r'
+    fi
+    echo "scp $r $local_dir $i:$remote_dir"
+    scp $r $local_dir $i:$remote_dir
+  elif [ $1 == "ssh" ]
+  then
+    cmd="exit"
+  elif [ $1 == "ps" ]
+  then
+    cmd="ps ax|pgrep $3"
+  else
+    echo "Incorrect commands:" $1
+  fi
+  if [ $1 != "scp" ]
+  then
+    echo $cmd
+    ssh $i $cmd
+  fi
+done

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
new file mode 100644
index 0000000..80f2304
--- /dev/null
+++ b/src/communication/msg.cc
@@ -0,0 +1,5 @@
+#include "communication/msg.h"
+
+namespace singa {
+} /* singa */
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
new file mode 100644
index 0000000..279d758
--- /dev/null
+++ b/src/communication/socket.cc
@@ -0,0 +1,118 @@
+#include "communication/socket.h"
+
+namespace singa {
+Poller::Poller(){
+  poller_=zpoller_new(NULL);
+}
+
+void Poller::Add(Socket* socket){
+  zsock_t* zsock=static_cast<zsock_t*>(socket->InternalID());
+  zpoller_add(poller_, zsock);
+  zsock2Socket_[zsock]=socket;
+}
+
+Socket* Poller::Wait(int timeout){
+  zsock_t* sock=(zsock_t*)zpoller_wait(poller_, timeout);
+  if(sock!=NULL)
+    return zsock2Socket_[sock];
+  else return nullptr;
+}
+
+Dealer::Dealer(int id):id_(id){
+  dealer_=zsock_new(ZMQ_DEALER);
+  CHECK_NOTNULL(dealer_);
+  poller_=zpoller_new(dealer_);
+}
+
+int Dealer::Connect(string endpoint){
+  if(endpoint.length())
+    CHECK_EQ(zsock_connect(dealer_,endpoint.c_str()),0);
+  return 1;
+}
+int Dealer::Send(Msg *msg){
+  zmsg_t* zmsg=(static_cast<Msg*>(msg))->DumpToZmsg();
+  zmsg_send(&zmsg, dealer_);
+  delete msg;
+  return 1;
+}
+
+Msg* Dealer::Receive(){
+  zmsg_t* zmsg=zmsg_recv(dealer_);
+  if(zmsg==NULL)
+    return nullptr;
+  Msg* msg=new Msg();
+  msg->ParseFromZmsg(zmsg);
+  return msg;
+}
+Dealer::~Dealer(){
+  zsock_destroy(&dealer_);
+}
+
+Router::Router(int bufsize){
+  nBufmsg_=0;
+  bufsize_=bufsize;
+  router_=zsock_new(ZMQ_ROUTER);
+  CHECK_NOTNULL(router_);
+  poller_=zpoller_new(router_);
+}
+int Router::Bind(string endpoint){
+  if(endpoint.length())
+    CHECK_EQ(zsock_bind(router_, endpoint.c_str()),0);
+  return 1;
+}
+
+int Router::Send(Msg *msg){
+  zmsg_t* zmsg=static_cast<Msg*>(msg)->DumpToZmsg();
+  int dstid=static_cast<Msg*>(msg)->dst();
+  if(id2addr_.find(dstid)!=id2addr_.end()){
+    // the connection has already been set up
+    zframe_t* addr=zframe_dup(id2addr_[dstid]);
+    zmsg_prepend(zmsg, &addr);
+    zmsg_send(&zmsg, router_);
+  }else{
+    // the connection is not ready, buffer the message
+    if(bufmsg_.size()==0)
+      nBufmsg_=0;
+    bufmsg_[dstid].push_back(zmsg);
+    nBufmsg_++;
+    CHECK_LE(nBufmsg_, bufsize_);
+  }
+  delete msg;
+  return 1;
+}
+
+Msg* Router::Receive(){
+  zmsg_t* zmsg=zmsg_recv(router_);
+  if(zmsg==NULL)
+    return nullptr;
+  zframe_t* dealer=zmsg_pop(zmsg);
+  Msg* msg=new Msg();
+  msg->ParseFromZmsg(zmsg);
+  if (id2addr_.find(msg->src())==id2addr_.end()){
+    // new connection, store the sender's identfier and send buffered messages
+    // for it
+    id2addr_[msg->src()]=dealer;
+    if(bufmsg_.find(msg->src())!=bufmsg_.end()){
+      for(auto& it: bufmsg_.at(msg->src())){
+        zframe_t* addr=zframe_dup(dealer);
+        zmsg_prepend(it, &addr);
+        zmsg_send(&it, router_);
+      }
+      bufmsg_.erase(msg->src());
+    }
+  }
+  else
+    zframe_destroy(&dealer);
+  return msg;
+}
+
+Router::~Router(){
+  zsock_destroy(&router_);
+  for(auto it: id2addr_)
+    zframe_destroy(&it.second);
+  for(auto it: bufmsg_){
+    for(auto *msg: it.second)
+      zmsg_destroy(&msg);
+  }
+}
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
new file mode 100644
index 0000000..89306d8
--- /dev/null
+++ b/src/main.cc
@@ -0,0 +1,49 @@
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "trainer/trainer.h"
+
+/**
+ * \file main.cc is the main entry of SINGA, like the driver program for 
Hadoop.
+ *
+ * 1. Users register their own implemented classes, e.g., layer, updater, etc.
+ * 2. Users prepare the google protobuf object for the model configuration and
+ * the cluster configuration.
+ * 3. Users call trainer to start the training.
+ *
+ * TODO
+ * 1. Add the resume function to continue training from a previously stopped
+ * point.
+ * 2. Add helper functions for users to configure their model and cluster
+ * easily, e.g., AddLayer(layer_type, source_layers, meta_data).
+ */
+
+DEFINE_int32(procsID, 0, "Global process ID");
+DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
+DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
+
+/**
+ * Register layers, and other customizable classes.
+ *
+ * If users want to use their own implemented classes, they should register
+ * them here. Refer to the Worker::RegisterDefaultClasses()
+ */
+void RegisterClasses(const singa::ModelProto& proto){
+}
+
+int main(int argc, char **argv) {
+  // TODO set log dir
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  singa::ClusterProto cluster;
+  singa::ReadProtoFromTextFile(FLAGS_cluster.c_str(), &cluster);
+  singa::ModelProto model;
+  singa::ReadProtoFromTextFile(FLAGS_model.c_str(), &model);
+  LOG(INFO)<<"The cluster config is\n"<<cluster.DebugString();
+  LOG(INFO)<<"The model config is\n"<<model.DebugString();
+
+  RegisterClasses(model);
+  singa::Trainer trainer;
+  trainer.Start(model, cluster, FLAGS_procsID);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/base_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/base_layer.cc b/src/neuralnet/base_layer.cc
new file mode 100644
index 0000000..50fc396
--- /dev/null
+++ b/src/neuralnet/base_layer.cc
@@ -0,0 +1,194 @@
+#include <opencv2/highgui/highgui.hpp>
+#include <opencv2/imgproc/imgproc.hpp>
+#include <cblas.h>
+#include <math.h>
+#include <cfloat>
+#include "neuralnet/base_layer.h"
+namespace singa {
+/*****************************************************************************
+ * Implementation for Layer
+ *****************************************************************************/
+void Layer::Init(const LayerProto &proto) {
+  layer_proto_=proto;
+}
+
+void Layer::Init(const Layer& other, const vector<int>& shape){
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+  layer_proto_=other.layer_proto_;
+}
+void Layer::Setup(){
+  Setup(layer_proto_, srclayers_);
+}
+void Layer::SetupAfterPartition(){
+  vector<int> shape=data_.shape();
+  SetupAfterPartition(layer_proto_, shape, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+  CHECK(std::equal(shape.begin(), shape.end(), data_.shape().begin()))<<name()
+    <<IntVecToString(shape)<<"--"<<IntVecToString(data_.shape());
+}
+void Layer::ComputeFeature(bool training){
+  ComputeFeature(training, srclayers_);
+}
+void Layer::ComputeGradient(){
+  ComputeGradient(srclayers_);
+}
+
+void Layer::ToProto(LayerProto *proto, bool copyData) {
+}
+void BridgeSrcLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+}
+void BridgeSrcLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void BridgeSrcLayer::ComputeFeature(bool training,
+    const vector<SLayer>& srclayers){
+  if(training)
+    ready_=false;
+  else
+    ready_=true;
+}
+void BridgeSrcLayer::ComputeGradient(const vector<SLayer>& srclayers){
+
+}
+void BridgeDstLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+}
+void BridgeDstLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void BridgeDstLayer::ComputeFeature(bool training,
+    const vector<SLayer>& srclayers){
+  if(training)
+    ready_=true;
+  else
+    ready_=false;
+}
+void BridgeDstLayer::ComputeGradient(const vector<shared_ptr<Layer>>& 
srclayers){
+
+}
+
+/*******************************
+ * Implementation for ConcateLayer
+ *******************************/
+void ConcateLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  size_t concate_dim=proto.concate_param().concate_dimension();
+  CHECK_GE(concate_dim,0);
+  CHECK_GT(srclayers.size(),1);
+  vector<int> shape=srclayers[0]->data(this).shape();
+  for(size_t i=1;i<srclayers.size();i++){
+    const vector<int>& srcshape=srclayers[i]->data(this).shape();
+    for(size_t j=0;j<shape.size();j++)
+      if(j==concate_dim)
+        shape[j]+=srcshape[j];
+      else
+        CHECK_EQ(shape[j], srcshape[j]);
+  }
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+}
+
+void ConcateLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+//  LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void ConcateLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers){}
+
+void ConcateLayer::ComputeGradient(const vector<shared_ptr<Layer>>& 
srclayers){}
+/*****************************************************************************
+ * Implementation for SliceLayer
+ *****************************************************************************/
+void SliceLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  int slice_dim=proto.slice_param().slice_dimension();
+  int slice_num=proto.slice_param().slice_num();
+  CHECK_GE(slice_dim,0);
+  CHECK_EQ(slice_num, dstlayers_.size());
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+  datavec_.resize(slice_num);
+  gradvec_.resize(slice_num);
+  //LOG(ERROR)<<"slice dim "<<slice_dim<<" slice num "<<slice_num;
+  for(int i=0;i<slice_num;i++){
+    vector<int> newshape(data_.shape());
+    newshape[slice_dim]=newshape[slice_dim]/slice_num+
+      ((i==slice_num-1)?newshape[slice_dim]%slice_num:0);
+    datavec_[i].Reshape(newshape);
+    gradvec_[i].Reshape(newshape);
+    //LOG(ERROR)<<"slice "<<IntVecToString(newshape);
+  }
+}
+
+void SliceLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+
+int SliceLayer::SliceID(const Layer* layer) const {
+  CHECK(layer!= nullptr);
+  for(size_t i=0;i<datavec_.size();i++){
+    //LOG(ERROR)<<"get slice "<<IntVecToString(shapes_[i]);
+    if(dstlayers_[i].get() == layer)
+      return i;
+  }
+  CHECK(false);
+  return -1;
+}
+
+const Blob<float>& SliceLayer::data(const Layer* layer) const {
+  if(layer==nullptr)
+    return data_;
+  return datavec_[SliceID(layer)];
+}
+const Blob<float>& SliceLayer::grad(const Layer* layer) const {
+  if(layer==nullptr)
+    return grad_;
+  return gradvec_[SliceID(layer)];
+}
+Blob<float>* SliceLayer::mutable_data(const Layer* layer) {
+  if(layer==nullptr)
+    return &data_;
+  return &datavec_[SliceID(layer)];
+}
+Blob<float>* SliceLayer::mutable_grad(const Layer* layer){
+  if(layer==nullptr)
+    return &grad_;
+  return &gradvec_[SliceID(layer)];
+}
+void SliceLayer::ComputeFeature(bool training, const 
vector<shared_ptr<Layer>>& srclayers){}
+void SliceLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){}
+
+void SplitLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.Reshape(srclayers[0]->data(this).shape());
+}
+
+void SplitLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+void SplitLayer::ComputeFeature(bool training, const 
vector<shared_ptr<Layer>>& srclayers){
+
+}
+void SplitLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){
+
+}
+
+}  // namespace singa
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc
new file mode 100644
index 0000000..d45bcc0
--- /dev/null
+++ b/src/neuralnet/layer.cc
@@ -0,0 +1,781 @@
+#include <glog/logging.h>
+#include <memory>
+#include <algorithm>
+#include <opencv2/highgui/highgui.hpp>
+#include <opencv2/imgproc/imgproc.hpp>
+#include "mshadow/tensor.h"
+#include "mshadow/cxxnet_op.h"
+#include "neuralnet/layer.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+
+using namespace mshadow;
+using namespace mshadow::expr;
+
+namespace singa {
+
+/************ Implementation for ConvProductLayer*************************/
+void ConvolutionLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  ConvolutionProto conv_param=proto.convolution_param();
+  kernel_=conv_param.kernel();
+  CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
+  pad_=conv_param.pad();
+  stride_=conv_param.stride();
+  num_filters_=conv_param.num_filters();
+  const vector<int>& srcshape=srclayers[0]->data(this).shape();
+  int dim=srcshape.size();
+  CHECK_GT(dim, 2);
+  width_=srcshape[dim-1];
+  height_=srcshape[dim-2];
+  if(dim>3)
+    channels_=srcshape[dim-3];
+  else if(dim>2)
+    channels_=1;
+  batchsize_=srcshape[0];
+  conv_height_=(height_ + 2 * pad_ - kernel_) / stride_ + 1;
+  conv_width_= (width_ + 2 * pad_ - kernel_) / stride_ + 1;
+  col_height_=channels_*kernel_*kernel_;
+  col_width_=conv_height_*conv_width_;
+  vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+  col_data_.Reshape(vector<int>{col_height_, col_width_});
+  col_grad_.Reshape(vector<int>{col_height_, col_width_});
+
+  Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
+  weight_=shared_ptr<Param>(factory->Create("Param"));
+  weight_->Setup(proto.param(0), vector<int>{num_filters_, col_height_}, 
col_height_);
+  bias_=shared_ptr<Param>(factory->Create("Param"));
+  bias_->Setup(proto.param(1), vector<int>{num_filters_},0);
+}
+
+void ConvolutionLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  LayerProto newproto(proto);
+  ConvolutionProto *conv_param=newproto.mutable_convolution_param();
+  conv_param->set_num_filters(shape[1]);
+  Setup(newproto, srclayers);
+}
+
+void ConvolutionLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers){
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 3> data(data_.mutable_cpu_data(),
+      Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
+  Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
+      Shape2(num_filters_, col_height_));
+  Tensor<cpu, 1> bias(bias_->mutable_cpu_data(),
+      Shape1(num_filters_));
+
+  for(int n=0;n<batchsize_;n++){
+    if(pad_>0)
+      col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
+    else
+      col=unpack_patch2col(src[n], kernel_, stride_);
+    data[n]=dot(weight, col);
+  }
+  data+=broadcast<1>(bias, data.shape);
+}
+
+void ConvolutionLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
+      Shape2(num_filters_, col_height_));
+
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this);
+  Tensor<cpu, 4> gsrc(Shape4(batchsize_, channels_, height_, width_));
+  if(gsrcblob!=nullptr)
+    gsrc.dptr=gsrcblob->mutable_cpu_data();
+  Tensor<cpu, 3> grad(grad_.mutable_cpu_data(),
+      Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
+  Tensor<cpu, 2> gcol(col_grad_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(),
+      Shape2(num_filters_, col_height_));
+  Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(),
+      Shape1(num_filters_));
+
+  gweight=0.0f;
+  gbias=sumall_except_dim<1>(grad);
+  Shape<3> padshape(gsrc.shape.SubShape());
+  padshape[0]+=2*pad_;padshape[1]+=2*pad_;
+  Shape<2> imgshape=Shape2(height_, width_);
+  for(int n=0;n<batchsize_;n++){
+    if(pad_>0)
+      col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
+    else
+      col=unpack_patch2col(src[n], kernel_, stride_);
+    gweight+=dot(grad[n], col.T());
+
+    if(gsrcblob!=nullptr){
+      gcol=dot(weight.T(), grad[n]);
+      gsrc[n]=crop(pack_col2patch(gcol, padshape, kernel_, stride_), imgshape);
+    }
+  }
+}
+
+/****************** Implementation for DropoutLayer ***********************/
+void DropoutLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(*srclayers[0]->mutable_grad(this));
+  mask_.Reshape(srclayers[0]->data(this).shape());
+  pdrop_=proto.dropout_param().dropout_ratio();
+  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+  ASingleton<Random<cpu>>::Instance(seed);
+}
+
+void DropoutLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void DropoutLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers) {
+  // check training
+  if(!training){
+    data_.CopyFrom(srclayers[0]->data());
+    return;
+  }
+  float pkeep=1-pdrop_;
+  Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
+  mask = F<op::threshold>(ASingleton<Random<cpu>>::Instance()\
+      ->uniform(mask.shape), pkeep ) * (1.0f/pkeep);
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Blob<float>* srcblob=srclayers[0]->mutable_data();
+  Tensor<cpu, 1> src(srcblob->mutable_cpu_data(), Shape1(srcblob->count()));
+  data=src*mask;
+}
+
+void DropoutLayer::ComputeGradient(const vector<SLayer>& srclayers)  {
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad();
+  Tensor<cpu, 1> gsrc(gsrcblob->mutable_cpu_data(), Shape1(gsrcblob->count()));
+  gsrc=grad*mask;
+}
+/**************** Implementation for InnerProductLayer********************/
+void InnerProductLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  const auto& src=srclayers[0]->data(this);
+  batchsize_=src.shape()[0];
+  vdim_=src.count()/batchsize_;
+  hdim_=proto.inner_product_param().num_output();
+  data_.Reshape(vector<int>{batchsize_, hdim_});
+  grad_.ReshapeLike(data_);
+  Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
+  weight_=shared_ptr<Param>(factory->Create("Param"));
+  bias_=shared_ptr<Param>(factory->Create("Param"));
+  weight_->Setup(proto.param(0), vector<int>{vdim_, hdim_}, vdim_*hdim_);
+  bias_->Setup(proto.param(1), vector<int>{hdim_},0);
+}
+void InnerProductLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  LayerProto newproto(proto);
+  InnerProductProto * innerproto=newproto.mutable_inner_product_param();
+  innerproto->set_num_output(shape[1]);
+  Setup(newproto, srclayers);
+}
+
+void InnerProductLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers) {
+  Tensor<cpu, 2> data(data_.mutable_cpu_data(), Shape2(batchsize_,hdim_));
+  CHECK_EQ(srclayers[0]->data().count(), batchsize_*vdim_);
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(),
+      Shape2(batchsize_,vdim_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), Shape1(hdim_));
+  data=dot(src, weight);
+  // repmat: repeat bias vector into batchsize rows
+  data+=repmat(bias, batchsize_);
+}
+
+void InnerProductLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(),
+      Shape2(batchsize_,vdim_));
+  Tensor<cpu, 2> grad(grad_.mutable_cpu_data(),Shape2(batchsize_,hdim_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), Shape1(hdim_));
+
+  gbias=sum_rows(grad);
+  gweight=dot(src.T(), grad);
+  if(srclayers[0]->mutable_grad(this)!=nullptr){
+    Tensor<cpu, 2> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+        Shape2(batchsize_,vdim_));
+    gsrc=dot(grad, weight.T());
+  }
+}
+/*****************************************************************************
+ * Implementation for LabelLayer
+ *****************************************************************************/
+void LabelLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  data_.Reshape(vector<int>{batchsize});
+}
+
+void LabelLayer::ParseRecords(bool training, const vector<Record>& records, 
Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  float *label= blob->mutable_cpu_data() ;
+  int rid=0;
+  for(const Record& record: records){
+    label[rid++]=record.image().label();
+    CHECK_LT(record.image().label(),10);
+  }
+  CHECK_EQ(rid, blob->shape()[0]);
+}
+
+
+/*********************LMDBDataLayer**********************************/
+void LMDBDataLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers){
+  if(random_skip_){
+    int nskip=rand()%random_skip_;
+    int n=0;
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    while (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_NEXT) == MDB_SUCCESS)
+      n++;
+    LOG(INFO)<<"Random Skip "<<nskip<<" records of total "<<n<<"records";
+    // We have reached the end. Restart from the first.
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    for(int i=0;i<nskip;i++){
+      if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+            &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+        // We have reached the end. Restart from the first.
+        DLOG(INFO) << "Restarting data prefetching from start.";
+        CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+              &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+      }
+    }
+    random_skip_=0;
+  }
+  Datum datum;
+  for(auto& record: records_){
+    SingleLabelImageRecord* image=record.mutable_image();
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
+    datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size);
+    ConvertDatumToSingleLableImageRecord(datum, image);
+    if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+      // We have reached the end. Restart from the first.
+      DLOG(INFO) << "Restarting data prefetching from start.";
+      CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+            &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    }
+  }
+}
+
+void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum,
+    SingleLabelImageRecord* record){
+  record->set_label(datum.label());
+  record->clear_shape();
+  if(datum.has_channels())
+    record->add_shape(datum.channels());
+  if(datum.has_height())
+    record->add_shape(datum.height());
+  if(datum.has_width())
+    record->add_shape(datum.width());
+  if(datum.has_data())
+    record->set_pixel(datum.data());
+  if(datum.float_data_size()){
+    record->clear_data();
+    for(float x: datum.float_data())
+      record->add_data(x);
+  }
+}
+
+void LMDBDataLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed";
+  CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB
+  CHECK_EQ(mdb_env_open(mdb_env_,
+        proto.data_param().path().c_str(),
+        MDB_RDONLY, 0664), MDB_SUCCESS) << "cannot open lmdb "
+    << proto.data_param().path();
+  CHECK_EQ(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_), MDB_SUCCESS)
+    << "mdb_txn_begin failed";
+  CHECK_EQ(mdb_open(mdb_txn_, NULL, 0, &mdb_dbi_), MDB_SUCCESS)
+    << "mdb_open failed";
+  CHECK_EQ(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_), MDB_SUCCESS)
+    << "mdb_cursor_open failed";
+  LOG(INFO) << "Opening lmdb " << proto.data_param().path();
+  CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_FIRST),
+      MDB_SUCCESS) << "mdb_cursor_get failed";
+
+  if (mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT)
+      != MDB_SUCCESS) {
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_,
+          MDB_FIRST), MDB_SUCCESS);
+  }
+  Datum datum;
+  datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size);
+  SingleLabelImageRecord* record=sample_.mutable_image();
+  ConvertDatumToSingleLableImageRecord(datum, record);
+
+  batchsize_=proto.data_param().batchsize();
+  records_.resize(batchsize_);
+  random_skip_=proto.data_param().random_skip();
+}
+
+/***************** Implementation for LRNLayer *************************/
+void LRNLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  lsize_ = proto.lrn_param().local_size();
+  CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol";
+  knorm_=proto.lrn_param().knorm();
+  alpha_ = proto.lrn_param().alpha();
+  beta_ = proto.lrn_param().beta();
+
+  const vector<int>& s=srclayers[0]->data(this).shape();
+  data_.Reshape(s);
+  grad_.Reshape(s);
+  norm_.Reshape(s);
+  batchsize_=s[0];
+  channels_=s[1];
+  height_=s[2];
+  width_=s[3];
+}
+
+void LRNLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void LRNLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  const float salpha = alpha_ / lsize_;
+  Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
+  // stores normalizer without power
+  norm= chpool<red::sum>( F<op::square>(src) , lsize_ ) * salpha + knorm_;
+  data = src * F<op::power>(norm, -beta_ );
+}
+
+void LRNLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  const float salpha = alpha_ / lsize_;
+  Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s);
+  Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), s);
+
+  gsrc = grad * F<op::power>( norm, -beta_ );
+  gsrc += ( - 2.0f * beta_ * salpha ) * chpool<red::sum>(
+      grad * src * F<op::power>( norm, -beta_-1.0f ), lsize_ )  * src;
+}
+
+/**************** Implementation for MnistImageLayer******************/
+
+void MnistImageLayer::ParseRecords(bool training, const vector<Record>& 
records,
+    Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  int ndim=records.at(0).image().shape_size();
+  int inputsize =records.at(0).image().shape(ndim-1);
+
+  float* dptr=blob->mutable_cpu_data();
+  for(const Record& record: records){
+    // copy from record to cv::Mat
+    cv::Mat input(inputsize, inputsize, CV_32FC1);
+    const SingleLabelImageRecord& imagerecord=record.image();
+    if(imagerecord.pixel().size()){
+      string pixel=imagerecord.pixel();
+      for(int i=0,k=0;i<inputsize;i++)
+        for(int j=0;j<inputsize;j++)
+          // NOTE!!! must cast pixel to uint8_t then to float!!! waste a lot of
+          // time to debug this
+          
input.at<float>(i,j)=static_cast<float>(static_cast<uint8_t>(pixel[k++]));
+    }else{
+      for(int i=0,k=0;i<inputsize;i++)
+        for(int j=0;j<inputsize;j++)
+          input.at<float>(i,j)=imagerecord.data(k++);
+    }
+    int size=blob->shape()[1];
+    /*
+    cv::Mat resizeMat=input;
+    // affine transform, scaling, rotation and shearing
+    if(gamma_){
+      float r1=rand_real()*2-1;
+      float r2=rand_real()*2-1;
+      int h=static_cast<int>(inputsize*(1.+r1*gamma_/100.0));
+      int w=static_cast<int>(inputsize*(1.+r2*gamma_/100.0));
+      cv::resize(input, resizeMat, cv::Size(h,w));
+    }
+    cv::Mat betaMat=resizeMat;
+    cv::Mat warpmat(2,3, CV_32FC1);
+    warpmat.at<float>(0,0)=1.0;
+    warpmat.at<float>(0,1)=0.0;
+    warpmat.at<float>(0,2)=0.0;
+    warpmat.at<float>(1,0)=0.0;
+    warpmat.at<float>(1,1)=1.0;
+    warpmat.at<float>(1,2)=0.0;
+
+    if(beta_){
+      float r=rand_real()*2-1;
+      if(rand() % 2){ // rotation
+        cv::Point center(resizeMat.rows/2, resizeMat.cols/2);
+        warpmat=cv::getRotationMatrix2D(center, r*beta_, 1.0);
+      }else{
+        //shearing
+        warpmat.at<float>(0,1)=r*beta_/90;
+        if(imagerecord.label()==1 ||imagerecord.label()==7)
+          warpmat.at<float>(0,1)/=2.0;
+      }
+    }
+    cv::warpAffine(resizeMat, betaMat, warpmat, cv::Size(size, size));
+    */
+
+    for(int i=0;i<size;i++){
+      for(int j=0;j<size;j++){
+        *dptr=input.at<float>(i,j)/norm_a_-norm_b_;
+        dptr++;
+      }
+    }
+  }
+  CHECK_EQ(dptr, blob->mutable_cpu_data()+blob->count());
+}
+void MnistImageLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+  kernel_=proto.mnist_param().kernel();
+  sigma_=proto.mnist_param().sigma();
+  alpha_=proto.mnist_param().alpha();
+  beta_=proto.mnist_param().beta();
+  gamma_=proto.mnist_param().gamma();
+  resize_=proto.mnist_param().resize();
+  norm_a_=proto.mnist_param().norm_a();
+  norm_b_=proto.mnist_param().norm_b();
+  elastic_freq_=proto.mnist_param().elastic_freq();
+
+  int ndim=sample.image().shape_size();
+  CHECK_GE(ndim,2);
+  if(resize_)
+    data_.Reshape(vector<int>{batchsize, resize_, resize_});
+  else{
+    int s=sample.image().shape(ndim-1);
+    CHECK_EQ(s,sample.image().shape(ndim-2));
+    data_.Reshape(vector<int>{batchsize, s, s });
+  }
+}
+
+/******************** Implementation for PoolingLayer******************/
+void PoolingLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  PoolingProto pool_param = proto.pooling_param();
+  kernel_=pool_param.kernel();
+  stride_=pool_param.stride();
+  CHECK_LT(pad_, kernel_);
+  pool_=proto.pooling_param().pool();
+  CHECK(pool_ == PoolingProto_PoolMethod_AVE
+        || pool_ == PoolingProto_PoolMethod_MAX)
+      << "Padding implemented only for average and max pooling.";
+
+  const auto& srcshape=srclayers[0]->data(this).shape();
+  int dim=srcshape.size();
+  CHECK_GT(dim,2);
+  width_ = srcshape[dim-1];
+  height_ = srcshape[dim-2];
+  if(dim>3)
+    channels_ = srcshape[dim-3];
+  else
+    channels_=1;
+  batchsize_=srcshape[0];
+  pooled_height_ = static_cast<int>((height_ - kernel_) / stride_) + 1;
+  pooled_width_ = static_cast<int>(( width_ - kernel_) / stride_) + 1;
+  data_.Reshape(vector<int>{batchsize_, channels_, pooled_height_, 
pooled_width_});
+  grad_.ReshapeLike(data_);
+}
+
+void PoolingLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void PoolingLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers){
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(),
+      Shape4(batchsize_, channels_, pooled_height_, pooled_width_));
+  if(pool_ == PoolingProto_PoolMethod_MAX)
+    data=pool<red::maximum>(src, kernel_, stride_);
+  else if(pool_ == PoolingProto_PoolMethod_AVE)
+    data=pool<red::sum>(src, kernel_, stride_)
+      *(1.0f/(kernel_*kernel_));
+}
+
+/*
+ * partition only on num/channel dim
+ * assume grad and data have the same paritition
+ */
+void PoolingLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Shape<4> s1= Shape4(batchsize_, channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),s1);
+  Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),s1);
+  Shape<4> s2= Shape4(batchsize_, channels_, pooled_height_, pooled_width_);
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(), s2);
+  Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s2);
+  if(pool_ == PoolingProto_PoolMethod_MAX)
+      gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_);
+  else if(pool_ == PoolingProto_PoolMethod_AVE)
+      gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_)
+        *(1.0f/(kernel_*kernel_));
+}
+
+/***************** Implementation for ReLULayer *****************************/
+
+void ReLULayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data());
+  grad_.ReshapeLike(*(srclayers[0]->mutable_grad()));
+}
+
+void ReLULayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void ReLULayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  data=F<op::relu>(src);
+}
+
+void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  gsrc=F<op::relu_grad>(data)*grad;
+}
+
+/*************** Implementation for RGBImageLayer *************************/
+
+void RGBImageLayer::ParseRecords(bool training, const vector<Record>& records,
+    Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  const vector<int>& s=blob->shape();
+  Tensor<cpu, 4> images(blob->mutable_cpu_data(), Shape4(s[0],s[1],s[2],s[3]));
+  const SingleLabelImageRecord& r=records.at(0).image();
+  Tensor<cpu, 3> raw_image(Shape3(r.shape(0),r.shape(1),r.shape(2)));
+  AllocSpace(raw_image);
+  Tensor<cpu, 3> croped_image(Shape3(s[1],s[2],s[3]));
+  if(cropsize_)
+    AllocSpace(croped_image);
+    //CHECK(std::equal(croped_image.shape(), raw_image.shape());
+  int rid=0;
+  const float* meandptr=mean_.cpu_data();
+  for(const Record& record: records){
+    auto image=images[rid];
+    bool do_crop=cropsize_>0&&training;
+    bool do_mirror=mirror_&&rand()%2&&training;
+    float* dptr=nullptr;
+    if(do_crop||do_mirror)
+      dptr=raw_image.dptr;
+    else
+      dptr=image.dptr;
+    if(record.image().pixel().size()){
+      string pixel=record.image().pixel();
+      for(size_t i=0;i<pixel.size();i++)
+        dptr[i]=static_cast<float>(static_cast<uint8_t>(pixel[i]));
+    }else {
+      memcpy(dptr, record.image().data().data(),
+          sizeof(float)*record.image().data_size());
+    }
+    for(int i=0;i<mean_.count();i++)
+      dptr[i]-=meandptr[i];
+
+    if(do_crop){
+      int hoff=rand()%(r.shape(1)-cropsize_);
+      int woff=rand()%(r.shape(2)-cropsize_);
+      Shape<2> cropshape=Shape2(cropsize_, cropsize_);
+      if(do_mirror){
+        croped_image=crop(raw_image, cropshape, hoff, woff);
+        image=mirror(croped_image);
+      }else{
+        image=crop(raw_image, cropshape, hoff, woff);
+      }
+    }else if(do_mirror){
+      image=mirror(raw_image);
+    }
+    rid++;
+  }
+  if(scale_)
+    images=images*scale_;
+
+  FreeSpace(raw_image);
+  if(cropsize_)
+    FreeSpace(croped_image);
+}
+void RGBImageLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  scale_=proto.rgbimage_param().scale();
+  cropsize_=proto.rgbimage_param().cropsize();
+  mirror_=proto.rgbimage_param().mirror();
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+  vector<int> shape;
+  shape.push_back(batchsize);
+  for(int x: sample.image().shape())
+    shape.push_back(x);
+  CHECK_EQ(shape.size(),4);
+  if(cropsize_){
+    shape[2]=cropsize_;
+    shape[3]=cropsize_;
+  }
+  data_.Reshape(shape);
+  mean_.Reshape({shape[1],shape[2],shape[3]});
+  if(proto.rgbimage_param().has_meanfile()){
+    BlobProto tmp;
+    ReadProtoFromBinaryFile(proto.rgbimage_param().meanfile().c_str(), &tmp);
+    CHECK_EQ(mean_.count(), tmp.data_size());
+    memcpy(mean_.mutable_cpu_data(), tmp.data().data(), 
sizeof(float)*tmp.data_size());
+  }else{
+    memset(mean_.mutable_cpu_data(),0,sizeof(float)*mean_.count());
+  }
+}
+
+/***************Implementation for ShardDataLayer**************************/
+void ShardDataLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers){
+  if(random_skip_){
+    int nskip=rand()%random_skip_;
+    LOG(INFO)<<"Random Skip "<<nskip<<" records, there are "<<shard_->Count()
+      <<" records in total";
+    string key;
+    for(int i=0;i<nskip;i++){
+      shard_->Next(&key, &sample_);
+    }
+    random_skip_=0;
+  }
+  for(auto& record: records_){
+    string key;
+    shard_->Next(&key, &record);
+  }
+}
+
+void ShardDataLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  shard_= std::make_shared<DataShard>(proto.data_param().path(),
+      DataShard::kRead);
+  string key;
+  shard_->Next(&key, &sample_);
+  batchsize_=proto.data_param().batchsize();
+
+  records_.resize(batchsize_);
+  random_skip_=proto.data_param().random_skip();
+}
+/*******************Implementation of TanLayer***************************/
+void TanhLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(srclayers[0]->grad(this));
+}
+
+void TanhLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+
+void TanhLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  data=F<op::stanh>(src);
+}
+
+void TanhLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
+  Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  gsrc=F<op::stanh_grad>(data)*grad;
+}
+/********** * Implementation for SoftmaxLossLayer*************************/
+void SoftmaxLossLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),2);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  batchsize_=data_.shape()[0];
+  dim_=data_.count()/batchsize_;
+  topk_=proto.softmaxloss_param().topk();
+  metric_.Reshape(vector<int>{2});
+  scale_=proto.softmaxloss_param().scale();
+}
+void SoftmaxLossLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+void SoftmaxLossLayer::ComputeFeature(bool training, const vector<SLayer>& 
srclayers) {
+  Shape<2> s=Shape2(batchsize_, dim_);
+  Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s);
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s);
+  Softmax(prob, src);
+  const float* label=srclayers[1]->data().cpu_data();
+  const float* probptr=prob.dptr;
+  float loss=0, precision=0;
+  for(int n=0;n<batchsize_;n++){
+    int ilabel=static_cast<int>(label[n]);
+    CHECK_LT(ilabel,10);
+    CHECK_GE(ilabel,0);
+    float prob_of_truth=probptr[ilabel];
+    loss-=log(std::max(prob_of_truth, FLT_MIN));
+    vector<std::pair<float, int> > probvec;
+    for (int j = 0; j < dim_; ++j) {
+      probvec.push_back(std::make_pair(probptr[j], j));
+    }
+    std::partial_sort(
+        probvec.begin(), probvec.begin() + topk_,
+        probvec.end(), std::greater<std::pair<float, int> >());
+    // check if true label is in top k predictions
+    for (int k = 0; k < topk_; k++) {
+      if (probvec[k].second == static_cast<int>(label[n])) {
+        precision++;
+        break;
+      }
+    }
+    probptr+=dim_;
+  }
+  CHECK_EQ(probptr, prob.dptr+prob.shape.Size());
+  float *metric=metric_.mutable_cpu_data();
+  metric[0]=loss*scale_/(1.0f*batchsize_);
+  metric[1]=precision*scale_/(1.0f*batchsize_);
+}
+
+void SoftmaxLossLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  const float* label=srclayers[1]->data().cpu_data();
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad();
+  gsrcblob->CopyFrom(data_);
+  float* gsrcptr=gsrcblob->mutable_cpu_data();
+  for(int n=0;n<batchsize_;n++){
+    gsrcptr[n*dim_+static_cast<int>(label[n])]-=1.0f;
+  }
+  Tensor<cpu, 1> gsrc(gsrcptr, Shape1(gsrcblob->count()));
+  gsrc*=scale_/(1.0f*batchsize_);
+}
+
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
new file mode 100644
index 0000000..0bca26e
--- /dev/null
+++ b/src/neuralnet/neuralnet.cc
@@ -0,0 +1,401 @@
+#include <algorithm>
+#include <queue>
+
+#include "neuralnet/neuralnet.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+#include "utils/graph.h"
+
+
+namespace singa {
+#define CreateLayer(id) CreateInstance(id, Layer)
+
+void NeuralNet::RegisterLayers(){
+  Factory<Layer>* factory=Singleton<Factory<Layer>>::Instance();
+  factory->Register("kConvolution", CreateLayer(ConvolutionLayer));
+  factory->Register("kConcate", CreateLayer(ConcateLayer));
+  factory->Register("kDropout", CreateLayer(DropoutLayer));
+  factory->Register("kInnerProduct", CreateLayer(InnerProductLayer));
+  factory->Register("kRGBImage", CreateLayer(RGBImageLayer));
+  factory->Register("kLabel", CreateLayer(LabelLayer));
+  factory->Register("kLMDBData", CreateLayer(LMDBDataLayer));
+  factory->Register("kLRN", CreateLayer(LRNLayer));
+  factory->Register("kMnistImage", CreateLayer(MnistImageLayer));
+  factory->Register("kBridgeDst", CreateLayer(BridgeDstLayer));
+  factory->Register("kBridgeSrc", CreateLayer(BridgeSrcLayer));
+  factory->Register("kPooling", CreateLayer(PoolingLayer));
+  factory->Register("kReLU", CreateLayer(ReLULayer));
+  factory->Register("kShardData", CreateLayer(ShardDataLayer));
+  factory->Register("kSlice", CreateLayer(SliceLayer));
+  factory->Register("kSoftmaxLoss", CreateLayer(SoftmaxLossLayer));
+  factory->Register("kSplit", CreateLayer(SplitLayer));
+  factory->Register("kTanh", CreateLayer(TanhLayer));
+}
+shared_ptr<NeuralNet> NeuralNet::SetupNeuralNet(const NetProto& np, Phase 
phase){
+  NetProto proto;
+  proto.set_partition_type(np.partition_type());
+  // exclude layers if necessary
+  for(auto& layer:np.layer()){
+    bool include=true;
+    for(int x: layer.exclude()){
+      if(x==phase)
+        include=false;
+    }
+    if(include){
+      LayerProto* lp=proto.add_layer();
+      lp->CopyFrom(layer);
+    }
+  }
+  LOG(INFO)<<"NeuralNet config is "<<proto.DebugString();
+  shared_ptr<NeuralNet> net(new NeuralNet(proto));
+  return net;
+}
+NeuralNet::NeuralNet(NetProto net_proto, int group_size) {
+  group_size_=group_size;
+  for(int i=0;i<net_proto.layer_size();i++){
+    LayerProto * layer_proto=net_proto.mutable_layer(i);
+    if(!layer_proto->has_partition_type())
+      layer_proto->set_partition_type(net_proto.partition_type());
+  }
+
+  LOG(INFO)<<"Construct Neural Net...";
+  ConstructNeuralNet(net_proto);
+  if(group_size_>1)
+    PartitionNeuralNet();
+  for(auto layer: layers_){
+    DLOG(INFO)<<layer->name();
+  }
+  // assign id for params;
+  int paramid=0;
+  for(auto& layer: layers_){
+    for(shared_ptr<Param> p: layer->GetParams()){
+      params_.push_back(p);
+      p->set_id(paramid++);
+    }
+  }
+
+  LOG(INFO)<<"Neural Net constructed";
+}
+
+void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){
+  // construct graph, one node for one layer, identified by layer name
+  map<string, LayerProto> protos;
+  for (auto &layer_proto : net_proto.layer()){
+    graph_.AddNode(layer_proto.name());
+    protos[layer_proto.name()]=layer_proto;
+  }
+  for (auto &layer_proto : net_proto.layer())
+    if(layer_proto.srclayers_size())
+      for(const string& src: layer_proto.srclayers())
+        graph_.AddEdge(src, layer_proto.name());
+
+  // topology sort
+  graph_.Sort();
+  //DLOG(INFO)<<"pure graph without partition\n"<< graph_.ToString();
+
+  auto* factory=Singleton<Factory<Layer>>::Instance();
+  // create Layers according to topology order
+  for(SNode node: graph_.nodes()){
+    shared_ptr<Layer> layer(factory->Create(protos[node->name()].type()));
+    layer->Init(protos[node->name()]);
+    name2layer_[node->name()]=layer;
+    layers_.push_back(layer);
+  }
+
+  // connect Layers.
+  for(SNode node: graph_.nodes()){
+    auto layer=name2layer_[node->name()];
+    for(SNode dst: node->dstnodes())
+      layer->AddDstLayer(name2layer_[dst->name()]);
+    for(SNode src: node->srcnodes())
+      layer->AddSrcLayer(name2layer_[src->name()]);
+  }
+  // setup layer properties, e.g., shapes
+  for(auto& layer: layers_){
+      layer->Setup();
+  }
+  LOG(INFO)<<"network graph witout partition\n"<<ToString();
+}
+
+void NeuralNet::PartitionNeuralNet(){
+  graph_=CreatePartitonedGraph(layers_, name2layer_);
+  //DLOG(ERROR)<<"pure graph after partition\n"<<graph_.ToString();
+  map<string, shared_ptr<Layer>> name2layer(name2layer_);
+  name2layer_.clear();
+  layers_.clear();
+  int gsize=group_size_;
+  auto* factory=Singleton<Factory<Layer>>::Instance();
+  // create Layers according to topology order
+  for(SNode node: graph_.nodes()){
+    LayerProto proto;
+    proto.set_name(node->name());
+    proto.set_locationid(node->val().locationid);
+    proto.set_partitionid(node->val().partitionid);
+    const string& origin=node->val().origin;
+    if (origin=="kSlice"){
+      proto.set_type(origin);
+      SliceProto *slice=proto.mutable_slice_param();
+      slice->set_slice_dimension(node->val().slice_dimension);
+      slice->set_slice_num(node->dstnodes().size());
+    }else if(origin== "kConcate"){
+      proto.set_type(origin);
+      ConcateProto *concate=proto.mutable_concate_param();
+      concate->set_concate_dimension(node->val().concate_dimension);
+      concate->set_concate_num(node->srcnodes().size());
+    }else if(origin=="kSplit"){
+      proto.set_type(origin);
+      SplitProto *split=proto.mutable_split_param();
+      split->set_num_splits(node->dstnodes().size());
+    }else if(origin=="kBridgeSrc" || origin== "kBridgeDst"){
+      proto.set_type(origin);
+    }else{
+      CHECK(name2layer.find(node->val().origin)!=name2layer_.end())
+        <<"Unkown origin for node "<<node->val().origin;
+    }
+    shared_ptr<Layer> newlayer;
+    if(proto.has_type()){
+      // layers added due to partition
+      shared_ptr<Layer> layer(factory->Create(proto.type()));
+      layer->Init(proto);
+      newlayer=layer;
+    }else{
+      // partitioned layers from origin neuralnet
+      auto oldlayer=name2layer.at(node->val().origin);
+      vector<int> shape=oldlayer->shape(nullptr);
+      if(oldlayer->partition_type()==kNone){
+        newlayer=oldlayer;
+      } else{
+        int pdim=oldlayer->partition_dimension();
+        shape[pdim]=shape[pdim]/gsize+
+          ((node->val().partitionid==gsize-1)?shape[pdim]%gsize:0);
+        shared_ptr<Layer> layer(factory->Create(oldlayer->type()));
+        layer->Init(*oldlayer, shape);
+        layer->set_name(node->name());
+        newlayer=layer;
+      }
+    }
+    layers_.push_back(newlayer);
+    name2layer_[node->name()]=newlayer;
+  }
+
+  // connect Layers.
+  for(SNode node: graph_.nodes()){
+    auto layer=name2layer_[node->name()];
+    layer->ClearDstLayers();
+    for(SNode dst: node->dstnodes())
+      layer->AddDstLayer(name2layer_[dst->name()]);
+    layer->ClearSrcLayers();
+    for(SNode src: node->srcnodes())
+      layer->AddSrcLayer(name2layer_[src->name()]);
+  }
+
+  LOG(INFO)<<"Adjacency matrix\n"<<ToAdjacency();
+
+  // set up layers after
+  for(shared_ptr<Layer> layer: layers_){
+    const vector<int>& shape=layer->shape(nullptr);
+    layer->SetupAfterPartition();
+    const vector<int>& newshape=layer->shape(nullptr);
+    if(shape.size())
+      CHECK(std::equal(shape.begin(),shape.end(),newshape.begin()));
+  }
+
+  LOG(INFO)<<"network graph after partition layers\n"<<ToString();
+}
+
+Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
+    const map<string, shared_ptr<Layer>>& name2layer){
+  Graph graph;
+  // partition origin nodes/layers
+  map<string, vector<SNode>> layer2nodes; //from name of original layer to 
nodes
+  int gsize=group_size_;
+  for(const auto& layer: layers){
+    vector<SNode> nodes;
+    if(layer->partition_type()==kDataPartition||
+        layer->partition_type()==kLayerPartition){
+      char suffix[4];
+      for(int i=0;i<gsize;i++){
+        sprintf(suffix, "%02d", i);
+        // differentiate partitions
+        string nodename=layer->name()+"-"+string(suffix);
+        LayerInfo info;
+        auto node=graph.AddNode(nodename, LayerInfo{layer->name(),i, i,-1,-1});
+        nodes.push_back(node);
+      }
+    }else if(layer->partition_type()==kNone){
+      auto node=graph.AddNode(layer->name(),
+          LayerInfo{layer->name(), layer->locationid(), 0,-1,-1});
+      nodes.push_back(node);
+    }else{
+      LOG(FATAL)<<"Unknown partition type "<<layer->partition_type();
+    }
+    layer2nodes[layer->name()]=nodes;
+  }
+
+
+  // connect nodes, nodes for ConcateLayer and SliceLayer are added.
+  for(shared_ptr<Layer> layer: layers){
+    string name=layer->name();
+    PartitionType type=layer->partition_type();
+    const vector<SNode>& nodes=layer2nodes.at(name);
+    for(int srcid=0;srcid<layer->srclayers_size();srcid++){
+      shared_ptr<Layer> srclayer=layer->srclayers()[srcid];
+      string srcname=srclayer->name();
+      const vector<SNode> srcnodes=layer2nodes.at(srcname);
+      PartitionType srctype=srclayer->partition_type();
+      ConnectionType connection=layer->connection_type(srcid);
+      if(srctype==kNone){
+        CHECK_EQ(srcnodes.size(),1)
+          <<"local layer "<<srcname<<" should not be partitioned";
+        SNode srcnode=srcnodes[0];
+        
if(type==kDataPartition||(type==kLayerPartition&&connection==kOneToOne)){
+          LayerInfo info=srcnode->val();
+          info.slice_dimension=name2layer.at(name)->partition_dimension();
+          graph.InsertSliceNode(srcnode, nodes, info);
+        } else if(type==kNone){
+          CHECK_EQ(nodes.size(),1)
+            <<"local layer "<<name<<" should not be nodeed";
+          graph.AddEdge(srcnode, nodes[0]);
+        } else { // type==kLayerPartition&&connection==kOneToAll
+          graph.InsertSplitNode(srcnode, nodes);
+        }
+      }else if((type==kNone
+                &&(srctype==kDataPartition||srctype==kLayerPartition))
+               ||(type==kLayerPartition&&connection==kOneToAll&&
+                  (srctype==kDataPartition||srctype==kLayerPartition))){
+        // copy/concate the whole srclayer for every dst partition
+        for(SNode node:nodes){
+          LayerInfo info=node->val();
+          info.concate_dimension=name2layer.at(srcname)->partition_dimension();
+          CHECK_GE(info.concate_dimension,0);
+          graph.InsertConcateNode(srcnodes, node, info);
+        }
+      }else if((srctype==kLayerPartition&&type==kDataPartition)
+          || (srctype==kDataPartition&&type==kLayerPartition)){
+        // the most complext scenario
+        vector<SNode> slicenodes;
+        for(SNode srcnode: srcnodes){
+          LayerInfo info=srcnode->val();
+          info.slice_dimension=name2layer.at(name)->partition_dimension();
+          slicenodes.push_back(graph.InsertSliceNode(srcnode, nodes,
+              info, false));
+        }
+        for(SNode node: nodes){
+          LayerInfo info=node->val();
+          info.concate_dimension=name2layer.at(srcname)->partition_dimension();
+          CHECK_GE(info.concate_dimension,0);
+          graph.InsertConcateNode(slicenodes, node, info);
+        }
+      }else if((srctype==kDataPartition&&type==kDataPartition)||
+          (srctype==kLayerPartition&&type==kLayerPartition&&
+           layer->connection_type(srcid)==kOneToOne)){
+        CHECK_EQ(srcnodes.size(), nodes.size());
+        for(size_t i=0;i<srcnodes.size();i++){
+          graph.AddEdge(srcnodes[i], nodes[i]);
+        }
+      }
+    }
+  }
+  // must do topology sort, because we have added new nodes.
+  graph.Sort();
+  //LOG(ERROR)<<graph.ToString();
+
+  // add node for split layer
+  bool data_node=true;
+  vector<SNode> oldnodes=graph.nodes();
+  for(SNode node: oldnodes){
+    if(node->dstnodes_size()>1&&node->val().origin!="kSlice"
+        &&node->val().origin!="kSplit"&&!data_node){
+      vector<SNode> dstnodes=node->dstnodes();
+      for(SNode dst: dstnodes)
+        graph.RemoveEdge(node, dst);
+      graph.InsertSplitNode(node, dstnodes);
+    }
+    data_node=false;
+  }
+
+  // add bridge
+  oldnodes=graph.nodes();
+  for(SNode node: oldnodes){
+    vector<SNode> dstnodes=node->dstnodes();
+    for(size_t i=0;i<dstnodes.size();i++){
+      SNode dstnode=dstnodes.at(i);
+      if(node->val().locationid!=dstnode->val().locationid){
+        graph.RemoveEdge(node, dstnode);
+        graph.InsertBridgeNode(node, dstnode);
+      }
+    }
+  }
+  graph.Sort();
+  return graph;
+}
+
+std::string NeuralNet::ToString(){
+  map<string, string> info;
+  for(auto layer: layers_){
+    info[layer->name()]=IntVecToString(layer->shape(nullptr));
+    string type=layer->type();
+  }
+  return graph_.ToString(info);
+}
+
+std::string NeuralNet::ToAdjacency(){
+  string disp="";
+  for(auto& layer: layers_){
+    disp+=layer->name()+": ";
+    for(const auto& dst: layer->dstlayers())
+      disp+=dst->name()+", ";
+    disp+="\n";
+  }
+  return disp;
+}
+
+
+void NeuralNet::ToProto(NetProto *proto, bool copyData) {
+  proto->clear_layer();
+}
+
+string NeuralNet::DebugInfo(){
+  string ret;
+  char display[4096];
+  for(auto& layer: layers_){
+    if(!layer->is_datalayer()){
+      sprintf(display, "Forward layer  %10s data norm1 %13.9f\n",
+          layer->name().c_str(), layer->data().asum_data());
+      ret+=string(display);
+    }
+  }
+  for (auto it = layers_.rbegin(); it != layers_.rend(); it++){
+    shared_ptr<Layer> layer=*it;
+    
if(!(layer->is_datalayer()||layer->is_losslayer()||layer->is_parserlayer())){
+      sprintf(display, "Backward layer %10s grad norm1 %13.9f\n",
+          layer->name().c_str(), layer->grad().asum_data());
+      ret+=string(display);
+    }
+  }
+  for(auto& layer: layers_){
+    for(auto param: layer->GetParams()){
+      sprintf(display, "Layer %10s, param id %2d, name %10s,\
+          value norm1 %13.9f, grad norm1 %13.9f\n",
+          layer->name().c_str(), param->id(), param->name().c_str(),
+          param->data().asum_data(), param->grad().asum_data());
+      ret+=string(display);
+    }
+  }
+  return ret;
+}
+void NeuralNet::ShareParams(shared_ptr<NeuralNet> other, int flag){
+  for(auto& layer: layers_){
+    auto otherlayer=other->name2layer(layer->name());
+    if(otherlayer!=nullptr){
+      const auto& otherparams=otherlayer->GetParams();
+      const auto& params=layer->GetParams();
+      CHECK_EQ(params.size(), otherparams.size());
+      for(size_t i=0;i<params.size();i++){
+        params[i]->ShareData(otherparams[i]);
+      }
+    }
+  }
+}
+
+}  // namespace singa

Reply via email to