SINGA-7
Add support for shared memory hogwild training in single process (node).
hogwild training is enabled by setting share_memory to true (i.e., worker 
groups share memory space for parameter values) in cluster.conf.
Depending on the configurations of cluster.conf, there are two places to do the 
update:
1. if sever_update is true (default) and there is at least one server group, 
workers send gradients to servers
which conduct the update based on sending worker's running iteration.
2. otherwise, workers update parameters locally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/40a86abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/40a86abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/40a86abe

Branch: refs/heads/master
Commit: 40a86abebeb923ef39a19cb3f03c445305c3eddc
Parents: 806826e
Author: wang wei <[email protected]>
Authored: Sat Jun 13 11:38:42 2015 +0800
Committer: wang wei <[email protected]>
Committed: Sat Jun 13 11:38:42 2015 +0800

----------------------------------------------------------------------
 include/communication/msg.h |   6 +-
 include/trainer/trainer.h   |   7 +-
 include/trainer/worker.h    |  57 +-----
 include/utils/cluster.h     |   8 +
 include/utils/param.h       |  50 ++---
 src/proto/cluster.pb.h      |  72 +++++++-
 src/proto/cluster.proto     |   5 +
 src/trainer/server.cc       |   7 +-
 src/trainer/trainer.cc      |  47 ++---
 src/trainer/worker.cc       | 112 ++++++-----
 src/utils/param.cc          | 390 ++++++++-------------------------------
 11 files changed, 276 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index 21ac78e..61cbd01 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -51,7 +51,6 @@ class BaseMsg{
     */
   virtual bool next_frame()=0;
 };
-
 // TODO make it a compiler argument
 #define USE_ZMQ
 
@@ -186,6 +185,11 @@ class Msg : public BaseMsg{
   zframe_t *frame_;
 };
 #endif
+inline void DeleteMsg(Msg** msg){
+  delete *msg;
+  *msg=nullptr;
+}
+
 
 } /* singa */
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 37d7106..18250af 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -54,14 +54,11 @@ class Trainer{
       *  otherwise
       * @param owner the procs id of the worker who ownes this Param object
       */
-    void AddParam(shared_ptr<Param> p, int local, int owner){
+    void AddParam(shared_ptr<Param> p, bool local){
       num_local+=local;
       num_total+=1;
-      if(owner>-1)
-        owner_procs=owner;
-      if(local>0){
+      if(local)
         shares.push_back(p);
-      }
     }
     int num_update, next_version; //!< all counters are atomic
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 09262e6..7481ebd 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -28,7 +28,6 @@ class Worker {
     validation_net_=val_net;
   }
 
-
   void Stop();
   int Put(shared_ptr<Param> param, int step);
   int Get(shared_ptr<Param> param, int step);
@@ -50,14 +49,13 @@ class Worker {
   /**
    * Test/validate one mini-batch.
    */
-  virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase 
phase)=0;
+  virtual void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> 
net)=0;
   /**
     * Test the perforance of the learned model on validation or test dataset.
     * Test is done by the first group.
     * @param net, neural network
-    * @param phase kValidation or kTest.
     */
-  void Test(shared_ptr<NeuralNet> net, int nsteps, const string &prefix);
+  void Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net);
 
   /**
     * Main function of Worker.
@@ -66,12 +64,6 @@ class Worker {
     */
   virtual void Run();
 
-
-  /**
-   * Pull data from layers resident on other nodes due to Model Partition.
-  void Pull(zsock_t* pull, shared_ptr<NeuralNet> net);
-   */
-
   /**
    * Check is it time to display training info, e.g., loss and precison.
    */
@@ -130,63 +122,30 @@ class Worker {
           % modelproto_.validation_frequency() == 0));
   }
 
-
-  /**
-   * start training from scratch.
-   * setup training/test/validation neuralnets, then call Run().
-  void Start(ModelProto model);
-   */
   /**
    * TODO Resume from snapshot
   void Resume();
    */
   void ReceiveBlobs(shared_ptr<NeuralNet> net);
   void SendBlob();
+  void ConnectStub(shared_ptr<Dealer> dealer);
  protected:
   int thread_id_, group_id_, worker_id_;
   int step_;
   ModelProto modelproto_;
   shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
-  shared_ptr<Dealer> layer_dealer_, param_dealer_;
-  Poller layer_poller_, param_poller_;
+  shared_ptr<Dealer> layer_dealer_, dealer_;
   shared_ptr<Updater> updater_;
 };
 
 class BPWorker: public Worker{
  public:
+  BPWorker(int thread_id, int group_id, int worker_id);
   ~BPWorker(){}
-  BPWorker(int thread_id, int group_id, int worker_id):Worker(thread_id, 
group_id, worker_id){}
   virtual void TrainOneBatch(int step);
-  virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase);
-  void Forward(shared_ptr<NeuralNet> net, int step, bool training);
-  void Backward(shared_ptr<NeuralNet> net, int step);
-    /**
-   * Profiling the time cost of training one batch.
-  string TimerInfo(){
-    char buf[1024];
-    float ticks=ticks_*1000;
-    float tf=tForward_/ticks, tb=tBackward_/ticks,
-          td=tSyncData_/ticks, tp=tSyncParam_/ticks;
-    float total=tf+tb+td+tp;
-    sprintf(buf,
-        "Total\t%6.2f\tforward\t%6.2f\tbackward\t%6.2f\t"
-        // syncdata\t%6.2f\tsyncparam\t%6.2f\n"
-        , total,tf,tb);
-    float gensync=Param::worker_gen_sync/ticks;
-    float handlesync=Param::worker_handle_sync/ticks;
-    sprintf(buf+strlen(buf),
-        "worker_gen_sync\t%6.2f\tworker_handle_sync\t%6.2f\n",
-        gensync, handlesync);
-    Param::worker_gen_sync=0;
-    Param::worker_handle_sync=0;
-    tForward_=0;
-    tBackward_=0;
-    tSyncData_=0;
-    tSyncData_=0;
-    ticks_=0;
-    return string(buf);
-  }
-   */
+  virtual void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net);
+  void Forward(int step, Phase phase, shared_ptr<NeuralNet> net);
+  void Backward(int step, shared_ptr<NeuralNet> net);
 };
 }  // namespace singa
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index d7ac365..fcdb241 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -102,6 +102,14 @@ class Cluster {
     return cluster_.server_timeout();
   }
 
