http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/connection_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer.cc b/src/neuralnet/connection_layer.cc index 1ba2d95..acf243d 100644 --- a/src/neuralnet/connection_layer.cc +++ b/src/neuralnet/connection_layer.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,17 +24,24 @@ namespace singa { using std::vector; - +/********* Implementation for BridgeDstLayer **************/ +void BridgeDstLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + Layer::Setup(proto, srclayers); + CHECK_EQ(srclayers.size(), 1); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.ReshapeLike(data_); +} /************* Implementation for ConcateLayer ***********/ -void ConcateLayer::Setup(const LayerProto& proto, int npartitions) { - // CHECK_EQ(npartitions, 1); - Layer::Setup(proto, npartitions); - size_t concate_dim = proto.concate_conf().concate_dim(); +void ConcateLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + size_t concate_dim = conf.concate_conf().concate_dim(); CHECK_GE(concate_dim, 0); - CHECK_GT(srclayers_.size(), 1); - vector<int> shape = srclayers_[0]->data(this).shape(); - for (size_t i = 1; i < srclayers_.size(); i++) { - const vector<int>& srcshape = srclayers_[i]->data(this).shape(); + CHECK_GT(srclayers.size(), 1); + vector<int> shape = srclayers[0]->data(this).shape(); + for (size_t i = 1; i < srclayers.size(); i++) { + const vector<int>& srcshape = srclayers[i]->data(this).shape(); for (size_t j = 0; j < shape.size(); j++) if (j == concate_dim) shape[j] += srcshape[j]; @@ -45,23 +52,24 @@ void ConcateLayer::Setup(const LayerProto& proto, int npartitions) { grad_.Reshape(shape); } -void ConcateLayer::ComputeFeature(int flag, Metric *perf) { +void ConcateLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented for Concate Layer"; } -void ConcateLayer::ComputeGradient(int flag, Metric* perf) { +void ConcateLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented for Concate Layer"; } /************* Implementation for SliceLayer****************/ -void SliceLayer::Setup(const LayerProto& proto, int npartitions) { +void SliceLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { /* - Layer::Setup(proto, npartitions); - slice_dim_ = proto.slice_conf().slice_dim(); + Layer::Setup(conf, npartitions); + slice_dim_ = conf.slice_conf().slice_dim(); slice_num_ = npartitions; CHECK_GE(slice_dim_, 0); CHECK_EQ(slice_num_, dstlayers_.size()); - data_.Reshape(srclayers_[0]->data(this).shape()); + data_.Reshape(srclayers[0]->data(this).shape()); grad_.ReshapeLike(data_); datavec_.resize(slice_num_); gradvec_.resize(slice_num_); @@ -79,11 +87,11 @@ void SliceLayer::Setup(const LayerProto& proto, int npartitions) { LOG(FATAL) << "Not implemented"; } -void SliceLayer::ComputeFeature(int flag, Metric *perf) { +void SliceLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { /* - CHECK_EQ(srclayers_.size(), 1); + CHECK_EQ(srclayers.size(), 1); if (slice_dim_ == 0) { - const auto& blob = srclayers_.at(0)->data(this); + const auto& blob = srclayers.at(0)->data(this); int size = blob.count() / slice_num_; for (int i = 0; i < slice_num_; i++) { float* dst = datavec_[i].mutable_cpu_data(); @@ -95,7 +103,7 @@ void SliceLayer::ComputeFeature(int flag, Metric *perf) { LOG(FATAL) << "Not implemented"; } -void SliceLayer::ComputeGradient(int flag, Metric* perf) { +void SliceLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented"; } @@ -112,19 +120,19 @@ int SliceLayer::SliceID(const Layer* layer) const { }*/ /************* Implementation for SplitLayer****************/ -void SplitLayer::Setup(const LayerProto& proto, int npartitions) { - // CHECK_EQ(npartitions, 1); - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - data_.Reshape(srclayers_[0]->data(this).shape()); - grad_.Reshape(srclayers_[0]->data(this).shape()); +void SplitLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.Reshape(srclayers[0]->data(this).shape()); } -void SplitLayer::ComputeFeature(int flag, Metric *perf) { +void SplitLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented"; } -void SplitLayer::ComputeGradient(int flag, Metric* perf) { +void SplitLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented"; } } // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/input_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/input_layer.cc b/src/neuralnet/input_layer.cc index a608ba4..f89369c 100644 --- a/src/neuralnet/input_layer.cc +++ b/src/neuralnet/input_layer.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,9 +34,9 @@ using std::string; using std::vector; /************* Implementation for ParserLayer ***********/ -void ParserLayer::ComputeFeature(int flag, Metric *perf) { - CHECK_EQ(srclayers_.size(), 1); - auto datalayer = dynamic_cast<DataLayer*>(*srclayers_.begin()); +void ParserLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 1); + auto datalayer = dynamic_cast<DataLayer*>(*srclayers.begin()); ParseRecords(flag, datalayer->records(), &data_); } @@ -48,8 +48,9 @@ LMDBDataLayer::~LMDBDataLayer() { mdb_cursor_ = nullptr; } -void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); +void LMDBDataLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + Layer::Setup(proto, srclayers); OpenLMDB(proto.lmdbdata_conf().path()); CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT), MDB_SUCCESS); @@ -62,7 +63,7 @@ void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) { ConvertCaffeDatumToRecord(datum, record); batchsize_ = proto.lmdbdata_conf().batchsize(); if (partition_dim() == 0) - batchsize_ /= npartitions; + batchsize_ /= proto.num_partitions(); records_.resize(batchsize_); random_skip_ = proto.lmdbdata_conf().random_skip(); } @@ -83,9 +84,9 @@ void LMDBDataLayer::OpenLMDB(const std::string& path) { MDB_SUCCESS) << "mdb_cursor_get failed"; } -void LMDBDataLayer::ComputeFeature(int flag, Metric* perf) { +void LMDBDataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { if (mdb_cursor_ == nullptr) - OpenLMDB(layer_proto_.lmdbdata_conf().path()); + OpenLMDB(layer_conf_.lmdbdata_conf().path()); if (random_skip_) { int nskip = rand() % random_skip_; int n = 0; @@ -155,8 +156,9 @@ ShardDataLayer::~ShardDataLayer() { shard_ = nullptr; } -void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); +void ShardDataLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + Layer::Setup(proto, srclayers); shard_ = new DataShard(proto.sharddata_conf().path(), DataShard::kRead); string key; shard_->Next(&key, &sample_); @@ -164,14 +166,14 @@ void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) { shard_ = nullptr; batchsize_ = proto.sharddata_conf().batchsize(); if (partition_dim() == 0) - batchsize_ /= npartitions; + batchsize_ /= proto.num_partitions(); records_.resize(batchsize_); random_skip_ = proto.sharddata_conf().random_skip(); } -void ShardDataLayer::ComputeFeature(int flag, Metric* perf) { +void ShardDataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { if (shard_ == nullptr) - shard_ = new DataShard(layer_proto_.sharddata_conf().path(), + shard_ = new DataShard(layer_conf_.sharddata_conf().path(), DataShard::kRead); if (random_skip_) { int nskip = rand() % random_skip_; @@ -193,15 +195,16 @@ void ShardDataLayer::ComputeFeature(int flag, Metric* perf) { } /********* Implementation for LabelLayer **************/ -void LabelLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize(); +void LabelLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + Layer::Setup(proto, srclayers); + CHECK_EQ(srclayers.size(), 1); + int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize(); data_.Reshape(vector<int>{batchsize}); } void LabelLayer::ParseRecords(int flag, const vector<Record>& records, - Blob<float>* blob) { + Blob<float>* blob) { int rid = 0; float *label = blob->mutable_cpu_data(); for (const Record& record : records) { @@ -212,8 +215,8 @@ void LabelLayer::ParseRecords(int flag, const vector<Record>& records, } /**************** Implementation for MnistLayer ******************/ -void MnistLayer::ParseRecords(int flag, - const vector<Record>& records, Blob<float>* blob) { +void MnistLayer::ParseRecords(int flag, const vector<Record>& records, + Blob<float>* blob) { LOG_IF(ERROR, records.size() == 0) << "Empty records to parse"; int ndim = records.at(0).image().shape_size(); int inputsize = records.at(0).image().shape(ndim-1); @@ -246,11 +249,12 @@ void MnistLayer::ParseRecords(int flag, CHECK_EQ(dptr, blob->mutable_cpu_data() + blob->count()); } -void MnistLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize(); - Record sample = dynamic_cast<DataLayer*>(srclayers_[0])->sample(); +void MnistLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + Layer::Setup(proto, srclayers); + CHECK_EQ(srclayers.size(), 1); + int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize(); + Record sample = dynamic_cast<DataLayer*>(srclayers[0])->sample(); norm_a_ = proto.mnist_conf().norm_a(); norm_b_ = proto.mnist_conf().norm_b(); int ndim = sample.image().shape_size(); @@ -261,8 +265,8 @@ void MnistLayer::Setup(const LayerProto& proto, int npartitions) { } /*************** Implementation for RGBImageLayer *************************/ -void RGBImageLayer::ParseRecords(int flag, - const vector<Record>& records, Blob<float>* blob) { +void RGBImageLayer::ParseRecords(int flag, const vector<Record>& records, + Blob<float>* blob) { const vector<int>& s = blob->shape(); Tensor<cpu, 4> images(data_.mutable_cpu_data(), Shape4(s[0], s[1], s[2], s[3])); @@ -315,14 +319,15 @@ void RGBImageLayer::ParseRecords(int flag, FreeSpace(croped_image); } -void RGBImageLayer::Setup(const LayerProto& proto, int npartitions) { - ParserLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); +void RGBImageLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + ParserLayer::Setup(proto, srclayers); + CHECK_EQ(srclayers.size(), 1); scale_ = proto.rgbimage_conf().scale(); cropsize_ = proto.rgbimage_conf().cropsize(); mirror_ = proto.rgbimage_conf().mirror(); - int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize(); - Record sample = dynamic_cast<DataLayer*>(srclayers_[0])->sample(); + int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize(); + Record sample = dynamic_cast<DataLayer*>(srclayers[0])->sample(); vector<int> shape; shape.push_back(batchsize); for (int x : sample.image().shape()) { @@ -361,7 +366,7 @@ PrefetchLayer::~PrefetchLayer() { } -void PrefetchLayer::ComputeFeature(int flag, Metric* perf) { +void PrefetchLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { LOG(FATAL) << "Not implemented"; } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc index d818533..e229045 100644 --- a/src/neuralnet/layer.cc +++ b/src/neuralnet/layer.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,7 +42,9 @@ Layer* Layer::Create(const LayerProto& proto) { return layer; } -const string Layer::DebugString(int step, int flag) { +const std::string Layer::ToString(bool debug, int flag) { + if (!debug) + return ""; string ret = StringPrintf("Layer %10s ", name().c_str()); if ((flag & kForward) == kForward && data_.count() !=0) { ret += StringPrintf("data norm1 %13.9f", data_.asum_data()); @@ -60,4 +62,15 @@ const string Layer::DebugString(int step, int flag) { } return ret; } + +const std::string LossLayer::ToString(bool debug, int flag) { + std::string disp; + if (debug) { + disp = Layer::ToString(debug, flag); + } else { + disp = metric_.ToLogString(); + metric_.Reset(); + } + return disp; +} } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/loss_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/loss_layer.cc b/src/neuralnet/loss_layer.cc index d8fd92b..b5447f6 100644 --- a/src/neuralnet/loss_layer.cc +++ b/src/neuralnet/loss_layer.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,50 +40,59 @@ using std::string; using std::vector; /********** * Implementation for EuclideanLossLayer*************************/ -void EuclideanLossLayer::ComputeFeature(int flag, Metric* perf) { - int count = srclayers_[0]->data(this).count(); - CHECK_EQ(count, srclayers_[1]->data(this).count()); - const float* reconstruct_dptr = srclayers_[0]->data(this).cpu_data(); - const float* input_dptr = srclayers_[1]->data(this).cpu_data(); +void EuclideanLossLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 2); + Layer::Setup(conf, srclayers); +} + +void EuclideanLossLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { + int count = srclayers[0]->data(this).count(); + CHECK_EQ(count, srclayers[1]->data(this).count()); + const float* reconstruct_dptr = srclayers[0]->data(this).cpu_data(); + const float* input_dptr = srclayers[1]->data(this).cpu_data(); float loss = 0; for (int i = 0; i < count; i++) { loss += (input_dptr[i] - reconstruct_dptr[i]) * (input_dptr[i] - reconstruct_dptr[i]); } - perf->Add("loss", loss / srclayers_[0]->data(this).shape()[0]); + metric_.Add("loss", loss / srclayers[0]->data(this).shape()[0]); } -void EuclideanLossLayer::ComputeGradient(int flag, Metric* perf) { - int count = srclayers_[0]->data(this).count(); - CHECK_EQ(count, srclayers_[1]->data(this).count()); - const float* reconstruct_dptr = srclayers_[0]->data(this).cpu_data(); - const float* input_dptr = srclayers_[1]->data(this).cpu_data(); - Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this); + +void EuclideanLossLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { + int count = srclayers[0]->data(this).count(); + CHECK_EQ(count, srclayers[1]->data(this).count()); + const float* reconstruct_dptr = srclayers[0]->data(this).cpu_data(); + const float* input_dptr = srclayers[1]->data(this).cpu_data(); + Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this); float* gsrcptr = gsrcblob->mutable_cpu_data(); for (int i = 0; i < count; i++) { gsrcptr[i] = reconstruct_dptr[i]-input_dptr[i]; } Tensor<cpu, 1> gsrc(gsrcptr, Shape1(gsrcblob->count())); - gsrc /= srclayers_[0]->data(this).shape()[0]; + gsrc /= srclayers[0]->data(this).shape()[0]; } - /********** * Implementation for SoftmaxLossLayer*************************/ -void SoftmaxLossLayer::Setup(const LayerProto& proto, int npartitions) { - LossLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 2); - data_.Reshape(srclayers_[0]->data(this).shape()); +void SoftmaxLossLayer::Setup(const LayerProto& proto, + const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 2); + LossLayer::Setup(proto, srclayers); + data_.Reshape(srclayers[0]->data(this).shape()); batchsize_ = data_.shape()[0]; dim_ = data_.count() / batchsize_; topk_ = proto.softmaxloss_conf().topk(); - metric_.Reshape(vector<int>{2}); scale_ = proto.softmaxloss_conf().scale(); } -void SoftmaxLossLayer::ComputeFeature(int flag, Metric* perf) { +void SoftmaxLossLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { Shape<2> s = Shape2(batchsize_, dim_); Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s); - Tensor<cpu, 2> src(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), s); + Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s); Softmax(prob, src); - const float* label = srclayers_[1]->data(this).cpu_data(); + const float* label = srclayers[1]->data(this).cpu_data(); const float* probptr = prob.dptr; float loss = 0, precision = 0; for (int n = 0; n < batchsize_; n++) { @@ -108,13 +117,14 @@ void SoftmaxLossLayer::ComputeFeature(int flag, Metric* perf) { probptr += dim_; } CHECK_EQ(probptr, prob.dptr + prob.shape.Size()); - perf->Add("loss", loss * scale_ / (1.0f * batchsize_)); - perf->Add("accuracy", precision * scale_ / (1.0f * batchsize_)); + metric_.Add("loss", loss * scale_ / (1.0f * batchsize_)); + metric_.Add("accuracy", precision * scale_ / (1.0f * batchsize_)); } -void SoftmaxLossLayer::ComputeGradient(int flag, Metric* perf) { - const float* label = srclayers_[1]->data(this).cpu_data(); - Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this); +void SoftmaxLossLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { + const float* label = srclayers[1]->data(this).cpu_data(); + Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this); gsrcblob->CopyFrom(data_); float* gsrcptr = gsrcblob->mutable_cpu_data(); for (int n = 0; n < batchsize_; n++) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc index 775a5a7..ec23c23 100644 --- a/src/neuralnet/neuralnet.cc +++ b/src/neuralnet/neuralnet.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -94,6 +94,7 @@ NeuralNet::~NeuralNet() { delete layer; } +/* std::string NeuralNet::ToAdjacency() { string disp = ""; for (auto& layer : layers_) { @@ -104,6 +105,7 @@ std::string NeuralNet::ToAdjacency() { } return disp; } +*/ void NeuralNet::ShareParamsFrom(NeuralNet* other) { for (auto& layer : layers_) { @@ -215,6 +217,7 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) { // differentiate partitions string nodename = layer.name() + "@" + string(suffix); proto->set_partition_id(i); + proto->set_num_partitions(npartitions); proto->set_name(nodename); auto node = new Node(nodename, layer.name(), i, proto); graph->AddNode(node); @@ -321,21 +324,19 @@ void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) { } // connect layers for (Node* node : graph->nodes()) { - auto layer = name2layer_[node->name]; - layer->clear_dstlayers(); - for (Node* dst : node->dstnodes) - layer->add_dstlayer(name2layer_[dst->name]); - layer->clear_srclayers(); + auto layer = name2layer(node->name); + src_map_[layer] = vector<Layer*>{}; for (Node* src : node->srcnodes) - layer->add_srclayer(name2layer_[src->name]); + src_map_[layer].push_back(name2layer(src->name)); } + // setup layers int paramid = 0; map<string, string> layerinfo; map<string, vector<Layer*>> share_param_layers; for (Node* node : graph->nodes()) { - auto layer = name2layer_[node->name]; - layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions); + auto layer = name2layer(node->name); + layer->Setup(*(static_cast<LayerProto*>(node->proto)), srclayers(layer)); LOG(INFO) << "constructing graph: " << layer->name(); layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape()); string param_name = "$"; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/neuron_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuron_layer.cc b/src/neuralnet/neuron_layer.cc index 9e7831a..4e3acf0 100644 --- a/src/neuralnet/neuron_layer.cc +++ b/src/neuralnet/neuron_layer.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -73,17 +73,19 @@ ConvolutionLayer::~ConvolutionLayer() { delete weight_; delete bias_; } -void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - ConvolutionProto conv_conf = proto.convolution_conf(); +void ConvolutionLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 1); + Layer::Setup(conf, srclayers); + ConvolutionProto conv_conf = conf.convolution_conf(); kernel_ = conv_conf.kernel(); CHECK_GT(kernel_, 0) << "Filter size cannot be zero."; pad_ = conv_conf.pad(); stride_ = conv_conf.stride(); num_filters_ = conv_conf.num_filters(); if (partition_dim() > 0) - num_filters_ /= npartitions; - const vector<int>& srcshape = srclayers_[0]->data(this).shape(); + num_filters_ /= srclayers.at(0)->num_partitions(); + const vector<int>& srcshape = srclayers[0]->data(this).shape(); int dim = srcshape.size(); CHECK_GT(dim, 2); width_ = srcshape[dim - 1]; @@ -102,14 +104,15 @@ void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) { grad_.Reshape(shape); col_data_.Reshape(vector<int>{col_height_, col_width_}); col_grad_.Reshape(vector<int>{col_height_, col_width_}); - weight_ = Param::Create(proto.param(0)); - bias_ = Param::Create(proto.param(1)); + weight_ = Param::Create(conf.param(0)); + bias_ = Param::Create(conf.param(1)); weight_->Setup(vector<int>{num_filters_, col_height_}); bias_->Setup(vector<int>{num_filters_}); } -void ConvolutionLayer::ComputeFeature(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); +void ConvolutionLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto data = Tensor3(&data_); auto col = Tensor2(&col_data_); auto weight = Tensor2(weight_->mutable_data()); @@ -124,15 +127,16 @@ void ConvolutionLayer::ComputeFeature(int flag, Metric* perf) { data += expr::broadcast<1>(bias, data.shape); } -void ConvolutionLayer::ComputeGradient(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); +void ConvolutionLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto col = Tensor2(&col_data_); auto weight = Tensor2(weight_->mutable_data()); auto grad = Tensor3(&grad_); auto gcol = Tensor2(&col_grad_); auto gweight = Tensor2(weight_->mutable_grad()); auto gbias = Tensor1(bias_->mutable_grad()); - Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this); + Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this); Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_)); if (gsrcblob != nullptr) gsrc.dptr = gsrcblob->mutable_cpu_data(); @@ -157,8 +161,9 @@ void ConvolutionLayer::ComputeGradient(int flag, Metric* perf) { } /******************* Implementation for CConvolutionLayer *********/ -void CConvolutionLayer::ComputeFeature(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); +void CConvolutionLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto data = Tensor3(&data_); auto col = Tensor2(&col_data_); auto weight = Tensor2(weight_->mutable_data()); @@ -172,8 +177,9 @@ void CConvolutionLayer::ComputeFeature(int flag, Metric* perf) { data += expr::broadcast<1>(bias, data.shape); } -void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); +void CConvolutionLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto col = Tensor2(&col_data_); auto weight = Tensor2(weight_->mutable_data()); @@ -182,7 +188,7 @@ void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) { auto gweight = Tensor2(weight_->mutable_grad()); auto gbias = Tensor1(bias_->mutable_grad()); gweight = 0.f; - Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this); + Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this); Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_)); if (gsrcblob != nullptr) gsrc.dptr = gsrcblob->mutable_cpu_data(); @@ -200,18 +206,19 @@ void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) { } /****************** Implementation for DropoutLayer ***********************/ -void DropoutLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - data_.ReshapeLike(srclayers_[0]->data(this)); - grad_.ReshapeLike(*srclayers_[0]->mutable_grad(this)); - mask_.Reshape(srclayers_[0]->data(this).shape()); - pdrop_ = proto.dropout_conf().dropout_ratio(); +void DropoutLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(*srclayers[0]->mutable_grad(this)); + mask_.Reshape(srclayers[0]->data(this).shape()); + pdrop_ = conf.dropout_conf().dropout_ratio(); } -void DropoutLayer::ComputeFeature(int flag, Metric* perf) { +void DropoutLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { // check training if ((flag & kTrain) != kTrain) { - data_.CopyFrom(srclayers_[0]->data(this)); + data_.CopyFrom(srclayers[0]->data(this)); return; } float pkeep = 1 - pdrop_; @@ -219,14 +226,14 @@ void DropoutLayer::ComputeFeature(int flag, Metric* perf) { mask = expr::F<op::threshold>(TSingleton<Random<cpu>>::Instance() \ ->uniform(mask.shape), pkeep) * (1.0f/pkeep); auto data = Tensor1(&data_); - auto src = Tensor1(srclayers_[0]->mutable_data(this)); + auto src = Tensor1(srclayers[0]->mutable_data(this)); data = src * mask; } -void DropoutLayer::ComputeGradient(int flag, Metric* perf) { +void DropoutLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto mask = Tensor1(&mask_); auto grad = Tensor1(&grad_); - auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); + auto gsrc = Tensor1(srclayers[0]->mutable_grad(this)); gsrc = grad * mask; } @@ -251,11 +258,10 @@ Blob<float>* RBMLayer::Sample(int flag) { return (flag & kPositive) == kPositive || first_gibbs_ ? &sample_ : &neg_sample_; } -void RBMLayer::Setup(const LayerProto& proto, int npartitions) { - CHECK_EQ(npartitions, 1); // TODO(wangwei) test for npartitions > 1 - Layer::Setup(proto, npartitions); - hdim_ = proto.rbm_conf().hdim(); - gaussian_ = proto.rbm_conf().gaussian(); +void RBMLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + hdim_ = conf.rbm_conf().hdim(); + gaussian_ = conf.rbm_conf().gaussian(); first_gibbs_ = true; } /**************** Implementation for RBMVisLayer********************/ @@ -264,32 +270,33 @@ RBMVisLayer::~RBMVisLayer() { delete bias_; } -void RBMVisLayer::Setup(const LayerProto& proto, int npartitions) { - RBMLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 2); +void RBMVisLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 2); + RBMLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 2); hid_layer_ = nullptr; - for (auto src : srclayers_) { - for (auto dst : src->srclayers()) { - if (dst->name() == name()) { - CHECK(hid_layer_ == nullptr); - hid_layer_ = static_cast<RBMHidLayer*>(src); - } + for (auto src : srclayers) { + if (typeid(*src) == typeid(RBMHidLayer)) { + // note the hid layer has may not been set up. + CHECK(hid_layer_ == nullptr); + hid_layer_ = dynamic_cast<RBMHidLayer*>(src); } } - input_layer_ = srclayers_[0] != hid_layer_ ? srclayers_[0]: srclayers_[1]; + input_layer_ = srclayers[0] != hid_layer_ ? srclayers[0]: srclayers[1]; const auto& src = input_layer_->data(this); batchsize_ = src.shape()[0]; data_.ReshapeLike(src); neg_data_.ReshapeLike(data_); neg_sample_.ReshapeLike(data_); vdim_ = src.count() / batchsize_; - weight_ = Param::Create(proto.param(0)); + weight_ = Param::Create(conf.param(0)); weight_ ->Setup(vector<int>{hdim_, vdim_}); - bias_ = Param::Create(proto.param(1)); + bias_ = Param::Create(conf.param(1)); bias_->Setup(vector<int>{vdim_}); } -void RBMVisLayer::ComputeFeature(int flag, Metric* perf) { +void RBMVisLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { if ((flag & kPositive) == kPositive) { data_.CopyFrom(input_layer_->data(this), true); first_gibbs_ = true; @@ -308,13 +315,13 @@ void RBMVisLayer::ComputeFeature(int flag, Metric* perf) { for (int i = 0; i < data_.count(); i++) { err += (dptr[i] - rcns[i]) * (dptr[i] - rcns[i]); } - perf->Add("Squared Error", err / batchsize_); + metric_.Add("Squared Error", err / batchsize_); } first_gibbs_ = false; } } -void RBMVisLayer::ComputeGradient(int flag, Metric* perf) { +void RBMVisLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto vis_pos = Tensor2(&data_); auto vis_neg = Tensor2(&neg_data_); auto hid_pos = Tensor2(hid_layer_->mutable_data(this)); @@ -336,25 +343,25 @@ RBMHidLayer::~RBMHidLayer() { delete bias_; } -void RBMHidLayer::Setup(const LayerProto& proto, - int npartitions) { - RBMLayer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - const auto& src_data = srclayers_[0]->data(this); +void RBMHidLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + RBMLayer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + const auto& src_data = srclayers[0]->data(this); batchsize_ = src_data.shape()[0]; vdim_ = src_data.count() / batchsize_; data_.Reshape(vector<int>{batchsize_, hdim_}); neg_data_.ReshapeLike(data_); sample_.ReshapeLike(data_); neg_sample_.ReshapeLike(data_); - weight_ = Param::Create(proto.param(0)); + weight_ = Param::Create(conf.param(0)); weight_->Setup(vector<int>{hdim_, vdim_}); - bias_ = Param::Create(proto.param(1)); + bias_ = Param::Create(conf.param(1)); bias_->Setup(vector<int>{hdim_}); - vis_layer_ = static_cast<RBMVisLayer*> (srclayers_[0]); + vis_layer_ = dynamic_cast<RBMVisLayer*> (srclayers[0]); } -void RBMHidLayer::ComputeFeature(int flag, Metric* perf) { +void RBMHidLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { auto weight = Tensor2(weight_->mutable_data()); auto bias = Tensor1(bias_->mutable_data()); @@ -376,7 +383,7 @@ void RBMHidLayer::ComputeFeature(int flag, Metric* perf) { data = expr::F<op::sigmoid>(data); } -void RBMHidLayer::ComputeGradient(int flag, Metric* perf) { +void RBMHidLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto hid_pos = Tensor2(&data_); auto hid_neg = Tensor2(&neg_data_); auto gbias = Tensor1(bias_->mutable_grad()); @@ -390,20 +397,21 @@ InnerProductLayer::~InnerProductLayer() { delete bias_; } -void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - const auto& src = srclayers_[0]->data(this); +void InnerProductLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + const auto& src = srclayers[0]->data(this); batchsize_ = src.shape()[0]; vdim_ = src.count() / batchsize_; - hdim_ = layer_proto_.innerproduct_conf().num_output(); - transpose_ = proto.innerproduct_conf().transpose(); + hdim_ = layer_conf_.innerproduct_conf().num_output(); + transpose_ = conf.innerproduct_conf().transpose(); if (partition_dim() > 0) - hdim_ /= npartitions; + hdim_ /= srclayers.at(0)->num_partitions(); data_.Reshape(vector<int>{batchsize_, hdim_}); grad_.ReshapeLike(data_); - weight_ = Param::Create(proto.param(0)); - bias_ = Param::Create(proto.param(1)); + weight_ = Param::Create(conf.param(0)); + bias_ = Param::Create(conf.param(1)); if (transpose_) weight_->Setup(vector<int>{vdim_, hdim_}); else @@ -411,9 +419,10 @@ void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) { bias_->Setup(vector<int>{hdim_}); } -void InnerProductLayer::ComputeFeature(int flag, Metric* perf) { +void InnerProductLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { auto data = Tensor2(&data_); - auto src = Tensor2(srclayers_[0]->mutable_data(this)); + auto src = Tensor2(srclayers[0]->mutable_data(this)); auto weight = Tensor2(weight_->mutable_data()); auto bias = Tensor1(bias_->mutable_data()); if (transpose_) @@ -424,8 +433,9 @@ void InnerProductLayer::ComputeFeature(int flag, Metric* perf) { data += expr::repmat(bias, batchsize_); } -void InnerProductLayer::ComputeGradient(int flag, Metric* perf) { - auto src = Tensor2(srclayers_[0]->mutable_data(this)); +void InnerProductLayer::ComputeGradient(int flag, + const vector<Layer*>& srclayers) { + auto src = Tensor2(srclayers[0]->mutable_data(this)); auto grad = Tensor2(&grad_); auto weight = Tensor2(weight_->mutable_data()); auto gweight = Tensor2(weight_->mutable_grad()); @@ -436,8 +446,8 @@ void InnerProductLayer::ComputeGradient(int flag, Metric* perf) { gweight = dot(src.T(), grad); else gweight = dot(grad.T(), src); - if (srclayers_[0]->mutable_grad(this) != nullptr) { - auto gsrc = Tensor2(srclayers_[0]->mutable_grad(this)); + if (srclayers[0]->mutable_grad(this) != nullptr) { + auto gsrc = Tensor2(srclayers[0]->mutable_grad(this)); if (transpose_) gsrc = dot(grad, weight.T()); else @@ -445,15 +455,15 @@ void InnerProductLayer::ComputeGradient(int flag, Metric* perf) { } } /***************** Implementation for LRNLayer *************************/ -void LRNLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - lsize_ = proto.lrn_conf().local_size(); +void LRNLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + lsize_ = conf.lrn_conf().local_size(); CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol"; - knorm_ = proto.lrn_conf().knorm(); - alpha_ = proto.lrn_conf().alpha(); - beta_ = proto.lrn_conf().beta(); - const vector<int>& s = srclayers_[0]->data(this).shape(); + knorm_ = conf.lrn_conf().knorm(); + alpha_ = conf.lrn_conf().alpha(); + beta_ = conf.lrn_conf().beta(); + const vector<int>& s = srclayers[0]->data(this).shape(); data_.Reshape(s); grad_.Reshape(s); norm_.Reshape(s); @@ -463,9 +473,9 @@ void LRNLayer::Setup(const LayerProto& proto, int npartitions) { width_ = s[3]; } -void LRNLayer::ComputeFeature(int flag, Metric* perf) { +void LRNLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { const float salpha = alpha_ / lsize_; - auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto data = Tensor4(&data_); auto norm = Tensor4(&norm_); // stores normalizer without power @@ -474,12 +484,12 @@ void LRNLayer::ComputeFeature(int flag, Metric* perf) { data = src * expr::F<op::power>(norm, -beta_); } -void LRNLayer::ComputeGradient(int flag, Metric* perf) { +void LRNLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { const float salpha = alpha_ / lsize_; - auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto norm = Tensor4(&norm_); auto grad = Tensor4(&grad_); - auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this)); + auto gsrc = Tensor4(srclayers[0]->mutable_grad(this)); gsrc = grad * expr::F<op::power>(norm, -beta_); gsrc += (- 2.0f * beta_ * salpha) * expr::chpool<red::sum>( @@ -487,18 +497,19 @@ void LRNLayer::ComputeGradient(int flag, Metric* perf) { } /******************** Implementation for PoolingLayer******************/ -void PoolingLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - PoolingProto pool_conf = proto.pooling_conf(); +void PoolingLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + CHECK_EQ(srclayers.size(), 1); + PoolingProto pool_conf = conf.pooling_conf(); kernel_ = pool_conf.kernel(); stride_ = pool_conf.stride(); CHECK_LT(pad_, kernel_); - pool_ = proto.pooling_conf().pool(); + pool_ = conf.pooling_conf().pool(); CHECK(pool_ == PoolingProto_PoolMethod_AVG || pool_ == PoolingProto_PoolMethod_MAX) << "Padding implemented only for average and max pooling."; - const auto& srcshape = srclayers_[0]->data(this).shape(); + const auto& srcshape = srclayers[0]->data(this).shape(); int dim = srcshape.size(); CHECK_GT(dim, 2); width_ = srcshape[dim - 1]; @@ -515,8 +526,8 @@ void PoolingLayer::Setup(const LayerProto& proto, int npartitions) { grad_.ReshapeLike(data_); } -void PoolingLayer::ComputeFeature(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); +void PoolingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); auto data = Tensor4(&data_); if (pool_ == PoolingProto_PoolMethod_MAX) data = expr::pool<red::maximum>(src, kernel_, stride_); @@ -529,9 +540,9 @@ void PoolingLayer::ComputeFeature(int flag, Metric* perf) { * partition only on num/channel dim * assume grad and data have the same paritition */ -void PoolingLayer::ComputeGradient(int flag, Metric* perf) { - auto src = Tensor4(srclayers_[0]->mutable_data(this)); - auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this)); +void PoolingLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { + auto src = Tensor4(srclayers[0]->mutable_data(this)); + auto gsrc = Tensor4(srclayers[0]->mutable_grad(this)); auto data = Tensor4(&data_); auto grad = Tensor4(&grad_); if (pool_ == PoolingProto_PoolMethod_MAX) @@ -543,101 +554,99 @@ void PoolingLayer::ComputeGradient(int flag, Metric* perf) { /***************** Implementation of CPoolingLayer ***************/ -void CPoolingLayer::Setup(const LayerProto& proto, int npartitions) { - PoolingLayer::Setup(proto, npartitions); +void CPoolingLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + PoolingLayer::Setup(conf, srclayers); if (pool_ == PoolingProto_PoolMethod_MAX) mask_.ReshapeLike(data_); } -void CPoolingLayer::ComputeFeature(int flag, Metric* perf) { +void CPoolingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { if (pool_ == PoolingProto_PoolMethod_MAX) - ForwardMaxPooling(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), + ForwardMaxPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(), batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_, stride_, stride_, data_.mutable_cpu_data(), mask_.mutable_cpu_data()); else if (pool_ == PoolingProto_PoolMethod_AVG) - ForwardAvgPooling(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), + ForwardAvgPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(), batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_, stride_, stride_, data_.mutable_cpu_data()); else LOG(FATAL) << "unknow pooling method"; } -void CPoolingLayer::ComputeGradient(int flag, Metric* perf) { +void CPoolingLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { if (pool_ == PoolingProto_PoolMethod_MAX) BackwardMaxPooling(grad_.cpu_data(), mask_.cpu_data(), batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_, - stride_, stride_,srclayers_[0]->mutable_grad(this)->mutable_cpu_data()); + stride_, stride_, srclayers[0]->mutable_grad(this)->mutable_cpu_data()); else if (pool_ == PoolingProto_PoolMethod_AVG) BackwardAvgPooling(grad_.cpu_data(), batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_, - stride_, stride_,srclayers_[0]->mutable_grad(this)->mutable_cpu_data()); + stride_, stride_, srclayers[0]->mutable_grad(this)->mutable_cpu_data()); else LOG(FATAL) << "unknow pooling method"; } /***************** Implementation for ReLULayer *****************************/ -void ReLULayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - data_.ReshapeLike(srclayers_[0]->data(this)); - grad_.ReshapeLike(*(srclayers_[0]->mutable_grad(this))); +void ReLULayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(*(srclayers[0]->mutable_grad(this))); } -void ReLULayer::ComputeFeature(int flag, Metric* perf) { +void ReLULayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); - auto src = Tensor1(srclayers_[0]->mutable_data(this)); + auto src = Tensor1(srclayers[0]->mutable_data(this)); data = expr::F<op::relu>(src); } -void ReLULayer::ComputeGradient(int flag, Metric* perf) { +void ReLULayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); auto grad = Tensor1(&grad_); - auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); + auto gsrc = Tensor1(srclayers[0]->mutable_grad(this)); gsrc = expr::F<op::relu_grad>(data)*grad; } /*******************Implementation of SigmoidLayer***************************/ -void SigmoidLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - data_.ReshapeLike(srclayers_[0]->data(this)); - grad_.ReshapeLike(srclayers_[0]->grad(this)); +void SigmoidLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(srclayers[0]->grad(this)); } -void SigmoidLayer::ComputeFeature(int flag, Metric* perf) { +void SigmoidLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); - auto src = Tensor1(srclayers_[0]->mutable_data(this)); + auto src = Tensor1(srclayers[0]->mutable_data(this)); data = expr::F<op::sigmoid>(src); } -void SigmoidLayer::ComputeGradient(int flag, Metric* perf) { +void SigmoidLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); auto grad = Tensor1(&grad_); - auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); + auto gsrc = Tensor1(srclayers[0]->mutable_grad(this)); gsrc = expr::F<op::sigmoid_grad>(data) * grad; } /*******************Implementation of TanLayer***************************/ -void STanhLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - data_.ReshapeLike(srclayers_[0]->data(this)); - grad_.ReshapeLike(srclayers_[0]->grad(this)); +void STanhLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + Layer::Setup(conf, srclayers); + data_.ReshapeLike(srclayers[0]->data(this)); + grad_.ReshapeLike(srclayers[0]->grad(this)); } -void STanhLayer::ComputeFeature(int flag, Metric* perf) { +void STanhLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); - auto src = Tensor1(srclayers_[0]->mutable_data(this)); + auto src = Tensor1(srclayers[0]->mutable_data(this)); data = expr::F<op::stanh>(src); } -void STanhLayer::ComputeGradient(int flag, Metric* perf) { +void STanhLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { auto data = Tensor1(&data_); auto grad = Tensor1(&grad_); - auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); + auto gsrc = Tensor1(srclayers[0]->mutable_grad(this)); gsrc = expr::F<op::stanh_grad>(data) * grad; } -/********* Implementation for BridgeDstLayer **************/ -void BridgeDstLayer::Setup(const LayerProto& proto, int npartitions) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - data_.Reshape(srclayers_[0]->data(this).shape()); - grad_.ReshapeLike(data_); -} + } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/proto/job.proto ---------------------------------------------------------------------- diff --git a/src/proto/job.proto b/src/proto/job.proto index dc202d9..950f785 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -59,9 +59,9 @@ message JobProto { // TODO(wangwei): set -1 for test forever optional int32 test_steps = 21 [default = 0]; // frequency of validation, e.g., do validation every 100 training steps - optional int32 valid_freq = 25 [default = 0]; + optional int32 validate_freq = 25 [default = 0]; // total num of steps for validating all validation data - optional int32 valid_steps = 26 [default = 0]; + optional int32 validate_steps = 26 [default = 0]; // frequency of checkpoint optional int32 checkpoint_freq = 30 [default = 0]; @@ -83,7 +83,7 @@ message JobProto { // start test after this num steps optional int32 test_after = 82 [default = 0]; // start validation after this num steps - optional int32 valid_after = 83 [default = 0]; + optional int32 validate_after = 83 [default = 0]; // for internal use // users typically do not touch following fields @@ -224,6 +224,8 @@ message LayerProto { optional int32 partition_dim = 60 [default = -1]; // names of parameters shared from other layers optional int32 partition_id = 90 [default = 0]; + // num of partitions for this layer + optional int32 num_partitions = 91 [default = 1]; extensions 101 to 200; } @@ -571,7 +573,7 @@ enum PartitionType { enum Phase { kUnknown = 0; kTrain = 1; - kValidation = 2; + kVal = 2; kTest= 4; // postivie phase for contrastive divergence algorithm kPositive = 8; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/server.cc ---------------------------------------------------------------------- diff --git a/src/server.cc b/src/server.cc new file mode 100644 index 0000000..3e0f4cb --- /dev/null +++ b/src/server.cc @@ -0,0 +1,269 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "./server.h" + +#include <thread> +#include <chrono> +#include "mshadow/tensor.h" +#include "proto/common.pb.h" +#include "utils/param.h" +#include "utils/singleton.h" +#include "utils/factory.h" +#include "utils/cluster.h" + +namespace singa { + +using namespace mshadow; +using std::vector; + +Server::Server(int group_id, int server_id, + const JobProto& job_conf, + const vector<int>& slice2group, + const vector<int>& slice2server) { + grp_id_ = group_id; + id_ = server_id; + updater_ = Updater::Create(job_conf.updater()); + slice2group_ = slice2group; + slice2server_ = slice2server; +} + +Server::~Server() { + delete updater_; + // free Params (i.e., slices) in server shard + for (auto entry : shard_) + for (auto param : entry.second->shares) + delete param; +} + +void Stop(void* running) { + *static_cast<bool *>(running) = false; +} + +void Server::Run() { + LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start"; + auto cluster = Cluster::Get(); + if (cluster->nserver_groups()) { + CHECK_GT(slice2group_.size(), 0); + if (cluster->nservers_per_group()) { + CHECK_GT(slice2server_.size(), 0); + } + } + n_updates_.resize(slice2group_.size(), 0); + n_pending_sync_.resize(slice2group_.size(), 0); + last_sync_.resize(slice2group_.size()); + + // TODO(wangsh): give each dealer a unique id + auto dealer = new Dealer(0); + CHECK(dealer->Connect(kInprocRouterEndpoint)); + Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); + ping->set_type(kConnect); + dealer->Send(&ping); + + bool running = true; + CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running)); + Poller poll(dealer); + // start recv loop and process requests + while (running) { + // must use poller here; otherwise Receive() gets stuck after workers stop. + auto* sock = poll.Wait(cluster->poll_time()); + if (poll.Terminated()) { + LOG(ERROR) << "Connection broken!"; + exit(0); + } else if (sock == nullptr) { + continue; + } + Msg* msg = dealer->Receive(); + if (msg == nullptr) break; // interrupted + Msg* response = nullptr; + int type = msg->type(); + int slice_id = SliceID(msg->trgt_val()); + if (type == kPut) { + response = HandlePut(&msg); + } else if (shard_.find(slice_id) == shard_.end()) { + // TODO(wangsh): buffer the msg instead, and process it after the + // corresponding put request is done + // delay the processing by re-queue the msg. May sleep for a while? + response = msg; + } else { + switch (type) { + case kGet: + response = HandleGet(&msg); + break; + case kUpdate: + for (auto reply : HandleUpdate(&msg)) + dealer->Send(&reply); + break; + case kSyncRequest: + response = HandleSyncRequest(&msg); + break; + case kSyncResponse: + HandleSyncResponse(&msg); + break; + default: + LOG(ERROR) << "Unknown message type: " << type; + break; + } + } + if (response != nullptr) + dealer->Send(&response); + } + + // send stop msg to stub + Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); + msg->set_type(kStop); + dealer->Send(&msg); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops"; + delete dealer; +} + +Msg* Server::HandlePut(Msg **msg) { + int version = (*msg)->trgt_version(); + int slice_id = SliceID((*msg)->trgt_val()); + if (shard_.find(slice_id) != shard_.end()) + LOG(FATAL) << "Param (" << slice_id << ") is put more than once"; + + // TODO(wangwei) replace hard coded param type 0 + auto param = Singleton<Factory<Param>>::Instance()->Create(0); + auto response = param->HandlePutMsg(msg, true); + // parse num of shares of this param from a worker group + int num_shares = 1; + if ((*msg)->NextFrame()) + (*msg)->ParseFormatFrame("i", &num_shares); + DeleteMsg(msg); + shard_[slice_id] = new ParamEntry(num_shares, param); + // must set version after HandlePutMsg which allocates the memory + param->set_version(version); + param->set_local_version(version); + param->set_id(slice_id); + // allocate blob for param sync between groups. + if (slice2group_[slice_id] != grp_id_) { + last_sync_[slice_id].ReshapeLike(param->data()); + last_sync_[slice_id].CopyFrom(param->data()); + } + LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_ + <<") put slice=" << slice_id << " size=" << param->size(); + return response; +} + +Msg* Server::HandleGet(Msg **msg) { + int val = (*msg)->trgt_val(); + auto param = shard_.at(SliceID(val))->shares.at(0); + // re-queue the request if the param is not updated to the required version + if (param->version() < (*msg)->trgt_version()) { + return *msg; + } else { + // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first(); + auto reply = param->HandleGetMsg(msg, false); + reply->set_trgt(val, param->version()); + return reply; + } +} + +const vector<Msg*> Server::HandleUpdate(Msg **msg) { + vector<Msg*> ret; + int sliceid = SliceID((*msg)->trgt_val()); + auto entry = shard_.at(sliceid); + buffer_requests_[sliceid].push_back(*msg); + int num_update; + (*msg)->LastFrame(); + (*msg)->ParseFormatFrame("i", &num_update); + (*msg)->FirstFrame(); + entry->num_update += num_update; + // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src()) + // << ", " << num_update << " total " << entry->num_total; + // do update until recv gradients from all shares of this param/slice + if (entry->num_update >= entry->num_total) { + CHECK_EQ(entry->num_update, entry->num_total); + auto& request = buffer_requests_.at(sliceid); + int step = (*msg)->trgt_version(); + int trgt_val = (*msg)->trgt_val(); + auto param = entry->shares.at(0); + // extract and aggregate gradients + param->ParseUpdateMsgs(request); + updater_->Update(step, param, 1.0f / entry->num_total); + param->set_local_version(param->local_version() + 1); + // response to all shares of this param + for (auto response : param->GenUpdateResponseMsgs(&request, false)) { + response->set_trgt(trgt_val, param->local_version()); + ret.push_back(response); + } + entry->num_update = 0; + n_updates_[sliceid]++; + // sync with master group after at least sync_freq local updates + // the last check is to avoid sending msg to stopped servers + if (slice2group_[sliceid] != grp_id_ + && n_updates_[sliceid] >= Cluster::Get()->sync_freq() + && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) { + auto shape = Shape1(param->size()); + Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape); + Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); + tmp = cur - tmp; + int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer); + Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr); + sync->set_type(kSyncRequest); + sync->set_trgt(trgt_val, param->local_version()); + sync->AddFrame(tmp.dptr, param->size() * sizeof(float)); + Copy(tmp, cur); + ret.push_back(sync); + n_updates_[sliceid] = 0; + n_pending_sync_[sliceid]++; + } + } + // message already pushed to buffer, just need to reset the pointer + *msg = nullptr; + return ret; +} + +Msg* Server::HandleSyncRequest(Msg **msg) { + Msg* msgg = *msg; + int slice = SliceID(msgg->trgt_val()); + auto param = shard_.at(slice)->shares.at(0); + auto shape = Shape1(param->size()); + CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float)); + Tensor<cpu, 1> inc(static_cast<float*>(msgg->FrameData()), shape); + Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); + // recv sync msg on the slice I am maintaining + cur += inc; + msgg->SwapAddr(); + msgg->set_type(kSyncResponse); + // copy the fresh param value into the response msg + Copy(inc, cur); + return msgg; +} + +// recv sync msg on slice mastered by others +void Server::HandleSyncResponse(Msg **msg) { + Msg* msgg = *msg; + int slice = SliceID(msgg->trgt_val()); + auto param = shard_.at(slice)->shares.at(0); + auto shape = Shape1(param->size()); + Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape); + Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); + Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape); + cur += master - prev; // cur = master + (cur - prev); + Copy(prev, cur); + DeleteMsg(msg); + n_pending_sync_[slice]--; +} + +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/stub.cc ---------------------------------------------------------------------- diff --git a/src/stub.cc b/src/stub.cc new file mode 100644 index 0000000..7b439e5 --- /dev/null +++ b/src/stub.cc @@ -0,0 +1,285 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "./stub.h" + +#include <glog/logging.h> +#include <unistd.h> +#include <map> +#include <thread> +#include <set> +#include "mshadow/tensor.h" +#include "proto/common.pb.h" +#include "utils/cluster.h" +#include "utils/common.h" +#include "utils/tinydir.h" + +namespace singa { + +using std::vector; +using std::string; + +/***********************Stub****************************/ +Stub::~Stub() { + delete router_; +} +void Stub::Setup() { + router_ = new Router(); + router_->Bind(kInprocRouterEndpoint); + auto cluster = Cluster::Get(); + const string hostip = cluster->hostip(); + int port = router_->Bind("tcp://" + hostip + ":*"); + endpoint_ = hostip + ":" + std::to_string(port); +} +/** + * Get a hash id for a Param object from a group. + * + * Simple multiple group_id with a large prime number 997 (assuming there are + * no more than 997 worker groups) and plus owner param id. + */ +inline int Hash(int grp_id, int param_id) { + return grp_id * 997 + param_id; +} +const std::unordered_map<int, ParamEntry*> CreateParamShard( + const vector<Worker*>& workers) { + std::unordered_map<int, ParamEntry*> shard; + // grp id -> net + std::unordered_map<int, NeuralNet*> grp2net; + // grp id -> worker id range + std::unordered_map<int, std::pair<int, int>> grp2workers; + for (auto worker : workers) { + int grp = worker->grp_id(), id = worker->id(); + if (grp2net.find(grp) == grp2net.end()) { + grp2net[grp] = worker->train_net(); + grp2workers[grp] = std::make_pair(id, id + 1); + } else { + CHECK_EQ(grp2net[grp], worker->train_net()); + int start = grp2workers[grp].first, end = grp2workers[grp].second; + if (start > id) start = id; + if (end < id + 1) end = id + 1; + grp2workers[grp] = std::make_pair(start, end); + } + } + + for (const auto entry : grp2net) { + int grp = entry.first; + int wstart = grp2workers[grp].first, wend = grp2workers[grp].second; + for (auto layer : entry.second->layers()) { + int partition = layer->partition_id(); + bool local = partition >= wstart && partition < wend; + for (auto param : layer->GetParams()) { + int hash = Hash(grp, param->owner()); + if (shard.find(hash) == shard.end()) + shard[hash] = new ParamEntry(); + shard[hash]->AddParam(local, param); + } + } + } + return shard; +} + +void Stub::Run(const vector<int>& slice2server, + const vector<Worker*>& workers, const vector<Server*>& servers) { + slice2server_ = slice2server; + int nworkers = workers.size(), nservers = servers.size(); + auto cluster = Cluster::Get(); + int procs_id = cluster->procs_id(); + LOG(INFO) << "Stub in process " << procs_id << " starts"; + auto shard = CreateParamShard(workers); + std::map<int, Dealer*> inter_dealers; // for sending msg to other procs + std::queue<Msg*> msg_queue; + while (true) { + Msg* msg = nullptr; + if (msg_queue.empty()) { + msg = router_->Receive(); + } else { + msg = msg_queue.front(); + msg_queue.pop(); + } + int type = msg->type(), dst = msg->dst(), flag = AddrType(dst); + if (flag == kStub && (AddrProc(dst) == procs_id || AddrGrp(dst) == -1)) { + // the following statements are ordered! + if (type == kConnect) { + DeleteMsg(&msg); + } else if (type == kStop) { + int src_flag = AddrType(msg->src()); + if (src_flag == kServer) nservers--; + else if (src_flag == kWorkerParam) nworkers--; + DeleteMsg(&msg); + if (nworkers == 0 && nservers == 0) break; + } else { + int grp; + int paramid = ParamID(msg->trgt_val()); + ParamEntry *entry = nullptr; + switch (type) { + case kUpdate: + grp = AddrGrp(msg->src()); + entry = shard.at(Hash(grp, paramid)); + for (auto update_msg : HandleUpdateRequest(entry, &msg)) + msg_queue.push(update_msg); + break; + case kRUpdate: + grp = AddrGrp(msg->dst()); + entry = shard.at(Hash(grp, paramid)); + HandleUpdateResponse(entry, &msg); + break; + case kGet: + grp = AddrGrp(msg->src()); + entry = shard.at(Hash(grp, paramid)); + for (auto get_msg : HandleGetRequest(entry, &msg)) + msg_queue.push(get_msg); + break; + case kRGet: + grp = AddrGrp(msg->dst()); + entry = shard.at(Hash(grp, paramid)); + HandleGetResponse(entry, &msg); + break; + case kPut: + grp = AddrGrp(msg->src()); + entry = shard.at(Hash(grp, paramid)); + for (auto put_msg : HandlePutRequest(entry, &msg)) + msg_queue.push(put_msg); + break; + default: + LOG(ERROR) << "Unknow message type:" << type; + break; + } + } + } else { + int dst_procs = AddrProc(dst); + if (flag != kStub) + dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag); + if (dst_procs != procs_id) { + if (inter_dealers.find(dst_procs) == inter_dealers.end()) + inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs); + inter_dealers[dst_procs]->Send(&msg); + } else { + router_->Send(&msg); + } + } + } + LOG(ERROR) << "Stub in process " << procs_id << " stops"; + for (auto& entry : inter_dealers) + delete entry.second; +} + +Dealer* Stub::CreateInterProcsDealer(int dst_procs) { + // forward to other procs + auto cluster = Cluster::Get(); + auto dealer = new Dealer(); + while (cluster->endpoint(dst_procs) == "") { + // kCollectSleepTime)); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + LOG(ERROR) << "waiting for procs " << dst_procs << " to register"; + } + dealer->Connect("tcp://"+cluster->endpoint(dst_procs)); + return dealer; +} + +void Stub::GenMsgs(int type, int version, ParamEntry* entry, Msg* msg, + vector<Msg*> *ret) { + int procs_id = Cluster::Get()->procs_id(); + int src_grp = AddrGrp(msg->src()); + int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group(); + auto param = entry->shares.at(0); + for (int idx = 0 ; idx < param->num_slices(); idx++) { + int slice_id = param->slice_start() + idx; + int server = slice2server_[slice_id]; + int dst_procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer); + Msg* new_msg = nullptr; + if (type == kPut) { + CHECK_GT(entry->num_total, 0); + new_msg = param->GenPutMsg(dst_procs != procs_id, idx); + new_msg->AddFormatFrame("i", entry->num_total); + } else if (type == kGet) { + new_msg = param->GenGetMsg(dst_procs != procs_id, idx); + } else if (type == kUpdate) { + new_msg = param->GenUpdateMsg(dst_procs != procs_id, idx); + new_msg->AddFormatFrame("i", entry->num_local); + } else { + LOG(FATAL) << "Wrong type"; + } + new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version); + new_msg->set_src(Addr(src_grp, procs_id, kStub)); + new_msg->set_dst(Addr(dst_grp, server, kServer)); + ret->push_back(new_msg); + } +} + +const vector<Msg*> Stub::HandleGetRequest(ParamEntry* entry, Msg** msg) { + vector<Msg*> ret; + int version = (*msg)->trgt_version(); + if (version > entry->next_version) { + entry->next_version = version; + GenMsgs(kGet, version, entry, *msg, &ret); + } + DeleteMsg(msg); + return ret; +} + +const vector<Msg*> Stub::HandleUpdateRequest(ParamEntry *entry, Msg** msg) { + vector<Msg*> ret; + entry->num_update++; + if (entry->num_update >= entry->num_local) { + // average local gradient + if (entry->num_local > 1) { + auto it = entry->shares.begin(); + auto shape = mshadow::Shape1((*it)->size()); + mshadow::Tensor<mshadow::cpu, 1> sum((*it)->mutable_cpu_grad(), shape); + for (++it; it != entry->shares.end(); it++) { + mshadow::Tensor<mshadow::cpu, 1> grad((*it)->mutable_cpu_grad(), shape); + sum += grad; + } + } + int step = (*msg)->trgt_version(); + GenMsgs(kUpdate, step, entry, *msg, &ret); + entry->num_update = 0; + } + DeleteMsg(msg); + return ret; +} + +const vector<Msg*> Stub::HandlePutRequest(ParamEntry* entry, Msg** msg) { + vector<Msg*> ret; + int version = (*msg)->trgt_version(); + GenMsgs(kPut, version, entry, *msg, &ret); + DeleteMsg(msg); + return ret; +} + +void Stub::HandleGetResponse(ParamEntry* entry, Msg** msg) { + int version = (*msg)->trgt_version(); + int sliceid = SliceID((*msg)->trgt_val()); + auto param = entry->shares.at(0); + if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start())) + param->set_version(version); + DeleteMsg(msg); +} + +void Stub::HandleUpdateResponse(ParamEntry* entry, Msg** msg) { + int version = (*msg)->trgt_version(); + int sliceid = SliceID((*msg)->trgt_val()); + auto param = entry->shares.at(0); + if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start())) + param->set_version(version); + DeleteMsg(msg); +} +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc deleted file mode 100644 index 5e74c1b..0000000 --- a/src/trainer/server.cc +++ /dev/null @@ -1,263 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "trainer/server.h" - -#include <thread> -#include <chrono> -#include "mshadow/tensor.h" -#include "proto/common.pb.h" -#include "utils/param.h" -#include "utils/singleton.h" -#include "utils/factory.h" -#include "utils/cluster.h" - -namespace singa { - -using namespace mshadow; -using std::vector; - -Server::Server(int group_id, int server_id) { - grp_id_ = group_id; - id_ = server_id; -} - -void Server::Setup(const UpdaterProto& proto, const vector<int>& slice2group, - const vector<int>& slice2server) { - updater_ = Updater::Create(proto); - slice2group_ = slice2group; - slice2server_ = slice2server; - n_updates_.resize(slice2group_.size(), 0); - n_pending_sync_.resize(slice2group_.size(), 0); - last_sync_.resize(slice2group_.size()); -} - -Server::~Server() { - delete updater_; - // free Params (i.e., slices) in server shard - for (auto entry : shard_) - for (auto param : entry.second->shares) - delete param; -} - -void Stop(void* running) { - *static_cast<bool *>(running) = false; -} - -void Server::Run() { - LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start"; - // TODO(wangsh): give each dealer a unique id - auto dealer = new Dealer(0); - CHECK(dealer->Connect(kInprocRouterEndpoint)); - Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); - ping->set_type(kConnect); - dealer->Send(&ping); - - auto cluster = Cluster::Get(); - bool running = true; - CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running)); - Poller poll(dealer); - // start recv loop and process requests - while (running) { - // must use poller here; otherwise Receive() gets stuck after workers stop. - auto* sock = poll.Wait(cluster->poll_time()); - if (poll.Terminated()) { - LOG(ERROR) << "Connection broken!"; - exit(0); - } else if (sock == nullptr) { - continue; - } - Msg* msg = dealer->Receive(); - if (msg == nullptr) break; // interrupted - Msg* response = nullptr; - int type = msg->type(); - int slice_id = SliceID(msg->trgt_val()); - if (type == kPut) { - response = HandlePut(&msg); - } else if (shard_.find(slice_id) == shard_.end()) { - // TODO(wangsh): buffer the msg instead, and process it after the - // corresponding put request is done - // delay the processing by re-queue the msg. May sleep for a while? - response = msg; - } else { - switch (type) { - case kGet: - response = HandleGet(&msg); - break; - case kUpdate: - for (auto reply : HandleUpdate(&msg)) - dealer->Send(&reply); - break; - case kSyncRequest: - response = HandleSyncRequest(&msg); - break; - case kSyncResponse: - HandleSyncResponse(&msg); - break; - default: - LOG(ERROR) << "Unknown message type: " << type; - break; - } - } - if (response != nullptr) - dealer->Send(&response); - } - - // send stop msg to stub - Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); - msg->set_type(kStop); - dealer->Send(&msg); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops"; - delete dealer; -} - -Msg* Server::HandlePut(Msg **msg) { - int version = (*msg)->trgt_version(); - int slice_id = SliceID((*msg)->trgt_val()); - if (shard_.find(slice_id) != shard_.end()) - LOG(FATAL) << "Param (" << slice_id << ") is put more than once"; - - // TODO(wangwei) replace hard coded param type 0 - auto param = Singleton<Factory<Param>>::Instance()->Create(0); - auto response = param->HandlePutMsg(msg, true); - // parse num of shares of this param from a worker group - int num_shares = 1; - if ((*msg)->NextFrame()) - (*msg)->ParseFormatFrame("i", &num_shares); - DeleteMsg(msg); - shard_[slice_id] = new ParamEntry(num_shares, param); - // must set version after HandlePutMsg which allocates the memory - param->set_version(version); - param->set_local_version(version); - param->set_id(slice_id); - // allocate blob for param sync between groups. - if (slice2group_[slice_id] != grp_id_) { - last_sync_[slice_id].ReshapeLike(param->data()); - last_sync_[slice_id].CopyFrom(param->data()); - } - LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_ - <<") put slice=" << slice_id << " size=" << param->size(); - return response; -} - -Msg* Server::HandleGet(Msg **msg) { - int val = (*msg)->trgt_val(); - auto param = shard_.at(SliceID(val))->shares.at(0); - // re-queue the request if the param is not updated to the required version - if (param->version() < (*msg)->trgt_version()) { - return *msg; - } else { - // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first(); - auto reply = param->HandleGetMsg(msg, false); - reply->set_trgt(val, param->version()); - return reply; - } -} - -const vector<Msg*> Server::HandleUpdate(Msg **msg) { - vector<Msg*> ret; - int sliceid = SliceID((*msg)->trgt_val()); - auto entry = shard_.at(sliceid); - buffer_requests_[sliceid].push_back(*msg); - int num_update; - (*msg)->LastFrame(); - (*msg)->ParseFormatFrame("i", &num_update); - (*msg)->FirstFrame(); - entry->num_update += num_update; - // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src()) - // << ", " << num_update << " total " << entry->num_total; - // do update until recv gradients from all shares of this param/slice - if (entry->num_update >= entry->num_total) { - CHECK_EQ(entry->num_update, entry->num_total); - auto& request = buffer_requests_.at(sliceid); - int step = (*msg)->trgt_version(); - int trgt_val = (*msg)->trgt_val(); - auto param = entry->shares.at(0); - // extract and aggregate gradients - param->ParseUpdateMsgs(request); - updater_->Update(step, param, 1.0f / entry->num_total); - param->set_local_version(param->local_version() + 1); - // response to all shares of this param - for (auto response : param->GenUpdateResponseMsgs(&request, false)) { - response->set_trgt(trgt_val, param->local_version()); - ret.push_back(response); - } - entry->num_update = 0; - n_updates_[sliceid]++; - // sync with master group after at least sync_freq local updates - // the last check is to avoid sending msg to stopped servers - if (slice2group_[sliceid] != grp_id_ - && n_updates_[sliceid] >= Cluster::Get()->sync_freq() - && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) { - auto shape = Shape1(param->size()); - Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - tmp = cur - tmp; - int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer); - Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr); - sync->set_type(kSyncRequest); - sync->set_trgt(trgt_val, param->local_version()); - sync->AddFrame(tmp.dptr, param->size() * sizeof(float)); - Copy(tmp, cur); - ret.push_back(sync); - n_updates_[sliceid] = 0; - n_pending_sync_[sliceid]++; - } - } - // message already pushed to buffer, just need to reset the pointer - *msg = nullptr; - return ret; -} - -Msg* Server::HandleSyncRequest(Msg **msg) { - Msg* msgg = *msg; - int slice = SliceID(msgg->trgt_val()); - auto param = shard_.at(slice)->shares.at(0); - auto shape = Shape1(param->size()); - CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float)); - Tensor<cpu, 1> inc(static_cast<float*>(msgg->FrameData()), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - // recv sync msg on the slice I am maintaining - cur += inc; - msgg->SwapAddr(); - msgg->set_type(kSyncResponse); - // copy the fresh param value into the response msg - Copy(inc, cur); - return msgg; -} - -// recv sync msg on slice mastered by others -void Server::HandleSyncResponse(Msg **msg) { - Msg* msgg = *msg; - int slice = SliceID(msgg->trgt_val()); - auto param = shard_.at(slice)->shares.at(0); - auto shape = Shape1(param->size()); - Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape); - cur += master - prev; // cur = master + (cur - prev); - Copy(prev, cur); - DeleteMsg(msg); - n_pending_sync_[slice]--; -} - -} // namespace singa
