SINGA-21 Code review 5

review worker.h, worker.cc
 - format code
 - change shared_ptr to raw ptr for neuralnet object
    all neuralnet object will be managed by trainer
    trainer passes pointers to workers and releases them in destructor

others
 - remove virtual keyword in Server class (server.h)


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f50d293f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f50d293f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f50d293f

Branch: refs/heads/master
Commit: f50d293ff550d5b8ccace9f7e6992865474d0d29
Parents: d3e1fca
Author: wang sheng <[email protected]>
Authored: Wed Sep 23 13:15:42 2015 +0800
Committer: wang sheng <[email protected]>
Committed: Wed Sep 23 13:28:28 2015 +0800

----------------------------------------------------------------------
 include/neuralnet/neuralnet.h |  10 +-
 include/trainer/server.h      |   9 +-
 include/trainer/trainer.h     |  12 +-
 include/trainer/worker.h      | 129 ++++++++++++---------
 src/neuralnet/neuralnet.cc    |   8 +-
 src/trainer/server.cc         |   3 +-
 src/trainer/trainer.cc        |  24 ++--
 src/trainer/worker.cc         | 229 ++++++++++++++++---------------------
 8 files changed, 208 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
index 06fd977..693fe19 100644
--- a/include/neuralnet/neuralnet.h
+++ b/include/neuralnet/neuralnet.h
@@ -22,8 +22,6 @@
 #ifndef SINGA_NEURALNET_NEURALNET_H_
 #define SINGA_NEURALNET_NEURALNET_H_
 
-#include <map>
-#include <memory>
 #include <string>
 #include <vector>
 