+  const bool server_update() const {
+    return cluster_.server_update();
+  }
+
+  const bool share_memory() const {
+    return cluster_.share_memory();
+  }
+
   /**
    * bandwidth MB/s
   float bandwidth() const {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
index 529c349..e55480b 100644
--- a/include/utils/param.h
+++ b/include/utils/param.h
@@ -11,19 +11,20 @@
 namespace singa {
 class Param {
  public:
-  Param():data_(nullptr){}
+  Param();
   virtual ~Param(){};
 
-  virtual Msg* GenGetMsg(void* arg=nullptr);
-  virtual Msg* GenPutMsg(void* arg=nullptr);
-  virtual Msg* GenUpdateMsg(void* arg=nullptr);
-  virtual Msg* GenSyncMsg(void* arg=nullptr);
+  virtual Msg* GenGetMsg(bool copy, int v=-1);
+  virtual Msg* GenPutMsg(bool copy, int v=-1);
+  virtual Msg* GenUpdateMsg(bool copy, int v=-1);
+  virtual Msg* GenSyncMsg(bool copy, int v=-1);
 
   virtual Msg* HandleGetMsg(Msg** msg);
   virtual Msg* HandlePutMsg(Msg** msg);
-  virtual int ParseUpdateMsg(Msg** msg);
-  virtual Msg* GenUpdateResponseMsg(void* arg=nullptr);
   virtual Msg* HandleSyncMsg(Msg** msg);
+  virtual const std::pair<bool, int> ParseUpdateMsg(Msg** msg);
+  virtual Msg* GenUpdateResponseMsg(bool copy, int v=-1);
+
 
   virtual int ParseGetResponseMsg(Msg** msg);
   virtual int ParsePutResponseMsg(Msg** msg);
@@ -74,12 +75,27 @@ class Param {
     proto_.set_owner(id);
   }
 
+  /**
+   * return the version of the parameter value shared by multiple workers
+   */
   int version() const {
-    return data_->version(); // TODO store version in data blob
+    return data_->version();
   }
+
   void set_version(int v) {
     data_->set_version(v); // TODO read version from data blob
   }
+
+  /**
+   * return the version of the parameter value local to a worker
+   */
+  int local_version() const {
+    return local_version_;
+  }
+
+  void set_local_version(int v){
+    local_version_=v;
+  }
    /**
     * @return num of floats.
     */
