SINGA-21 Code review 5
review worker.h, worker.cc
- format code
- change shared_ptr to raw ptr for neuralnet object
all neuralnet object will be managed by trainer
trainer passes pointers to workers and releases them in destructor
others
- remove virtual keyword in Server class (server.h)
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f50d293f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f50d293f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f50d293f
Branch: refs/heads/master
Commit: f50d293ff550d5b8ccace9f7e6992865474d0d29
Parents: d3e1fca
Author: wang sheng <[email protected]>
Authored: Wed Sep 23 13:15:42 2015 +0800
Committer: wang sheng <[email protected]>
Committed: Wed Sep 23 13:28:28 2015 +0800
----------------------------------------------------------------------
include/neuralnet/neuralnet.h | 10 +-
include/trainer/server.h | 9 +-
include/trainer/trainer.h | 12 +-
include/trainer/worker.h | 129 ++++++++++++---------
src/neuralnet/neuralnet.cc | 8 +-
src/trainer/server.cc | 3 +-
src/trainer/trainer.cc | 24 ++--
src/trainer/worker.cc | 229 ++++++++++++++++---------------------
8 files changed, 208 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
index 06fd977..693fe19 100644
--- a/include/neuralnet/neuralnet.h
+++ b/include/neuralnet/neuralnet.h
@@ -22,8 +22,6 @@
#ifndef SINGA_NEURALNET_NEURALNET_H_
#define SINGA_NEURALNET_NEURALNET_H_
-#include <map>
-#include <memory>
#include <string>
#include <vector>
@@ -52,10 +50,10 @@ class NeuralNet {
* @param net_conf proto for the neural network
* @param phase test/training/validation
* @param npartitions num of partitions, do partitioning if num > 1
- * @return shared pointer to a neural net
+ * @return pointer to a neural net
*/
- static std::shared_ptr<NeuralNet> Create(const NetProto& net_conf,
- Phase phase, int npartitions);
+ static NeuralNet* Create(const NetProto& net_conf, Phase phase,
+ int npartitions);
/**
* construct the net structure from protocol buffer.
@@ -71,7 +69,7 @@ class NeuralNet {
/**
* Share memory of parameter values from other neuralnet
*/
- void ShareParamsFrom(std::shared_ptr<NeuralNet> other);
+ void ShareParamsFrom(NeuralNet* other);
inline const std::vector<Layer*>& layers() { return layers_; }
inline const std::vector<Param*>& params() const { return params_; }
inline Layer* name2layer(std::string name) const {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
index 3f3539a..84b3a41 100644
--- a/include/trainer/server.h
+++ b/include/trainer/server.h
@@ -22,7 +22,6 @@
#ifndef SINGA_TRAINER_SERVER_H_
#define SINGA_TRAINER_SERVER_H_
-#include <memory>
#include <unordered_map>
#include <vector>
#include "communication/socket.h"
@@ -45,7 +44,7 @@ namespace singa {
class Server {
public:
Server(int group_id, int server_id);
- virtual ~Server();
+ ~Server();
void Setup(const UpdaterProto& proto, const std::vector<int>& slice2group,
const std::vector<int>& slice2server);
void Run();
@@ -59,7 +58,7 @@ class Server {
* @return the orignal message or a response message which contains the
values
* of the Param with the request version.
*/
- virtual Msg* HandleGet(Msg** msg);
+ Msg* HandleGet(Msg** msg);
/**
* Process Update request.
*
@@ -88,7 +87,7 @@ class Server {
* @return the original message or response message. If we don't want to
* acknowledge the put request, then return nullptr.
*/
- virtual Msg* HandlePut(Msg **msg);
+ Msg* HandlePut(Msg **msg);
/**
* Handle sync request from other server groups.
*
@@ -100,7 +99,7 @@ class Server {
* @param msg request msg containing the parameter updates
* @return response msg that contains the fresh parameter values.
*/
- virtual Msg* HandleSyncRequest(Msg** msg);
+ Msg* HandleSyncRequest(Msg** msg);
/**
* Handle sync response.
*
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 0b03dea..d3d332f 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -21,8 +21,10 @@
#ifndef INCLUDE_TRAINER_TRAINER_H_
#define INCLUDE_TRAINER_TRAINER_H_
-#include <unordered_map>
+
#include <queue>
+#include <vector>
+#include <unordered_map>
#include "proto/job.pb.h"
#include "proto/singa.pb.h"
#include "utils/param.h"
@@ -34,13 +36,15 @@
#include "communication/socket.h"
namespace singa {
+
+using std::vector;
+
/**
* Every running process has a training object which launches one or more
* worker (and server) threads.
*
* The main thread runs a loop to forward messages between workers and servers.
*/
-
class Trainer{
public:
~Trainer();
@@ -82,7 +86,7 @@ class Trainer{
* @param jobConf
* @return worker instances
*/
- vector<Worker*> CreateWorkers(int nthread, const JobProto& jobConf);
+ vector<Worker*> CreateWorkers(const JobProto& jobConf);
/**
* Setup workers and servers.
@@ -158,6 +162,8 @@ class Trainer{
std::unordered_map<int, ParamEntry*> worker_shard_;
//!< map from slice to the server that updates it
vector<int> slice2server_;
+ //stub will destroy all neuralnets in the end
+ vector<NeuralNet*> nets_;
};
} /* singa */
#endif // INCLUDE_TRAINER_TRAINER_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
index 679435c..66439ec 100644
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@ -21,19 +21,16 @@
#ifndef SINGA_TRAINER_WORKER_H_
#define SINGA_TRAINER_WORKER_H_
+
+#include <string>
+#include "communication/socket.h"
#include "neuralnet/neuralnet.h"
#include "proto/job.pb.h"
-#include "communication/socket.h"
namespace singa {
-using std::map;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-
//!< sleep 5 milliseconds if the Param is not updated to the expected version
-const int kCollectSleepTime=5;
+const int kCollectSleepTime = 5;
/**
* The Worker class which runs the training algorithm.
* The first worker group will initialize parameters of the Net,
@@ -53,19 +50,13 @@ class Worker {
* @param grp_id global worker group ID
* @param id worker ID within the group
*/
- virtual void Init(int thread_id, int grp_id, int id);
+ virtual void Init(int grp_id, int id);
virtual ~Worker();
/**
* Setup members
*/
- void Setup(const JobProto& job, shared_ptr<NeuralNet> train_net,
- shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net);
- /**
- * Main function of Worker.
- *
- * Train the neuralnet step by step, test/validation is done periodically.
- */
- void Run();
+ void Setup(const JobProto& job, NeuralNet* train_net, NeuralNet* valid_net,
+ NeuralNet* test_net);
/**
* Init all local params (i.e., params from layers resident in this worker).
*
@@ -78,12 +69,17 @@ class Worker {
* train for a couple of steps to warmup the params before put
* them to servers (warmup of JobProto controls this).
*
- * If the owner param is availabel from checkpoint file, then its
+ * If the owner param is available from checkpoint file, then its
* values are parsed from the checkpoint file instead of randomly
initialized.
* For params who do not have checkpoints, randomly init them.
*/
void InitLocalParams();
-
+ /**
+ * Main function of Worker.
+ *
+ * Train the neuralnet step by step, test/validation is done periodically.
+ */
+ void Run();
/**
* Checkpoint all params owned by the worker from the first group onto disk.
* The serialization is done using BlobProtos which includes the name,
version
@@ -93,31 +89,30 @@ class Worker {
* @param step training step of this worker
* @param net the training net whose params will be dumped.
*/
- void Checkpoint(int step, shared_ptr<NeuralNet> net);
+ void Checkpoint(int step, NeuralNet* net);
/**
* Test the perforance of the learned model on validation or test dataset.
* Test is done by the first group.
* @param net, neural network
*/
- void Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net);
+ void Test(int nsteps, Phase phase, NeuralNet* net);
/**
* Train one mini-batch.
* Test/Validation is done before training.
*/
- virtual void TrainOneBatch(int step, Metric* perf)=0;
+ virtual void TrainOneBatch(int step, Metric* perf) = 0;
/**
* Test/validate one mini-batch.
*/
- virtual void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
- Metric* perf)=0;
+ virtual void TestOneBatch(int step, Phase phase, NeuralNet* net,
+ Metric* perf) = 0;
/**
* Report performance to the stub.
*
* @param prefix display prefix, e.g., 'Train', 'Test'
* @param perf
*/
- void Report(const string& prefix, const Metric & perf);
-
+ void Report(const std::string& prefix, const Metric & perf);
/**
* Put Param to server.
* @param param
@@ -148,80 +143,101 @@ class Worker {
/**
* Call Collect for every param of net
*/
- int CollectAll(shared_ptr<NeuralNet> net, int step);
+ int CollectAll(NeuralNet* net, int step);
/**
* Receive blobs from other workers due to model partitions.
*/
- void ReceiveBlobs(
- bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
+ void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
/**
* Send blobs to other workers due to model partitions.
*/
- void SendBlobs(
- bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net);
-
+ void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
/**
* Check is it time to display training info, e.g., loss and precison.
*/
- inline bool DisplayNow(int step) const;
+ inline bool DisplayNow(int step) const {
+ return job_conf_.disp_freq() > 0
+ && step >= job_conf_.disp_after()
+ && ((step - job_conf_.disp_after()) % job_conf_.disp_freq() == 0);
+ }
/**
* Check is it time to display training info, e.g., loss and precison.
*/
- inline bool DisplayDebugInfo(int step) const;
+ inline bool DisplayDebugInfo(int step) const {
+ return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
+ }
/**
* Check is it time to stop
*/
- inline bool StopNow(int step) const;
+ inline bool StopNow(int step) const {
+ return step >= job_conf_.train_steps();
+ }
/**
* Check is it time to do checkpoint.
*/
- inline bool CheckpointNow(int step) const;
+ inline bool CheckpointNow(int step) const {
+ return grp_id_ == 0
+ && job_conf_.checkpoint_freq() > 0
+ && step >= job_conf_.checkpoint_after()
+ && ((step - job_conf_.checkpoint_after())
+ % job_conf_.checkpoint_freq() == 0);
+ }
/**
* Check is it time to do test.
* @param step the ::Train() has been called this num times.
*/
- inline bool TestNow(int step) const;
+ inline bool TestNow(int step) const {
+ return grp_id_ == 0
+ && job_conf_.test_freq() > 0
+ && job_conf_.test_steps() > 0
+ && step >= job_conf_.test_after()
+ && ((step - job_conf_.test_after()) % job_conf_.test_freq() == 0);
+ }
/**
* Check is it time to do validation.
* @param step the ::Train() has been called step times.
*/
- inline bool ValidateNow(int step) const;
-
+ inline bool ValidateNow(int step) const {
+ return grp_id_ == 0
+ && job_conf_.valid_freq() > 0
+ && job_conf_.valid_steps() > 0
+ && step >= job_conf_.valid_after()
+ && ((step - job_conf_.valid_after()) % job_conf_.valid_freq() == 0);
+ }
/**
* @return group ID
*/
- int grp_id() const { return grp_id_;}
-
+ int grp_id() const { return grp_id_; }
/**
* @reutrn worker ID within the worker group.
*/
- int id() const { return id_;}
+ int id() const { return id_; }
protected:
- int thread_id_, grp_id_, id_;
- int step_;
+ int grp_id_ = -1, id_ = -1;
+ int step_ = 0;
JobProto job_conf_;
- shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
- Dealer* layer_dealer_, *dealer_;
+ NeuralNet* train_net_ = nullptr;
+ NeuralNet* test_net_ = nullptr;
+ NeuralNet* validation_net_ = nullptr;
+ Dealer* layer_dealer_ = nullptr;
+ Dealer* dealer_ = nullptr;
};
-class BPWorker: public Worker{
+class BPWorker: public Worker {
public:
- ~BPWorker(){}
- void Init(int thread_id, int grp_id, int id) override;
void TrainOneBatch(int step, Metric* perf) override;
- void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
- Metric* perf) override;
-
- void Forward(int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf);
- void Backward(int step, shared_ptr<NeuralNet> net);
+ void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+ override;
+ void Forward(int step, Phase phase, NeuralNet* net, Metric* perf);
+ void Backward(int step, NeuralNet* net);
};
-class CDWorker: public Worker{
+class CDWorker: public Worker {
public:
void TrainOneBatch(int step, Metric* perf) override;
- void TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net,
- Metric* perf) override;
+ void TestOneBatch(int step, Phase phase, NeuralNet* net, Metric* perf)
+ override;
};
inline int BlobTrgt(int grp, int layer) {
@@ -236,6 +252,7 @@ inline int BlobLayer(int blob_trgt) {
static int mask = (1 << 16) -1;
return blob_trgt & mask;
}
+
} // namespace singa
#endif // SINGA_TRAINER_WORKER_H_
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 286d273..200824a 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -28,11 +28,10 @@
namespace singa {
using std::map;
-using std::shared_ptr;
using std::string;
using std::vector;
-shared_ptr<NeuralNet> NeuralNet::Create(const NetProto& net_conf, Phase phase,
+NeuralNet* NeuralNet::Create(const NetProto& net_conf, Phase phase,
int npartitions) {
NetProto conf;
conf.CopyFrom(net_conf);
@@ -76,8 +75,7 @@ shared_ptr<NeuralNet> NeuralNet::Create(const NetProto&
net_conf, Phase phase,
}
LOG(INFO) << "NeuralNet config is\n" << conf.DebugString();
// TODO(wangwei) create net based on net type, e.g., directed, undirected,
etc
- auto net = std::make_shared<NeuralNet>(conf, npartitions);
- return net;
+ return new NeuralNet(conf, npartitions);
}
NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
@@ -107,7 +105,7 @@ std::string NeuralNet::ToAdjacency() {
return disp;
}
-void NeuralNet::ShareParamsFrom(shared_ptr<NeuralNet> other) {
+void NeuralNet::ShareParamsFrom(NeuralNet* other) {
for (auto& layer : layers_) {
auto otherlayer = other->name2layer(layer->name());
if (otherlayer != nullptr) {
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 29f6a68..5e74c1b 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -19,11 +19,12 @@
*
*************************************************************/
+#include "trainer/server.h"
+
#include <thread>
#include <chrono>
#include "mshadow/tensor.h"
#include "proto/common.pb.h"
-#include "trainer/server.h"
#include "utils/param.h"
#include "utils/singleton.h"
#include "utils/factory.h"
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index c928d91..8a0589e 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -38,11 +38,13 @@ using std::vector;
using std::map;
using std::queue;
using namespace std::chrono;
-using std::make_shared;
+using std::string;
/***********************Trainer****************************/
Trainer::~Trainer() {
delete router_;
+ for (NeuralNet* p : nets_)
+ delete p;
}
const vector<int> SliceParams(const vector<Param*>& params) {
@@ -92,30 +94,35 @@ void Trainer::SetupWorkerServer(
int grp_size = cluster->nworkers_per_group();
const auto& net_conf = job_conf.neuralnet();
auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
+ nets_.push_back(net);
// MUST do SliceParam before share param/net with others
auto slices = SliceParams(net->params());
- std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
+ std::unordered_map<int, NeuralNet*> grp_net;
int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
for (auto worker : workers) {
int grp_id = worker->grp_id();
int worker_id = worker->id();
- shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
+ NeuralNet* test_net = nullptr;
+ NeuralNet* valid_net = nullptr;
if (grp_net.find(grp_id) == grp_net.end()) {
if (grp_id == first_grp) {
// test are performed only by the first group now. TODO update.
if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for
exp
test_net->ShareParamsFrom(net);
+ nets_.push_back(test_net);
}
// validation are performed only by the first group. TODO update.
if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
valid_net = NeuralNet::Create(net_conf, kValidation, 1);
valid_net->ShareParamsFrom(net);
+ nets_.push_back(valid_net);
}
grp_net[grp_id] = net;
} else {
grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
+ nets_.push_back(grp_net[grp_id]);
if(cluster->share_memory())
grp_net[grp_id]->ShareParamsFrom(net);
}
@@ -131,7 +138,7 @@ void Trainer::SetupWorkerServer(
}
}
LOG(INFO) << "grp " << worker->grp_id() << ", worker "
- << worker->id() << " net " << grp_net[grp_id].get();
+ << worker->id() << " net " << grp_net[grp_id];
worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
}
@@ -168,7 +175,7 @@ vector<Server*> Trainer::CreateServers(const JobProto& job)
{
}
-vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
+vector<Worker*> Trainer::CreateWorkers(const JobProto& job) {
auto cluster=Cluster::Get();
vector<Worker*> workers;
if(!cluster->has_worker())
@@ -180,7 +187,7 @@ vector<Worker*> Trainer::CreateWorkers(int nthreads, const
JobProto& job) {
for (int gid = gstart; gid < gend; gid++) {
for (int wid = wstart; wid < wend; wid++) {
auto *worker = Worker::Create(job);
- worker->Init(nthreads++,gid, wid);
+ worker->Init(gid, wid);
workers.push_back(worker);
}
}
@@ -241,13 +248,12 @@ void Trainer::Start(bool resume, const SingaProto&
singaConf, JobProto* job) {
// register endpoint to zookeeper
cluster->Register(getpid(), hostip + ":" + std::to_string(port));
- int nthreads = 1;
- const vector<Worker*> workers = CreateWorkers(nthreads, *job);
- nthreads += workers.size();
+ const vector<Worker*> workers = CreateWorkers(*job);
const vector<Server*> servers = CreateServers(*job);
SetupWorkerServer(*job, workers, servers);
#ifdef USE_MPI
+ int nthreads = workers.size() + servers.size();
for (int i = 0; i < nthreads; i++)
MPIQueues.push_back(make_shared<SafeQueue>());
#endif
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f50d293f/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 23382e3..70859de 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -19,18 +19,19 @@
*
*************************************************************/
+#include "trainer/worker.h"
+
#include <glog/logging.h>
-#include <thread>
#include <chrono>
#include <thread>
#include <typeinfo>
-#include "utils/singleton.h"
#include "utils/cluster.h"
#include "utils/factory.h"
-#include "trainer/worker.h"
+#include "utils/singleton.h"
namespace singa {
-using std::thread;
+
+using std::string;
Worker* Worker::Create(const JobProto& proto) {
auto factory = Singleton<Factory<singa::Worker>>::Instance();
@@ -43,22 +44,12 @@ Worker* Worker::Create(const JobProto& proto) {
return worker;
}
-void Worker::Init(int thread_id, int grp_id, int id) {
- thread_id_ = thread_id;
+void Worker::Init(int grp_id, int id) {
grp_id_ = grp_id;
id_ = id;
layer_dealer_ = dealer_ = nullptr;
}
-void Worker::Setup(
- const JobProto& job, shared_ptr<NeuralNet> train_net,
- shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net) {
- job_conf_.CopyFrom(job);
- train_net_ = train_net;
- validation_net_ = valid_net;
- test_net_ = test_net;
-}
-
Worker::~Worker() {
if (layer_dealer_)
delete layer_dealer_;
@@ -66,17 +57,25 @@ Worker::~Worker() {
delete dealer_;
}
+void Worker::Setup(const JobProto& job, NeuralNet* train_net,
+ NeuralNet* valid_net, NeuralNet* test_net) {
+ job_conf_.CopyFrom(job);
+ train_net_ = train_net;
+ validation_net_ = valid_net;
+ test_net_ = test_net;
+}
+
void Worker::InitLocalParams() {
// for each server grp, its first subscriber worker grp does the param init
if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) {
// extract params that should be initialized by this worker
// must gen a name for each param if the user doesn't config it
std::unordered_map<string, Param*> name2param;
- for (auto layer: train_net_->layers()){
+ for (auto layer : train_net_->layers()) {
if (layer->partition_id() == id_) {
for (auto param : layer->GetParams()) {
// only owners fill the memory of parameter values.
- if(param->owner() == param->id()) {
+ if (param->owner() == param->id()) {
CHECK(name2param.find(param->name()) == name2param.end());
name2param[param->name()] = param;
}
@@ -94,7 +93,7 @@ void Worker::InitLocalParams() {
if (name2param.find(bps.name(i)) != name2param.end()) {
name2param.at(bps.name(i))->FromProto(bps.blob(i));
// if load from pre-training params, reset version to start step
- if(job_conf_.reset_param_version())
+ if (job_conf_.reset_param_version())
name2param.at(bps.name(i))->set_version(job_conf_.step());
else // if resume training, use the same version as last checkpoint
name2param.at(bps.name(i))->set_version(bps.version(i));
@@ -130,30 +129,6 @@ void Worker::InitLocalParams() {
}
}
-void Worker::Checkpoint(int step, shared_ptr<NeuralNet> net) {
- if (grp_id_ == 0) {
- BlobProtos bps;
- for (auto layer: net->layers()){
- if (layer->partition_id() == id_) {
- for (auto param : layer->GetParams()) {
- // only owners fill the memory of parameter values.
- if(param->owner() == param->id()) {
- auto *blob = bps.add_blob();
- param->ToProto(blob);
- bps.add_version(param->version());
- bps.add_name(param->name());
- }
- }
- }
- }
- char buf[256];
- snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
- Cluster::Get()->checkpoint_folder().c_str(), step, id_);
- LOG(INFO) << "checkpoint to " << buf;
- WriteProtoToBinaryFile(bps, buf);
- }
-}
-
void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
dealer->Connect(kInprocRouterEndpoint);
Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
@@ -166,13 +141,15 @@ void Worker::Run() {
auto cluster = Cluster::Get();
int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
- dealer_ = new Dealer(2*thread_id_);
+ // TODO(wangsh): provide a unique sock id from cluster
+ dealer_ = new Dealer(0);
ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
for (auto layer : train_net_->layers()) {
if (layer->partition_id() == id_) {
if (typeid(layer) == typeid(BridgeDstLayer)
|| typeid(layer) == typeid(BridgeSrcLayer)) {
- layer_dealer_ = new Dealer(2*thread_id_+1);
+ // TODO(wangsh): provide a unique socket id from cluster
+ layer_dealer_ = new Dealer(1);
ConnectStub(grp_id_, id_, layer_dealer_, kWorkerLayer);
break;
}
@@ -184,16 +161,15 @@ void Worker::Run() {
Metric perf;
while (!StopNow(step_)) {
if (ValidateNow(step_) && validation_net_ != nullptr) {
- //LOG(ERROR)<<"Validation at step "<<step;
+ // LOG(ERROR)<<"Validation at step "<<step;
CollectAll(validation_net_, step_);
Test(job_conf_.valid_steps(), kValidation, validation_net_);
}
if (TestNow(step_) && test_net_ != nullptr) {
- //LOG(ERROR)<<"Test at step "<<step;
+ // LOG(ERROR)<<"Test at step "<<step;
CollectAll(test_net_, step_);
Test(job_conf_.test_steps(), kTest, test_net_);
}
-
if (CheckpointNow(step_)) {
CollectAll(train_net_, step_);
Checkpoint(step_, train_net_);
@@ -210,21 +186,41 @@ void Worker::Run() {
// save the model
Checkpoint(step_, train_net_);
-
// clean up
cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
// notify the stub on worker stop
- Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1,-1, kStub));
+ Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
msg->set_type(kStop);
dealer_->Send(&msg); // use param dealer to send the stop msg
-
LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
}
-
+void Worker::Checkpoint(int step, NeuralNet* net) {
+ if (grp_id_ == 0) {
+ BlobProtos bps;
+ for (auto layer : net->layers()) {
+ if (layer->partition_id() == id_) {
+ for (auto param : layer->GetParams()) {
+ // only owners fill the memory of parameter values.
+ if (param->owner() == param->id()) {
+ auto *blob = bps.add_blob();
+ param->ToProto(blob);
+ bps.add_version(param->version());
+ bps.add_name(param->name());
+ }
+ }
+ }
+ }
+ char buf[256];
+ snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin",
+ Cluster::Get()->checkpoint_folder().c_str(), step, id_);
+ LOG(INFO) << "checkpoint to " << buf;
+ WriteProtoToBinaryFile(bps, buf);
+ }
+}
int Worker::Put(Param* param, int step) {
- Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+ Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
msg->set_trgt(ParamTrgt(param->owner(), 0), step);
msg->set_type(kPut);
dealer_->Send(&msg);
@@ -234,7 +230,7 @@ int Worker::Put(Param* param, int step) {
int Worker::Get(Param* param, int step) {
if (param->version() >= step)
return 1;
- Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+ Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
msg->set_trgt(ParamTrgt(param->owner(), 0), step);
msg->set_type(kGet);
dealer_->Send(&msg);
@@ -243,28 +239,31 @@ int Worker::Get(Param* param, int step) {
int Worker::Update(Param* param, int step) {
param->set_local_version(param->version());
- Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+ Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
msg->set_trgt(ParamTrgt(param->owner(), 0), step);
msg->set_type(kUpdate);
dealer_->Send(&msg);
return 1;
}
-int Worker::CollectAll(shared_ptr<NeuralNet> net, int step) {
+int Worker::CollectAll(NeuralNet* net, int step) {
auto& layers = net->layers();
- for (auto& layer : layers){
- if (layer->partition_id() == id_)
- for (Param* p: layer->GetParams()) {
+ for (auto& layer : layers) {
+ if (layer->partition_id() == id_) {
+ for (Param* p : layer->GetParams()) {
Collect(p, step);
}
+ }
}
return 1;
}
+
int Worker::Collect(Param* param, int step) {
while (param->version() <= param->local_version())
std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime));
return 1;
}
+
void Worker::Report(const string& prefix, const Metric & perf) {
Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
msg->set_trgt(0, step_);
@@ -275,8 +274,8 @@ void Worker::Report(const string& prefix, const Metric &
perf) {
dealer_->Send(&msg);
}
-void Worker::ReceiveBlobs(
- bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
+void Worker::ReceiveBlobs(bool data, bool grad, BridgeLayer* layer,
+ NeuralNet* net) {
while (!layer->ready()) {
auto msg = layer_dealer_->Receive();
CHECK_EQ(AddrGrp(msg->src()), grp_id_);
@@ -290,19 +289,19 @@ void Worker::ReceiveBlobs(
}
}
-void Worker::SendBlobs(
- bool data, bool grad, BridgeLayer* layer, shared_ptr<NeuralNet> net) {
- auto dst=layer->dstlayers().at(0);
- Msg *msg=new Msg();
+void Worker::SendBlobs(bool data, bool grad, BridgeLayer* layer,
+ NeuralNet* net) {
+ auto dst = layer->dstlayers().at(0);
+ Msg *msg = new Msg();
msg->set_src(Addr(grp_id_, id_, kWorkerLayer));
msg->set_dst(Addr(grp_id_, dst->partition_id(), kWorkerLayer));
msg->AddFrame(dst->name().c_str(), dst->name().length());
- auto const & blob=layer->data(nullptr);
- msg->AddFrame(blob.cpu_data(), blob.count()*sizeof(float));
+ auto const & blob = layer->data(nullptr);
+ msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float));
layer_dealer_->Send(&msg);
}
-void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) {
+void Worker::Test(int nsteps, Phase phase, NeuralNet* net) {
Metric perf;
for (int step = 0; step < nsteps; step++)
TestOneBatch(step, phase, net, &perf);
@@ -312,96 +311,63 @@ void Worker::Test(int nsteps, Phase phase,
shared_ptr<NeuralNet> net) {
Report("Test", perf);
}
-bool Worker::DisplayNow(int step) const {
- return (job_conf_.disp_freq() > 0
- && step >= job_conf_.disp_after()
- && ((step - job_conf_.disp_after())
- % job_conf_.disp_freq() == 0));
-}
-
-bool Worker::DisplayDebugInfo(int step) const {
- return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0;
-}
-bool Worker::StopNow(int step) const {
- return step >= job_conf_.train_steps();
-}
-bool Worker::CheckpointNow(int step) const {
- return (grp_id_ == 0
- && job_conf_.checkpoint_freq() > 0
- && step >= job_conf_.checkpoint_after()
- && ((step - job_conf_.checkpoint_after())
- % job_conf_.checkpoint_freq() == 0));
-}
-bool Worker::TestNow(const int step) const {
- return (grp_id_ == 0
- && job_conf_.test_freq() > 0
- && job_conf_.test_steps() > 0
- && step >= job_conf_.test_after()
- && ((step - job_conf_.test_after())
- % job_conf_.test_freq() == 0));
-}
-bool Worker::ValidateNow(const int step) const {
- return (grp_id_ == 0
- && job_conf_.valid_freq() > 0
- && job_conf_.valid_steps() > 0
- && step >= job_conf_.valid_after()
- && ((step - job_conf_.valid_after())
- % job_conf_.valid_freq() == 0));
+/****************************BPWorker**********************************/
+void BPWorker::TrainOneBatch(int step, Metric* perf) {
+ Forward(step, kTrain, train_net_, perf);
+ Backward(step, train_net_);
}
-
-/****************************BPWorker**********************************/
-void BPWorker::Init(int thread_id, int group_id, int worker_id) {
- Worker::Init(thread_id, group_id, worker_id);
+void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+ Metric* perf) {
+ Forward(step, phase, net, perf);
}
-void BPWorker::Forward(
- int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf) {
+void BPWorker::Forward(int step, Phase phase, NeuralNet* net, Metric* perf) {
for (auto& layer : net->layers()) {
if (layer->partition_id() == id_) {
- if (typeid(*layer) == typeid(BridgeDstLayer)) // recv data from other
workers
- ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+ // TODO(wangwei): enable this for model partition
+ // recv data from other workers
+ // if (typeid(*layer) == typeid(BridgeDstLayer))
+ // ReceiveBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
if (phase == kTrain) {
- for (Param* p : layer->GetParams()) { // wait until param is updated
+ // wait until param is updated
+ for (Param* p : layer->GetParams()) {
Collect(p, step);
}
}
layer->ComputeFeature(phase | kForward, perf);
- if (typeid(*layer) == typeid(BridgeSrcLayer)) // send data to other
workers
- SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
+ // TODO(wangwei): enable this for model partition
+ // send data to other workers
+ // if (typeid(*layer) == typeid(BridgeSrcLayer))
+ // SendBlobs(true, false, dynamic_cast<BridgeLayer*>(layer), net);
if (DisplayDebugInfo(step))
LOG(INFO) << layer->DebugString(step, phase | kForward);
}
}
}
-void BPWorker::Backward(int step, shared_ptr<NeuralNet> net) {
- auto& layers=net->layers();
- for (auto it = layers.rbegin(); it != layers.rend(); it++){
+void BPWorker::Backward(int step, NeuralNet* net) {
+ auto& layers = net->layers();
+ for (auto it = layers.rbegin(); it != layers.rend(); it++) {
Layer* layer = *it;
if (layer->partition_id() == id_) {
- // if (typeid(layer) == typeid(BridgeSrcLayer)) // send data to other
workers
- // ReceiveBlobs(false, true, layer, net);
+ // TODO(wangwei): enable this for model partition
+ // send data to other workers
+ // if (typeid(layer) == typeid(BridgeSrcLayer))
+ // ReceiveBlobs(false, true, layer, net);
layer->ComputeGradient(kTrain | kBackward, nullptr);
if (DisplayDebugInfo(step))
LOG(INFO) << layer->DebugString(step, kTrain | kBackward);
for (Param* p : layer->GetParams())
Update(p, step);
- if (typeid(layer) == typeid(BridgeDstLayer)) // recv data from other
workers
- SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
+ // TODO(wangwei): enable this for model partition
+ // recv data from other workers
+ // if (typeid(layer) == typeid(BridgeDstLayer))
+ // SendBlobs(false, true, dynamic_cast<BridgeDstLayer*>(layer), net);
}
}
}
-void BPWorker::TrainOneBatch(int step, Metric* perf) {
- Forward(step, kTrain, train_net_, perf);
- Backward(step, train_net_);
-}
-
-void BPWorker::TestOneBatch(int step, Phase phase,
- shared_ptr<NeuralNet> net, Metric* perf) {
- Forward(step, phase, net, perf);
-}
/****************************CDWorker**********************************/
void CDWorker::TrainOneBatch(int step, Metric* perf) {
const auto& layers = train_net_->layers();
@@ -432,8 +398,8 @@ void CDWorker::TrainOneBatch(int step, Metric* perf) {
}
}
-void CDWorker::TestOneBatch(int step, Phase phase,
- shared_ptr<NeuralNet> net, Metric* perf) {
+void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net,
+ Metric* perf) {
auto& layers = net->layers();
for (auto *layer : layers)
layer->ComputeFeature(kPositive, perf);
@@ -441,4 +407,5 @@ void CDWorker::TestOneBatch(int step, Phase phase,
if (typeid(*layer) == typeid(RBMVisLayer))
layer->ComputeFeature(kNegative | kTest, perf);
}
+
} // namespace singa