@@ -52,10 +50,10 @@ class NeuralNet {
    * @param net_conf proto for the neural network
    * @param phase test/training/validation
    * @param npartitions num of partitions, do partitioning if num > 1
-   * @return shared pointer to a neural net
+   * @return pointer to a neural net
    */
-  static std::shared_ptr<NeuralNet> Create(const NetProto& net_conf,
-                                           Phase phase, int npartitions);
+  static NeuralNet* Create(const NetProto& net_conf, Phase phase,
+                           int npartitions);
 
   /**
    * construct the net structure from protocol buffer.
@@ -71,7 +69,7 @@ class NeuralNet {
   /**
    * Share memory of parameter values from other neuralnet
    */
-  void ShareParamsFrom(std::shared_ptr<NeuralNet> other);
+  void ShareParamsFrom(NeuralNet* other);
   inline const std::vector<Layer*>& layers() { return layers_; }
   inline const std::vector<Param*>& params() const { return params_; }
   inline Layer* name2layer(std::string name) const {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index 3f3539a..84b3a41 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -22,7 +22,6 @@
 #ifndef SINGA_TRAINER_SERVER_H_
 #define SINGA_TRAINER_SERVER_H_
 
-#include <memory>
 #include <unordered_map>
 #include <vector>
 #include "communication/socket.h"
@@ -45,7 +44,7 @@ namespace singa {
 class Server {
  public:
   Server(int group_id, int server_id);
-  virtual ~Server();
+  ~Server();
   void Setup(const UpdaterProto& proto, const std::vector<int>& slice2group,
              const std::vector<int>& slice2server);
   void Run();
@@ -59,7 +58,7 @@ class Server {
    * @return the orignal message or a response message which contains the 
values
    * of the Param with the request version.
    */
-  virtual Msg* HandleGet(Msg** msg);
+  Msg* HandleGet(Msg** msg);
   /**
    * Process Update request.
    *
@@ -88,7 +87,7 @@ class Server {
    * @return the original message or response message. If we don't want to
    * acknowledge the put request, then return nullptr.
    */
-  virtual Msg* HandlePut(Msg **msg);
+  Msg* HandlePut(Msg **msg);
   /**
    * Handle sync request from other server groups.
    *
@@ -100,7 +99,7 @@ class Server {
    * @param msg request msg containing the parameter updates
    * @return response msg that contains the fresh parameter values.
    */
-  virtual Msg* HandleSyncRequest(Msg** msg);
+  Msg* HandleSyncRequest(Msg** msg);
   /**
    * Handle sync response.
    *

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 0b03dea..d3d332f 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -21,8 +21,10 @@
 
 #ifndef INCLUDE_TRAINER_TRAINER_H_
 #define INCLUDE_TRAINER_TRAINER_H_
-#include <unordered_map>
+
 #include <queue>
+#include <vector>
+#include <unordered_map>
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
 #include "utils/param.h"
@@ -34,13 +36,15 @@
 #include "communication/socket.h"
 
 namespace singa {
+
+using std::vector;
+  
 /**
  * Every running process has a training object which launches one or more
  * worker (and server) threads.
  *
  * The main thread runs a loop to forward messages between workers and servers.
  */
-
 class Trainer{
  public:
   ~Trainer();
@@ -82,7 +86,7 @@ class Trainer{
    * @param jobConf
    * @return worker instances
    */
-  vector<Worker*> CreateWorkers(int nthread, const JobProto& jobConf);
+  vector<Worker*> CreateWorkers(const JobProto& jobConf);
 
   /**
    * Setup workers and servers.
@@ -158,6 +162,8 @@ class Trainer{
   std::unordered_map<int, ParamEntry*> worker_shard_;
   //!< map from slice to the server that updates it
   vector<int> slice2server_;
+  //stub will destroy all neuralnets in the end
+  vector<NeuralNet*> nets_;
 };
 } /* singa */
 #endif // INCLUDE_TRAINER_TRAINER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 679435c..66439ec 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -21,19 +21,16 @@
 
 #ifndef SINGA_TRAINER_WORKER_H_
 #define SINGA_TRAINER_WORKER_H_
+
+#include <string>
+#include "communication/socket.h"
 #include "neuralnet/neuralnet.h"
 #include "proto/job.pb.h"
-#include "communication/socket.h"
 
 namespace singa {
 
-using std::map;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-  
 //!< sleep 5 milliseconds if the Param is not updated to the expected version
-const int kCollectSleepTime=5;
+const int kCollectSleepTime = 5;
 /**
  * The Worker class which runs the training algorithm.
  * The first worker group will initialize parameters of the Net,
@@ -53,19 +50,13 @@ class Worker {
    * @param grp_id global worker group ID
    * @param id worker ID within the group
    */
-  virtual void Init(int thread_id, int grp_id, int id);
+  virtual void Init(int grp_id, int id);
   virtual ~Worker();
   /**
    * Setup members
    */
-  void Setup(const JobProto& job, shared_ptr<NeuralNet> train_net,
-      shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net);
-  /**
-    * Main function of Worker.
-    *
-    * Train the neuralnet step by step, test/validation is done periodically.
-    */
-  void Run();
+  void Setup(const JobProto& job, NeuralNet* train_net, NeuralNet* valid_net,
+             NeuralNet* test_net);
   /**
    * Init all local params (i.e., params from layers resident in this worker).
    *
@@ -78,12 +69,17 @@ class Worker {
    * train for a couple of steps to warmup the params before put
    * them to servers (warmup of JobProto controls this).
    *
-   * If the owner param is availabel from checkpoint file, then its
+   * If the owner param is available from checkpoint file, then its
    * values are parsed from the checkpoint file instead of randomly 
initialized.
    * For params who do not have checkpoints, randomly init them.
    */
   void InitLocalParams();
-
+  /**
+    * Main function of Worker.
+    *
+    * Train the neuralnet step by step, test/validation is done periodically.
+    */
+  void Run();
   /**
    * Checkpoint all params owned by the worker from the first group onto disk.
    * The serialization is done using BlobProtos which includes the name, 
version
@@ -93,31 +89,30 @@ class Worker {
    * @param step training step of this worker
    * @param net the training net whose params will be dumped.
    */
-  void Checkpoint(int step, shared_ptr<NeuralNet> net);
+  void Checkpoint(int step, NeuralNet* net);
   /**
     * Test the perforance of the learned model on validation or test dataset.
     * Test is done by the first group.
     * @param net, neural network
     */
-  void Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net);
+  void Test(int nsteps, Phase phase, NeuralNet* net);
   /**
     * Train one mini-batch.
     * Test/Validation is done before training.
     */
-  virtual void TrainOneBatch(int step, Metric* perf)=0;
+  virtual void TrainOneBatch(int step, Metric* perf) = 0;
   /**
    * Test/validate one mini-batch.
    */
-  virtual void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf)=0;
+  virtual void TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) = 0;
   /**
    * Report performance to the stub.
    *
    * @param prefix display prefix, e.g., 'Train', 'Test'
    * @param perf
    */
-  void Report(const string& prefix, const Metric & perf);
-
+  void Report(const std::string& prefix, const Metric & perf);
   /**
    * Put Param to server.
    * @param param
@@ -148,80 +143,101 @@ class Worker {
   /**
    * Call Collect for every param of net
    */
-  int CollectAll(shared_ptr<NeuralNet> net, int step);
+  int CollectAll(NeuralNet* net, int step);
   /**
    * Receive blobs from other workers due to model partitions.
    */
-  void ReceiveBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
+  void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
   /**
    * Send blobs to other workers due to model partitions.
    */
-  void SendBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
-
+  void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
   /**
    * Check is it time to display training info, e.g., loss and precison.
    */
-  inline bool DisplayNow(int step) const;
+  inline bool DisplayNow(int step) const {
+    return job_conf_.disp_freq() > 0
+           && step >= job_conf_.disp_after()
+           && ((step - job_conf_.disp_after()) % job_conf_.disp_freq() == 0);
+  }
   /**
    * Check is it time to display training info, e.g., loss and precison.
    */
-  inline bool DisplayDebugInfo(int step) const;
+  inline bool DisplayDebugInfo(int step) const {
+    return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
+  }
   /**
    * Check is it time to stop
    */
-  inline bool StopNow(int step) const;
+  inline bool StopNow(int step) const {
+    return step >= job_conf_.train_steps();
+  }
   /**
    * Check is it time to do checkpoint.
    */
-  inline bool CheckpointNow(int step) const;
+  inline bool CheckpointNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.checkpoint_freq() > 0
+           && step >= job_conf_.checkpoint_after()
+           && ((step - job_conf_.checkpoint_after())
+              % job_conf_.checkpoint_freq() == 0);
+  }
   /**
    * Check is it time to do test.
    * @param step the ::Train() has been called this num times.
    */
-  inline bool TestNow(int step) const;
+  inline bool TestNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.test_freq() > 0
+           && job_conf_.test_steps() > 0
+           && step >= job_conf_.test_after()
+           && ((step - job_conf_.test_after()) % job_conf_.test_freq() == 0);
+  }
   /**
    * Check is it time to do validation.
    * @param step the ::Train() has been called step times.
    */
-  inline bool ValidateNow(int step) const;
-
+  inline bool ValidateNow(int step) const {
+    return grp_id_ == 0
+           && job_conf_.valid_freq() > 0
+           && job_conf_.valid_steps() > 0
+           && step >= job_conf_.valid_after()
+           && ((step - job_conf_.valid_after()) % job_conf_.valid_freq() == 0);
+  }
   /**
    * @return group ID
    */
-  int grp_id() const { return grp_id_;}
-
+  int grp_id() const { return grp_id_; }
   /**
    * @reutrn worker ID within the worker group.
    */
-  int id() const { return id_;}
+  int id() const { return id_; }
 
  protected:
-  int thread_id_, grp_id_, id_;
-  int step_;
+  int grp_id_ = -1, id_ = -1;
+  int step_ = 0;
   JobProto job_conf_;
-  shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
-  Dealer* layer_dealer_, *dealer_;
+  NeuralNet* train_net_ = nullptr;
+  NeuralNet* test_net_ = nullptr;
+  NeuralNet* validation_net_ = nullptr;
+  Dealer* layer_dealer_ = nullptr;
+  Dealer* dealer_ = nullptr;
 };
 
-class BPWorker: public Worker{
+class BPWorker: public Worker {
  public:
-  ~BPWorker(){}
-  void Init(int thread_id, int grp_id, int id) override;
   void TrainOneBatch(int step, Metric* perf) override;
-  void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf) override;
-
-  void Forward(int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf);
-  void Backward(int step, shared_ptr<NeuralNet> net);
+  void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+      override;
+  void Forward(int step, Phase phase, NeuralNet* net, Metric* perf);
+  void Backward(int step, NeuralNet* net);
 };
 
-class CDWorker: public Worker{
+class CDWorker: public Worker {
  public:
   void TrainOneBatch(int step, Metric* perf) override;
-  void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
-      Metric* perf) override;
+  void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+      override;
 };
 
 inline int BlobTrgt(int grp, int layer) {
@@ -236,6 +252,7 @@ inline int BlobLayer(int blob_trgt) {
   static int mask = (1 << 16) -1;
   return blob_trgt & mask;
 }
+
 }  // namespace singa
 
 #endif  // SINGA_TRAINER_WORKER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 286d273..200824a 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -28,11 +28,10 @@
 namespace singa {
 
 using std::map;
-using std::shared_ptr;
 using std::string;
 using std::vector;
 
-shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& net_conf, Phase phase,
+NeuralNet* NeuralNet::Create(const NetProto& net_conf, Phase phase,
                                         int npartitions) {
   NetProto conf;
   conf.CopyFrom(net_conf);
@@ -76,8 +75,7 @@ shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& 
net_conf, Phase phase,
   }
   LOG(INFO) << "NeuralNet config is\n" << conf.DebugString();
   // TODO(wangwei) create net based on net type, e.g., directed, undirected, 
etc
-  auto net = std::make_shared<NeuralNet>(conf, npartitions);
-  return net;
+  return new NeuralNet(conf, npartitions);
 }
 
 NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
@@ -107,7 +105,7 @@ std::string NeuralNet::ToAdjacency() {
   return disp;
 }
 
-void NeuralNet::ShareParamsFrom(shared_ptr<NeuralNet> other) {
+void NeuralNet::ShareParamsFrom(NeuralNet* other) {
   for (auto& layer : layers_) {
     auto otherlayer = other->name2layer(layer->name());
     if (otherlayer != nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 29f6a68..5e74c1b 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -19,11 +19,12 @@
 *
 *************************************************************/
 
+#include "trainer/server.h"
+
 #include <thread>
 #include <chrono>
 #include "mshadow/tensor.h"
 #include "proto/common.pb.h"
-#include "trainer/server.h"
 #include "utils/param.h"
 #include "utils/singleton.h"
 #include "utils/factory.h"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index c928d91..8a0589e 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -38,11 +38,13 @@ using std::vector;
 using std::map;
 using std::queue;
 using namespace std::chrono;
-using std::make_shared;
+using std::string;
 
 /***********************Trainer****************************/
 Trainer::~Trainer() {
   delete router_;
+  for (NeuralNet* p : nets_)
+    delete p;
 }
 
 const vector<int> SliceParams(const vector<Param*>& params) {
@@ -92,30 +94,35 @@ void Trainer::SetupWorkerServer(
   int grp_size = cluster->nworkers_per_group();
   const auto& net_conf = job_conf.neuralnet();
   auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
+  nets_.push_back(net);
   // MUST do SliceParam before share param/net with others
   auto slices = SliceParams(net->params());
 
-  std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
+  std::unordered_map<int, NeuralNet*> grp_net;
   int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
   for (auto worker : workers) {
     int grp_id = worker->grp_id();
     int worker_id = worker->id();
-    shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
+    NeuralNet* test_net = nullptr;
+    NeuralNet* valid_net = nullptr;
     if (grp_net.find(grp_id) == grp_net.end()) {
       if (grp_id == first_grp) {
         //  test are performed only by the first group now. TODO update.
         if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
           test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for 
exp
           test_net->ShareParamsFrom(net);
+          nets_.push_back(test_net);
         }
         //  validation are performed only by the first group. TODO update.
         if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
           valid_net = NeuralNet::Create(net_conf, kValidation, 1);
           valid_net->ShareParamsFrom(net);
+          nets_.push_back(valid_net);
         }
         grp_net[grp_id] = net;
       } else {
         grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
+        nets_.push_back(grp_net[grp_id]);
         if(cluster->share_memory())
           grp_net[grp_id]->ShareParamsFrom(net);
       }
@@ -131,7 +138,7 @@ void Trainer::SetupWorkerServer(
       }
     }
     LOG(INFO) << "grp " << worker->grp_id() << ", worker "
-      << worker->id() << " net " << grp_net[grp_id].get();
+              << worker->id() << " net " << grp_net[grp_id];
     worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
   }
 
@@ -168,7 +175,7 @@ vector<Server*> Trainer::CreateServers(const JobProto& job) 
{
 }
 
 
-vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
+vector<Worker*> Trainer::CreateWorkers(const JobProto& job) {
   auto cluster=Cluster::Get();
   vector<Worker*> workers;
   if(!cluster->has_worker())
@@ -180,7 +187,7 @@ vector<Worker*> Trainer::CreateWorkers(int nthreads, const 
JobProto& job) {
   for (int gid = gstart; gid < gend; gid++) {
     for (int wid = wstart; wid < wend; wid++) {
       auto *worker = Worker::Create(job);
-      worker->Init(nthreads++,gid, wid);
+      worker->Init(gid, wid);
       workers.push_back(worker);
     }
   }
@@ -241,13 +248,12 @@ void Trainer::Start(bool resume, const SingaProto& 
singaConf, JobProto* job) {
   // register endpoint to zookeeper
   cluster->Register(getpid(), hostip + ":" + std::to_string(port));
 
-  int nthreads = 1;
-  const vector<Worker*> workers = CreateWorkers(nthreads, *job);
-  nthreads += workers.size();
+  const vector<Worker*> workers = CreateWorkers(*job);
   const vector<Server*> servers = CreateServers(*job);
   SetupWorkerServer(*job, workers, servers);
 
 #ifdef USE_MPI
+  int nthreads = workers.size() + servers.size();
   for (int i = 0; i < nthreads; i++)
     MPIQueues.push_back(make_shared<SafeQueue>());
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 23382e3..70859de 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -19,18 +19,19 @@
 *
 *************************************************************/
 
+#include "trainer/worker.h"
+
 #include <glog/logging.h>
-#include <thread>
 #include <chrono>
 #include <thread>
 #include <typeinfo>
-#include "utils/singleton.h"
 #include "utils/cluster.h"
 #include "utils/factory.h"
-#include "trainer/worker.h"
+#include "utils/singleton.h"
 
 namespace singa {
-using std::thread;
+
+using std::string;
 
 Worker* Worker::Create(const JobProto& proto) {
   auto factory = Singleton<Factory<singa::Worker>>::Instance();
@@ -43,22 +44,12 @@ Worker* Worker::Create(const JobProto& proto) {
   return worker;
 }
 
-void Worker::Init(int thread_id, int grp_id, int id) {
-  thread_id_ = thread_id;
+void Worker::Init(int grp_id, int id) {
   grp_id_ = grp_id;
   id_ = id;
   layer_dealer_ = dealer_ = nullptr;
 }
 
-void Worker::Setup(
-    const JobProto& job, shared_ptr<NeuralNet> train_net,
-    shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net) {
-  job_conf_.CopyFrom(job);
-  train_net_ = train_net;
-  validation_net_ = valid_net;
-  test_net_ = test_net;
-}
-
 Worker::~Worker() {
   if (layer_dealer_)
     delete layer_dealer_;
@@ -66,17 +57,25 @@ Worker::~Worker() {
     delete dealer_;
 }
 
+void Worker::Setup(const JobProto& job, NeuralNet* train_net,
+                   NeuralNet* valid_net, NeuralNet* test_net) {
+  job_conf_.CopyFrom(job);
+  train_net_ = train_net;
+  validation_net_ = valid_net;
+  test_net_ = test_net;
+}
+
 void Worker::InitLocalParams() {
   // for each server grp, its first subscriber worker grp does the param init
   if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) {
     // extract params that should be initialized by this worker
     // must gen a name for each param if the user doesn't config it
     std::unordered_map<string, Param*> name2param;
-    for (auto layer: train_net_->layers()){
+    for (auto layer : train_net_->layers()) {
       if (layer->partition_id() == id_) {
         for (auto param : layer->GetParams()) {
           // only owners fill the memory of parameter values.
-          if(param->owner() == param->id()) {
+          if (param->owner() == param->id()) {
             CHECK(name2param.find(param->name()) == name2param.end());
             name2param[param->name()] = param;
           }
@@ -94,7 +93,7 @@ void Worker::InitLocalParams() {
         if (name2param.find(bps.name(i)) != name2param.end()) {
           name2param.at(bps.name(i))->FromProto(bps.blob(i));
           //  if load from pre-training params, reset version to start step
-          if(job_conf_.reset_param_version())
+          if (job_conf_.reset_param_version())
             name2param.at(bps.name(i))->set_version(job_conf_.step());
           else  // if resume training, use the same version as last checkpoint
             name2param.at(bps.name(i))->set_version(bps.version(i));
@@ -130,30 +129,6 @@ void Worker::InitLocalParams() {
   }
 }
 
-void Worker::Checkpoint(int step, shared_ptr<NeuralNet> net) {
-  if (grp_id_ == 0) {
-    BlobProtos bps;
-    for (auto layer: net->layers()){
-      if (layer->partition_id() == id_) {
-        for (auto param : layer->GetParams()) {
-          // only owners fill the memory of parameter values.
-          if(param->owner() == param->id()) {
-            auto *blob = bps.add_blob();
-            param->ToProto(blob);
-            bps.add_version(param->version());
-            bps.add_name(param->name());
-          }
-        }
-      }
-    }
-    char buf[256];
-    snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
-         Cluster::Get()->checkpoint_folder().c_str(), step, id_);
-    LOG(INFO) << "checkpoint to " << buf;
-    WriteProtoToBinaryFile(bps, buf);
-  }
-}
-
 void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
   dealer->Connect(kInprocRouterEndpoint);
   Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
@@ -166,13 +141,15 @@ void Worker::Run() {
   auto cluster = Cluster::Get();
   int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
   CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
-  dealer_ = new Dealer(2*thread_id_);
+  // TODO(wangsh): provide a unique sock id from cluster
+  dealer_ = new Dealer(0);
   ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
   for (auto layer : train_net_->layers()) {
     if (layer->partition_id() == id_) {
       if (typeid(layer) == typeid(BridgeDstLayer)
           || typeid(layer) == typeid(BridgeSrcLayer)) {
-        layer_dealer_ = new Dealer(2*thread_id_+1);
+        // TODO(wangsh): provide a unique socket id from cluster
+        layer_dealer_ = new Dealer(1);
         ConnectStub(grp_id_, id_, layer_dealer_, kWorkerLayer);
         break;
       }
@@ -184,16 +161,15 @@ void Worker::Run() {
   Metric perf;
   while (!StopNow(step_)) {
     if (ValidateNow(step_) && validation_net_ != nullptr) {
-      //LOG(ERROR)<<"Validation at step "<<step;
+      // LOG(ERROR)<<"Validation at step "<<step;
       CollectAll(validation_net_, step_);
       Test(job_conf_.valid_steps(), kValidation, validation_net_);
     }
     if (TestNow(step_) && test_net_ != nullptr) {
-      //LOG(ERROR)<<"Test at step "<<step;
+      // LOG(ERROR)<<"Test at step "<<step;
       CollectAll(test_net_, step_);
       Test(job_conf_.test_steps(), kTest, test_net_);
     }
-
     if (CheckpointNow(step_)) {
       CollectAll(train_net_, step_);
       Checkpoint(step_, train_net_);
@@ -210,21 +186,41 @@ void Worker::Run() {
 
   // save the model
   Checkpoint(step_, train_net_);
-
   // clean up
   cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
   // notify the stub on worker stop
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1,-1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_type(kStop);
   dealer_->Send(&msg);  // use param dealer to send the stop msg
-
   LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
 }
 
-
+void Worker::Checkpoint(int step, NeuralNet* net) {
+  if (grp_id_ == 0) {
+    BlobProtos bps;
+    for (auto layer : net->layers()) {
+      if (layer->partition_id() == id_) {
+        for (auto param : layer->GetParams()) {
+          // only owners fill the memory of parameter values.
+          if (param->owner() == param->id()) {
+            auto *blob = bps.add_blob();
+            param->ToProto(blob);
+            bps.add_version(param->version());
+            bps.add_name(param->name());
+          }
+        }
+      }
+    }
+    char buf[256];
+    snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
+             Cluster::Get()->checkpoint_folder().c_str(), step, id_);
+    LOG(INFO) << "checkpoint to " << buf;
+    WriteProtoToBinaryFile(bps, buf);
+  }
+}
 
 int Worker::Put(Param* param, int step) {
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kPut);
   dealer_->Send(&msg);
@@ -234,7 +230,7 @@ int Worker::Put(Param* param, int step) {
 int Worker::Get(Param* param, int step) {
   if (param->version() >= step)
     return 1;
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kGet);
   dealer_->Send(&msg);
@@ -243,28 +239,31 @@ int Worker::Get(Param* param, int step) {
 
 int Worker::Update(Param* param, int step) {
   param->set_local_version(param->version());
-  Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(ParamTrgt(param->owner(), 0), step);
   msg->set_type(kUpdate);
   dealer_->Send(&msg);
   return 1;
 }
 
-int Worker::CollectAll(shared_ptr<NeuralNet> net, int step) {
+int Worker::CollectAll(NeuralNet* net, int step) {
   auto& layers = net->layers();
-  for (auto& layer : layers){
-    if (layer->partition_id() == id_)
-      for (Param* p: layer->GetParams()) {
+  for (auto& layer : layers) {
+    if (layer->partition_id() == id_) {
+      for (Param* p : layer->GetParams()) {
         Collect(p, step);
       }
+    }
   }
   return 1;
 }
+
 int Worker::Collect(Param* param, int step) {
   while (param->version() <= param->local_version())
     std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
   return 1;
 }
+
 void Worker::Report(const string& prefix, const Metric & perf) {
   Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
   msg->set_trgt(0, step_);
@@ -275,8 +274,8 @@ void Worker::Report(const string& prefix, const Metric & 
perf) {
   dealer_->Send(&msg);
 }
 
-void Worker::ReceiveBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
+void Worker::ReceiveBlobs(bool data, bool grad, BridgeLayer* layer,
+                          NeuralNet* net) {
   while (!layer->ready()) {
     auto msg = layer_dealer_->Receive();
     CHECK_EQ(AddrGrp(msg->src()), grp_id_);
@@ -290,19 +289,19 @@ void Worker::ReceiveBlobs(
   }
 }
 
-void Worker::SendBlobs(
-    bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
-  auto dst=layer->dstlayers().at(0);
-  Msg *msg=new Msg();
+void Worker::SendBlobs(bool data, bool grad, BridgeLayer* layer,
+                       NeuralNet* net) {
+  auto dst = layer->dstlayers().at(0);
+  Msg *msg = new Msg();
   msg->set_src(Addr(grp_id_, id_, kWorkerLayer));
   msg->set_dst(Addr(grp_id_, dst->partition_id(), kWorkerLayer));
   msg->AddFrame(dst->name().c_str(), dst->name().length());
-  auto const & blob=layer->data(nullptr);
-  msg->AddFrame(blob.cpu_data(), blob.count()*sizeof(float));
+  auto const & blob = layer->data(nullptr);
+  msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float));
   layer_dealer_->Send(&msg);
 }
 
-void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) {
+void Worker::Test(int nsteps, Phase phase, NeuralNet* net) {
   Metric perf;
   for (int step = 0; step < nsteps; step++)
     TestOneBatch(step, phase, net, &perf);
@@ -312,96 +311,63 @@ void Worker::Test(int nsteps, Phase phase, 
shared_ptr<NeuralNet> net) {
     Report("Test", perf);
 }
 
-bool Worker::DisplayNow(int step) const {
-  return (job_conf_.disp_freq() > 0
-      && step >= job_conf_.disp_after()
-      && ((step - job_conf_.disp_after())
-        % job_conf_.disp_freq() == 0));
-}
-
-bool Worker::DisplayDebugInfo(int step) const {
-  return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
-}
-bool Worker::StopNow(int step) const {
-  return step >= job_conf_.train_steps();
-}
-bool Worker::CheckpointNow(int step) const {
-  return (grp_id_ == 0
-      && job_conf_.checkpoint_freq() > 0
-      && step >= job_conf_.checkpoint_after()
-      && ((step - job_conf_.checkpoint_after())
-        % job_conf_.checkpoint_freq() == 0));
-}
-bool Worker::TestNow(const int step) const {
-  return (grp_id_ == 0
-      && job_conf_.test_freq() > 0
-      && job_conf_.test_steps() > 0
-      && step >= job_conf_.test_after()
-      && ((step - job_conf_.test_after())
-        % job_conf_.test_freq() == 0));
-}
-bool Worker::ValidateNow(const int step) const {
-  return (grp_id_ == 0
-      && job_conf_.valid_freq() > 0
-      && job_conf_.valid_steps() > 0
-      && step >= job_conf_.valid_after()
-      && ((step - job_conf_.valid_after())
-        % job_conf_.valid_freq() == 0));
+/****************************BPWorker**********************************/
+void BPWorker::TrainOneBatch(int step, Metric* perf) {
+  Forward(step, kTrain, train_net_, perf);
+  Backward(step, train_net_);
 }
 
-
-/****************************BPWorker**********************************/
-void BPWorker::Init(int thread_id, int group_id, int worker_id) {
-  Worker::Init(thread_id, group_id, worker_id);
+void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) {
+  Forward(step, phase, net, perf);
 }
 
-void BPWorker::Forward(
-    int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf) {
+void BPWorker::Forward(int step, Phase phase, NeuralNet* net, Metric* perf) {
   for (auto& layer : net->layers()) {
     if (layer->partition_id() == id_) {
-      if (typeid(*layer) == typeid(BridgeDstLayer))  // recv data from other 
workers
-        ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // recv data from other workers
+      // if (typeid(*layer) == typeid(BridgeDstLayer))
+      //   ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
       if (phase == kTrain) {
-        for (Param* p : layer->GetParams()) {  // wait until param is updated
+        // wait until param is updated
+        for (Param* p : layer->GetParams()) {
           Collect(p, step);
         }
       }
       layer->ComputeFeature(phase | kForward, perf);
-      if (typeid(*layer) == typeid(BridgeSrcLayer))  // send data to other 
workers
-        SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // send data to other workers
+      // if (typeid(*layer) == typeid(BridgeSrcLayer))
+      //   SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
       if (DisplayDebugInfo(step))
         LOG(INFO) << layer->DebugString(step, phase | kForward);
     }
   }
 }
 
-void BPWorker::Backward(int step, shared_ptr<NeuralNet> net) {
-  auto& layers=net->layers();
-  for (auto it = layers.rbegin(); it != layers.rend(); it++){
+void BPWorker::Backward(int step, NeuralNet* net) {
+  auto& layers = net->layers();
+  for (auto it = layers.rbegin(); it != layers.rend(); it++) {
     Layer* layer = *it;
     if (layer->partition_id() == id_) {
-      // if (typeid(layer) == typeid(BridgeSrcLayer))  // send data to other 
workers
-      // ReceiveBlobs(false, true, layer, net);
+      // TODO(wangwei): enable this for model partition
+      // send data to other workers
+      // if (typeid(layer) == typeid(BridgeSrcLayer))
+      //   ReceiveBlobs(false, true, layer, net);
       layer->ComputeGradient(kTrain | kBackward, nullptr);
       if (DisplayDebugInfo(step))
         LOG(INFO) << layer->DebugString(step, kTrain | kBackward);
       for (Param* p : layer->GetParams())
         Update(p, step);
-      if (typeid(layer) == typeid(BridgeDstLayer))  // recv data from other 
workers
-        SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
+      // TODO(wangwei): enable this for model partition
+      // recv data from other workers
+      // if (typeid(layer) == typeid(BridgeDstLayer))
+      //   SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
     }
   }
 }
 
-void BPWorker::TrainOneBatch(int step, Metric* perf) {
-  Forward(step, kTrain, train_net_, perf);
-  Backward(step, train_net_);
-}
-
-void BPWorker::TestOneBatch(int step, Phase phase,
-    shared_ptr<NeuralNet> net, Metric* perf) {
-  Forward(step, phase, net, perf);
-}
 /****************************CDWorker**********************************/
 void CDWorker::TrainOneBatch(int step, Metric* perf) {
   const auto& layers = train_net_->layers();
@@ -432,8 +398,8 @@ void CDWorker::TrainOneBatch(int step, Metric* perf) {
   }
 }
 
-void CDWorker::TestOneBatch(int step, Phase phase,
-    shared_ptr<NeuralNet> net, Metric* perf) {
+void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+                            Metric* perf) {
   auto& layers = net->layers();
   for (auto *layer : layers)
     layer->ComputeFeature(kPositive, perf);
@@ -441,4 +407,5 @@ void CDWorker::TestOneBatch(int step, Phase phase,
     if (typeid(*layer) == typeid(RBMVisLayer))
       layer->ComputeFeature(kNegative | kTest, perf);
 }
+
 }  // namespace singa

Reply via email to