@@ -131,6 +147,7 @@ class Param {
   Blob<float> grad_, history_;
   ParamProto proto_;
   int fan_in_;
+  int local_version_;
 };
 /**
  * To support the shared memory and distributed Hogwild algorithm.
@@ -141,23 +158,6 @@ class Param {
  * copy is avoided for intra-process communication.
  */
 class HogwildParam: public Param{
- public:
-  virtual Msg* GenGetMsg(void* arg=nullptr);
-  virtual Msg* GenPutMsg(void* arg=nullptr);
-  virtual Msg* GenUpdateMsg(void* arg=nullptr);
-  virtual Msg* GenSyncMsg(void* arg=nullptr);
-
-  virtual Msg* HandleGetMsg(Msg** msg);
-  virtual Msg* HandlePutMsg(Msg** msg);
-  virtual int ParseUpdateMsg(Msg** msg);
-  virtual Msg* GenUpdateResponseMsg(void* arg=nullptr);
-  virtual Msg* HandleSyncMsg(Msg** msg);
-
-  virtual int ParseGetResponseMsg(Msg** msg);
-  virtual int ParsePutResponseMsg(Msg** msg);
-  virtual int ParseUpdateResponseMsg(Msg** msg);
-  virtual int ParseSyncResponseMsg(Msg** msg);
-
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/proto/cluster.pb.h
----------------------------------------------------------------------
diff --git a/src/proto/cluster.pb.h b/src/proto/cluster.pb.h
index 564be1f..82ee8b2 100644
--- a/src/proto/cluster.pb.h
+++ b/src/proto/cluster.pb.h
@@ -236,6 +236,20 @@ class ClusterProto : public ::google::protobuf::Message {
   inline ::google::protobuf::int32 server_timeout() const;
   inline void set_server_timeout(::google::protobuf::int32 value);
 
+  // optional bool server_update = 40 [default = true];
+  inline bool has_server_update() const;
+  inline void clear_server_update();
+  static const int kServerUpdateFieldNumber = 40;
+  inline bool server_update() const;
+  inline void set_server_update(bool value);
+
+  // optional bool share_memory = 41 [default = true];
+  inline bool has_share_memory() const;
+  inline void clear_share_memory();
+  static const int kShareMemoryFieldNumber = 41;
+  inline bool share_memory() const;
+  inline void set_share_memory(bool value);
+
   // @@protoc_insertion_point(class_scope:singa.ClusterProto)
  private:
   inline void set_has_nworker_groups();
@@ -270,6 +284,10 @@ class ClusterProto : public ::google::protobuf::Message {
   inline void clear_has_worker_timeout();
   inline void set_has_server_timeout();
   inline void clear_has_server_timeout();
+  inline void set_has_server_update();
+  inline void clear_has_server_update();
+  inline void set_has_share_memory();
+  inline void clear_has_share_memory();
 
   ::google::protobuf::UnknownFieldSet _unknown_fields_;
 
@@ -280,20 +298,22 @@ class ClusterProto : public ::google::protobuf::Message {
   ::google::protobuf::int32 nworkers_per_procs_;
   ::google::protobuf::int32 nservers_per_procs_;
   ::std::string* hostfile_;
-  bool server_worker_separate_;
   ::google::protobuf::int32 nprocs_;
+  ::google::protobuf::int32 start_port_;
   ::std::string* workspace_;
   ::std::string* log_dir_;
-  ::google::protobuf::int32 start_port_;
-  ::google::protobuf::int32 stub_timeout_;
   ::std::string* zookeeper_host_;
   static ::std::string* _default_zookeeper_host_;
   ::google::protobuf::RepeatedPtrField< ::singa::ServerTopology > 
server_group_;
+  ::google::protobuf::int32 stub_timeout_;
+  bool server_worker_separate_;
+  bool server_update_;
+  bool share_memory_;
   ::google::protobuf::int32 worker_timeout_;
   ::google::protobuf::int32 server_timeout_;
 
   mutable int _cached_size_;
-  ::google::protobuf::uint32 _has_bits_[(17 + 31) / 32];
+  ::google::protobuf::uint32 _has_bits_[(19 + 31) / 32];
 
   friend void  protobuf_AddDesc_cluster_2eproto();
   friend void protobuf_AssignDesc_cluster_2eproto();
@@ -983,6 +1003,50 @@ inline void 
ClusterProto::set_server_timeout(::google::protobuf::int32 value) {
   server_timeout_ = value;
 }
 
+// optional bool server_update = 40 [default = true];
+inline bool ClusterProto::has_server_update() const {
+  return (_has_bits_[0] & 0x00020000u) != 0;
+}
+inline void ClusterProto::set_has_server_update() {
+  _has_bits_[0] |= 0x00020000u;
+}
+inline void ClusterProto::clear_has_server_update() {
+  _has_bits_[0] &= ~0x00020000u;
+}
+inline void ClusterProto::clear_server_update() {
+  server_update_ = true;
+  clear_has_server_update();
+}
+inline bool ClusterProto::server_update() const {
+  return server_update_;
+}
+inline void ClusterProto::set_server_update(bool value) {
+  set_has_server_update();
+  server_update_ = value;
+}
+
+// optional bool share_memory = 41 [default = true];
+inline bool ClusterProto::has_share_memory() const {
+  return (_has_bits_[0] & 0x00040000u) != 0;
+}
+inline void ClusterProto::set_has_share_memory() {
+  _has_bits_[0] |= 0x00040000u;
+}
+inline void ClusterProto::clear_has_share_memory() {
+  _has_bits_[0] &= ~0x00040000u;
+}
+inline void ClusterProto::clear_share_memory() {
+  share_memory_ = true;
+  clear_has_share_memory();
+}
+inline bool ClusterProto::share_memory() const {
+  return share_memory_;
+}
+inline void ClusterProto::set_share_memory(bool value) {
+  set_has_share_memory();
+  share_memory_ = value;
+}
+
 // -------------------------------------------------------------------
 
 // ServerTopology

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index ec7ac7f..f79d7d4 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -36,6 +36,11 @@ message ClusterProto{
   optional int32 stub_timeout=30 [default=5000];
   optional int32 worker_timeout=31 [default=5000];
   optional int32 server_timeout=32 [default=5000];
+
+  // conduct updates at server side; otherwise do it at worker side
+  optional bool server_update=40 [default=true];
+  // share memory space between worker groups in one procs
+  optional bool share_memory=41 [default=true];
 }
 
 message ServerTopology{

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 5d530da..e0fcb48 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -110,11 +110,10 @@ Msg* Server::HandleGet(shared_ptr<Param> param, Msg 
**msg){
 Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
   //repsonse of the format: <identity><type: kData><paramId><param content>
   auto* tmp=static_cast<Msg*>((*msg)->CopyAddr());
-  int v=(*msg)->target_second()+1;
-  param->ParseUpdateMsg(msg);
-  updater_->Update(param->version(), param);
+  const std::pair<bool, int> copy_step=param->ParseUpdateMsg(msg);
+  updater_->Update(copy_step.second, param);
   param->set_version(param->version()+1);
-  auto response=param->GenUpdateResponseMsg(&v);
+  auto response=param->GenUpdateResponseMsg(copy_step.first, param->version());
   tmp->SwapAddr();
   response->SetAddr(tmp);
   delete tmp;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index dbf8a48..3d69249 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -65,7 +65,7 @@ void Trainer::Start(const ModelProto& mproto, const 
ClusterProto& cproto,
   vector<shared_ptr<Server>> servers;
   vector<HandleContext> ctx;
   int nthreads=1; // the first socket is the router
-  if(cluster->has_server()){
+  if(cluster->has_server()){ // todo move sever creation to a method
     int pid=cluster->procs_id();
     if(cluster->server_worker_separate())
       pid-=cluster->nworker_procs();
@@ -88,11 +88,10 @@ void Trainer::Start(const ModelProto& mproto, const 
ClusterProto& cproto,
       }
     }
   }
-
   // create workers
   vector<shared_ptr<Worker>> workers;
   std::map<int, shared_ptr<Trainer::ParamShard>> shards;
-  if(cluster->has_worker()){
+  if(cluster->has_worker()){ //move worker creation to a method
     auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
         cluster->nworkers_per_group());
     //LOG(ERROR)<<net->ToString();
@@ -123,7 +122,7 @@ void Trainer::Start(const ModelProto& mproto, const 
ClusterProto& cproto,
             cluster->nworkers_per_group());
         // the train net for other groups may share parameter values from the
         // first group
-        if(mproto.hogwild())
+        if(cluster->share_memory())
           train_net->ShareParams(net, kValueOnly);
       }
       if(gid==0){
@@ -146,13 +145,14 @@ void Trainer::Start(const ModelProto& mproto, const 
ClusterProto& cproto,
       shards[gid]=shard;
       for(auto layer: train_net->layers()){
         int procsid=ProcsIDOf(gid, layer->partitionid(),kWorkerParam);
-        int local=procsid==cluster->procs_id();
+        bool local=procsid==cluster->procs_id();
         for(auto param: layer->GetParams()){
-          int owner=param->owner()<0||param->owner()==param->id()?procsid:-1;
+          int owner_procs=param->owner()==param->id()?procsid:procs_id_;
           if(shard->find(param->owner())==shard->end())
-            (*shard)[param->owner()]=make_shared<ParamInfo>(param, local, 
owner);
+            (*shard)[param->owner()]=
+              make_shared<ParamInfo>(param, local, owner_procs);
           else
-            shard->at(param->owner())->AddParam(param, local, owner);
+            shard->at(param->owner())->AddParam(param, local);
         }
       }
       for(int wid=wstart;wid<wend;wid++){
@@ -229,7 +229,7 @@ void Trainer::Run(int nworkers, int nservers,
           }
           delete msg;
           msg=nullptr;
-        }else if(cluster->nserver_groups()>1){
+        }else if(cluster->nserver_groups()>0){
           int group_id=msg->src_first();
           int paramid=msg->target_first();
           auto entry=shards.at(group_id)->at(paramid);
@@ -328,11 +328,13 @@ Msg* Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** 
msg){
     }
   }else if(version>pi->next_version){
     pi->next_version=version;
-    reply=pi->shares.at(0)->GenGetMsg(&version);
     int gid=msgg->src_first(), pid=msgg->target_first();
+    int dstgroup=gid/Cluster::Get()->nworker_groups_per_server_group();
+    int dstid=Sharding(pid);
+    int dstprocs=ProcsIDOf(dstgroup, dstid, kServer);
+    reply=pi->shares.at(0)->GenGetMsg(dstprocs!=procs_id_);
     reply->set_src(procs_id_, gid, kStub);
-    reply->set_dst(gid/Cluster::Get()->nworker_groups_per_server_group(),
-        Sharding(pid), kServer);
+    reply->set_dst(dstgroup, dstid, kServer);
   }
   return reply;
 }
@@ -345,6 +347,7 @@ Msg* Trainer::HandleGetResponse(shared_ptr<ParamInfo>pi, 
Msg** msg){
 
 Msg* Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** msg){
   Msg* msgg=*msg, *update=nullptr;
+  int step= msgg->target_second();
   if(msgg->src_flag()==kStub){
     if(pi->num_update<pi->num_local)
       return *msg; //wait unitl local updates are ready
@@ -366,8 +369,7 @@ Msg* Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** 
msg){
     }
     agg/=pi->num_total;
     if(pi->num_local<pi->num_total){
-      int v=msgg->target_second();
-      update=pi->shares.at(0)->GenUpdateMsg(&v);
+      update=pi->shares.at(0)->GenUpdateMsg(pi->owner_procs!=procs_id_, step);
       int gid=msgg->src_first();
       update->set_src(procs_id_, gid,kStub);
       update->set_dst(pi->owner_procs, gid, kStub);
@@ -375,12 +377,13 @@ Msg* Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** 
msg){
     }
   }
   if(pi->num_update==pi->num_total){
-    int v=msgg->target_second();
-    update=pi->shares.at(0)->GenUpdateMsg(&v);
     int gid=msgg->src_first();
+    int dstgroup=gid/Cluster::Get()->nworker_groups_per_server_group();
+    int dstid=Sharding(msgg->target_first());
+    int dstprocs=ProcsIDOf(dstgroup, dstid, kServer);
+    update=pi->shares.at(0)->GenUpdateMsg(dstprocs!=procs_id_, step);
     update->set_src(procs_id_, gid, kStub);
-    update->set_dst(gid/Cluster::Get()->nworker_groups_per_server_group(),
-        Sharding((*msg)->target_first()), kServer);
+    update->set_dst(dstgroup, dstid, kServer);
     pi->num_update=0;
   }
   delete *msg;
@@ -395,12 +398,14 @@ int 
Trainer::HandleUpdateResponse(shared_ptr<Trainer::ParamInfo> pi, Msg** msg){
 
 Msg* Trainer::HandlePut(shared_ptr<Trainer::ParamInfo>pi, Msg** msg){
   CHECK_NE((*msg)->src_flag(), kStub);
-  Msg* put=pi->shares.at(0)->GenPutMsg();
   int gid=(*msg)->src_first();
   int id=(*msg)->target_first();
+  int dstgroup=gid/Cluster::Get()->nworker_groups_per_server_group();
+  int dstid=Sharding(id);
+  int dstprocs=ProcsIDOf(dstgroup, dstid, kServer);
+  Msg* put=pi->shares.at(0)->GenPutMsg(dstprocs!=procs_id_);
   put->set_src(procs_id_, gid , kStub);
-  put->set_dst(gid/Cluster::Get()->nworker_groups_per_server_group(),
-      Sharding(id), kServer);
+  put->set_dst(dstgroup, dstid, kServer);
   delete *msg;
   *msg=NULL;
   return put;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 3d400ee..1a2c265 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -12,7 +12,6 @@ using std::thread;
 namespace singa {
 Worker::Worker(int thread_id, int group_id, int worker_id):
   thread_id_(thread_id), group_id_(group_id), worker_id_(worker_id){
-
 }
 
 void Worker::Setup(const ModelProto& model,
@@ -20,72 +19,58 @@ void Worker::Setup(const ModelProto& model,
   train_net_=train_net;
   modelproto_=model;
   auto cluster=Cluster::Get();
-  int sgid=group_id_/cluster->nworker_groups_per_server_group();
-  CHECK(cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid));
-  if(model.hogwild()){
+  if(cluster->nserver_groups()&&cluster->server_update()){
+    int sgid=group_id_/cluster->nworker_groups_per_server_group();
+    CHECK(cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid));
+  }else{
     updater_=shared_ptr<Updater>(Singleton<Factory<Updater>>::Instance()
         ->Create("Updater"));
     updater_->Init(model.updater());
   }
 }
 
-void Worker::Run(){
-    param_dealer_=make_shared<Dealer>(2*thread_id_);
-  param_dealer_->Connect(kInprocRouterEndpoint);
-  param_poller_.Add(param_dealer_.get());
-  layer_dealer_=make_shared<Dealer>(2*thread_id_+1);
-  layer_dealer_->Connect(kInprocRouterEndpoint);
-
-  { // TODO remove waiting pong msg
+void Worker::ConnectStub(shared_ptr<Dealer> dealer){
+  dealer->Connect(kInprocRouterEndpoint);
   Msg* ping=new Msg();
   ping->set_src(group_id_, worker_id_, kWorkerParam);
   ping->set_dst(-1,-1,kStub);
   ping->set_type(kConnect);
   ping->add_frame("PING", 4);
-  param_dealer_->Send(&ping);
-  ping=param_dealer_->Receive();
+  dealer->Send(&ping);
+  ping=dealer->Receive();
   string pong((char*)ping->frame_data(), ping->frame_size());
   CHECK_STREQ("PONG", pong.c_str());
   delete ping;
-  }
+}
 
-  {
-  Msg* ping=new Msg();
-  ping->set_src(group_id_, worker_id_, kWorkerLayer);
-  ping->set_dst(-1,-1,kStub);
-  ping->set_type(kConnect);
-  ping->add_frame("PING", 4);
-  layer_dealer_->Send(&ping);
-  ping=layer_dealer_->Receive();
-  string pong((char*)ping->frame_data(), ping->frame_size());
-  CHECK_STREQ("PONG", pong.c_str());
-  delete ping;
-  }
+void Worker::Run(){
+  dealer_=make_shared<Dealer>(2*thread_id_);
+  ConnectStub(dealer_);
+  for(auto layer: train_net_->layers())
+    if(layer->partitionid()==worker_id_)
+      if(layer->is_bridgedstlayer()||layer->is_bridgesrclayer()){
+        layer_dealer_=make_shared<Dealer>(2*thread_id_+1);
+        ConnectStub(layer_dealer_);
+        break;
+      }
   step_=modelproto_.step();
-  //layer_dealer_=std::make_shared<Dealer>(thread_id_*2);
   // init params
   for(auto layer: train_net_->layers()){
-    //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name();
     if(layer->partitionid()==worker_id_)
       for(auto param: layer->GetParams()){
-        if(group_id_==0){
-          if(param->owner()==param->id()){
+        if(param->owner() == param->id()){
+          if(group_id_==0)
             param->Init(0);
-            Put(param, step_);
-          }else{
-            Get(param, 0);
-          }
-        }else{
-          Get(param, modelproto_.warmup_steps());
+          else
+            Get(param, modelproto_.warmup_steps());
         }
       }
   }
   Metric perf;
-  if(group_id_==0&&step_<modelproto_.warmup_steps()){
+  if(group_id_==0){
     for(step_=0;step_<modelproto_.warmup_steps();step_++)
       RunOneBatch(step_, &perf);
     for(auto layer: train_net_->layers()){
-      //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name();
       if(layer->partitionid()==worker_id_)
         for(auto param: layer->GetParams())
           if(param->owner()==param->id())
@@ -108,7 +93,7 @@ void Worker::Stop(){
   msg->set_src(group_id_, worker_id_, kWorkerParam);
   msg->set_dst(-1,-1, kStub);
   msg->set_type(kStop);
-  param_dealer_->Send(&msg);
+  dealer_->Send(&msg); // use param dealer to send the stop msg
 }
 int Worker::Put(shared_ptr<Param> param, int step){
   Msg* msg=new Msg();
@@ -116,7 +101,7 @@ int Worker::Put(shared_ptr<Param> param, int step){
   msg->set_dst(-1, -1, kStub);
   msg->set_type(kPut);
   msg->set_target(param->owner(), step);
-  param_dealer_->Send(&msg);
+  dealer_->Send(&msg);
   return 1;
 }
 int Worker::Get(shared_ptr<Param> param, int step){
@@ -125,10 +110,11 @@ int Worker::Get(shared_ptr<Param> param, int step){
   msg->set_dst(-1, -1, kStub);
   msg->set_type(kGet);
   msg->set_target(param->owner(), step);
-  param_dealer_->Send(&msg);
+  dealer_->Send(&msg);
   return 1;
 }
 int Worker::Update(shared_ptr<Param> param, int step){
+  param->set_local_version(param->version());
   if(updater_){
     updater_->Update(step, param);
     param->set_version(param->version()+1);
@@ -138,7 +124,7 @@ int Worker::Update(shared_ptr<Param> param, int step){
     msg->set_dst(-1, -1, kStub);
     msg->set_type(kUpdate);
     msg->set_target(param->owner(), step);
-    param_dealer_->Send(&msg);
+    dealer_->Send(&msg);
   }
   return 1;
 }
@@ -154,7 +140,7 @@ int Worker::CollectAll(shared_ptr<NeuralNet> net, int step){
   return 1;
 }
 int Worker::Collect(shared_ptr<Param> param, int step){
-  while(param->version()<step){
+  while(param->version()<=param->local_version()){
     std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
   }
   return 1;
@@ -168,7 +154,7 @@ const void Worker::DisplayPerformance(const Metric & perf, 
const string& prefix)
   const string disp=perf.ToString();
   msg->add_frame(prefix.c_str(), prefix.length());
   msg->add_frame(disp.c_str(), disp.length());
-  param_dealer_->Send(&msg);
+  dealer_->Send(&msg);
   //LOG(ERROR)<<prefix<<" "<<perf.ToString();
 }
 
@@ -176,12 +162,12 @@ void Worker::RunOneBatch(int step, Metric* perf){
   if(ValidateNow(step)){
     //LOG(ERROR)<<"Validation at step "<<step;
     CollectAll(validation_net_, step);
-    Test(validation_net_, modelproto_.validation_steps(), "Validation");
+    Test(modelproto_.validation_steps(),kValidation, validation_net_);
   }
   if(TestNow(step)){
     //LOG(ERROR)<<"Test at step "<<step;
     CollectAll(test_net_, step);
-    Test(test_net_, modelproto_.test_steps(), "Test");
+    Test(modelproto_.test_steps(), kTest, test_net_);
   }
   TrainOneBatch(step);
   //LOG(ERROR)<<"Train "<<step;
@@ -214,11 +200,11 @@ void Worker::ReceiveBlobs(shared_ptr<NeuralNet> net){
 void Worker::SendBlob(){
 }
 
-void Worker::Test(shared_ptr<NeuralNet> net, int nsteps, const string& prefix){
+void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net){
   const auto& losslayers=net->losslayers();
   Metric perf;
   for(int step=0;step<nsteps;step++){
-    TestOneBatch(net, step, kTest);
+    TestOneBatch(step, phase, net);
     for(auto layer: losslayers){
       if(layer->partitionid()==worker_id_){
         const float * ptr=layer->metric().cpu_data();
@@ -229,12 +215,19 @@ void Worker::Test(shared_ptr<NeuralNet> net, int nsteps, 
const string& prefix){
     perf.Inc();
   }
   perf.Avg();
-  DisplayPerformance(perf, prefix);
+  if(phase==kValidation)
+    DisplayPerformance(perf, "Validation");
+  else if (phase==kTest)
+    DisplayPerformance(perf, "Test");
 }
 
 /****************************BPWorker**********************************/
 
-void BPWorker::Forward(shared_ptr<NeuralNet> net, int step,  bool training){
+BPWorker::BPWorker(int thread_id, int group_id, int worker_id):
+  Worker(thread_id, group_id, worker_id){
+}
+
+void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){
   auto& layers=net->layers();
   for(auto& layer: layers){
     if(layer->partitionid()==worker_id_){
@@ -254,13 +247,13 @@ void BPWorker::Forward(shared_ptr<NeuralNet> net, int 
step,  bool training){
           delete msg;
         }
       }
-      if(training){
+      if(phase==kTrain){
         for(shared_ptr<Param> p: layer->GetParams()){
           Collect(p, step);
         }
       }
       //clock_t s=clock();
-      layer->ComputeFeature(training);
+      layer->ComputeFeature(phase==kTrain);
       //LOG(ERROR)<<layer->name()<<":"<<(clock()-s)*1.0/CLOCKS_PER_SEC;
       if(layer->is_bridgesrclayer()){
         auto dst=layer->dstlayers().at(0);
@@ -272,7 +265,8 @@ void BPWorker::Forward(shared_ptr<NeuralNet> net, int step, 
 bool training){
         msg->add_frame(blob.cpu_data(), blob.count()*sizeof(float));
         layer_dealer_->Send(&msg);
       }
-      
if(training&&DisplayDebugInfo(step)&&layer->mutable_data(nullptr)!=nullptr){
+      if(phase==kTrain&&DisplayDebugInfo(step)
+          &&layer->mutable_data(nullptr)!=nullptr){
         LOG(INFO)<<StringPrintf("Forward layer  %10s data norm1 %13.9f",
             layer->name().c_str(), layer->data(nullptr).asum_data());
       }
@@ -280,7 +274,7 @@ void BPWorker::Forward(shared_ptr<NeuralNet> net, int step, 
 bool training){
   }
 }
 
-void BPWorker::Backward(shared_ptr<NeuralNet> net, int step){
+void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){
   auto& layers=net->layers();
   for (auto it = layers.rbegin(); it != layers.rend(); it++){
     shared_ptr<Layer> layer=*it;
@@ -310,12 +304,12 @@ void BPWorker::Backward(shared_ptr<NeuralNet> net, int 
step){
 }
 
 void BPWorker::TrainOneBatch(int step){
-  Forward(train_net_, step, true);
-  Backward(train_net_, step);
+  Forward(step, kTrain, train_net_);
+  Backward(step, train_net_);
 }
 
-void BPWorker::TestOneBatch(shared_ptr<NeuralNet> net,int step, Phase phase){
-  Forward(net, step, false);
+void BPWorker::TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net){
+  Forward(step, phase, net);
 }
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/40a86abe/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index e616a1c..c109a88 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -10,115 +10,138 @@ using std::vector;
 using std::string;
 namespace singa {
 
-Msg* Param::GenPutMsg(void* arg){
+Param::Param():data_(nullptr), local_version_(-1){}
+
+Msg* Param::GenPutMsg(bool copy, int v){
+  Msg* msg=new Msg();
+  msg->set_type(kPut);
+  msg->set_target(owner(), version());
   char buf[128];
   sprintf(buf, "%d %f %f", size(),
       learning_rate_multiplier(), weight_decay_multiplier());
-  Msg* msg=new Msg();
-  msg->set_type(kPut);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
-  msg->add_frame(buf, strlen(buf));
-  msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  if(copy){
+    sprintf(buf+strlen(buf), " %p", nullptr);
+    msg->add_frame(buf, strlen(buf));
+    msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  }else{
+    //share the data blob which includes the blob version
+    sprintf(buf+strlen(buf), " %p", data_.get());
+    msg->add_frame(buf, strlen(buf));
+  }
        return msg;
 }
 
-Msg* Param::GenGetMsg(void* arg){
+Msg* Param::GenGetMsg(bool copy, int v){
   Msg* msg=new Msg();
   msg->set_type(kGet);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
+  msg->set_target(owner(), local_version()+1);
+  msg->add_frame(&copy, sizeof(bool));
   return msg;
 }
 
-Msg* Param::GenUpdateMsg(void* arg){
+Msg* Param::GenUpdateMsg(bool copy, int v){
   Msg* msg=new Msg();
   msg->set_type(kUpdate);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
   msg->set_target(owner(), v);
-  msg->add_frame(mutable_cpu_grad(), size()*sizeof(float));
+  msg->add_frame(&copy, sizeof(bool));
+  if(copy)
+    msg->add_frame(mutable_cpu_grad(), size()*sizeof(float));
+  else{ // to share values of grad blob
+    char buf[32]; sprintf(buf, "%p", &grad_);
+    msg->add_frame(buf, strlen(buf));
+    //LOG(ERROR)<<"param id="<<id()<<" ptr="<<buf;
+  }
   return msg;
 }
 
-Msg* Param::GenSyncMsg(void* arg){
+Msg* Param::GenSyncMsg(bool copy, int v){
   return nullptr;
 }
 
 Msg* Param::HandlePutMsg(Msg** msg){
   int size;
   float lr, wc;
-  sscanf(static_cast<char*>((*msg)->frame_data()), "%d %f %f",
-      &size, &lr, &wc);
+  void* ptr;
+  sscanf(static_cast<char*>((*msg)->frame_data()), "%d %f %f %p",
+      &size, &lr, &wc, &ptr);
   proto_.set_learning_rate_multiplier(lr);
   proto_.set_weight_decay_multiplier(wc);
-  CHECK((*msg)->next_frame());
   vector<int> shape{size};
-  data_=std::make_shared<Blob<float>>(shape);
-  data_->set_version((*msg)->target_second());
   grad_.Reshape(shape);
   history_.Reshape(shape);
-  CHECK_EQ(size* sizeof(float), (*msg)->frame_size());
-  memcpy(mutable_cpu_data(), (*msg)->frame_data(), size*sizeof(float));
-  delete (*msg);
-  *msg=nullptr;
+  data_=std::make_shared<Blob<float>>(shape);
+  if(ptr==nullptr){
+    data_->set_version((*msg)->target_second());
+    CHECK((*msg)->next_frame());
+    CHECK_EQ(size* sizeof(float), (*msg)->frame_size());
+    memcpy(mutable_cpu_data(), (*msg)->frame_data(), size*sizeof(float));
+  } else{
+    data_->ShareData(*static_cast<Blob<float>*>(ptr));
+  }
+  DeleteMsg(msg);
   return nullptr;
 }
 
 Msg* Param::HandleGetMsg(Msg** msg){
   if((*msg)->target_second()<=version()){
-    (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
+    bool* copy=static_cast<bool*>((*msg)->frame_data());
+    (*msg)->next_frame();
+    if(*copy)
+      (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
     (*msg)->SwapAddr();
     (*msg)->set_type(kRGet);
   }
   return *msg;
 }
 
-int Param::ParseUpdateMsg(Msg** msg){
-  CHECK((*msg)->frame_size());
-  memcpy(mutable_cpu_grad(), (*msg)->frame_data(),(*msg)->frame_size());
-  delete (*msg);
-  *msg=nullptr;
-  return 1;
+const std::pair<bool, int> Param::ParseUpdateMsg(Msg** msg){
+  int step=(*msg)->target_second();
+  bool* copy=static_cast<bool*>((*msg)->frame_data());
+  (*msg)->next_frame();
+  if(*copy){
+    CHECK((*msg)->frame_size());
+    memcpy(mutable_cpu_grad(), (*msg)->frame_data(),(*msg)->frame_size());
+  }else {// use the same data field of the grad blob
+    Blob<float>* ptr=nullptr;
+    sscanf(static_cast<char*>((*msg)->frame_data()), "%p", &ptr);
+    //LOG(ERROR)<<"id="<<id()<<" ptr="<<ptr;
+    grad_.ShareData(*ptr);
+  }
+  DeleteMsg(msg);
+  return std::make_pair(*copy, step);
 }
 
-Msg* Param::GenUpdateResponseMsg(void* arg){
+Msg* Param::GenUpdateResponseMsg(bool copy, int v){
   Msg* msg=new Msg();
   msg->set_type(kRUpdate);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
   msg->set_target(owner(), v);
-  msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  msg->add_frame(&copy, sizeof(bool));
+  if(copy)
+    msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
   return msg;
 }
 
 Msg* Param::HandleSyncMsg(Msg** msg){
-  delete *msg;
-  *msg=nullptr;
+  DeleteMsg(msg);
   return nullptr;
 }
 
 int Param::ParseSyncResponseMsg(Msg** msg){
-  delete *msg;
-  *msg=nullptr;
+  DeleteMsg(msg);
   return 1;
 }
 int Param::ParsePutResponseMsg(Msg **msg){
   return ParseSyncResponseMsg(msg);
 }
 int Param::ParseGetResponseMsg(Msg **msg){
-  CHECK((*msg)->frame_size());
-  memcpy(mutable_cpu_data(), (*msg)->frame_data(), (*msg)->frame_size());
-  // must be set after all other settings are done!
+  bool *copy=static_cast<bool*>((*msg)->frame_data());
+  (*msg)->next_frame();
+  if(*copy){
+    CHECK((*msg)->frame_size());
+    memcpy(mutable_cpu_data(), (*msg)->frame_data(), (*msg)->frame_size());
+  }  // must be set after all other settings are done!
   set_version((*msg)->target_second());
-  delete *msg;
-  *msg=nullptr;
+  DeleteMsg(msg);
   return 1;
 }
 int Param::ParseUpdateResponseMsg(Msg **msg){
@@ -174,271 +197,4 @@ void Param::Init(int v){
   }
   set_version(v);
 }
-
-/********************HogwildParam***************************/
-Msg* HogwildParam::GenPutMsg(void* arg){
-  char buf[128];
-  sprintf(buf, "%d %f %f %p", size(),
-      learning_rate_multiplier(), weight_decay_multiplier(), 
mutable_cpu_data());
-  Msg* msg=new Msg();
-  msg->set_type(kPut);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
-  msg->add_frame(buf, strlen(buf));
-       return msg;
-}
-
-Msg* HogwildParam::GenGetMsg(void* arg){
-  Msg* msg=new Msg();
-  msg->set_type(kGet);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
-  return msg;
-}
-
-Msg* HogwildParam::GenUpdateMsg(void* arg){
-  Msg* msg=new Msg();
-  msg->set_type(kUpdate);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
-  void* p=mutable_cpu_grad();
-  msg->add_frame(p, sizeof(void*));
-  return msg;
-}
-
-Msg* HogwildParam::GenSyncMsg(void* arg){
-  return nullptr;
-}
-
-Msg* HogwildParam::HandlePutMsg(Msg** msg){
-  int size;
-  float lr, wc;
-  sscanf(static_cast<char*>((*msg)->frame_data()), "%d %f %f",
-      &size, &lr, &wc);
-  proto_.set_learning_rate_multiplier(lr);
-  proto_.set_weight_decay_multiplier(wc);
-  CHECK((*msg)->next_frame());
-  vector<int> shape{size};
-  // set pointer
-  data_=std::make_shared<Blob<float>>(shape);
-  data_->set_version((*msg)->target_second());
-  grad_.Reshape(shape);
-  history_.Reshape(shape);
-  delete (*msg);
-  *msg=nullptr;
-  return nullptr;
-}
-
-Msg* HogwildParam::HandleGetMsg(Msg** msg){
-  if((*msg)->target_second()<=version()){
-    (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
-    (*msg)->SwapAddr();
-    (*msg)->set_type(kRGet);
-  }
-  return *msg;
-}
-
-int HogwildParam::ParseUpdateMsg(Msg** msg){
-  delete (*msg);
-  *msg=nullptr;
-  return 1;
-}
-
-Msg* HogwildParam::GenUpdateResponseMsg(void* arg){
-  Msg* msg=new Msg();
-  msg->set_type(kRUpdate);
-  int v=version();
-  if(arg!=nullptr)
-    v=*(int*)arg;
-  msg->set_target(owner(), v);
-  return msg;
-}
-
-Msg* HogwildParam::HandleSyncMsg(Msg** msg){
-  delete *msg;
-  *msg=nullptr;
-  return nullptr;
-}
-
-int HogwildParam::ParseSyncResponseMsg(Msg** msg){
-  delete *msg;
-  *msg=nullptr;
-  return 1;
-}
-int HogwildParam::ParsePutResponseMsg(Msg **msg){
-  return ParseSyncResponseMsg(msg);
-}
-int HogwildParam::ParseGetResponseMsg(Msg **msg){
-  // must be set after all other settings are done!
-  set_version((*msg)->target_second());
-  delete *msg;
-  *msg=nullptr;
-  return 1;
-}
-int HogwildParam::ParseUpdateResponseMsg(Msg **msg){
-  return ParseGetResponseMsg(msg);
-}
-
-/**************************RandomSyncParam********************************
-const vector<int> RandomSyncParam::RandomSample(int seed, int m, int n){
-  vector<int> samples(m);
-  std::mt19937 gen(seed);
-  std::uniform_real_distribution<float> dist(0.f,1.f);
-  for(int i=0,k=0;i<n&&k<m;i++)
-    if((m-k)*1.0f/(n-i)>dist(gen)){
-      samples[k++]=i;
-    }
-  return samples;
-}
-
-zmsg_t* RandomSyncParam::HandleSyncMsg(zmsg_t** msg){
-  int64_t start=zclock_mono();
-  char* control=zframe_strdup(zmsg_first(*msg));
-  int seed, count;
-  sscanf(control, "%d-%d", &seed,&count);
-  delete control;
-  zframe_t* syncframe=zmsg_next(*msg);
-  CHECK_EQ(zframe_size(syncframe), count*sizeof(float));
-  float* syncptr=(float*)zframe_data(syncframe);
-  float* dptr=data_.mutable_cpu_data();
-  int k=0;
-  if(count==data_.count()){
-    for(int idx=0;idx<count;idx++){
-      float x=dptr[idx];
-      dptr[idx]+=syncptr[k];
-      syncptr[k]=x;
-      k++;
-    }
-  }else{
-    for(int idx: RandomSample(seed, count, data_.count())){
-      float x=dptr[idx];
-      dptr[idx]+=syncptr[k];
-      syncptr[k]=x;
-      k++;
-    }
-  }
-  CHECK_EQ(k,count);
-  CHECK_EQ(zframe_size(syncframe), count*sizeof(float));
-  return *msg;
-}
-
-zmsg_t *RandomSyncParam::GenSyncMsgFromWorker(float sample_ratio){
-  int64_t start=zclock_mono();
-  zmsg_t* msg=zmsg_new();
-  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
-  int m=data_.count()*sample_ratio;
-  zmsg_addstrf(msg, "%u-%d", seed, m);
-  float* updateptr=new float[m];
-  float* dptr=data_.mutable_cpu_data();
-  float* sdptr=snapshot_.mutable_cpu_data();
-  int k=0;
-  if(m==data_.count()){
-    for(int idx=0;idx<m;idx++)
-      updateptr[k++]=dptr[idx]-sdptr[idx];
-  }else{
-    const vector<int> samples=RandomSample(seed, m, data_.count());
-    for(int idx:samples){
-      updateptr[k++]=dptr[idx]-sdptr[idx];
-    }
-  }
-  CHECK_EQ(k,m);
-  zframe_t* frame=zframe_new(updateptr, sizeof(float)*m);
-  zmsg_append(msg, &frame);
-  delete updateptr;
-  worker_gen_sync+=zclock_mono()-start;
-  return msg;
-}
-
-void RandomSyncParam::ParseSyncMsgFromPS(zmsg_t** msg){
-  int64_t start=zclock_mono();
-  //LOG(ERROR)<<"worker sync "<<id();
-  char* control=zmsg_popstr(*msg);
-  int seed, count;
-  sscanf(control, "%u-%d", &seed, &count);
-  //LOG(ERROR)<<"worker sync "<<id()<<" "<<control;
-  delete control;
-  zframe_t* psdataframe=zmsg_pop(*msg);
-  CHECK_EQ(zframe_size(psdataframe), count*sizeof(float));
-  float* psdptr=(float*)zframe_data(psdataframe);
-  float* dptr=data_.mutable_cpu_data();
-  float* sdptr=snapshot_.mutable_cpu_data();
-  int k=0;
-  if(count==data_.count()){
-    for(int idx=0;idx<count;idx++){
-      dptr[idx]+=psdptr[k++]-sdptr[idx];
-      sdptr[idx]=dptr[idx];
-    }
-  }else{
-    for(int idx: RandomSample(seed, count, data_.count())){
-      dptr[idx]+=psdptr[k++]-sdptr[idx];
-      sdptr[idx]=dptr[idx];
-    }
-  }
-  zframe_destroy(&psdataframe);
-  worker_handle_sync+=zclock_mono()-start;
-  zmsg_destroy(msg);
-}
-
-
-void RandomSyncParam::Setup(const ParamProto& proto, const vector<int>& shape,
-    int fan_in){
-  Param::Setup(proto, shape, fan_in);
-  snapshot_.Reshape(shape);
-}
-
-void RandomSyncParam::Init(){
-  Param::Init();
-  memcpy(snapshot_.mutable_cpu_data(), data_.mutable_cpu_data(),
-      sizeof(float)*data_.count());
-}
-*/
-
-/***************************ElasticParam************************************
-zmsg_t* ElasticParam::HandleSyncMsg(zmsg_t** msg){
-  int64_t start=zclock_mono();
-  char* control=zframe_strdup(zmsg_first(*msg));
-  float alpha;int count;
-  sscanf(control, "%f-%d", &alpha,&count);
-  delete control;
-  zframe_t* syncframe=zmsg_next(*msg);
-  CHECK_EQ(size(), count);
-  Tensor<cpu, 1> server(data_.mutable_cpu_data(), Shape1(count));
-  Tensor<cpu, 1> worker((float*)zframe_data(syncframe), Shape1(count));
-  worker=(worker-server)*alpha;
-  server+=worker;
-  return *msg;
-}
-
-zmsg_t *ElasticParam::GenSyncMsgFromWorker(float alpha){
-  int64_t start=zclock_mono();
-  zmsg_t* msg=zmsg_new();
-  zmsg_addstrf(msg, "%f-%d", alpha, size());
-  zmsg_addmem(msg, mutable_cpu_data(), sizeof(float)*size());
-  worker_gen_sync+=zclock_mono()-start;
-  return msg;
-}
-
-void ElasticParam::ParseSyncMsgFromPS(zmsg_t** msg){
-  int64_t start=zclock_mono();
-  //LOG(ERROR)<<"worker sync "<<id();
-  char* control=zmsg_popstr(*msg);
-  float alpha;int count;
-  sscanf(control, "%f-%d", &alpha, &count);
-  delete control;
-  zframe_t* frame=zmsg_pop(*msg);
-  CHECK_EQ(zframe_size(frame), count*sizeof(float));
-  Tensor<cpu, 1> diff((float*)zframe_data(frame), Shape1(count));
-  Tensor<cpu, 1> data(mutable_cpu_data(), Shape1(count));
-  data-=diff;
-  zframe_destroy(&frame);
-  zmsg_destroy(msg);
-  worker_handle_sync+=zclock_mono()-start;
-}
-*/
 }  // namespace singa


Reply via email to