Repository: incubator-singa Updated Branches: refs/heads/master 96bedb226 -> 189261f0e
SINGA-12 Supprt Checkpoint and Restore The checkpoint is done in the Worker class and controlled by two model configuration fields: checkpoint_after and checkpoint_frequency. Only do checkpoint for the Params owning the param values from the first group. The name, version and values of one Param are dumped onto disk (the path is <workspace>/checkpoint/step<training step>-worker<worker id>.bin). It is possible that the snapshot is separated into mutiple files because the neural net is partitioned into multi workers. The checkpoint files can be used: application 1: to restore (resume) the training by setting the command line argument -resume = true. The Resume function of the Trainer will find the files for the latest snapshot and add them to the model.conf's checkpoint filed. It also set the model config's step field to the snapshot step (extracted from file name). application 2: as the pretraining result of another model. Users have to config the new model's checkpoint field to add the paths of all checkpoint files of the same snapshot. The Worker's InitLocalParam will init Param's from checkpoint files if available. Otherwise it randomly init them using user configured init method. It matches the Param objects based on name. If the Param is not configured with a name, NeuralNet class will automatically create one for it based on the name of the layer to which the Param belongs. In this case, for application 2, users have to either configure the names of the new model params carefully to match the names of params from the pre-trained model for application 1, the worker-server topology cannot be changed. Restore for params which are partitioned due to model partitioning is not supported. Because if the pre-training is done using 2 workers, while the new model is trained with 3 workers, then the same original param is partitioned in different ways and hence cannot be matched. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/729a5c48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/729a5c48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/729a5c48 Branch: refs/heads/master Commit: 729a5c48a0142ffad3265b84a69642997167fae9 Parents: 96bedb2 Author: wang wei <[email protected]> Authored: Sat Jul 18 23:18:03 2015 +0800 Committer: wang wei <[email protected]> Committed: Sun Jul 19 00:15:02 2015 +0800 ---------------------------------------------------------------------- bin/singa-run.sh | 6 +- examples/cifar10/cluster-dist.conf | 8 --- examples/cifar10/model.conf | 97 +++++++++++++++++---------------- include/neuralnet/layer.h | 5 +- include/trainer/trainer.h | 19 +++++-- include/trainer/worker.h | 18 ++++-- include/utils/blob.h | 5 +- include/utils/cluster.h | 8 ++- include/utils/param.h | 22 +++++--- src/main.cc | 11 ++-- src/neuralnet/layer.cc | 34 ++++++------ src/neuralnet/neuralnet.cc | 9 ++- src/proto/common.proto | 25 ++++++--- src/proto/model.proto | 14 +++-- src/trainer/trainer.cc | 53 +++++++++++++++--- src/trainer/worker.cc | 77 +++++++++++++++++++++----- src/utils/blob.cc | 23 ++++---- src/utils/cluster.cc | 2 + src/utils/param.cc | 6 ++ 19 files changed, 288 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/bin/singa-run.sh ---------------------------------------------------------------------- diff --git a/bin/singa-run.sh b/bin/singa-run.sh index 7e3e00d..37e7c98 100755 --- a/bin/singa-run.sh +++ b/bin/singa-run.sh @@ -25,7 +25,7 @@ usage="Usage: \n \ (single node): singa-run.sh -cluster=YOUR_CONF_FILE -model=YOUR_CONF_FILE \n \ - (distributed): singa-run.sh -conf=YOUR_CONF_DIR \ + (distributed): singa-run.sh -conf=YOUR_CONF_DIR \ (the directory should contain cluster.conf/model.conf/hostfile)" #if [ $# -le 0 ] || [ $# -ge 3 ] ; then @@ -51,7 +51,7 @@ fi if [ $valid_args = false ] ; then echo -e $usage - exit 1 + exit 1 fi # get singa-base @@ -89,7 +89,7 @@ elif [ $# = 1 ] ; then -oUserKnownHostsFile=/dev/null \ -oLogLevel=quiet" hosts=(`cat $host_path |cut -d ' ' -f 1`) - cmd="./singa -cluster=$conf_path/cluster.conf -model=$conf_path/model.conf" + cmd="./singa -cluster=$conf_path/cluster.conf -model=$conf_path/model.conf -resume=true" ssh_cmd="cd $BASE; "$cmd for i in ${hosts[@]} ; do if [ $i = localhost ] ; then http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/examples/cifar10/cluster-dist.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/cluster-dist.conf b/examples/cifar10/cluster-dist.conf deleted file mode 100644 index 1a4e2c2..0000000 --- a/examples/cifar10/cluster-dist.conf +++ /dev/null @@ -1,8 +0,0 @@ -nworker_groups: 2 -nserver_groups: 2 -nservers_per_group: 1 -nworkers_per_group: 1 -nworkers_per_procs: 1 -workspace: "examples/cifar10/" -hostfile: "examples/cifar10/hostfile" -poll_time: 100 http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/examples/cifar10/model.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/model.conf b/examples/cifar10/model.conf index eff7dbc..f0e677c 100644 --- a/examples/cifar10/model.conf +++ b/examples/cifar10/model.conf @@ -1,8 +1,9 @@ name: "cifar10-convnet" train_steps: 1000 -test_steps:100 +test_steps: 100 test_frequency:300 display_frequency:30 +#checkpoint: "examples/cifar10/checkpoint/step2200-worker0.bin" alg: kBackPropagation updater{ weight_decay:0.004 @@ -61,18 +62,18 @@ layer { stride: 1 pad:2 } - param{ - name: "weight" - init_method:kGaussian - std:0.0001 - learning_rate_multiplier:1.0 - } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - value:0 - } + param { + name: "w1" + init_method:kGaussian + std:0.0001 + learning_rate_multiplier:1.0 + } + param { + name: "b1" + init_method: kConstant + learning_rate_multiplier:2.0 + value:0 + } } layer { @@ -111,18 +112,18 @@ layer { stride: 1 pad:2 } - param{ - name: "weight" - init_method:kGaussian - std:0.01 - learning_rate_multiplier:1.0 - } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - value:0 - } + param { + name: "w2" + init_method:kGaussian + std:0.01 + learning_rate_multiplier:1.0 + } + param { + name: "b2" + init_method: kConstant + learning_rate_multiplier:2.0 + value:0 + } } layer { name: "relu2" @@ -160,16 +161,16 @@ layer { stride: 1 pad:2 } - param{ - name: "weight" - init_method:kGaussian - std:0.01 - } - param{ - name: "bias" - init_method: kConstant - value:0 - } + param { + name: "w3" + init_method:kGaussian + std:0.01 + } + param { + name: "b3" + init_method: kConstant + value:0 + } } layer { name: "relu3" @@ -193,19 +194,19 @@ layer { innerproduct_conf { num_output: 10 } - param{ - name: "weight" - init_method:kGaussian - std:0.01 - learning_rate_multiplier:1.0 - weight_decay_multiplier:250 - } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - weight_decay_multiplier:0 - value:0 + param { + name: "w4" + init_method:kGaussian + std:0.01 + learning_rate_multiplier:1.0 + weight_decay_multiplier:250 + } + param { + name: "b4" + init_method: kConstant + learning_rate_multiplier:2.0 + weight_decay_multiplier:0 + value:0 } } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/neuralnet/layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/layer.h b/include/neuralnet/layer.h index b678e63..9a79e3f 100644 --- a/include/neuralnet/layer.h +++ b/include/neuralnet/layer.h @@ -243,8 +243,9 @@ class LMDBDataLayer: public DataLayer{ void Setup(const LayerProto& proto, int npartitions) override; void ComputeFeature(Phase phase, Metric *perf) override; - void ConvertDatumToSingleLableImageRecord(const Datum& datum, - SingleLabelImageRecord* record); + void ConvertCaffeDatumToRecord(const CaffeDatum& datum, + SingleLabelImageRecord* record); + private: MDB_env* mdb_env_; MDB_dbi mdb_dbi_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index 50526ae..6c2b7c6 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -28,18 +28,27 @@ class Trainer{ * one thread per worker/server. TODO rename variables about cluster config, * job config, etc. * + * @param resume if true resume the training from the latest checkpoint files + * @param job job ID * @param mconf model configuration * @param globalconf global singa configuration * @param cconf cluster configuration - * @param job job ID */ - void Start(const ModelProto& mconf, const GlobalProto& globalconf, - const ClusterProto& cconf, const int job); + void Start(bool resume, int job, ModelProto& mconf, + const GlobalProto& gconf, const ClusterProto& cconf); - // TODO add Resume() function to continue training from a previously stopped - // point. protected: /** + * Setting the checkpoint field of model configuration to resume training. + * + * The checkpoint folder will be searched to get the files for the latest + * checkpoint, which will be added into the checkpoint field. The workers + * would then load the values of params from the checkpoint files. + * + * @param model_conf model configuration + */ + void Resume(ModelProto& model_conf); + /** * Create server instances. * @param nthread total num of threads in current procs which is used to * assign each thread a local thread ID. The number of workers is extracted http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/trainer/worker.h ---------------------------------------------------------------------- diff --git a/include/trainer/worker.h b/include/trainer/worker.h index ad04c1b..9bda99c 100644 --- a/include/trainer/worker.h +++ b/include/trainer/worker.h @@ -40,10 +40,6 @@ class Worker { */ void Run(); /** - * Resume from snapshot - */ - void Resume(); - /** * Init all local params (i.e., params from layers resident in this worker). * * If the param is owned by the worker, then init it and put it to servers. @@ -55,10 +51,22 @@ class Worker { * train for a couple of steps to warmup the params before put * them to servers (warmup of ModelProto controls this). * - * TODO(wangwei) If the worker is resumed from checkpoint, the owner param's + * If the owner param is availabel from checkpoint file, then its * values are parsed from the checkpoint file instead of randomly initialized. + * For params who do not have checkpoints, randomly init them. */ void InitLocalParams(); + + /** + * Checkpoint all params owned by the worker from the first group onto disk. + * The serialization is done using BlobProtos which includes the name, version + * and values of each Param. + * Different worker would generate different checkpoint files. The file path + * is <workspace>/checkpoint-<modelname>-step<step>-worker<worker_id>.bin + * @param step training step of this worker + * @param net the training net whose params will be dumped. + */ + void Checkpoint(int step, shared_ptr<NeuralNet> net); /** * Test the perforance of the learned model on validation or test dataset. * Test is done by the first group. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/utils/blob.h ---------------------------------------------------------------------- diff --git a/include/utils/blob.h b/include/utils/blob.h index dd01dcb..9ff1d47 100644 --- a/include/utils/blob.h +++ b/include/utils/blob.h @@ -42,7 +42,6 @@ #define INCLUDE_UTILS_BLOB_ #include <memory> #include <vector> -//#include <atomic> #include <glog/logging.h> #include "proto/common.pb.h" using std::shared_ptr; @@ -144,9 +143,7 @@ class Blob { const Dtype* gpu_data() const; Dtype* mutable_cpu_data(); Dtype* mutable_gpu_data(); - /* - void FromProto(const BlobProto& proto); - */ + void FromProto(const singa::BlobProto& proto); void ToProto(singa::BlobProto* proto) const; /// @brief Compute the sum of absolute values (L1 norm) of the data. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h index 4b87da0..570377f 100644 --- a/include/utils/cluster.h +++ b/include/utils/cluster.h @@ -24,7 +24,7 @@ namespace singa { class Cluster { public: static shared_ptr<Cluster> Get(); - static shared_ptr<Cluster> Get(const GlobalProto& global, + static shared_ptr<Cluster> Get(const GlobalProto& global, const ClusterProto& cluster, int procs_id=0); const int nserver_groups()const{ return cluster_.nserver_groups(); } @@ -81,9 +81,13 @@ class Cluster { const string endpoint(const int procs_id) const; const string workspace() {return cluster_.workspace();} - const string vis_folder(){ + + const string vis_folder() const { return cluster_.workspace()+"/visualization"; } + const string checkpoint_folder() const { + return cluster_.workspace()+"/checkpoint"; + } const int stub_timeout() const { return cluster_.stub_timeout(); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/include/utils/param.h ---------------------------------------------------------------------- diff --git a/include/utils/param.h b/include/utils/param.h index ed30d40..eaa7084 100644 --- a/include/utils/param.h +++ b/include/utils/param.h @@ -68,6 +68,9 @@ class Param { const std::string& name() { return proto_.name(); } + void set_name(const std::string& name) { + proto_.set_name(name); + } /** * If it shares data from others, then owner is the id of that Param, * otherwise it is itself's id. @@ -133,13 +136,13 @@ class Param { Blob<float> *mutable_grad() { return &grad_; } - float* mutable_cpu_data(){ + float* mutable_cpu_data() { return data_->mutable_cpu_data(); } - float* mutable_cpu_grad(){ + float* mutable_cpu_grad() { return grad_.mutable_cpu_data(); } - float* mutable_cpu_history(){ + float* mutable_cpu_history() { return history_.mutable_cpu_data(); } @@ -161,6 +164,14 @@ class Param { * @param size num of floats for this slice */ void AddSlice(int slice_id, int size); + /** + * Init param values from checkpoint blob. + */ + void FromProto(const BlobProto& blob); + /** + * Dump param values to blob. + */ + void ToProto(BlobProto* blob); /**********************Msg related functions***************************/ /** @@ -258,11 +269,6 @@ class Param { void ParseResponseMsg(Msg* msg, int slice_idx); protected: - - /** - * name of the parameter used to share wights between neuralnets - */ - std::string name_; int local_version_; //!< the ID of the first slice int slice_start_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/main.cc ---------------------------------------------------------------------- diff --git a/src/main.cc b/src/main.cc index e6b7368..a72a2db 100644 --- a/src/main.cc +++ b/src/main.cc @@ -14,13 +14,14 @@ * 3. Users call trainer to start the training. * * TODO - * 1. Add the resume function to continue training from a previously stopped - * point. - * 2. Add helper functions for users to configure their model and cluster + * 1. Add helper functions for users to configure their model and cluster * easily, e.g., AddLayer(layer_type, source_layers, meta_data). */ -DEFINE_int32(procsID, -1, "Global process ID"); +// Job ID is not used now, TODO passing job id from singa-run script and +// re-organize ClusterProto, GlobalProto and ModelProto. +DEFINE_int32(job, -1, "Job ID"); // not used now +DEFINE_bool(resume, false, "resume from checkpoint"); DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file"); DEFINE_string(model, "examples/mnist/conv.conf", "Model config file"); DEFINE_string(global, "conf/singa.conf", "Global config file"); @@ -53,6 +54,6 @@ int main(int argc, char **argv) { RegisterClasses(model); singa::Trainer trainer; - trainer.Start(model, global, cluster, FLAGS_procsID); + trainer.Start(FLAGS_resume, FLAGS_job, model, global, cluster); return 0; } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/neuralnet/layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc index 0a1c665..e965888 100644 --- a/src/neuralnet/layer.cc +++ b/src/neuralnet/layer.cc @@ -257,13 +257,13 @@ void LMDBDataLayer::ComputeFeature(Phase phase, Metric* perf){ } random_skip_=0; } - Datum datum; + CaffeDatum datum; for(auto& record: records_){ SingleLabelImageRecord* image=record.mutable_image(); CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS); datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size); - ConvertDatumToSingleLableImageRecord(datum, image); + ConvertCaffeDatumToRecord(datum, image); if (mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT) != MDB_SUCCESS) { // We have reached the end. Restart from the first. @@ -274,7 +274,7 @@ void LMDBDataLayer::ComputeFeature(Phase phase, Metric* perf){ } } -void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum, +void LMDBDataLayer::ConvertCaffeDatumToRecord(const CaffeDatum& datum, SingleLabelImageRecord* record){ record->set_label(datum.label()); record->clear_shape(); @@ -316,10 +316,10 @@ void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) { CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_FIRST), MDB_SUCCESS); } - Datum datum; + CaffeDatum datum; datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size); SingleLabelImageRecord* record=sample_.mutable_image(); - ConvertDatumToSingleLableImageRecord(datum, record); + ConvertCaffeDatumToRecord(datum, record); batchsize_=batchsize(); if(partition_dim() == 0) @@ -618,18 +618,20 @@ void RGBImageLayer::Setup(const LayerProto& proto, int npartitions) { data_.Reshape(shape); mean_.Reshape({shape[1],shape[2],shape[3]}); if(proto.rgbimage_conf().has_meanfile()){ - if(proto.rgbimage_conf().meanfile().find("binaryproto")!=string::npos){ - BlobProto tmp; - ReadProtoFromBinaryFile(proto.rgbimage_conf().meanfile().c_str(), &tmp); - CHECK_EQ(mean_.count(), tmp.data_size()); - memcpy(mean_.mutable_cpu_data(), tmp.data().data(), sizeof(float)*tmp.data_size()); - }else{ - SingleLabelImageRecord tmp; - ReadProtoFromBinaryFile(proto.rgbimage_conf().meanfile().c_str(), &tmp); - CHECK_EQ(mean_.count(), tmp.data_size()); - memcpy(mean_.mutable_cpu_data(), tmp.data().data(), sizeof(float)*tmp.data_size()); + if(proto.rgbimage_conf().meanfile().find("binaryproto") != string::npos) { + CaffeBlob mean; + ReadProtoFromBinaryFile(proto.rgbimage_conf().meanfile().c_str(), &mean); + CHECK_EQ(mean_.count(), mean.data_size()); + memcpy(mean_.mutable_cpu_data(), mean.data().data(), + sizeof(float)*mean.data_size()); + } else { + SingleLabelImageRecord mean; + ReadProtoFromBinaryFile(proto.rgbimage_conf().meanfile().c_str(), &mean); + CHECK_EQ(mean_.count(), mean.data_size()); + memcpy(mean_.mutable_cpu_data(), mean.data().data(), + sizeof(float)*mean.data_size()); } - }else{ + } else { memset(mean_.mutable_cpu_data(),0,sizeof(float)*mean_.count()); } } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/neuralnet/neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc index 25609ed..1097c0b 100644 --- a/src/neuralnet/neuralnet.cc +++ b/src/neuralnet/neuralnet.cc @@ -107,8 +107,15 @@ void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) { auto layer = name2layer_[node->name]; layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions); layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape()); - for (auto param : layer->GetParams()) + string param_name = "$"; + for (auto param : layer->GetParams()) { param->set_id(paramid++); + // if user does not name the param, then name it based on layer name. + if (param->name() == "") { + param->set_name(layer->name() + param_name); + param_name += "$"; + } + } if (layer->partition_dim() == 0) share_param_layers[node->origin].push_back(layer); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/proto/common.proto ---------------------------------------------------------------------- diff --git a/src/proto/common.proto b/src/proto/common.proto index 256206c..d8be479 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -29,11 +29,7 @@ enum ShareOption { kWhole = 1; }; -message BlobProtos { - repeated BlobProto blobs = 1; - repeated int32 ids = 2; - repeated string names = 3; -} + enum ConnectionType { kOneToOne = 0; @@ -42,7 +38,7 @@ enum ConnectionType { } // to import caffe's lmdb dataset -message Datum { +message CaffeDatum { optional int32 channels = 1; optional int32 height = 2; optional int32 width = 3; @@ -55,7 +51,8 @@ message Datum { optional bool encoded = 7 [default = false]; } -message BlobProto { +// to import caffe's blob, e.g., image mean +message CaffeBlob { optional int32 num = 1 [default = 0]; optional int32 channels = 2 [default = 0]; optional int32 height = 3 [default = 0]; @@ -64,6 +61,18 @@ message BlobProto { repeated float diff = 6 [packed = true]; } +message BlobProto { + repeated int32 shape = 1; + repeated float data = 2 [packed = true]; +} + +message BlobProtos { + repeated int32 id = 2; + repeated int32 version = 3; + repeated string name = 4; + repeated BlobProto blob = 5; +} + message Record { enum Type { // each record contains image raw feature and its label. @@ -78,7 +87,7 @@ message SingleLabelImageRecord { repeated int32 shape = 1; optional int32 label = 2; optional bytes pixel = 3; - repeated float data = 4; + repeated float data = 4 [packed = true]; } message MetricProto { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/proto/model.proto ---------------------------------------------------------------------- diff --git a/src/proto/model.proto b/src/proto/model.proto index 5b22b3f..e6bd834 100644 --- a/src/proto/model.proto +++ b/src/proto/model.proto @@ -46,17 +46,21 @@ message ModelProto { optional bool resume = 36 [default = false]; // start display after this num steps - optional int32 display_after_steps = 60[default = 0]; + optional int32 display_after = 60[default = 0]; // start checkpoint after this num steps - optional int32 checkpoint_after_steps = 61 [default = 0]; + optional int32 checkpoint_after = 61 [default = 0]; // start test after this num steps - optional int32 test_after_steps = 62 [default = 0]; + optional int32 test_after = 62 [default = 0]; // start validation after this num steps - optional int32 validation_after_steps = 63 [default = 0]; + optional int32 validation_after = 63 [default = 0]; // last snapshot step optional int32 step = 64 [default = 0]; // display debug info optional bool debug = 65 [default = false]; + // checkpoint files + repeated string checkpoint = 66; + // reset the version of params loaded from checkpoint file to step + optional bool reset_param_version = 67 [default = false]; } message NetProto { @@ -108,7 +112,7 @@ message ParamProto { repeated int32 shape = 31; // used for identifying the same params from diff models and display deug info - optional string name = 61 [default = "param"]; + optional string name = 61 [default = ""]; // used interally optional int32 id = 62; // parameter slice limit (Google Protobuf also has size limit) http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index 44c37ea..5d43e19 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -3,6 +3,7 @@ #include <map> #include <chrono> #include <glog/logging.h> +#include "utils/tinydir.h" #include "utils/cluster.h" #include "utils/common.h" #include "proto/common.pb.h" @@ -191,17 +192,51 @@ vector<Worker*> Trainer::CreateWorkers(int nthreads, const ModelProto& mconf){ return workers; } -void Trainer::Start(const ModelProto& mconf, const GlobalProto& gconf, - const ClusterProto& cconf, int job){ - RegisterDefaultClasses(mconf); +void Trainer::Resume(ModelProto& mconf) { + tinydir_dir dir; + string folder = Cluster::Get()->checkpoint_folder(); + tinydir_open(&dir, folder.c_str()); + int latest_step = 0; + // there would be multi checkpoint files (from diff workers) for one step + vector<char *> ck_files; + // iterate all files to get the files for the last checkpoint + while (dir.has_next) { + tinydir_file file; + tinydir_readfile(&dir, &file); + tinydir_next(&dir); + char* ch = strstr(file.name, "step"); + if (ch == nullptr && file.name[0] != '.' && file.name[1] != '.') { + LOG(INFO) << "Irregular file in checkpoint folder: " << file.name; + continue; + } - // register job to zookeeper - auto cluster=Cluster::Get(gconf, cconf, job); - if (mconf.resume()) { - // TODO(wangwei) resume from checkpoint - // load param slices to server_shard_ and reset running step of worker - // mproto.set_step(step); + LOG(ERROR) << ch; + int step = atoi(ch+4); + if (step == latest_step) { + ck_files.push_back(file.name); + } else if(step > latest_step) { + latest_step = step; + ck_files.clear(); + ck_files.push_back(file.name); + } + } + + if (latest_step > 0) { + mconf.set_step(latest_step); + for (auto ck_file : ck_files) + mconf.add_checkpoint(folder + "/" +string(ck_file)); } + tinydir_close(&dir); +} + +void Trainer::Start(bool resume, int job, ModelProto& mconf, + const GlobalProto& gconf, const ClusterProto& cconf) { + // register job to zookeeper at the beginning + auto cluster=Cluster::Get(gconf, cconf, job); + + RegisterDefaultClasses(mconf); + if (resume) + Resume(mconf); router_ = new Router(); router_->Bind(kInprocRouterEndpoint); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index e1f2a41..b6ce0d5 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -43,15 +43,37 @@ Worker::~Worker() { void Worker::InitLocalParams() { // for each server grp, its first subscriber worker grp does the param init if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) { + // extract params that should be initialized by this worker + // Must gen a name for each param if the user doesn't config it + std::unordered_map<string, Param*> name2param; for (auto layer: train_net_->layers()){ if (layer->partition_id() == id_) { for (auto param : layer->GetParams()) { // only owners fill the memory of parameter values. - if(param->owner() == param->id()) - param->InitValues(0); + if(param->owner() == param->id()) { + CHECK(name2param.find(param->name()) == name2param.end()); + name2param[param->name()] = param; + } } } } + // load from checkpoint. Get param blob based on param name + for (const auto checkpoint : modelproto_.checkpoint()) { + LOG(INFO) << "Load from checkpoint file " << checkpoint; + BlobProtos bps; + ReadProtoFromBinaryFile(checkpoint.c_str(), &bps); + for (int i = 0; i < bps.name_size(); i++) { + if (name2param.find(bps.name(i)) != name2param.end()) { + name2param.at(bps.name(i))->FromProto(bps.blob(i)); + name2param.at(bps.name(i))->set_version(bps.version(i)); + } + } + } + // init other params who do not have checkpoint version + for (auto entry : name2param) + if (entry.second->version() < 0 || modelproto_.reset_param_version()) + entry.second->InitValues(modelproto_.step()); + Metric perf; // warmup training before put params to servers for (; step_ < modelproto_.warmup_steps(); step_++) @@ -72,6 +94,30 @@ void Worker::InitLocalParams() { } } +void Worker::Checkpoint(int step, shared_ptr<NeuralNet> net) { + if (grp_id_ == 0) { + BlobProtos bps; + for (auto layer: net->layers()){ + if (layer->partition_id() == id_) { + for (auto param : layer->GetParams()) { + // only owners fill the memory of parameter values. + if(param->owner() == param->id()) { + auto *blob = bps.add_blob(); + param->ToProto(blob); + bps.add_version(param->version()); + bps.add_name(param->name()); + } + } + } + } + char buf[256]; + snprintf(buf, sizeof(buf), "%s/step%d-worker%d.bin", + Cluster::Get()->checkpoint_folder().c_str(), step, id_); + LOG(INFO) << "checkpoint to " << buf; + WriteProtoToBinaryFile(bps, buf); + } +} + void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) { dealer->Connect(kInprocRouterEndpoint); Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub)); @@ -112,6 +158,12 @@ void Worker::Run() { CollectAll(test_net_, step_); Test(modelproto_.test_steps(), kTest, test_net_); } + + if (CheckpointNow(step_)) { + CollectAll(train_net_, step_); + Checkpoint(step_, train_net_); + modelproto_.set_step(step_); + } TrainOneBatch(step_, &perf); // LOG(ERROR) << "Train " << step_; if (DisplayNow(step_)) { @@ -134,9 +186,7 @@ void Worker::Run() { LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stop"; } -void Worker::Resume() { - // TODO(wangwei) -} + int Worker::Put(Param* param, int step) { Msg* msg=new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); @@ -232,10 +282,11 @@ void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) { else if (phase == kTest) Report("Test", perf); } + bool Worker::DisplayNow(int step) const { return (modelproto_.display_frequency() > 0 - && step >= modelproto_.display_after_steps() - && ((step - modelproto_.display_after_steps()) + && step >= modelproto_.display_after() + && ((step - modelproto_.display_after()) % modelproto_.display_frequency() == 0)); } @@ -248,24 +299,24 @@ bool Worker::StopNow(int step) const { bool Worker::CheckpointNow(int step) const { return (grp_id_ == 0 && modelproto_.checkpoint_frequency() > 0 - && step >= modelproto_.checkpoint_after_steps() - && ((step - modelproto_.checkpoint_after_steps()) + && step >= modelproto_.checkpoint_after() + && ((step - modelproto_.checkpoint_after()) % modelproto_.checkpoint_frequency() == 0)); } bool Worker::TestNow(const int step) const { return (grp_id_ == 0 && modelproto_.test_frequency() > 0 && modelproto_.test_steps() > 0 - && step >= modelproto_.test_after_steps() - && ((step - modelproto_.test_after_steps()) + && step >= modelproto_.test_after() + && ((step - modelproto_.test_after()) % modelproto_.test_frequency() == 0)); } bool Worker::ValidateNow(const int step) const { return (grp_id_ == 0 && modelproto_.validation_frequency() > 0 && modelproto_.validation_steps() > 0 - && step >= modelproto_.validation_after_steps() - && ((step - modelproto_.validation_after_steps()) + && step >= modelproto_.validation_after() + && ((step - modelproto_.validation_after()) % modelproto_.validation_frequency() == 0)); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/utils/blob.cc ---------------------------------------------------------------------- diff --git a/src/utils/blob.cc b/src/utils/blob.cc index ae4c1b0..14c772f 100644 --- a/src/utils/blob.cc +++ b/src/utils/blob.cc @@ -196,7 +196,7 @@ Blob<Dtype>::Blob(const vector<int>& shape) template <typename Dtype> void Blob<Dtype>::Reshape(const vector<int>& shape) { count_=1; - shape_=shape; + shape_ = shape; for(size_t i=0;i<shape.size();i++){ CHECK(shape[i]); count_*=shape[i]; @@ -297,27 +297,26 @@ void Blob<Dtype>::CopyFrom(const Blob& source, bool reshape) { sizeof(Dtype)*count_); } -/* template <typename Dtype> -void Blob<Dtype>::FromProto(const BlobProto& proto) { - Reshape(); +void Blob<Dtype>::FromProto(const singa::BlobProto& proto) { + vector<int> shape; + for (int s : proto.shape()) + shape.push_back(s); + int count = count_; + Reshape(shape); + if (count != count_) + LOG(WARNING) << "Blob is reshaped to diff size " << count << ":" << count_; // copy data Dtype* data_vec = mutable_cpu_data(); for (int i = 0; i < count_; ++i) { data_vec[i] = proto.data(i); } } -*/ template <typename Dtype> void Blob<Dtype>::ToProto(singa::BlobProto* proto) const { - proto->set_num(shape_[0]); - if(shape_.size()>1) - proto->set_channels(shape_[1]); - if(shape_.size()>2) - proto->set_height(shape_[2]); - if(shape_.size()>3) - proto->set_width(shape_[3]); + for (int s : shape_) + proto->add_shape(s); proto->clear_data(); const Dtype* data_vec = cpu_data(); for (int i = 0; i < count_; ++i) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc index 9c57c42..3019f14 100644 --- a/src/utils/cluster.cc +++ b/src/utils/cluster.cc @@ -75,6 +75,8 @@ const string Cluster::endpoint(int procsid) const { void Cluster::SetupFolders(const ClusterProto &cluster){ // create visulization folder mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + // create checkpoint folder + mkdir(checkpoint_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); } shared_ptr<Cluster> Cluster::Get(const GlobalProto& global, const ClusterProto& cluster, http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/729a5c48/src/utils/param.cc ---------------------------------------------------------------------- diff --git a/src/utils/param.cc b/src/utils/param.cc index 8b1f113..69e3b09 100644 --- a/src/utils/param.cc +++ b/src/utils/param.cc @@ -80,6 +80,12 @@ void Param::InitValues(int version){ } set_version(version); } +void Param::FromProto(const BlobProto& blob) { + data_->FromProto(blob); +} +void Param::ToProto(BlobProto* blob) { + data_->ToProto(blob); +} /**************Message related functions********/ Msg* Param::GenPutMsg(bool copy, int idx) {
