Repository: incubator-singa Updated Branches: refs/heads/master 539fcee56 -> 4dee7b9cd
SINGA-54 Refactor job configuration to move fields in ModelProto out Tested with mnist and cifar examples. Four components are necessary for submitting a job, namely, neuralnet, alg, updater and cluster. The configuration is now consistent with the MM paper. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/1b574f3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/1b574f3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/1b574f3c Branch: refs/heads/master Commit: 1b574f3c10f23fa80926471c3efa752d062d4301 Parents: 539fcee Author: Wei Wang <[email protected]> Authored: Fri Aug 14 16:25:10 2015 +0800 Committer: Wei Wang <[email protected]> Committed: Fri Aug 14 16:25:10 2015 +0800 ---------------------------------------------------------------------- examples/cifar10/job.conf | 49 +++--- examples/mnist/conv.conf | 295 +++++++++++++++++------------------- examples/mnist/job.conf | 47 +++--- include/neuralnet/base_layer.h | 2 +- include/singa.h | 7 +- include/trainer/trainer.h | 23 ++- include/trainer/worker.h | 8 +- src/main.cc | 2 - src/neuralnet/base_layer.cc | 10 +- src/neuralnet/layer.cc | 6 +- src/neuralnet/neuralnet.cc | 1 - src/proto/common.proto | 7 - src/proto/job.proto | 181 +++++++++++----------- src/trainer/trainer.cc | 53 ++++--- src/trainer/worker.cc | 75 +++++---- src/utils/param.cc | 24 +-- 16 files changed, 380 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/examples/cifar10/job.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index f7829b8..89afca9 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -1,30 +1,23 @@ -cluster { - nworker_groups: 1 - nserver_groups: 1 - workspace: "examples/cifar10" +name: "cifar10-convnet" +train_steps: 1000 +test_steps: 100 +test_freq:300 +disp_freq:30 +alg: kBP +updater{ + weight_decay:0.004 + lr_change: kFixedStep + type: kSGD + fixedstep_conf:{ + step:0 + step:60000 + step:65000 + step_lr:0.001 + step_lr:0.0001 + step_lr:0.00001 + } } - -model { - name: "cifar10-convnet" - train_steps: 1000 - test_steps: 100 - test_frequency:300 - display_frequency:30 - alg: kBackPropagation - updater{ - weight_decay:0.004 - lr_change: kFixedStep - type: kSGD - fixedstep_conf:{ - step:0 - step:60000 - step:65000 - step_lr:0.001 - step_lr:0.0001 - step_lr:0.00001 - } - } - neuralnet { +neuralnet { layer{ name: "data" type: kShardData @@ -226,4 +219,8 @@ model { srclayers: "label" } } +cluster { + nworker_groups: 1 + nserver_groups: 1 + workspace: "examples/cifar10" } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/examples/mnist/conv.conf ---------------------------------------------------------------------- diff --git a/examples/mnist/conv.conf b/examples/mnist/conv.conf index d463cd9..fce1418 100644 --- a/examples/mnist/conv.conf +++ b/examples/mnist/conv.conf @@ -1,186 +1,177 @@ -cluster { - nworker_groups: 1 - nserver_groups: 1 - nservers_per_group: 1 - nworkers_per_group: 1 - nservers_per_procs: 1 - nworkers_per_procs: 1 - workspace: "examples/mnist" -} -model { -name: "mnist-conv" +name: "conv" train_steps: 10000 test_steps:100 -test_frequency:500 -display_frequency:50 -debug: false +test_freq:500 +disp_freq:50 +alg: kBP +debug: true updater{ - base_learning_rate:0.01 + base_lr:0.01 momentum:0.9 weight_decay:0.0005 - gamma:0.0001 - pow:0.75 - learning_rate_change_method:kInverse + lr_change: kInverse + type: kSGD + inverse_conf { + gamma:0.0001 + pow:0.75 + } } neuralnet { -layer { - name: "data" - type: "kLMDBData" - data_param { - path: "/home/wangwei/program/singa/examples/mnist/mnist_train_lmdb" - batchsize: 64 + layer { + name: "data" + type: kShardData + sharddata_conf { + path: "examples/mnist/mnist_train_shard" + batchsize: 64 + } + exclude: kTest } - exclude: kTest -} -layer { - name: "data" - type: "kLMDBData" - data_param { - path: "/home/wangwei/program/singa/examples/mnist/mnist_test_lmdb" - batchsize: 100 + layer { + name: "data" + type: kShardData + sharddata_conf { + path: "examples/mnist/mnist_test_shard" + batchsize: 100 + } + exclude: kTrain } - exclude: kTrain -} -layer{ - name:"mnist" - type: "kMnistImage" - srclayers: "data" - mnist_param { -# sigma: 6 -# alpha: 38 -# gamma: 15 -# kernel: 21 -# elastic_freq:100 -# beta:15 -# resize: 29 - norm_a:255 + layer{ + name:"mnist" + type: kMnist + srclayers: "data" + mnist_conf { + norm_a:255 + norm_b:0 + } } -} - -layer{ - name: "label" - type: "kLabel" - srclayers: "data" -} -layer { - name: "conv1" - type: "kConvolution" - srclayers: "mnist" - convolution_param { - num_filters: 20 - kernel: 5 - stride: 1 + layer{ + name: "label" + type: kLabel + srclayers: "data" } - param{ - name: "weight" - init_method:kUniformSqrtFanIn - learning_rate_multiplier:1.0 - } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - value:0 + layer { + name: "conv1" + type: kConvolution + srclayers: "mnist" + convolution_conf { + num_filters: 20 + kernel: 5 + stride: 1 } -} -layer { - name: "pool1" - type: "kPooling" - srclayers: "conv1" - pooling_param { - pool: MAX - kernel: 2 - stride: 2 - } -} -layer { - name: "conv2" - type: "kConvolution" - srclayers: "pool1" - convolution_param { - num_filters: 50 - kernel: 5 - stride: 1 + param{ + name: "w1" + init_method:kUniformSqrtFanIn + learning_rate_multiplier:1.0 + } + param{ + name: "b1" + init_method: kConstant + learning_rate_multiplier:2.0 + value:0 + } } - param{ - name: "weight" - init_method:kUniformSqrtFanIn - learning_rate_multiplier:1.0 + layer { + name: "pool1" + type: kPooling + srclayers: "conv1" + pooling_conf { + pool: MAX + kernel: 2 + stride: 2 } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - value:0 - } -} -layer { - name: "pool2" - type: "kPooling" - srclayers: "conv2" - pooling_param { - pool: MAX - kernel: 2 - stride: 2 } -} -layer { - name: "ip1" - type: "kInnerProduct" - srclayers:"pool2" - inner_product_param { - num_output: 500 + layer { + name: "conv2" + type: kConvolution + srclayers: "pool1" + convolution_conf { + num_filters: 50 + kernel: 5 + stride: 1 + } + param{ + name: "w2" + init_method:kUniformSqrtFanIn + learning_rate_multiplier:1.0 + } + param{ + name: "b2" + init_method: kConstant + learning_rate_multiplier:2.0 + value:0 + } } - param{ - name: "weight" - init_method:kUniformSqrtFanIn - learning_rate_multiplier:1.0 + layer { + name: "pool2" + type: kPooling + srclayers: "conv2" + pooling_conf { + pool: MAX + kernel: 2 + stride: 2 } - param{ - name: "bias" - init_method: kConstant - learning_rate_multiplier:2.0 - value:0 } + layer { + name: "ip1" + type: kInnerProduct + srclayers:"pool2" + innerproduct_conf { + num_output: 500 + } + param{ + name: "w3" + init_method:kUniformSqrtFanIn + learning_rate_multiplier:1.0 + } + param{ + name: "b3" + init_method: kConstant + learning_rate_multiplier:2.0 + value:0 + } -} - -layer { - name: "relu1" - type: "kReLU" - srclayers:"ip1" -} + } -layer { - name: "ip2" - type: "kInnerProduct" - srclayers:"relu1" - inner_product_param { - num_output: 10 + layer { + name: "relu1" + type: kReLU + srclayers:"ip1" } - param{ - name: "weight" + + layer { + name: "ip2" + type: kInnerProduct + srclayers:"relu1" + innerproduct_conf { + num_output: 10 + } + param { + name: "w4" init_method:kUniformSqrtFanIn learning_rate_multiplier:1 } - param{ - name: "bias" + param { + name: "b4" init_method: kConstant learning_rate_multiplier:2 value:0 } -} -layer{ - name: "loss" - type:"kSoftmaxLoss" - softmaxloss_param{ - topk:1 } - srclayers:"ip2" - srclayers:"label" -} + layer{ + name: "loss" + type: kSoftmaxLoss + softmaxloss_conf{ + topk:1 + } + srclayers:"ip2" + srclayers:"label" + } } +cluster { + nworker_groups: 1 + nserver_groups: 1 + workspace: "examples/mnist" } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/examples/mnist/job.conf ---------------------------------------------------------------------- diff --git a/examples/mnist/job.conf b/examples/mnist/job.conf index 5d1445d..34fbca2 100644 --- a/examples/mnist/job.conf +++ b/examples/mnist/job.conf @@ -1,26 +1,20 @@ -cluster { - nworker_groups: 1 - nserver_groups: 1 - workspace: "examples/mnist" -} -model { - name: "deep-big-simple-mlp" - train_steps: 1000 - test_steps:10 - test_frequency:60 - display_frequency:30 - alg: kBackPropagation - updater{ - base_lr: 0.001 - lr_change: kStep - type: kSGD - step_conf{ - change_freq: 60 - gamma: 0.997 - } +name: "mlp" +train_steps: 1000 +test_steps:10 +test_freq:60 +disp_freq:10 +alg: kBP +updater{ + base_lr: 0.001 + lr_change: kStep + type: kSGD + step_conf{ + change_freq: 60 + gamma: 0.997 } +} - neuralnet { +neuralnet { layer { name: "data" type: kShardData @@ -46,13 +40,6 @@ model { type: kMnist srclayers: "data" mnist_conf { -# sigma: 6 -# alpha: 38 -# gamma: 15 -# kernel: 21 -# elastic_freq:100 -# beta:15 -# resize: 29 norm_a: 127.5 norm_b: 1 } @@ -228,4 +215,8 @@ model { srclayers:"label" } } +cluster { + nworker_groups: 1 + nserver_groups: 1 + workspace: "examples/mnist" } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/include/neuralnet/base_layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/base_layer.h b/include/neuralnet/base_layer.h index ca63da0..25df95f 100644 --- a/include/neuralnet/base_layer.h +++ b/include/neuralnet/base_layer.h @@ -133,10 +133,10 @@ class Layer { * blob in parser layers; The default value is "unknown"; If the * src layer is the prefetch layer and there are more than one parser layers, * this value be set. - */ const std::string &datablob() const { return layer_proto_.datablob(); } + */ /** * @return a const ref for Blob storing neuron values of this layer for BP */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/include/singa.h ---------------------------------------------------------------------- diff --git a/include/singa.h b/include/singa.h index c8984e5..82df64b 100644 --- a/include/singa.h +++ b/include/singa.h @@ -24,13 +24,16 @@ void SubmitJob(int job, bool resume, const JobProto& jobConf) { ReadProtoFromTextFile(FLAGS_singa_conf.c_str(), &singaConf); if (singaConf.has_log_dir()) SetupLog(singaConf.log_dir(), - std::to_string(job) + "-" + jobConf.model().name()); + std::to_string(job) + "-" + jobConf.name()); if (jobConf.num_openblas_threads() != 1) LOG(WARNING) << "openblas is set with " << jobConf.num_openblas_threads() << " threads"; openblas_set_num_threads(jobConf.num_openblas_threads()); + JobProto proto; + proto.CopyFrom(jobConf); + proto.set_id(job); Trainer trainer; - trainer.Start(job, resume, jobConf, singaConf); + trainer.Start(resume, singaConf, &proto); } } // namespace singa #endif // SINGA_SINGA_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index 911a4c4..1d28de6 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -27,14 +27,11 @@ class Trainer{ * Entrance function which construct the workers and servers, and luanch * one thread per worker/server. * - * @param job job ID * @param resume if true resume the training from the latest checkpoint files - * @param jobConf job configuration, including cluster and model configuration * @param singaConf global singa configuration including zookeeper and - * log dir setting. + * @param jobConf job configuration, including cluster and model configuration */ - void Start(int job, bool resume, - const JobProto& jobConf, const SingaProto& singaConf); + void Start(bool resume, const SingaProto& singaConf, JobProto* jobConf); protected: /** @@ -44,27 +41,27 @@ class Trainer{ * checkpoint, which will be added into the checkpoint field. The workers * would then load the values of params from the checkpoint files. * - * @param modelConf model configuration + * @param jobConf job configuration */ - void Resume(ModelProto* modelConf); + void Resume(JobProto* jobConf); /** * Create server instances. * @param nthread total num of threads in current procs which is used to * assign each thread a local thread ID. The number of workers is extracted * from Cluster - * @param modelConf + * @param jobConf * @return server instances */ - vector<Server*> CreateServers(int nthread, const ModelProto& modelConf); + vector<Server*> CreateServers(int nthread, const JobProto& jobConf); /** * Create workers instances. * @param nthread total num of threads in current procs which is used to * assign each thread a local thread ID. The number of workers is extracted * from Cluster - * @param modelConf + * @param jobConf * @return worker instances */ - vector<Worker*> CreateWorkers(int nthread, const ModelProto& modelConf); + vector<Worker*> CreateWorkers(int nthread, const JobProto& jobConf); /** * Setup workers and servers. @@ -77,7 +74,7 @@ class Trainer{ * @param servers */ void SetupWorkerServer( - const ModelProto& modelConf, + const JobProto& jobConf, const vector<Worker*>& workers, const vector<Server*>& servers); @@ -91,7 +88,7 @@ class Trainer{ * For other base classes, use its base class name (string) as the key and the * implementation class as the value, e.g., <"Updater" SGDUpdater>. */ - void RegisterDefaultClasses(const singa::ModelProto& proto); + void RegisterDefaultClasses(); /** * Generate msg to trigger synchronization with other server groups. * http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/include/trainer/worker.h ---------------------------------------------------------------------- diff --git a/include/trainer/worker.h b/include/trainer/worker.h index 025bcc1..0557ee2 100644 --- a/include/trainer/worker.h +++ b/include/trainer/worker.h @@ -31,7 +31,7 @@ class Worker { /** * Setup members */ - void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net, + void Setup(const JobProto& job, shared_ptr<NeuralNet> train_net, shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net); /** * Main function of Worker. @@ -49,7 +49,7 @@ class Worker { * If the training starts from scrath, the params are initialzed using random * distributions, e.g., Gaussian distribution. After that, the worker may * train for a couple of steps to warmup the params before put - * them to servers (warmup of ModelProto controls this). + * them to servers (warmup of JobProto controls this). * * If the owner param is availabel from checkpoint file, then its * values are parsed from the checkpoint file instead of randomly initialized. @@ -62,7 +62,7 @@ class Worker { * The serialization is done using BlobProtos which includes the name, version * and values of each Param. * Different worker would generate different checkpoint files. The file path - * is <workspace>/checkpoint-<modelname>-step<step>-worker<worker_id>.bin + * is <workspace>/checkpoint-<jobname>-step<step>-worker<worker_id>.bin * @param step training step of this worker * @param net the training net whose params will be dumped. */ @@ -173,7 +173,7 @@ class Worker { protected: int thread_id_, grp_id_, id_; int step_; - ModelProto modelproto_; + JobProto job_conf_; shared_ptr<NeuralNet> train_net_, test_net_, validation_net_; Dealer* layer_dealer_, *dealer_; Updater* updater_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/main.cc ---------------------------------------------------------------------- diff --git a/src/main.cc b/src/main.cc index d95e405..00b75ff 100644 --- a/src/main.cc +++ b/src/main.cc @@ -33,8 +33,6 @@ int main(int argc, char **argv) { singa::JobProto jobConf; std::string job_file = FLAGS_conf; singa::ReadProtoFromTextFile(job_file.c_str(), &jobConf); - CHECK(jobConf.has_cluster()); - CHECK(jobConf.has_model()); RegisterClasses(); singa::SubmitJob(FLAGS_job, FLAGS_resume, jobConf); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/neuralnet/base_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/base_layer.cc b/src/neuralnet/base_layer.cc index 57163e9..695104e 100644 --- a/src/neuralnet/base_layer.cc +++ b/src/neuralnet/base_layer.cc @@ -125,17 +125,19 @@ void PrefetchLayer::Setup(const LayerProto& proto, int npartitions) { } const Blob<float>& PrefetchLayer::data(const Layer* from, Phase phase) const { - if(from!=nullptr){ - return datablobs_.at(from->datablob()); - }else{ + LOG(FATAL) << " needs update"; + if(from != nullptr) { + return datablobs_.at(""); + } else { //CHECK_EQ(datablobs_.size(),1); return datablobs_.begin()->second; } } Blob<float>* PrefetchLayer::mutable_data(const Layer* from, Phase phase) { + LOG(FATAL) << " needs update"; if(from!=nullptr){ - return &(datablobs_.at(from->datablob())); + return &(datablobs_.at("")); }else{ //CHECK_EQ(datablobs_.size(),1); return &(datablobs_.begin()->second); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/neuralnet/layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc index 314bb14..c1fce00 100644 --- a/src/neuralnet/layer.cc +++ b/src/neuralnet/layer.cc @@ -439,7 +439,7 @@ void MnistLayer::ParseRecords(Phase phase, LOG_IF(ERROR, records.size()==0)<<"Empty records to parse"; int ndim=records.at(0).image().shape_size(); int inputsize =records.at(0).image().shape(ndim-1); - CHECK_EQ(inputsize, blob->shape()[1]); + CHECK_EQ(inputsize, blob->shape()[2]); float* dptr=blob->mutable_cpu_data(); for(const Record& record: records){ @@ -485,11 +485,11 @@ void MnistLayer::Setup(const LayerProto& proto, int npartitions) { int ndim=sample.image().shape_size(); CHECK_GE(ndim,2); if(resize_) - data_.Reshape(vector<int>{batchsize, resize_, resize_}); + data_.Reshape(vector<int>{batchsize, 1, resize_, resize_}); else{ int s=sample.image().shape(ndim-1); CHECK_EQ(s,sample.image().shape(ndim-2)); - data_.Reshape(vector<int>{batchsize, s, s }); + data_.Reshape(vector<int>{batchsize, 1, s, s }); } } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/neuralnet/neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc index 83f8c36..4732a36 100644 --- a/src/neuralnet/neuralnet.cc +++ b/src/neuralnet/neuralnet.cc @@ -88,7 +88,6 @@ shared_ptr<NeuralNet> NeuralNet::Create( param->set_share_from(from); } - for (auto layer : net_conf.layer()) LOG(INFO) << "NeuralNet config is\n" << conf.DebugString(); // TODO(wangwei) create net based on net type, e.g., directed, undirected, etc http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/proto/common.proto ---------------------------------------------------------------------- diff --git a/src/proto/common.proto b/src/proto/common.proto index d8be479..3b6efb3 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -24,13 +24,6 @@ enum EntityType { kRuntime = 4; }; -enum ShareOption { - kValueOnly = 0; - kWhole = 1; -}; - - - enum ConnectionType { kOneToOne = 0; kOneToAll = 1; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/proto/job.proto ---------------------------------------------------------------------- diff --git a/src/proto/job.proto b/src/proto/job.proto index 200197f..a67d330 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -1,9 +1,73 @@ package singa; +enum TrainOneBatchAlg { + // Back-propagation algorithm for feed-forward models, e.g., CNN, and RNN + kBP = 1; + // Contrastive Divergence algorithm for RBM, DBM etc. + kCD = 2; +} message JobProto { - required ClusterProto cluster = 1; - required ModelProto model = 2; - optional int32 num_openblas_threads = 3 [default = 1]; + // job name, e.g., "cifar10-dcnn", "mnist-mlp" + required string name = 1; + // neural net consits of a set of connected layers + required NetProto neuralnet = 3; + // algorithms calculating gradients for one mini-batch/iteration + required TrainOneBatchAlg alg = 5; + // configuration of SGD updater, including learning rate, etc. + required UpdaterProto updater = 7; + // cluster toplogy conf + required ClusterProto cluster = 9; + + // for setting CD fields + optional CDProto cd_conf = 12; + + // total num of steps for training + required int32 train_steps = 16; + // frequency of displaying training info + optional int32 disp_freq = 17 [default = 0]; + + // frequency of test, e.g., do test every 100 training steps + optional int32 test_freq = 20 [default = 0]; + // total num of steps for testing all test data; todo set -1 for test forever + optional int32 test_steps = 21 [default = 0]; + // frequency of validation, e.g., do validation every 100 training steps + optional int32 valid_freq = 25 [default = 0]; + // total num of steps for validating all validation data + optional int32 valid_steps = 26 [default = 0]; + // frequency of checkpoint + optional int32 checkpoint_freq = 30 [default = 0]; + + // for loading checkpoint files to init parameters + repeated string checkpoint_path = 60; + // send parameters to servers after training for this num of steps + optional int32 warmup_steps = 61 [default = 0]; + // display debug info + optional bool debug = 62 [default = false]; + // reset the version of params loaded from checkpoint file to step + optional bool reset_param_version = 63 [default = true]; + // set num of threads used by openblas + optional int32 num_openblas_threads = 64 [default = 1]; + + // start checkpoint after this num steps + optional int32 checkpoint_after = 80 [default = 0]; + // start display after this num steps + optional int32 disp_after = 81[default = 0]; + // start test after this num steps + optional int32 test_after = 82 [default = 0]; + // start validation after this num steps + optional int32 valid_after = 83 [default = 0]; + + // used by SINGA; uses typically do not touch these fields + optional bool resume = 90 [default = false]; + // last snapshot step + optional int32 step = 91 [default = 0]; + // job id allocated by zookeeper + optional int32 id = 92 [default = -1]; +} + +message CDProto { + //number of steps for gibbs sampling + optional int32 pcd_k = 1 [default = 1]; } message ClusterProto { @@ -13,24 +77,23 @@ message ClusterProto { optional int32 nservers_per_group = 4 [default = 1]; optional int32 nworkers_per_procs = 5 [default = 1]; optional int32 nservers_per_procs = 6 [default = 1]; + // local workspace for checkpoint files and vis files + required string workspace = 10; // servers and workers in different processes? - optional bool server_worker_separate = 11 [default = false]; + optional bool server_worker_separate = 20 [default = false]; // port number is used by ZeroMQ - optional int32 start_port = 13 [default = 6723]; - // local workspace, train/val/test shards, checkpoint files - required string workspace = 14; - - // conduct updates at server side; otherwise do it at worker side - optional bool server_update = 40 [default = true]; + optional int32 start_port = 60 [default = 6723]; + // conduct updates at server side; otherwise do it at worker side + optional bool server_update = 61 [default = true]; // share memory space between worker groups in one procs - optional bool share_memory = 41 [default = true]; + optional bool share_memory = 62 [default = true]; // bandwidth of ethernet, Bytes per second, default is 1 Gbps - optional int32 bandwidth=50 [default=134217728]; + optional int32 bandwidth=80 [default=134217728]; // poll time in milliseconds - optional int32 poll_time=51 [default =100]; + optional int32 poll_time=81 [default =100]; } @@ -47,67 +110,14 @@ enum Phase { kLoss = 7; } -message ModelProto { - // model name, e.g., "cifar10-dcnn", "mnist-mlp" - required string name = 1; - // frequency of displaying training info - required int32 display_frequency = 3 ; - // total num of steps for training - required int32 train_steps = 5; - // configuration of SGD updater, including learning rate, etc. - required UpdaterProto updater = 7; - enum GradCalcAlg { - // BP algorithm for feed-forward models, e.g., CNN, MLP, RNN - kBackPropagation = 1; - // CD algorithm for RBM, DBM etc., models - kContrastiveDivergence = 2; - } - // gradient calculation algorithm - required GradCalcAlg alg = 8 [default = kBackPropagation]; - required NetProto neuralnet = 9; - - // total num of steps for validation - optional int32 validation_steps = 30 [default = 0]; - // total num of steps for test - optional int32 test_steps = 31 [default = 0]; - // frequency of validation - optional int32 validation_frequency = 32; - // frequency of test - optional int32 test_frequency = 33 [default = 0]; - // frequency of checkpoint - optional int32 checkpoint_frequency = 34 [default = 0]; - // send parameters to servers after training for this num of steps - optional int32 warmup_steps = 35 [default = 0]; - // checkpoint path - optional bool resume = 36 [default = false]; - - // start display after this num steps - optional int32 display_after = 60[default = 0]; - // start checkpoint after this num steps - optional int32 checkpoint_after = 61 [default = 0]; - // start test after this num steps - optional int32 test_after = 62 [default = 0]; -// start validation after this num steps - optional int32 validation_after = 63 [default = 0]; - // last snapshot step - optional int32 step = 64 [default = 0]; - // display debug info - optional bool debug = 65 [default = false]; - // checkpoint files - repeated string checkpoint = 66; - // reset the version of params loaded from checkpoint file to step - optional bool reset_param_version = 67 [default = true]; - //number of steps for gibbs sampling - optional int32 pcd_k=69 [default=15]; -} - message NetProto { repeated LayerProto layer = 1; // partitioning type for parallelism - optional int32 partition_dim = 2 [default = 0]; + optional int32 partition_dim = 20 [default = 0]; } -// weight matrix should be defined before bias vector +// weight matrix should be defined before bias vector; +// todo separate conf for diff init method message ParamProto { enum InitMethod { // fix the values of all parameters a constant in the value field @@ -131,7 +141,9 @@ message ParamProto { // <a href="http://deeplearning.net/tutorial/mlp.html"> Theano MLP</a> kUniformSqrtFanInOut = 6; } - optional InitMethod init_method = 1 [default = kGaussian]; + // used for identifying the same params from diff models and display deug info + optional string name = 1 [default = ""]; + optional InitMethod init_method = 2 [default = kGaussian]; // constant init optional float value = 5 [default = 1]; // for uniform sampling @@ -144,20 +156,18 @@ message ParamProto { optional float learning_rate_multiplier = 15 [default = 1]; // multiplied on the global weight decay. optional float weight_decay_multiplier = 16 [default = 1]; - // partition dimension, -1 for no partition - optional int32 partition_dim = 30; - // usually, the program will infer the param shape - repeated int32 shape = 31; - // used for identifying the same params from diff models and display deug info - optional string name = 61 [default = ""]; - // name of the owner param from which this param shares the values - optional string share_from = 62; + + // name of the owner param from which this param shares the values + optional string share_from = 60; + // used interally - optional int32 id = 63; - // parameter slice limit (Google Protobuf also has size limit) - optional int32 split_threshold = 64 [default = 5000000]; + optional int32 id = 90; // used internally - optional int32 owner = 65 [default = -1]; + optional int32 owner = 91 [default = -1]; + // partition dimension, -1 for no partition + optional int32 partition_dim = 92; + // usually, the program will infer the param shape + repeated int32 shape = 93; } enum PartitionType{ @@ -241,12 +251,9 @@ message LayerProto { // overrides the partition dimension for neural net - optional int32 partition_dim =59 [default = -1]; - optional string datablob = 58 [default = "unknow"]; - + optional int32 partition_dim =60 [default = -1]; // names of parameters shared from other layers - repeated string share_param = 60; - optional int32 partition_id = 62 [default = 0]; + optional int32 partition_id = 90 [default = 0]; } message RGBImageProto { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index f348ff6..699fc30 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -28,7 +28,7 @@ Trainer::~Trainer() { delete router_; } -void Trainer::RegisterDefaultClasses(const singa::ModelProto& model_conf) { +void Trainer::RegisterDefaultClasses() { // register all implemented layers singa::NeuralNet::RegisterLayers(); auto param_factory = Singleton<Factory<singa::Param>>::Instance(); @@ -77,12 +77,12 @@ const vector<int> SliceParams(const vector<Param*>& params) { } void Trainer::SetupWorkerServer( - const ModelProto& model_conf, + const JobProto& job_conf, const vector<Worker*>& workers, const vector<Server*>& servers) { auto cluster = Cluster::Get(); int grp_size = cluster->nworkers_per_group(); - const auto& net_conf = model_conf.neuralnet(); + const auto& net_conf = job_conf.neuralnet(); auto net = NeuralNet::Create(net_conf, kTrain, grp_size); // MUST do SliceParam before share param/net with others auto slices = SliceParams(net->params()); @@ -96,12 +96,12 @@ void Trainer::SetupWorkerServer( if (grp_net.find(grp_id) == grp_net.end()) { if (grp_id == first_grp) { // test are performed only by the first group now. TODO update. - if (first_grp == 0 && model_conf.test_steps() && worker_id == 0) { + if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) { test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp test_net->ShareParamsFrom(net); } // validation are performed only by the first group. TODO update. - if (first_grp == 0 && model_conf.validation_steps() && worker_id == 0) { + if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) { valid_net = NeuralNet::Create(net_conf, kValidation, 1); valid_net->ShareParamsFrom(net); } @@ -124,18 +124,18 @@ void Trainer::SetupWorkerServer( } LOG(INFO) << "grp " << worker->grp_id() << ", worker " << worker->id() << " net " << grp_net[grp_id].get(); - worker->Setup(model_conf, grp_net[grp_id], valid_net, test_net); + worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net); } // partition among server groups, each group maintains one sub-set for sync auto slice2group = PartitionSlices(cluster->nserver_groups(), slices); for (auto server : servers) - server->Setup(model_conf.updater(), &server_shard_, slice2group); + server->Setup(job_conf.updater(), &server_shard_, slice2group); // partition within one server group, each server updates for one sub-set slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices); } -vector<Server*> Trainer::CreateServers(int nthreads, const ModelProto& mconf) { +vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) { auto cluster = Cluster::Get(); vector<Server*> servers; if (!cluster->has_server()) @@ -157,7 +157,7 @@ vector<Server*> Trainer::CreateServers(int nthreads, const ModelProto& mconf) { return servers; } -vector<Worker*> Trainer::CreateWorkers(int nthreads, const ModelProto& mconf){ +vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) { auto cluster=Cluster::Get(); vector<Worker*> workers; if(!cluster->has_worker()) @@ -184,18 +184,19 @@ vector<Worker*> Trainer::CreateWorkers(int nthreads, const ModelProto& mconf){ for (int gid = gstart; gid < gend; gid++) { for (int wid = wstart; wid < wend; wid++) { Worker* worker=nullptr; - if (mconf.alg() == ModelProto_GradCalcAlg_kBackPropagation) + if (job.alg() == TrainOneBatchAlg::kBP) worker = new BPWorker(nthreads++,gid, wid); - else { + else if (job.alg() == TrainOneBatchAlg::kCD) worker=new CDWorker(nthreads++,gid, wid); - } + else + LOG(FATAL) << "unknown alg for trainonebatch func " << job.alg(); workers.push_back(worker); } } return workers; } -void Trainer::Resume(ModelProto* modelConf) { +void Trainer::Resume(JobProto* jobConf) { tinydir_dir dir; string folder = Cluster::Get()->checkpoint_folder(); tinydir_open(&dir, folder.c_str()); @@ -226,24 +227,22 @@ void Trainer::Resume(ModelProto* modelConf) { } if (latest_step > 0) { - modelConf->set_step(latest_step); - if (!modelConf->has_reset_param_version()) - modelConf->set_reset_param_version(false); - modelConf->clear_checkpoint(); + jobConf->set_step(latest_step); + if (!jobConf->has_reset_param_version()) + jobConf->set_reset_param_version(false); + jobConf->clear_checkpoint_path(); for (auto ck_file : ck_files) - modelConf->add_checkpoint(folder + "/" + ck_file); + jobConf->add_checkpoint_path(folder + "/" + ck_file); } tinydir_close(&dir); } -void Trainer::Start(int job, bool resume, - const JobProto& jobConf, const SingaProto& singaConf) { +void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) { // register job to zookeeper at the beginning - auto cluster = Cluster::Get(job, singaConf, jobConf.cluster()); - ModelProto model = jobConf.model(); - RegisterDefaultClasses(model); + auto cluster = Cluster::Get(job->id(), singaConf, job->cluster()); + RegisterDefaultClasses(); if (resume) - Resume(&model); + Resume(job); router_ = new Router(); router_->Bind(kInprocRouterEndpoint); @@ -253,10 +252,10 @@ void Trainer::Start(int job, bool resume, cluster->Register(getpid(), hostip + ":" + std::to_string(port)); int nthreads = 1; - const vector<Worker*> workers = CreateWorkers(nthreads, model); + const vector<Worker*> workers = CreateWorkers(nthreads, *job); nthreads += workers.size(); - const vector<Server*> servers = CreateServers(nthreads, model); - SetupWorkerServer(model, workers, servers); + const vector<Server*> servers = CreateServers(nthreads, *job); + SetupWorkerServer(*job, workers, servers); #ifdef USE_MPI for (int i = 0; i < nthreads; i++) http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index 36ba8de..b6f9d44 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -16,23 +16,16 @@ Worker::Worker(int thread_id, int grp_id, int id): } void Worker::Setup( - const ModelProto& model, shared_ptr<NeuralNet> train_net, + const JobProto& job, shared_ptr<NeuralNet> train_net, shared_ptr<NeuralNet> valid_net, shared_ptr<NeuralNet> test_net) { - modelproto_.CopyFrom(model); + job_conf_.CopyFrom(job); train_net_ = train_net; validation_net_ = valid_net; test_net_ = test_net; auto cluster = Cluster::Get(); - // if no server or user requires worker to do param update - if (!(cluster->nserver_groups() && cluster->server_update())) { - updater_ = Singleton<Factory<Updater>>::Instance()->Create("Updater"); - updater_->Init(model.updater()); - } } Worker::~Worker() { - if (updater_ != nullptr) - delete updater_; if (layer_dealer_) delete layer_dealer_; if (dealer_) @@ -59,7 +52,7 @@ void Worker::InitLocalParams() { // load from checkpoints. get param blob based on param name. // the param from previous checkpoint files will be overwritten by // the param with the same name in later checkpoint files. - for (const auto checkpoint : modelproto_.checkpoint()) { + for (const auto checkpoint : job_conf_.checkpoint_path()) { LOG(INFO) << "Load from checkpoint file " << checkpoint; BlobProtos bps; ReadProtoFromBinaryFile(checkpoint.c_str(), &bps); @@ -67,8 +60,8 @@ void Worker::InitLocalParams() { if (name2param.find(bps.name(i)) != name2param.end()) { name2param.at(bps.name(i))->FromProto(bps.blob(i)); // if load from pre-training params, reset version to start step - if(modelproto_.reset_param_version()) - name2param.at(bps.name(i))->set_version(modelproto_.step()); + if(job_conf_.reset_param_version()) + name2param.at(bps.name(i))->set_version(job_conf_.step()); else // if resume training, use the same version as last checkpoint name2param.at(bps.name(i))->set_version(bps.version(i)); } @@ -77,15 +70,15 @@ void Worker::InitLocalParams() { // init other params who do not have checkpoint version for (auto entry : name2param) if (entry.second->version() < 0) { - entry.second->InitValues(modelproto_.step()); - if (!modelproto_.reset_param_version()) + entry.second->InitValues(job_conf_.step()); + if (!job_conf_.reset_param_version()) LOG(ERROR) << "better reset version of params from checkpoints " << "to the same as other newly initialized params!"; } Metric perf; // warmup training before put params to servers - for (; step_ < modelproto_.warmup_steps(); step_++) + for (; step_ < job_conf_.warmup_steps(); step_++) TrainOneBatch(step_, &perf); for (auto layer : train_net_->layers()) { if (layer->partition_id() == id_) @@ -99,7 +92,7 @@ void Worker::InitLocalParams() { for (auto layer : train_net_->layers()) { if (layer->partition_id() == id_) for (auto param : layer->GetParams()) - Get(param, modelproto_.warmup_steps()); + Get(param, job_conf_.warmup_steps()); } } @@ -153,25 +146,25 @@ void Worker::Run() { } } - step_ = modelproto_.step(); + step_ = job_conf_.step(); InitLocalParams(); Metric perf; while (!StopNow(step_)) { if (ValidateNow(step_)) { //LOG(ERROR)<<"Validation at step "<<step; CollectAll(validation_net_, step_); - Test(modelproto_.validation_steps(), kValidation, validation_net_); + Test(job_conf_.valid_steps(), kValidation, validation_net_); } if (TestNow(step_)) { //LOG(ERROR)<<"Test at step "<<step; CollectAll(test_net_, step_); - Test(modelproto_.test_steps(), kTest, test_net_); + Test(job_conf_.test_steps(), kTest, test_net_); } if (CheckpointNow(step_)) { CollectAll(train_net_, step_); Checkpoint(step_, train_net_); - modelproto_.set_step(step_); + job_conf_.set_step(step_); } TrainOneBatch(step_, &perf); // LOG(ERROR) << "Train " << step_; @@ -296,40 +289,40 @@ void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net) { } bool Worker::DisplayNow(int step) const { - return (modelproto_.display_frequency() > 0 - && step >= modelproto_.display_after() - && ((step - modelproto_.display_after()) - % modelproto_.display_frequency() == 0)); + return (job_conf_.disp_freq() > 0 + && step >= job_conf_.disp_after() + && ((step - job_conf_.disp_after()) + % job_conf_.disp_freq() == 0)); } bool Worker::DisplayDebugInfo(int step) const { - return DisplayNow(step) && modelproto_.debug() && grp_id_ == 0; + return DisplayNow(step) && job_conf_.debug() && grp_id_ == 0; } bool Worker::StopNow(int step) const { - return step >= modelproto_.train_steps(); + return step >= job_conf_.train_steps(); } bool Worker::CheckpointNow(int step) const { return (grp_id_ == 0 - && modelproto_.checkpoint_frequency() > 0 - && step >= modelproto_.checkpoint_after() - && ((step - modelproto_.checkpoint_after()) - % modelproto_.checkpoint_frequency() == 0)); + && job_conf_.checkpoint_freq() > 0 + && step >= job_conf_.checkpoint_after() + && ((step - job_conf_.checkpoint_after()) + % job_conf_.checkpoint_freq() == 0)); } bool Worker::TestNow(const int step) const { return (grp_id_ == 0 - && modelproto_.test_frequency() > 0 - && modelproto_.test_steps() > 0 - && step >= modelproto_.test_after() - && ((step - modelproto_.test_after()) - % modelproto_.test_frequency() == 0)); + && job_conf_.test_freq() > 0 + && job_conf_.test_steps() > 0 + && step >= job_conf_.test_after() + && ((step - job_conf_.test_after()) + % job_conf_.test_freq() == 0)); } bool Worker::ValidateNow(const int step) const { return (grp_id_ == 0 - && modelproto_.validation_frequency() > 0 - && modelproto_.validation_steps() > 0 - && step >= modelproto_.validation_after() - && ((step - modelproto_.validation_after()) - % modelproto_.validation_frequency() == 0)); + && job_conf_.valid_freq() > 0 + && job_conf_.valid_steps() > 0 + && step >= job_conf_.valid_after() + && ((step - job_conf_.valid_after()) + % job_conf_.valid_freq() == 0)); } @@ -406,7 +399,7 @@ void CDWorker::NegativePhase(int step, shared_ptr<NeuralNet> net, Metric* perf) { // for negative phase, gibbs sampling only concerns RBM bottom and top layer auto& layers = net->layers(); - for (int i = 0; i < modelproto_.pcd_k(); i++) { + for (int i = 0; i < job_conf_.cd_conf().pcd_k(); i++) { for (auto& layer : layers) { if (layer->is_vislayer() || layer->is_hidlayer()) layer->ComputeFeature(kNegative, perf); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1b574f3c/src/utils/param.cc ---------------------------------------------------------------------- diff --git a/src/utils/param.cc b/src/utils/param.cc index 8c0b440..2f43f66 100644 --- a/src/utils/param.cc +++ b/src/utils/param.cc @@ -44,35 +44,35 @@ void Param::InitValues(int version){ auto random=TSingleton<Random<cpu>>::Instance(); switch (proto_.init_method()) { case ParamProto::kConstant: - data=proto_.value(); + data = proto_.value(); break; case ParamProto::kUniform: random->SampleUniform(data, proto_.low(), proto_.high()); - if(proto_.value()) - data*= proto_.value(); + if(proto_.value() != 1) + data *= proto_.value(); break; - /* case ParamProto::kUniformSqrtFanIn: - CHECK_GT(fan_in_,0); random->SampleUniform(data, proto_.low(), proto_.high()); - if(proto_.value()) - data*= proto_.value()/ sqrt(fan_in_ / 3.0f); + // only valid for param matrix with dim 1 for fan in + LOG(ERROR) << "init fan in"; + CHECK_EQ(data_->shape().size(), 2); + data *= proto_.value() / sqrt(data_->shape().at(1) / 3.0f); + LOG(ERROR) << "end fan in"; break; - */ case ParamProto::kUniformSqrtFanInOut: random->SampleUniform(data, proto_.low(), proto_.high()); if(proto_.value()) - data*= proto_.value()/ sqrt(data_->shape()[0] +data_->shape()[1]); + data *= proto_.value()/ sqrt(data_->shape()[0] +data_->shape()[1]); break; case ParamProto::kGaussian: random->SampleGaussian(data, proto_.mean(), proto_.std()); - if(proto_.value()) - data*= proto_.value(); + if(proto_.value() != 1) + data *= proto_.value(); break; case ParamProto::kGaussainSqrtFanIn: random->SampleGaussian(data, proto_.mean(), proto_.std()); if(proto_.value()) - data*= proto_.value()/ sqrt(data_->shape()[0]); + data *= proto_.value()/ sqrt(data_->shape()[0]); break; default: LOG(ERROR) << "Illegal parameter init method ";
