SINGA-70 Refactor API of Layer, Worker, Server and Driver For Layer class * Setup, ComputeFeature and ComputeGradient are updated to accept one argument which represents all source layers. * DebugString() is changed to ToString() for displaying debug info and other info. For example, the performance values can be aggregated in ComputeFeature function and then converted into string in ToString(). * the srclayer and dstlayer fields are removed.
For Worker class * Report function is removed. The performance is now collected via Layer::ToString() and is reported by each worker. The Trainer class is renamed to Stub * Only the Run() function and message handling/generation functions are remained. Functions for creating servers and workers are moved into Driver. The Driver class * Rename function Submit to Train. * Add functions for creating workers and servers. All files under trainer folder are moved outside to be under src/ or include/. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/321ef96a Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/321ef96a Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/321ef96a Branch: refs/heads/master Commit: 321ef96a6e90ce7c70fae7e05b446c3dd38a3fef Parents: ab984da Author: Wei Wang <[email protected]> Authored: Fri Sep 25 23:31:53 2015 +0800 Committer: Wei Wang <[email protected]> Committed: Sat Sep 26 23:37:36 2015 +0800 ---------------------------------------------------------------------- .gitignore | 4 +- Makefile.am | 27 +- examples/rbm/autoencoder.conf | 8 +- examples/rbm/rbm2.conf | 2 +- examples/rbm/rbm3.conf | 2 +- examples/rbm/rbm4.conf | 2 +- examples/rnnlm/job.conf | 6 +- examples/rnnlm/main.cc | 2 +- examples/rnnlm/rnnlm.cc | 121 ++++---- examples/rnnlm/rnnlm.h | 34 ++- include/comm/msg.h | 238 +++++++++++++++ include/comm/socket.h | 174 +++++++++++ include/communication/msg.h | 238 --------------- include/communication/socket.h | 174 ----------- include/driver.h | 66 ++++- include/neuralnet/connection_layer.h | 47 +-- include/neuralnet/input_layer.h | 48 ++- include/neuralnet/layer.h | 212 ++++++++------ include/neuralnet/loss_layer.h | 26 +- include/neuralnet/neuralnet.h | 33 ++- include/neuralnet/neuron_layer.h | 80 ++--- include/server.h | 133 +++++++++ include/singa.h | 9 +- include/stub.h | 109 +++++++ include/trainer/server.h | 132 --------- include/trainer/trainer.h | 163 ----------- include/trainer/worker.h | 258 ---------------- include/utils/param.h | 38 ++- include/worker.h | 311 ++++++++++++++++++++ src/comm/msg.cc | 215 ++++++++++++++ src/comm/socket.cc | 180 ++++++++++++ src/communication/msg.cc | 215 -------------- src/communication/socket.cc | 180 ------------ src/driver.cc | 203 ++++++++++++- src/main.cc | 22 +- src/neuralnet/connection_layer.cc | 66 +++-- src/neuralnet/input_layer.cc | 75 ++--- src/neuralnet/layer.cc | 19 +- src/neuralnet/loss_layer.cc | 68 +++-- src/neuralnet/neuralnet.cc | 21 +- src/neuralnet/neuron_layer.cc | 285 +++++++++--------- src/proto/job.proto | 14 +- src/server.cc | 269 +++++++++++++++++ src/stub.cc | 285 ++++++++++++++++++ src/trainer/server.cc | 263 ----------------- src/trainer/trainer.cc | 469 ------------------------------ src/trainer/worker.cc | 411 -------------------------- src/utils/cluster.cc | 6 +- src/utils/common.cc | 8 +- src/utils/param.cc | 54 +++- src/worker.cc | 410 ++++++++++++++++++++++++++ 51 files changed, 3330 insertions(+), 3105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 841b0d6..7ac9bc2 100644 --- a/.gitignore +++ b/.gitignore @@ -42,10 +42,10 @@ stamp-h1 *.status config.h Makefile -thirdparty/* config/* config.h.in configure aclocal.m4 Makefile.in -!thirdpary/install.sh +thirdparty/* +!thirdparty/install.sh http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/Makefile.am ---------------------------------------------------------------------- diff --git a/Makefile.am b/Makefile.am index 3f68e29..00aacdd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -8,7 +8,7 @@ DEFAULT_FLAGS = -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \ $(MSHADOW_FLAGS) -DCPU_ONLY=1 -funroll-loops -DTHREADED CFLAGS = $(DEBUG) -CXXFLAGS = $(DEBUG) +CXXFLAGS = $(DEBUG) AC_CXXFLAGS = $(DEBUG) INCLUDES = -I$(top_srcdir)/include @@ -35,9 +35,9 @@ SINGA_SRCS := src/driver.cc \ src/utils/updater.cc \ src/utils/data_shard.cc \ src/utils/blob.cc \ - src/trainer/server.cc \ - src/trainer/worker.cc \ - src/trainer/trainer.cc \ + src/server.cc \ + src/worker.cc \ + src/stub.cc \ src/neuralnet/layer.cc \ src/neuralnet/connection_layer.cc \ src/neuralnet/input_layer.cc \ @@ -45,8 +45,8 @@ SINGA_SRCS := src/driver.cc \ src/neuralnet/neuron_layer.cc \ src/neuralnet/output_layer.cc \ src/neuralnet/neuralnet.cc \ - src/communication/socket.cc \ - src/communication/msg.cc + src/comm/socket.cc \ + src/comm/msg.cc SINGA_HDRS := include/singa.h \ include/utils/cluster.h \ @@ -60,9 +60,9 @@ SINGA_HDRS := include/singa.h \ include/utils/blob.h \ include/utils/updater.h \ include/utils/tinydir.h \ - include/trainer/server.h \ - include/trainer/worker.h \ - include/trainer/trainer.h \ + include/server.h \ + include/worker.h \ + include/stub.h \ include/neuralnet/layer.h \ include/neuralnet/connection_layer.h \ include/neuralnet/input_layer.h \ @@ -78,8 +78,8 @@ SINGA_HDRS := include/singa.h \ include/mshadow/cxxnet_op.h \ include/mshadow/tensor_base.h \ include/mshadow/tensor_random.h \ - include/communication/msg.h \ - include/communication/socket.h + include/comm/msg.h \ + include/comm/socket.h GTEST_SRCS := include/gtest/gtest-all.cc GTEST_HRDS := include/gtest/gtest.h @@ -108,10 +108,9 @@ endif libsinga_la_LDFLAGS = -I./include - #bin_PROGRAMS = singa singa_SOURCES = src/main.cc -singa_CXXFLAGS = $(DEFAULT_FLAGS) -MMD +singa_CXXFLAGS = $(DEFAULT_FLAGS) -MMD singa_LDFLAGS = -I./include \ -lsinga \ -lglog \ @@ -146,7 +145,7 @@ libgtest_la_LDFLAGS = -I./include #bin_PROGRAMS += singatest singatest_SOURCES = $(GTEST_HDRS) $(TEST_SRCS) -singatest_CXXFLAGS = $(DEFAULT_FLAGS) +singatest_CXXFLAGS = $(DEFAULT_FLAGS) singatest_LDFLAGS = -I./include \ -lsinga \ -lglog \ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/autoencoder.conf ---------------------------------------------------------------------- diff --git a/examples/rbm/autoencoder.conf b/examples/rbm/autoencoder.conf index 29f7729..c818c6e 100644 --- a/examples/rbm/autoencoder.conf +++ b/examples/rbm/autoencoder.conf @@ -3,10 +3,10 @@ train_steps: 12200 test_steps:100 test_freq:1000 disp_freq:100 -checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0.bin" -checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0.bin" -checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0.bin" -checkpoint_path: "examples/rbm/rbm4/checkpoint/step6000-worker0.bin" +checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0" +checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0" +checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0" +checkpoint_path: "examples/rbm/rbm4/checkpoint/step6000-worker0" train_one_batch{ alg: kBP } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm2.conf ---------------------------------------------------------------------- diff --git a/examples/rbm/rbm2.conf b/examples/rbm/rbm2.conf index 8a16e0f..52dc698 100644 --- a/examples/rbm/rbm2.conf +++ b/examples/rbm/rbm2.conf @@ -6,7 +6,7 @@ disp_freq: 100 train_one_batch{ alg: kCD } -checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0.bin" +checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0" updater{ type: kSGD momentum: 0.8 http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm3.conf ---------------------------------------------------------------------- diff --git a/examples/rbm/rbm3.conf b/examples/rbm/rbm3.conf index 75848d6..354fb3b 100644 --- a/examples/rbm/rbm3.conf +++ b/examples/rbm/rbm3.conf @@ -6,7 +6,7 @@ disp_freq: 100 train_one_batch{ alg: kCD } -checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0.bin" +checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0" updater{ type: kSGD http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm4.conf ---------------------------------------------------------------------- diff --git a/examples/rbm/rbm4.conf b/examples/rbm/rbm4.conf index 2b83afb..ebf39fa 100644 --- a/examples/rbm/rbm4.conf +++ b/examples/rbm/rbm4.conf @@ -6,7 +6,7 @@ disp_freq: 100 train_one_batch{ alg: kCD } -checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0.bin" +checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0" updater{ type: kSGD momentum: 0.8 http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/job.conf ---------------------------------------------------------------------- diff --git a/examples/rnnlm/job.conf b/examples/rnnlm/job.conf index db96e84..021692f 100644 --- a/examples/rnnlm/job.conf +++ b/examples/rnnlm/job.conf @@ -2,8 +2,8 @@ name: "rnnlm" #To scan the training file (81350) 10 times train_steps:81350 #To scan the validation file (6828) once -valid_steps:683 -valid_freq:8135 +validate_steps:683 +validate_freq:8135 #disp_freq is specific to training disp_freq:8135 train_one_batch { @@ -36,7 +36,7 @@ layer { path: "examples/rnnlm/train_shard" max_window: 10 } - exclude: kValidation + exclude: kVal } layer { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/main.cc ---------------------------------------------------------------------- diff --git a/examples/rnnlm/main.cc b/examples/rnnlm/main.cc index 87db06a..ea1dcdd 100644 --- a/examples/rnnlm/main.cc +++ b/examples/rnnlm/main.cc @@ -40,6 +40,6 @@ int main(int argc, char **argv) { singa::JobProto jobConf = driver.job_conf(); - driver.Submit(resume, jobConf); + driver.Train(resume, jobConf); return 0; } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/rnnlm.cc ---------------------------------------------------------------------- diff --git a/examples/rnnlm/rnnlm.cc b/examples/rnnlm/rnnlm.cc index 0ad6dcd..c086972 100644 --- a/examples/rnnlm/rnnlm.cc +++ b/examples/rnnlm/rnnlm.cc @@ -57,19 +57,19 @@ DataLayer::~DataLayer() { shard_ = nullptr; } -void DataLayer::Setup(const LayerProto& proto, int npartitions) { - RNNLayer::Setup(proto, npartitions); +void DataLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { + RNNLayer::Setup(conf, srclayers); shard_ = new singa::DataShard( - proto.GetExtension(data_conf).path(), + conf.GetExtension(data_conf).path(), singa::DataShard::kRead); string key; - max_window_ = proto.GetExtension(data_conf).max_window(); + max_window_ = conf.GetExtension(data_conf).max_window(); records_.resize(max_window_ + 1); // resize to # of records in data layer window_ = 0; shard_->Next(&key, &records_[window_]); } -void DataLayer::ComputeFeature(int flag, Metric *perf) { +void DataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { CHECK(records_.size() <= shard_->Count()); records_[0] = records_[window_]; window_ = max_window_; @@ -88,17 +88,18 @@ void DataLayer::ComputeFeature(int flag, Metric *perf) { } /*******LabelLayer**************/ -void LabelLayer::Setup(const LayerProto& proto, int npartitions) { - RNNLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - int max_window = dynamic_cast<DataLayer*>(srclayers_[0])->max_window(); +void LabelLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + RNNLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + int max_window = dynamic_cast<DataLayer*>(srclayers[0])->max_window(); data_.Reshape(vector<int>{max_window, 4}); } -void LabelLayer::ComputeFeature(int flag, Metric *perf) { - const auto& records = dynamic_cast<DataLayer*>(srclayers_[0])->records(); +void LabelLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + const auto& records = dynamic_cast<DataLayer*>(srclayers[0])->records(); float *label = data_.mutable_cpu_data(); - window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window(); + window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window(); for (int i = 0; i < window_; i++) { WordRecord wordrecord = records[i + 1].GetExtension(word); label[4 * i + 0] = wordrecord.class_start(); @@ -113,20 +114,21 @@ EmbeddingLayer::~EmbeddingLayer() { delete embed_; } -void EmbeddingLayer::Setup(const LayerProto& proto, int npartitions) { - RNNLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - int max_window = dynamic_cast<DataLayer*>(srclayers_[0])->max_window(); - word_dim_ = proto.GetExtension(embedding_conf).word_dim(); +void EmbeddingLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + RNNLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + int max_window = dynamic_cast<DataLayer*>(srclayers[0])->max_window(); + word_dim_ = conf.GetExtension(embedding_conf).word_dim(); data_.Reshape(vector<int>{max_window, word_dim_}); grad_.ReshapeLike(data_); - vocab_size_ = proto.GetExtension(embedding_conf).vocab_size(); - embed_ = Param::Create(proto.param(0)); + vocab_size_ = conf.GetExtension(embedding_conf).vocab_size(); + embed_ = Param::Create(conf.param(0)); embed_->Setup(vector<int>{vocab_size_, word_dim_}); } -void EmbeddingLayer::ComputeFeature(int flag, Metric* perf) { - auto datalayer = dynamic_cast<DataLayer*>(srclayers_[0]); +void EmbeddingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + auto datalayer = dynamic_cast<DataLayer*>(srclayers[0]); window_ = datalayer->window(); auto records = datalayer->records(); auto words = RTensor2(&data_); @@ -140,10 +142,11 @@ void EmbeddingLayer::ComputeFeature(int flag, Metric* perf) { } } -void EmbeddingLayer::ComputeGradient(int flag, Metric* perf) { +void EmbeddingLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { auto grad = RTensor2(&grad_); auto gembed = RTensor2(embed_->mutable_grad()); - auto datalayer = dynamic_cast<DataLayer*>(srclayers_[0]); + auto datalayer = dynamic_cast<DataLayer*>(srclayers[0]); auto records = datalayer->records(); gembed = 0; for (int t = 0; t < window_; t++) { @@ -156,22 +159,23 @@ HiddenLayer::~HiddenLayer() { delete weight_; } -void HiddenLayer::Setup(const LayerProto& proto, int npartitions) { - RNNLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - const auto& innerproductData = srclayers_[0]->data(this); - data_.ReshapeLike(srclayers_[0]->data(this)); - grad_.ReshapeLike(srclayers_[0]->grad(this)); +void HiddenLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + RNNLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + const auto& innerproductData = srclayers[0]->data(this); + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(srclayers[0]->grad(this)); int word_dim = data_.shape()[1]; - weight_ = Param::Create(proto.param(0)); + weight_ = Param::Create(conf.param(0)); weight_->Setup(std::vector<int>{word_dim, word_dim}); } // hid[t] = sigmoid(hid[t-1] * W + src[t]) -void HiddenLayer::ComputeFeature(int flag, Metric* perf) { - window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window(); +void HiddenLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window(); auto data = RTensor2(&data_); - auto src = RTensor2(srclayers_[0]->mutable_data(this)); + auto src = RTensor2(srclayers[0]->mutable_data(this)); auto weight = RTensor2(weight_->mutable_data()); for (int t = 0; t < window_; t++) { // Skip the 1st component if (t == 0) { @@ -184,12 +188,12 @@ void HiddenLayer::ComputeFeature(int flag, Metric* perf) { } } -void HiddenLayer::ComputeGradient(int flag, Metric* perf) { +void HiddenLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto data = RTensor2(&data_); auto grad = RTensor2(&grad_); auto weight = RTensor2(weight_->mutable_data()); auto gweight = RTensor2(weight_->mutable_grad()); - auto gsrc = RTensor2(srclayers_[0]->mutable_grad(this)); + auto gsrc = RTensor2(srclayers[0]->mutable_grad(this)); gweight = 0; TensorContainer<cpu, 1> tmp(Shape1(data_.shape()[1])); // Check!! @@ -210,30 +214,30 @@ LossLayer::~LossLayer() { delete class_weight_; } -void LossLayer::Setup(const LayerProto& proto, int npartitions) { - RNNLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 2); - const auto& src = srclayers_[0]->data(this); +void LossLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { + RNNLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 2); + const auto& src = srclayers[0]->data(this); int max_window = src.shape()[0]; int vdim = src.count() / max_window; // Dimension of input - int vocab_size = proto.GetExtension(loss_conf).vocab_size(); - int nclass = proto.GetExtension(loss_conf).nclass(); - word_weight_ = Param::Create(proto.param(0)); + int vocab_size = conf.GetExtension(loss_conf).vocab_size(); + int nclass = conf.GetExtension(loss_conf).nclass(); + word_weight_ = Param::Create(conf.param(0)); word_weight_->Setup(vector<int>{vocab_size, vdim}); - class_weight_ = Param::Create(proto.param(1)); + class_weight_ = Param::Create(conf.param(1)); class_weight_->Setup(vector<int>{nclass, vdim}); pword_.resize(max_window); pclass_.Reshape(vector<int>{max_window, nclass}); } -void LossLayer::ComputeFeature(int flag, Metric* perf) { - window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window(); +void LossLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window(); auto pclass = RTensor2(&pclass_); - auto src = RTensor2(srclayers_[0]->mutable_data(this)); + auto src = RTensor2(srclayers[0]->mutable_data(this)); auto word_weight = RTensor2(word_weight_->mutable_data()); auto class_weight = RTensor2(class_weight_->mutable_data()); - const float * label = srclayers_[1]->data(this).cpu_data(); + const float * label = srclayers[1]->data(this).cpu_data(); float loss = 0.f, ppl = 0.f; for (int t = 0; t < window_; t++) { @@ -254,24 +258,21 @@ void LossLayer::ComputeFeature(int flag, Metric* perf) { int cid = static_cast<int>(label[t * 4 + 3]); CHECK_GT(end, wid); CHECK_GE(wid, start); - loss += -log(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN)); - ppl += log10(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN)); + loss_ += -log(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN)); + ppl_ += log10(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN)); } - - perf->Add("loss", loss, window_); - // users can compute the PPL value by 10^(ppl before exp) - perf->Add("ppl before exp", ppl, window_); + num_ += window_; } -void LossLayer::ComputeGradient(int flag, Metric* perf) { +void LossLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto pclass = RTensor2(&pclass_); - auto src = RTensor2(srclayers_[0]->mutable_data(this)); - auto gsrc = RTensor2(srclayers_[0]->mutable_grad(this)); + auto src = RTensor2(srclayers[0]->mutable_data(this)); + auto gsrc = RTensor2(srclayers[0]->mutable_grad(this)); auto word_weight = RTensor2(word_weight_->mutable_data()); auto gword_weight = RTensor2(word_weight_->mutable_grad()); auto class_weight = RTensor2(class_weight_->mutable_data()); auto gclass_weight = RTensor2(class_weight_->mutable_grad()); - const float * label = srclayers_[1]->data(this).cpu_data(); + const float * label = srclayers[1]->data(this).cpu_data(); gclass_weight = 0; gword_weight = 0; for (int t = 0; t < window_; t++) { @@ -299,4 +300,10 @@ void LossLayer::ComputeGradient(int flag, Metric* perf) { gsrc[t] += dot(pclass[t], class_weight); } } + +const std::string LossLayer::ToString(bool debug, int flag) { + float loss = loss_ / num_; + float ppl = exp10(- ppl_ / num_); + return "loss = " + std::to_string(loss) + ", ppl = " + std::to_string(ppl); +} } // end of namespace rnnlm http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/rnnlm.h ---------------------------------------------------------------------- diff --git a/examples/rnnlm/rnnlm.h b/examples/rnnlm/rnnlm.h index b848fa4..ad0918e 100644 --- a/examples/rnnlm/rnnlm.h +++ b/examples/rnnlm/rnnlm.h @@ -25,6 +25,7 @@ #include "./rnnlm.pb.h" namespace rnnlm { +using std::vector; using singa::LayerProto; using singa::Layer; using singa::Param; @@ -57,8 +58,8 @@ class RNNLayer : virtual public Layer { class DataLayer : public RNNLayer, public singa::DataLayer { public: ~DataLayer(); - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; int max_window() const { return max_window_; } @@ -75,9 +76,9 @@ class DataLayer : public RNNLayer, public singa::DataLayer { */ class LabelLayer : public RNNLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; - void ComputeGradient(int flag, Metric* perf) override {} + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} }; @@ -88,9 +89,9 @@ class LabelLayer : public RNNLayer { class EmbeddingLayer : public RNNLayer { public: ~EmbeddingLayer(); - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; const std::vector<Param*> GetParams() const override { std::vector<Param*> params{embed_}; return params; @@ -111,9 +112,10 @@ class EmbeddingLayer : public RNNLayer { class HiddenLayer : public RNNLayer { public: ~HiddenLayer(); - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; + const std::vector<Param*> GetParams() const override { std::vector<Param*> params{weight_}; return params; @@ -132,9 +134,11 @@ class HiddenLayer : public RNNLayer { class LossLayer : public RNNLayer { public: ~LossLayer(); - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; + + const std::string ToString(bool debug, int flag) override; const std::vector<Param*> GetParams() const override { std::vector<Param*> params{word_weight_, class_weight_}; return params; @@ -144,6 +148,8 @@ class LossLayer : public RNNLayer { std::vector<Blob<float>> pword_; Blob<float> pclass_; Param* word_weight_, *class_weight_; + float loss_, ppl_; + int num_; }; } // namespace rnnlm #endif // EXAMPLES_RNNLM_RNNLM_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/comm/msg.h ---------------------------------------------------------------------- diff --git a/include/comm/msg.h b/include/comm/msg.h new file mode 100644 index 0000000..50a9b81 --- /dev/null +++ b/include/comm/msg.h @@ -0,0 +1,238 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_COMM_MSG_H_ +#define SINGA_COMM_MSG_H_ + +// TODO(wangwei): make it a compiler argument +#define USE_ZMQ + +#include <utility> +#ifdef USE_ZMQ +#include <czmq.h> +#endif + +namespace singa { +/** + * Wrapper to generate message address + * @param grp worker/server group id + * @param id_or_proc worker/server id or procs id + * @param type msg type + */ +inline int Addr(int grp, int id_or_proc, int type) { + return (grp << 16) | (id_or_proc << 8) | type; +} + +/** + * Parse group id from addr. + * + * @return group id + */ +inline int AddrGrp(int addr) { + return addr >> 16; +} + +/** + * Parse worker/server id from addr. + * + * @return id + */ +inline int AddrID(int addr) { + static const int mask = (1 << 8) - 1; + return (addr >> 8) & mask; +} + +/** + * Parse worker/server procs from addr. + * + * @return procs id + */ +inline int AddrProc(int addr) { + return AddrID(addr); +} + +/** + * Parse msg type from addr + * @return msg type + */ +inline int AddrType(int addr) { + static const int mask = (1 << 8) -1; + return addr & mask; +} + +/** + * Msg used to transfer Param info (gradient or value), feature blob, etc + * between workers, stubs and servers. + * + * Each msg has a source addr and dest addr identified by a unique integer. + * It is also associated with a target field (value and version) for ease of + * getting some meta info (e.g., parameter id) from the msg. + * + * Other data is added into the message as frames. + */ +class Msg { + public: + ~Msg(); + Msg(); + /** + * Construct the msg providing source and destination addr. + */ + Msg(int src, int dst); + /** + * Copy constructor. + */ + Msg(const Msg& msg); + /** + * Swap the src/dst addr + */ + void SwapAddr(); + /** + * Add a frame (a chunck of bytes) into the message + */ + void AddFrame(const void* addr, int nBytes); + /** + * @return num of bytes of the current frame. + */ + int FrameSize(); + /** + * @return the pointer to the current frame data. + */ + void* FrameData(); + /** + * @return the data of the current frame as c string + */ + char* FrameStr(); + /** + * Move the cursor to the first frame. + */ + void FirstFrame(); + /** + * Move the cursor to the last frame. + */ + void LastFrame(); + /** + * Move the cursor to the next frame + * @return true if the next frame is not NULL; otherwise false + */ + bool NextFrame(); + /** + * Add a 'format' frame to the msg (like CZMQ's zsock_send). + * + * The format is a string that defines the type of each field. + * The format can contain any of these characters, each corresponding to + * one or two arguments: + * i = int (signed) + * 1 = uint8_t + * 2 = uint16_t + * 4 = uint32_t + * 8 = uint64_t + * p = void * (sends the pointer value, only meaningful over inproc) + * s = char** + * + * Returns size of the added content. + */ + int AddFormatFrame(const char *format, ...); + /** + * Parse the current frame added using AddFormatFrame(const char*, ...). + * + * The format is a string that defines the type of each field. + * The format can contain any of these characters, each corresponding to + * one or two arguments: + * i = int (signed) + * 1 = uint8_t + * 2 = uint16_t + * 4 = uint32_t + * 8 = uint64_t + * p = void * (sends the pointer value, only meaningful over inproc) + * s = char** + * + * Returns size of the parsed content. + */ + int ParseFormatFrame(const char* format, ...); + +#ifdef USE_ZMQ + void ParseFromZmsg(zmsg_t* msg); + zmsg_t* DumpToZmsg(); +#endif + + /** + * @return msg size in terms of bytes, ignore meta info. + */ + int size() const; + /** + * Set source addr. + * @param addr unique identify one worker/server/stub in the current job + */ + inline void set_src(int addr) { src_ = addr; } + /** + * @return source addr. + */ + inline int src() const { return src_; } + /** + * Set destination addr. + * @param addr unique identify one worker/server/stub in the current job + */ + inline void set_dst(int addr) { dst_ = addr; } + /** + * @return dst addr. + */ + inline int dst() const { return dst_; } + /** + * Set msg type, e.g., kPut, kGet, kUpdate, kRequest + */ + inline void set_type(int type) { type_ = type; } + /** + * @return msg type. + */ + inline int type() const { return type_; } + /** + * Set msg target. + * + * One msg has a target to identify some entity in worker/server/stub. + * The target is associated with a version, e.g., Param version. + */ + inline void set_trgt(int val, int version) { + trgt_val_ = val; + trgt_version_ = version; + } + inline int trgt_val() const { return trgt_val_; } + inline int trgt_version() const { return trgt_version_; } + + protected: + int src_ = 0; + int dst_ = 0; + int type_ = 0; + int trgt_val_ = 0; + int trgt_version_ = 0; +#ifdef USE_ZMQ + zmsg_t* msg_ = nullptr; + zframe_t *frame_ = nullptr; +#endif +}; + +inline void DeleteMsg(Msg** msg) { + delete *msg; + *msg = nullptr; +} + +} // namespace singa + +#endif // SINGA_COMM_MSG_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/comm/socket.h ---------------------------------------------------------------------- diff --git a/include/comm/socket.h b/include/comm/socket.h new file mode 100644 index 0000000..f2ffb4d --- /dev/null +++ b/include/comm/socket.h @@ -0,0 +1,174 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_COMM_SOCKET_H_ +#define SINGA_COMM_SOCKET_H_ + +#ifdef USE_ZMQ +#include <czmq.h> +#endif +#include <map> +#include <string> +#include <vector> +#include "comm/msg.h" + +namespace singa { + +const std::string kInprocRouterEndpoint = "inproc://router"; + +class SocketInterface { + public: + virtual ~SocketInterface() {} + /** + * Send a message to connected socket(s), non-blocking. The message + * will be deallocated after sending, thus should not be used after + * calling Send(); + * + * @param msg The message to be sent + * @return 1 for success queuing the message for sending, 0 for failure + */ + virtual int Send(Msg** msg) = 0; + /** + * Receive a message from any connected socket. + * + * @return a message pointer if success; nullptr if failure + */ + virtual Msg* Receive() = 0; + /** + * @return Identifier of the implementation dependent socket. E.g., zsock_t* + * for ZeroMQ implementation and rank for MPI implementation. + */ + virtual void* InternalID() const = 0; +}; + +class Poller { + public: + Poller(); + explicit Poller(SocketInterface* socket); + /** + * Add a socket for polling; Multiple sockets can be polled together by + * adding them into the same poller. + */ + void Add(SocketInterface* socket); + /** + * Poll for all sockets added into this poller. + * @param timeout Stop after this number of mseconds + * @return pointer To the socket if it has one message in the receiving + * queue; nullptr if no message in any sockets, + */ + SocketInterface* Wait(int duration); + + /** + * @return true if the poller is terminated due to process interupt + */ + virtual bool Terminated(); + + protected: +#ifdef USE_ZMQ + zpoller_t *poller_; + std::map<zsock_t*, SocketInterface*> zsock2Socket_; +#endif +}; + +class Dealer : public SocketInterface { + public: + /* + * @param id Local dealer ID within a procs if the dealer is from worker or + * server thread, starts from 1 (0 is used by the router); or the connected + * remote procs ID for inter-process dealers from the stub thread. + */ + Dealer(); + explicit Dealer(int id); + ~Dealer() override; + /** + * Setup the connection with the router. + * + * @param endpoint Identifier of the router. For intra-process + * connection, the endpoint follows the format of ZeroMQ, i.e., + * starting with "inproc://"; in Singa, since each process has one + * router, hence we can fix the endpoint to be "inproc://router" for + * intra-process. For inter-process, the endpoint follows ZeroMQ's + * format, i.e., IP:port, where IP is the connected process. + * @return 1 connection sets up successfully; 0 otherwise + */ + int Connect(const std::string& endpoint); + int Send(Msg** msg) override; + Msg* Receive() override; + void* InternalID() const override; + + protected: + int id_ = -1; +#ifdef USE_ZMQ + zsock_t* dealer_ = nullptr; + zpoller_t* poller_ = nullptr; +#endif +}; + +class Router : public SocketInterface { + public: + Router(); + /** + * There is only one router per procs, hence its local id is 0 and is not set + * explicitly. + * + * @param bufsize Buffer at most this number of messages + */ + explicit Router(int bufsize); + ~Router() override; + /** + * Setup the connection with dealers. + * + * It automatically binds to the endpoint for intra-process communication, + * i.e., "inproc://router". + * + * @param endpoint The identifier for the Dealer socket in other process + * to connect. It has the format IP:Port, where IP is the host machine. + * If endpoint is empty, it means that all connections are + * intra-process connection. + * @return number of connected dealers. + */ + int Bind(const std::string& endpoint); + /** + * If the destination socket has not connected yet, buffer this the message. + */ + int Send(Msg** msg) override; + Msg* Receive() override; + void* InternalID() const override; + + protected: + int nBufmsg_ = 0; + int bufsize_ = 100; +#ifdef USE_ZMQ + zsock_t* router_ = nullptr; + zpoller_t* poller_ = nullptr; + std::map<int, zframe_t*> id2addr_; + std::map<int, std::vector<zmsg_t*>> bufmsg_; +#endif +}; + +#ifdef USE_MPI +// TODO(wangsheng): add intra-process communication using shared queue +std::vector<SafeQueue*> MPIQueues; +#endif + +} // namespace singa + +#endif // SINGA_COMM_SOCKET_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/communication/msg.h ---------------------------------------------------------------------- diff --git a/include/communication/msg.h b/include/communication/msg.h deleted file mode 100644 index 217d89a..0000000 --- a/include/communication/msg.h +++ /dev/null @@ -1,238 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_COMMUNICATION_MSG_H_ -#define SINGA_COMMUNICATION_MSG_H_ - -// TODO(wangwei): make it a compiler argument -#define USE_ZMQ - -#include <utility> -#ifdef USE_ZMQ -#include <czmq.h> -#endif - -namespace singa { -/** - * Wrapper to generate message address - * @param grp worker/server group id - * @param id_or_proc worker/server id or procs id - * @param type msg type - */ -inline int Addr(int grp, int id_or_proc, int type) { - return (grp << 16) | (id_or_proc << 8) | type; -} - -/** - * Parse group id from addr. - * - * @return group id - */ -inline int AddrGrp(int addr) { - return addr >> 16; -} - -/** - * Parse worker/server id from addr. - * - * @return id - */ -inline int AddrID(int addr) { - static const int mask = (1 << 8) - 1; - return (addr >> 8) & mask; -} - -/** - * Parse worker/server procs from addr. - * - * @return procs id - */ -inline int AddrProc(int addr) { - return AddrID(addr); -} - -/** - * Parse msg type from addr - * @return msg type - */ -inline int AddrType(int addr) { - static const int mask = (1 << 8) -1; - return addr & mask; -} - -/** - * Msg used to transfer Param info (gradient or value), feature blob, etc - * between workers, stubs and servers. - * - * Each msg has a source addr and dest addr identified by a unique integer. - * It is also associated with a target field (value and version) for ease of - * getting some meta info (e.g., parameter id) from the msg. - * - * Other data is added into the message as frames. - */ -class Msg { - public: - ~Msg(); - Msg(); - /** - * Construct the msg providing source and destination addr. - */ - Msg(int src, int dst); - /** - * Copy constructor. - */ - Msg(const Msg& msg); - /** - * Swap the src/dst addr - */ - void SwapAddr(); - /** - * Add a frame (a chunck of bytes) into the message - */ - void AddFrame(const void* addr, int nBytes); - /** - * @return num of bytes of the current frame. - */ - int FrameSize(); - /** - * @return the pointer to the current frame data. - */ - void* FrameData(); - /** - * @return the data of the current frame as c string - */ - char* FrameStr(); - /** - * Move the cursor to the first frame. - */ - void FirstFrame(); - /** - * Move the cursor to the last frame. - */ - void LastFrame(); - /** - * Move the cursor to the next frame - * @return true if the next frame is not NULL; otherwise false - */ - bool NextFrame(); - /** - * Add a 'format' frame to the msg (like CZMQ's zsock_send). - * - * The format is a string that defines the type of each field. - * The format can contain any of these characters, each corresponding to - * one or two arguments: - * i = int (signed) - * 1 = uint8_t - * 2 = uint16_t - * 4 = uint32_t - * 8 = uint64_t - * p = void * (sends the pointer value, only meaningful over inproc) - * s = char** - * - * Returns size of the added content. - */ - int AddFormatFrame(const char *format, ...); - /** - * Parse the current frame added using AddFormatFrame(const char*, ...). - * - * The format is a string that defines the type of each field. - * The format can contain any of these characters, each corresponding to - * one or two arguments: - * i = int (signed) - * 1 = uint8_t - * 2 = uint16_t - * 4 = uint32_t - * 8 = uint64_t - * p = void * (sends the pointer value, only meaningful over inproc) - * s = char** - * - * Returns size of the parsed content. - */ - int ParseFormatFrame(const char* format, ...); - -#ifdef USE_ZMQ - void ParseFromZmsg(zmsg_t* msg); - zmsg_t* DumpToZmsg(); -#endif - - /** - * @return msg size in terms of bytes, ignore meta info. - */ - int size() const; - /** - * Set source addr. - * @param addr unique identify one worker/server/stub in the current job - */ - inline void set_src(int addr) { src_ = addr; } - /** - * @return source addr. - */ - inline int src() const { return src_; } - /** - * Set destination addr. - * @param addr unique identify one worker/server/stub in the current job - */ - inline void set_dst(int addr) { dst_ = addr; } - /** - * @return dst addr. - */ - inline int dst() const { return dst_; } - /** - * Set msg type, e.g., kPut, kGet, kUpdate, kRequest - */ - inline void set_type(int type) { type_ = type; } - /** - * @return msg type. - */ - inline int type() const { return type_; } - /** - * Set msg target. - * - * One msg has a target to identify some entity in worker/server/stub. - * The target is associated with a version, e.g., Param version. - */ - inline void set_trgt(int val, int version) { - trgt_val_ = val; - trgt_version_ = version; - } - inline int trgt_val() const { return trgt_val_; } - inline int trgt_version() const { return trgt_version_; } - - protected: - int src_ = 0; - int dst_ = 0; - int type_ = 0; - int trgt_val_ = 0; - int trgt_version_ = 0; -#ifdef USE_ZMQ - zmsg_t* msg_ = nullptr; - zframe_t *frame_ = nullptr; -#endif -}; - -inline void DeleteMsg(Msg** msg) { - delete *msg; - *msg = nullptr; -} - -} // namespace singa - -#endif // SINGA_COMMUNICATION_MSG_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/communication/socket.h ---------------------------------------------------------------------- diff --git a/include/communication/socket.h b/include/communication/socket.h deleted file mode 100644 index 3590577..0000000 --- a/include/communication/socket.h +++ /dev/null @@ -1,174 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#ifndef SINGA_COMMUNICATION_SOCKET_H_ -#define SINGA_COMMUNICATION_SOCKET_H_ - -#ifdef USE_ZMQ -#include <czmq.h> -#endif -#include <map> -#include <string> -#include <vector> -#include "communication/msg.h" - -namespace singa { - -const std::string kInprocRouterEndpoint = "inproc://router"; - -class SocketInterface { - public: - virtual ~SocketInterface() {} - /** - * Send a message to connected socket(s), non-blocking. The message - * will be deallocated after sending, thus should not be used after - * calling Send(); - * - * @param msg The message to be sent - * @return 1 for success queuing the message for sending, 0 for failure - */ - virtual int Send(Msg** msg) = 0; - /** - * Receive a message from any connected socket. - * - * @return a message pointer if success; nullptr if failure - */ - virtual Msg* Receive() = 0; - /** - * @return Identifier of the implementation dependent socket. E.g., zsock_t* - * for ZeroMQ implementation and rank for MPI implementation. - */ - virtual void* InternalID() const = 0; -}; - -class Poller { - public: - Poller(); - explicit Poller(SocketInterface* socket); - /** - * Add a socket for polling; Multiple sockets can be polled together by - * adding them into the same poller. - */ - void Add(SocketInterface* socket); - /** - * Poll for all sockets added into this poller. - * @param timeout Stop after this number of mseconds - * @return pointer To the socket if it has one message in the receiving - * queue; nullptr if no message in any sockets, - */ - SocketInterface* Wait(int duration); - - /** - * @return true if the poller is terminated due to process interupt - */ - virtual bool Terminated(); - - protected: -#ifdef USE_ZMQ - zpoller_t *poller_; - std::map<zsock_t*, SocketInterface*> zsock2Socket_; -#endif -}; - -class Dealer : public SocketInterface { - public: - /* - * @param id Local dealer ID within a procs if the dealer is from worker or - * server thread, starts from 1 (0 is used by the router); or the connected - * remote procs ID for inter-process dealers from the stub thread. - */ - Dealer(); - explicit Dealer(int id); - ~Dealer() override; - /** - * Setup the connection with the router. - * - * @param endpoint Identifier of the router. For intra-process - * connection, the endpoint follows the format of ZeroMQ, i.e., - * starting with "inproc://"; in Singa, since each process has one - * router, hence we can fix the endpoint to be "inproc://router" for - * intra-process. For inter-process, the endpoint follows ZeroMQ's - * format, i.e., IP:port, where IP is the connected process. - * @return 1 connection sets up successfully; 0 otherwise - */ - int Connect(const std::string& endpoint); - int Send(Msg** msg) override; - Msg* Receive() override; - void* InternalID() const override; - - protected: - int id_ = -1; -#ifdef USE_ZMQ - zsock_t* dealer_ = nullptr; - zpoller_t* poller_ = nullptr; -#endif -}; - -class Router : public SocketInterface { - public: - Router(); - /** - * There is only one router per procs, hence its local id is 0 and is not set - * explicitly. - * - * @param bufsize Buffer at most this number of messages - */ - explicit Router(int bufsize); - ~Router() override; - /** - * Setup the connection with dealers. - * - * It automatically binds to the endpoint for intra-process communication, - * i.e., "inproc://router". - * - * @param endpoint The identifier for the Dealer socket in other process - * to connect. It has the format IP:Port, where IP is the host machine. - * If endpoint is empty, it means that all connections are - * intra-process connection. - * @return number of connected dealers. - */ - int Bind(const std::string& endpoint); - /** - * If the destination socket has not connected yet, buffer this the message. - */ - int Send(Msg** msg) override; - Msg* Receive() override; - void* InternalID() const override; - - protected: - int nBufmsg_ = 0; - int bufsize_ = 100; -#ifdef USE_ZMQ - zsock_t* router_ = nullptr; - zpoller_t* poller_ = nullptr; - std::map<int, zframe_t*> id2addr_; - std::map<int, std::vector<zmsg_t*>> bufmsg_; -#endif -}; - -#ifdef USE_MPI -// TODO(wangsheng): add intra-process communication using shared queue -std::vector<SafeQueue*> MPIQueues; -#endif - -} // namespace singa - -#endif // SINGA_COMMUNICATION_SOCKET_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/driver.h ---------------------------------------------------------------------- diff --git a/include/driver.h b/include/driver.h index b33c7cc..9ae4b27 100644 --- a/include/driver.h +++ b/include/driver.h @@ -22,6 +22,7 @@ #ifndef SINGA_DRIVER_H_ #define SINGA_DRIVER_H_ +#include <vector> #include "proto/job.pb.h" #include "proto/singa.pb.h" #include "utils/factory.h" @@ -29,20 +30,70 @@ #include "utils/singleton.h" #include "utils/updater.h" #include "neuralnet/layer.h" -#include "trainer/worker.h" +#include "./worker.h" +#include "./server.h" namespace singa { - +using std::vector; class Driver { public: /** - * Init SINGA, including init glog, parse job id and job conf from cmd line, - * and register built-in layer, worker, updater, param subclasses. + * Init SINGA + * - init glog + * - parse job id and job conf from cmd line + * - register built-in layer, worker, updater, param subclasses. * * May be used for MPI init if it is used for message passing. */ void Init(int argc, char** argv); /** + * Update job configuration and call Train(const JobProto&) to start the + * training. + * + * It sets up the logging path and checkpoing files (if resume), and checks + * the existence of the workspace folder . + * + * @param[in] resume if true resume the training from the latest checkpoint + * files. + * @param[in] job_conf job configuration. + */ + void Train(bool resume, const JobProto& job_conf); + /** + * Create workers and servers to conduct the training. + * + * @param[in] job_conf job configuration with all necessary fields set (e.g., + * by Train(bool, const JobProto&). + */ + void Train(const JobProto& job_conf); + /** + * Setting the checkpoint field of the job configuration to resume training. + * + * The checkpoint folder will be searched to get the files for the latest + * checkpoint, which will be added into the checkpoint field. The workers + * would then load the values of params from the checkpoint files. + * + * @param job_conf job configuration + */ + void SetupForResume(JobProto* job_conf); + /** + * Create server instances. + * + * @param[in] job_conf job configuration. + * @param[in] net training neural network. + * @return server instances + */ + const vector<Server*> CreateServers(const JobProto& job_conf, NeuralNet* net); + /** + * Create workers instances. + * @param[in] job_conf job configuration. + * @param[in] net training neural network. + * @return worker instances + */ + const vector<Worker*> CreateWorkers(const JobProto& job_conf, NeuralNet* net); + + + /*********** Subclasses registers *************************/ + /** * Register a Layer subclass. * * @param type layer type ID. If called to register built-in subclasses, @@ -103,12 +154,7 @@ class Driver { template<typename Subclass, typename Type> int RegisterParamGenerator(const Type& type); - /** - * Submit the job configuration for starting the job. - * @param resume resume from last checkpoint if true. - * @param job job configuration - */ - void Submit(bool resume, const JobProto& job); + /****************** Access function ********************/ /** * @return job ID which is generated by zookeeper and passed in by the * launching script. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/connection_layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/connection_layer.h b/include/neuralnet/connection_layer.h index 75f399c..1976fb9 100644 --- a/include/neuralnet/connection_layer.h +++ b/include/neuralnet/connection_layer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,12 +56,12 @@ class BridgeLayer : virtual public ConnectionLayer { */ class BridgeDstLayer : public BridgeLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric* perf) override { + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override { // reset ready_ for next iteration. ready_ = false; } - void ComputeGradient(int flag, Metric* perf) override {} + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} bool is_bridgedstlayer() const { return true; } @@ -73,25 +73,32 @@ class BridgeDstLayer : public BridgeLayer { */ class BridgeSrcLayer : public BridgeLayer { public: - void ComputeFeature(int flag, Metric* perf) override {} - void ComputeGradient(int flag, Metric* perf) override { + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override { + CHECK_GE(srclayers.size(), 1); + srclayer_ = srclayers.at(0); + } + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {} + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override { ready_ = false; } const Blob<float>& data(const Layer* from) const override { - return srclayers_[0]->data(this); + return srclayer_->data(this); } Blob<float>* mutable_data(const Layer* from) override { - return srclayers_[0]->mutable_data(this); + return srclayer_->mutable_data(this); } const Blob<float>& grad(const Layer* from) const override { - return srclayers_[0]->grad(this); + return srclayer_->grad(this); } Blob<float>* mutable_grad(const Layer* from) override { - return srclayers_[0]->mutable_grad(this); + return srclayer_->mutable_grad(this); } bool is_bridgesrclayer() const override { return true; } + + private: + Layer* srclayer_; }; @@ -103,9 +110,9 @@ class BridgeSrcLayer : public BridgeLayer { */ class ConcateLayer : public ConnectionLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; }; /** @@ -116,9 +123,9 @@ class ConcateLayer : public ConnectionLayer { */ class SliceLayer : public ConnectionLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; private: std::vector<Blob<float>> datavec_; @@ -136,9 +143,9 @@ class SliceLayer : public ConnectionLayer { */ class SplitLayer : public ConnectionLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; protected: Blob<float> grads_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/input_layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/input_layer.h b/include/neuralnet/input_layer.h index 709912d..b5f2dd4 100644 --- a/include/neuralnet/input_layer.h +++ b/include/neuralnet/input_layer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,8 +32,8 @@ * * The feature loading phase can be implemented using a single layer or * separated into DataLayer (for loading features as records) and ParserLayer - * (for parsing features from records). SINGA has provided some built-in layers - * for DataLayer and ParserLayer. + * (for parsing features from records). SINGA has provided some subclasses of + * DataLayer and ParserLayer. * * Data prefetching can be implemented as a sub-class of InputLayer. * SINGA provides a built-in PrefetchLayer which embeds DataLayer and @@ -41,20 +41,15 @@ */ namespace singa { /** - * Base layer for reading records from local Shard, HDFS, lmdb, etc. + * Base layer for reading ::Record from local Shard, HDFS, lmdb, etc. */ class DataLayer: virtual public InputLayer { public: - void ComputeGradient(int flag, Metric* perf) override {} - Blob<float>* mutable_data(const Layer* layer) override { - return nullptr; - } - Blob<float>* mutable_grad(const Layer* layer) override { - return nullptr; - } + Blob<float>* mutable_data(const Layer* layer) override { return nullptr; } ConnectionType dst_layer_connection() const override { return kOneToMany; } + inline int batchsize() const { return batchsize_; } virtual const Record& sample() const { return sample_; @@ -81,8 +76,8 @@ class ShardDataLayer : public DataLayer { public: ~ShardDataLayer(); - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric *perf) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; private: DataShard* shard_; @@ -94,9 +89,9 @@ class LMDBDataLayer : public DataLayer { public: ~LMDBDataLayer(); - void Setup(const LayerProto& proto, int npartitions) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void OpenLMDB(const std::string& path); - void ComputeFeature(int flag, Metric *perf) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; void ConvertCaffeDatumToRecord(const CaffeDatum& datum, SingleLabelImageRecord* record); @@ -114,8 +109,8 @@ class LMDBDataLayer : public DataLayer { */ class ParserLayer : public InputLayer { public: - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override {} + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} ConnectionType dst_layer_connection() const override { return kOneToMany; } @@ -124,13 +119,6 @@ class ParserLayer : public InputLayer { */ virtual void ParseRecords(int flag, const std::vector<Record>& records, Blob<float>* blob) = 0; - Blob<float>* mutable_grad(const Layer* layer) override { - return nullptr; - } - const Blob<float>& grad(const Layer* from) const override { - CHECK(false) << "Parser layer has not gradient blob"; - return grad_; - } }; /** @@ -138,7 +126,7 @@ class ParserLayer : public InputLayer { */ class LabelLayer : public ParserLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ParseRecords(int flag, const std::vector<Record>& records, Blob<float>* blob) override; }; @@ -148,7 +136,7 @@ class LabelLayer : public ParserLayer { */ class MnistLayer : public ParserLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ParseRecords(int flag, const std::vector<Record>& records, Blob<float>* blob) override; @@ -161,7 +149,7 @@ class MnistLayer : public ParserLayer { */ class RGBImageLayer : public ParserLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; + void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ParseRecords(int flag, const std::vector<Record>& records, Blob<float>* blob) override; @@ -181,8 +169,8 @@ class RGBImageLayer : public ParserLayer { class PrefetchLayer : public Layer { public: ~PrefetchLayer(); - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override {} + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} protected: std::thread thread_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/layer.h b/include/neuralnet/layer.h index 05377b1..bf83163 100644 --- a/include/neuralnet/layer.h +++ b/include/neuralnet/layer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,16 +33,22 @@ #include "utils/param.h" namespace singa { - +using std::vector; /** * Base layer class. * - * Children should implement at least + * Subclasses should implement at least * Layer::ComputeFeature() and Layer::ComputGradient() - * functions for contrastive-divergence/back-propagation algorithm. + * functions in accordance with the NeuralNet::TrainOneBatch function. */ class Layer { public: + /** + * Create a sub-layer instance based on proto.type(); + * + * @param proto configuration of the layer instance. + * @return pointer to the newly created layer instance. + */ static Layer* Create(const LayerProto& proto); Layer() {} @@ -50,49 +56,51 @@ class Layer { /** * Setup layer properties. * - * Setup the shapes for data and parameters, also setup some properties - * based on the layer configuration and connected layers. + * Setup members e.g., shapes of Param objects based on the layer + * configuration and connected layers. + * It should check the partition setting when setup the properties. * - * @param proto layer configuration. - * @param npartitions num of total partitions of the original layer. This - * layer should be setup as one partition. + * @param conf layer configuration. + * @param srclayers source layers that connect to this layer. */ - virtual void Setup(const LayerProto& proto, int npartitions = 1) { - CHECK_GE(npartitions, 1); - layer_proto_ = proto; + virtual void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { + layer_conf_ = conf; } /** * Compute features of this layer based on connected layers. * - * @param perf pointer to Metric obj for collect and aggregate performance + * @param[in] flag set by the TrainOneBatch function, e.g., to indicate the + * running phase (kForward|kTrain, kForward|kTest, etc). + * @param[in] srclayers source layers that connect to this layer. */ - virtual void ComputeFeature(int flag, Metric* perf) = 0; + virtual void ComputeFeature(int flag, const vector<Layer*>& srclayers) = 0; /** - * Compute gradients for parameters and connected layers. - * @param flag used to get the calling phase, e.g., forward of training - * (kForward | kTrain) - * @param flag used to get the calling phase, e.g., forward of training + * Compute gradients for parameters associated with this layer. + * It may also compute the gradients of the loss w.r.t the source layers. + * + * \copydetails ComputeFeature(). */ - virtual void ComputeGradient(int flag, Metric* perf) = 0; + virtual void ComputeGradient(int flag, const vector<Layer*>& srclayers) = 0; /** - * Layers that have paramters must override this function. - * @param flag used to get the calling phase, e.g., forward of training - * (kForward | kTrain) - * @return parameters associated with this layer + * Layers that have paramters must override this function to return all Param + * objects associated with this layer. + * + * @return parameters associated with this layer. */ virtual const std::vector<Param*> GetParams() const { return std::vector<Param*> {}; } /** - * Return the connection type between one neuron of this layer and - * its source layer. + * Return the connection type between one neuron of this layer and its source + * layer. + * * Currently support two connection types: kOneToOne, and kOneToAll. - * kOneToOne indicates the neuron depends on only one neuron from src layer. - * kOneToAll indicates the neuron depends on all neurons from src layer. + * - kOneToOne indicates the neuron depends on only one neuron from src layer. + * - kOneToAll indicates the neuron depends on all neurons from src layer. * TODO(wangwei) support kOneToMany. * - * @param k index of source layer (current only support k = 0. - * @param connection type. + * @param[in] k index of source layer, current only support k = 0. + * @return connection type. */ virtual ConnectionType src_neuron_connection(int k) const { // CHECK_LT(k, srclayers_.size()); @@ -102,89 +110,101 @@ class Layer { * Return the connection type of this layer and all dst layers. * * Currently support two connection types: kOneToOne, and kOneToMany. - * kOneToOne indicates the users implement the ComputeFeature and - * ComputeGradient function considering only one dest layer. In this case, + * - kOneToOne indicates the users implement the ComputeFeature and + * ComputeGradient function considering only one dst layer. In this case, * a SplitLayer will be added automatically to connect this layer with all * dest layer. - * kOneToMany indicates the users has already considered multiple dest layers - * in the implementation. + * - kOneToMany indicates this layer has already considered multiple dst + * layers in the implementation. + * * @return connection type default is kOneToOne. */ virtual ConnectionType dst_layer_connection() const { return kOneToOne; } /** - * For print debug info about each layer, e.g., norm of feature vector, - * norm of parameters. + * To display layer info, e.g., aggreated loss/accuracy, or norm of feature + * vector and norm of parameters. * - * @param step training/test/validation step - * @param flag used to get the calling phase, e.g., forward of training - * (kForward | kTrain) - * @return debug info about this layer. + * @param[in] debug whether print the debug info + * @param[in] flag used to get the calling phase, e.g., forward of training + * (kForward | kTrain). + * @return info string about this layer, which is printed into the log. */ - virtual const std::string DebugString(int step, int flag); + virtual const std::string ToString(bool debug, int flag); /** - * @return partition dimension of this layer. - * -1 for no partition; - * 0 for partition the mini-batch into sub-mini-batch. - * 1 for partition the layer feature vector into sub-vector. + * @return partition dimension of this layer, + * - -1 for no partition. + * - 0 for partition on the data dimension, i.e., partitioning the mini-batch + * into sub-mini-batches. + * - 1 for partition this layer on feature dimension, i.e., the feature + * vector of each instance is partitioned into sub-vectors. */ inline int partition_dim() const { - CHECK_LE(layer_proto_.partition_dim(), 1); - return layer_proto_.partition_dim(); + CHECK_LE(layer_conf_.partition_dim(), 1); + return layer_conf_.partition_dim(); } - inline int partition_id() const { return layer_proto_.partition_id(); } - inline int type() const { return layer_proto_.type(); } /** - * Return name of this layer + * @return the partition ID (i.e., the worker ID to whom is layer is + * dispatched) of this layer, which is a sublayer partitioned from the + * original layer. + */ + inline int partition_id() const { return layer_conf_.partition_id(); } + /** + * @return total number of partitions (i.e., sub-layers) of the original + * layer of this layer. + */ + inline int num_partitions() const { return layer_conf_.num_partitions(); } + /** + * @return the type of this layer, only valid for built-in layer (types). */ - inline const std::string &name() const { return layer_proto_.name(); } + inline LayerType type() const { return layer_conf_.type(); } /** - * @return name of src data blob, used by prefetch layer to locate the data - * blob in parser layers; The default value is "unknown"; If the - * src layer is the prefetch layer and there are more than one parser layers, - * this value be set. - const std::string &datablob() const { - return layer_proto_.datablob(); + * @return user-defined layer type. + */ + inline const std::string& user_type() const { + return layer_conf_.user_type(); } + /** + * Return name of this layer */ + inline const std::string& name() const { return layer_conf_.name(); } /** - * @return a const ref for Blob storing neuron values of this layer for BP + * @param[in] from pointer to one of the dst layer. For some layers, they have + * more than one data Blob. In this case, this argument identifies the layer + * that is requesting the data Blob. + * @return a const ref for Blob storing feature values of this layer. */ virtual const Blob<float>& data(const Layer* from) const { return data_; } + /** + * @see data(). + * @return the pointer to the Blob storing feature values of this layer. + */ virtual Blob<float>* mutable_data(const Layer* from) { return &data_; } + /** + * @see data(). + * @return the const ref of the Blob for the gradient of this layer, mainly + * used in BP algorithm. + */ virtual const Blob<float>& grad(const Layer* from) const { return grad_; } /** - * @return a pointer to storing neuron grads of this layer for BP + * @see data(). + * @return a pointer to the Blob storing gradients of this layer, mainly + * used in BP algorithm. */ virtual Blob<float>* mutable_grad(const Layer* from) { return &grad_; } - /** - * return LayerS that connected to this layer - */ - inline const std::vector<Layer*> srclayers() const { return srclayers_; } - /** - * return LayerS that this layer connected to - */ - inline const std::vector<Layer*> dstlayers() const { return dstlayers_; } - inline int srclayers_size() const { return srclayers_.size(); } - inline int dstlayers_size() const { return dstlayers_.size(); } - inline void clear_dstlayers() { dstlayers_.clear(); } - inline void clear_srclayers() { srclayers_.clear(); } - inline void add_srclayer(Layer* src) { srclayers_.push_back(src); } - inline void add_dstlayer(Layer* dst) { dstlayers_.push_back(dst); } protected: - LayerProto layer_proto_; + LayerProto layer_conf_; Blob<float> data_, grad_; - std::vector<Layer*> srclayers_, dstlayers_; }; /** @@ -199,29 +219,59 @@ class ConnectionLayer : virtual public Layer { * parsing records. */ class InputLayer : virtual public Layer { - // defined as a layer category + public: + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} + Blob<float>* mutable_grad(const Layer* layer) override { + // LOG(FATAL) << "Loss layer has no gradient blob"; + return nullptr; + } + const Blob<float>& grad(const Layer* from) const override { + // LOG(FATAL) << "Loss layer has no gradient blob"; + return grad_; + } }; +/** + * Base layer for calculating loss and doing BackPropagation. + */ +class LossLayer : virtual public Layer { + public: + const std::string ToString(bool debug, int flag) override; + Blob<float>* mutable_grad(const Layer* layer) override { + LOG(FATAL) << "Loss layer has no gradient blob"; + return nullptr; + } + const Blob<float>& grad(const Layer* from) const override { + LOG(FATAL) << "Loss layer has no gradient blob"; + return grad_; + } + protected: + Metric metric_; +}; + +/** + * Base layer for feature transformation, e.g., ConvolutionLayer, PoolingLayer, + * etc. + */ class NeuronLayer : virtual public Layer { // defined as a layer category }; /** - * Base layer for calculating loss and other metrics, e.g., precison. + * Base layer for collecting features into disk file, HTTP stream, etc. */ -class LossLayer : virtual public Layer { +class OutpuLayer : virtual public Layer { public: + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} Blob<float>* mutable_grad(const Layer* layer) override { + LOG(FATAL) << "Loss layer has no gradient blob"; return nullptr; } const Blob<float>& grad(const Layer* from) const override { LOG(FATAL) << "Loss layer has no gradient blob"; return grad_; } - - protected: - Blob<float> metric_; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/loss_layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/loss_layer.h b/include/neuralnet/loss_layer.h index 3af0b46..a48a8e7 100644 --- a/include/neuralnet/loss_layer.h +++ b/include/neuralnet/loss_layer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,30 +22,36 @@ #ifndef SINGA_NEURALNET_LOSS_LAYER_H_ #define SINGA_NEURALNET_LOSS_LAYER_H_ +#include <vector> #include "neuralnet/layer.h" /** - * \file this file includes the declarations of layers that inherit the base + * @file this file includes the declarations of layers that inherit the base * LossLayer for measuring the objective training loss. */ namespace singa { +using std::vector; /** - * Squared Euclidean loss as 0.5 ||predict - ground_truth||^2. + * Squared Euclidean loss as @f$0.5 ||p - t||^2@f$, where p is for prediction + * t is for ground truth. */ class EuclideanLossLayer : public LossLayer { public: - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; }; /** - * Cross-entropy loss applied to the probabilities after Softmax. + * Cross-entropy loss applied to the probabilities computed from Softmax. + * @f$ L_i = -log P_{t_i}, t_i\in [0, C] @f$ is the label for the i-th object, + * C is the total number of classes. */ class SoftmaxLossLayer : public LossLayer { public: - void Setup(const LayerProto& proto, int npartitions) override; - void ComputeFeature(int flag, Metric* perf) override; - void ComputeGradient(int flag, Metric* perf) override; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; /** * softmax is not recommendeded for partition because it requires the whole http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/neuralnet.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h index 693fe19..a202f44 100644 --- a/include/neuralnet/neuralnet.h +++ b/include/neuralnet/neuralnet.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ #include <string> #include <vector> +#include <unordered_map> #include "neuralnet/layer.h" #include "proto/job.pb.h" @@ -31,7 +32,6 @@ #include "utils/graph.h" namespace singa { - /** * The neural network is constructed from user configurations in NetProto. * @@ -60,23 +60,27 @@ class NeuralNet { * @param netproto neural net config * @param npartitions num of partitions. 1 for no partitioning. */ - NeuralNet(NetProto netproto, int npartitions); + NeuralNet(NetProto net_conf, int num_partitions); ~NeuralNet(); /** * To display the adjacency layers - */ std::string ToAdjacency(); + */ /** * Share memory of parameter values from other neuralnet */ void ShareParamsFrom(NeuralNet* other); - inline const std::vector<Layer*>& layers() { return layers_; } + inline const std::vector<Layer*>& layers() const { return layers_; } inline const std::vector<Param*>& params() const { return params_; } inline Layer* name2layer(std::string name) const { - if (name2layer_.find(name) != name2layer_.end()) - return name2layer_.at(name); - else - return nullptr; + CHECK(name2layer_.find(name) != name2layer_.end()) + << "No layer with name " << name; + return name2layer_.at(name); + } + inline const std::vector<Layer*>& srclayers(const Layer* layer) const { + CHECK(src_map_.find(layer) != src_map_.end()) + << "layer (" << layer->name() << " ) has no source layers"; + return src_map_.at(layer); } inline Param* paramid2param(int id) const { return paramid2param_.at(id); } @@ -90,11 +94,11 @@ class NeuralNet { * @npartitions * @return neural net graph */ - Graph* CreateGraph(const NetProto& netproto, int npartitions); + Graph* CreateGraph(const NetProto& netproto, int num_partitions); /** * Create neural net from graph, one layer per node. */ - void CreateNetFromGraph(Graph* graph, int npartitions); + void CreateNetFromGraph(Graph* graph, int num_partitions); /** * prepare data structures, e.g., params_, layers_, etc. */ @@ -104,8 +108,9 @@ class NeuralNet { std::vector<Layer*> layers_; std::vector<Param*> params_; - std::map<std::string, Layer*> name2layer_; - std::map<int, Param*> paramid2param_; + std::unordered_map<std::string, Layer*> name2layer_; + std::unordered_map<int, Param*> paramid2param_; + std::unordered_map<const Layer*, std::vector<Layer*>> src_map_; }; } // namespace singa
