http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/neuralnet/layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc index db13824..b40b676 100644 --- a/src/neuralnet/layer.cc +++ b/src/neuralnet/layer.cc @@ -13,18 +13,47 @@ using namespace mshadow; using namespace mshadow::expr; namespace singa { +inline Tensor<cpu, 4> Tensor4(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<cpu, 4> tensor(blob->mutable_cpu_data(), + Shape4(shape[0], shape[1], shape[2], shape[3])); + return tensor; +} + +inline Tensor<cpu, 3> Tensor3(Blob<float>* blob){ + const vector<int>& shape = blob->shape(); + Tensor<cpu, 3> tensor(blob->mutable_cpu_data(), + Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); + return tensor; +} +inline Tensor<cpu, 2> Tensor2(Blob<float>* blob){ + const vector<int>& shape = blob->shape(); + Tensor<cpu, 2> tensor(blob->mutable_cpu_data(), + Shape2(shape[0], blob->count() / shape[0])); + return tensor; +} +inline Tensor<cpu, 1> Tensor1(Blob<float>* blob){ + Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count())); + return tensor; +} /************ Implementation for ConvProductLayer*************************/ -void ConvolutionLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); +ConvolutionLayer::~ConvolutionLayer() { + delete weight_; + delete bias_; +} +void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); ConvolutionProto conv_conf=proto.convolution_conf(); kernel_=conv_conf.kernel(); CHECK_GT(kernel_, 0) << "Filter size cannot be zero."; pad_=conv_conf.pad(); stride_=conv_conf.stride(); num_filters_=conv_conf.num_filters(); - const vector<int>& srcshape=srclayers[0]->data(this).shape(); + if(partition_dim() > 0) + num_filters_ /= npartitions; + + const vector<int>& srcshape=srclayers_[0]->data(this).shape(); int dim=srcshape.size(); CHECK_GT(dim, 2); width_=srcshape[dim-1]; @@ -45,32 +74,18 @@ void ConvolutionLayer::Setup(const LayerProto& proto, col_grad_.Reshape(vector<int>{col_height_, col_width_}); Factory<Param>* factory=Singleton<Factory<Param>>::Instance(); - weight_=shared_ptr<Param>(factory->Create("Param")); + weight_ = factory->Create("Param"); weight_->Setup(proto.param(0), vector<int>{num_filters_, col_height_}); - bias_=shared_ptr<Param>(factory->Create("Param")); + bias_ = factory->Create("Param"); bias_->Setup(proto.param(1), vector<int>{num_filters_}); } -void ConvolutionLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - LayerProto newproto(proto); - ConvolutionProto *conv_conf=newproto.mutable_convolution_conf(); - conv_conf->set_num_filters(shape[1]); - Setup(newproto, srclayers); -} - -void ConvolutionLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape4(batchsize_, channels_, height_, width_)); - Tensor<cpu, 3> data(data_.mutable_cpu_data(), - Shape3(batchsize_, num_filters_, conv_height_* conv_width_)); - Tensor<cpu, 2> col(col_data_.mutable_cpu_data(), - Shape2(col_height_, col_width_)); - Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), - Shape2(num_filters_, col_height_)); - Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), - Shape1(num_filters_)); +void ConvolutionLayer::ComputeFeature(Phase phase, Metric* perf){ + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto data = Tensor3(&data_); + auto col = Tensor2(&col_data_); + auto weight = Tensor2(weight_->mutable_data()); + auto bias = Tensor1(bias_->mutable_data()); for(int n=0;n<batchsize_;n++){ if(pad_>0) @@ -82,144 +97,126 @@ void ConvolutionLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclaye data+=broadcast<1>(bias, data.shape); } -void ConvolutionLayer::ComputeGradient(const vector<SLayer>& srclayers) { - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape4(batchsize_, channels_, height_, width_)); - Tensor<cpu, 2> col(col_data_.mutable_cpu_data(), - Shape2(col_height_, col_width_)); - Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), - Shape2(num_filters_, col_height_)); +void ConvolutionLayer::ComputeGradient(Phase phase) { + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto col = Tensor2(&col_data_); + auto weight = Tensor2(weight_->mutable_data()); + + auto grad = Tensor3(&grad_); + auto gcol = Tensor2(&col_grad_); + auto gweight = Tensor2(weight_->mutable_grad()); + auto gbias = Tensor1(bias_->mutable_grad()); - Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this); + Blob<float>* gsrcblob=srclayers_[0]->mutable_grad(this); Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_)); if(gsrcblob!=nullptr) gsrc.dptr=gsrcblob->mutable_cpu_data(); - Tensor<cpu, 3> grad(grad_.mutable_cpu_data(), - Shape3(batchsize_, num_filters_, conv_height_* conv_width_)); - Tensor<cpu, 2> gcol(col_grad_.mutable_cpu_data(), - Shape2(col_height_, col_width_)); - Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), - Shape2(num_filters_, col_height_)); - Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), - Shape1(num_filters_)); - - gweight=0.0f; gbias=sumall_except_dim<1>(grad); - Shape<3> padshape(gsrc.shape.SubShape()); - padshape[0]+=2*pad_;padshape[1]+=2*pad_; - Shape<2> imgshape=Shape2(height_, width_); + + gweight = 0.0f; + Shape<3> padshp(gsrc.shape.SubShape()); + padshp[0] += 2 * pad_; + padshp[1] += 2 * pad_; + Shape<2> imgshp = Shape2(height_, width_); for(int n=0;n<batchsize_;n++){ if(pad_>0) col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_); else col=unpack_patch2col(src[n], kernel_, stride_); - gweight+=dot(grad[n], col.T()); + gweight += dot(grad[n], col.T()); if(gsrcblob!=nullptr){ - gcol=dot(weight.T(), grad[n]); - gsrc[n]=crop(pack_col2patch(gcol, padshape, kernel_, stride_), imgshape); + gcol = dot(weight.T(), grad[n]); + gsrc[n] = crop(pack_col2patch(gcol, padshp, kernel_, stride_), imgshp); } } } /****************** Implementation for DropoutLayer ***********************/ -void DropoutLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - data_.ReshapeLike(srclayers[0]->data(this)); - grad_.ReshapeLike(*srclayers[0]->mutable_grad(this)); - mask_.Reshape(srclayers[0]->data(this).shape()); - pdrop_=proto.dropout_conf().dropout_ratio(); -} - -void DropoutLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); +void DropoutLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + data_.ReshapeLike(srclayers_[0]->data(this)); + grad_.ReshapeLike(*srclayers_[0]->mutable_grad(this)); + mask_.Reshape(srclayers_[0]->data(this).shape()); + pdrop_ = proto.dropout_conf().dropout_ratio(); } -void DropoutLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) { +void DropoutLayer::ComputeFeature(Phase phase, Metric* perf) { // check training - if(phase!= kTrain){//!training){ - data_.CopyFrom(srclayers[0]->data(this)); + if(phase != kTrain){//!training){ + data_.CopyFrom(srclayers_[0]->data(this)); return; } float pkeep=1-pdrop_; - Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count())); + auto mask = Tensor1(&mask_); mask = F<op::threshold>(TSingleton<Random<cpu>>::Instance()\ ->uniform(mask.shape), pkeep ) * (1.0f/pkeep); - Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); - Blob<float>* srcblob=srclayers[0]->mutable_data(this); - Tensor<cpu, 1> src(srcblob->mutable_cpu_data(), Shape1(srcblob->count())); - data=src*mask; -} - -void DropoutLayer::ComputeGradient(const vector<SLayer>& srclayers) { - Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(data_.count())); - Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count())); - Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this); - Tensor<cpu, 1> gsrc(gsrcblob->mutable_cpu_data(), Shape1(gsrcblob->count())); - gsrc=grad*mask; -} -/**************** Implementation for InnerProductLayer********************/ -void InnerProductLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); - const auto& src=srclayers[0]->data(this); + auto data = Tensor1(&data_); + auto src = Tensor1(srclayers_[0]->mutable_data(this)); + data = src * mask; +} + +void DropoutLayer::ComputeGradient(Phase phase) { + auto mask = Tensor1(&mask_); + auto grad = Tensor1(&grad_); + auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); + gsrc = grad * mask; +} + +/*********** Implementation for InnerProductLayer**********/ +InnerProductLayer::~InnerProductLayer() { + delete weight_; + delete bias_; +} +void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(), 1); + const auto& src=srclayers_[0]->data(this); batchsize_=src.shape()[0]; vdim_=src.count()/batchsize_; hdim_=proto.innerproduct_conf().num_output(); + if(partition_dim()>0) + hdim_ /= npartitions; data_.Reshape(vector<int>{batchsize_, hdim_}); grad_.ReshapeLike(data_); Factory<Param>* factory=Singleton<Factory<Param>>::Instance(); - weight_=shared_ptr<Param>(factory->Create("Param")); - bias_=shared_ptr<Param>(factory->Create("Param")); + weight_ = factory->Create("Param"); + bias_ = factory->Create("Param"); weight_->Setup(proto.param(0), vector<int>{vdim_, hdim_}); bias_->Setup(proto.param(1), vector<int>{hdim_}); } -void InnerProductLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - LayerProto newproto(proto); - InnerProductProto * innerproto=newproto.mutable_innerproduct_conf(); - innerproto->set_num_output(shape[1]); - Setup(newproto, srclayers); -} - -void InnerProductLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) { - Tensor<cpu, 2> data(data_.mutable_cpu_data(), Shape2(batchsize_,hdim_)); - CHECK_EQ(srclayers[0]->data(this).count(), batchsize_*vdim_); - Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape2(batchsize_,vdim_)); - Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_)); - Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), Shape1(hdim_)); + +void InnerProductLayer::ComputeFeature(Phase phase, Metric* perf) { + auto data = Tensor2(&data_); + auto src = Tensor2(srclayers_[0]->mutable_data(this)); + auto weight = Tensor2(weight_->mutable_data()); + auto bias = Tensor1(bias_->mutable_data()); data=dot(src, weight); // repmat: repeat bias vector into batchsize rows data+=repmat(bias, batchsize_); } -void InnerProductLayer::ComputeGradient(const vector<SLayer>& srclayers) { - Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape2(batchsize_,vdim_)); - Tensor<cpu, 2> grad(grad_.mutable_cpu_data(),Shape2(batchsize_,hdim_)); - Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_)); - Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), Shape2(vdim_,hdim_)); - Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), Shape1(hdim_)); +void InnerProductLayer::ComputeGradient(Phase phas) { + auto src = Tensor2(srclayers_[0]->mutable_data(this)); + auto grad = Tensor2(&grad_); + auto weight = Tensor2(weight_->mutable_data()); + auto gweight = Tensor2(weight_->mutable_grad()); + auto gbias = Tensor1(bias_->mutable_grad()); gbias=sum_rows(grad); gweight=dot(src.T(), grad); - if(srclayers[0]->mutable_grad(this)!=nullptr){ - Tensor<cpu, 2> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), - Shape2(batchsize_,vdim_)); + if(srclayers_[0]->mutable_grad(this)!=nullptr){ + auto gsrc = Tensor2(srclayers_[0]->mutable_grad(this)); gsrc=dot(grad, weight.T()); } } /***************************************************************************** * Implementation for LabelLayer *****************************************************************************/ -void LabelLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); - int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); +void LabelLayer::Setup(const LayerProto& proto, int npartitions){ + Layer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),1); + int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize(); data_.Reshape(vector<int>{batchsize}); } @@ -236,7 +233,7 @@ void LabelLayer::ParseRecords(Phase phase, const vector<Record>& records, /*********************LMDBDataLayer**********************************/ -void LMDBDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ +void LMDBDataLayer::ComputeFeature(Phase phase, Metric* perf){ if(random_skip_){ int nskip=rand()%random_skip_; int n=0; @@ -296,8 +293,8 @@ void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum, } } -void LMDBDataLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ +void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed"; CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB CHECK_EQ(mdb_env_open(mdb_env_, @@ -325,21 +322,23 @@ void LMDBDataLayer::Setup(const LayerProto& proto, ConvertDatumToSingleLableImageRecord(datum, record); batchsize_=batchsize(); + if(partition_dim() == 0) + batchsize_ /= npartitions; records_.resize(batchsize_); random_skip_=proto.lmdbdata_conf().random_skip(); } /***************** Implementation for LRNLayer *************************/ -void LRNLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); +void LRNLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),1); lsize_ = proto.lrn_conf().local_size(); CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol"; knorm_=proto.lrn_conf().knorm(); alpha_ = proto.lrn_conf().alpha(); beta_ = proto.lrn_conf().beta(); - const vector<int>& s=srclayers[0]->data(this).shape(); + const vector<int>& s=srclayers_[0]->data(this).shape(); data_.Reshape(s); grad_.Reshape(s); norm_.Reshape(s); @@ -349,30 +348,22 @@ void LRNLayer::Setup(const LayerProto& proto, width_=s[3]; } -void LRNLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); -} - -void LRNLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ +void LRNLayer::ComputeFeature(Phase phase, Metric* perf) { const float salpha = alpha_ / lsize_; - Shape<4> s=Shape4(batchsize_,channels_, height_, width_); - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s); - Tensor<cpu, 4> data(data_.mutable_cpu_data(), s); - Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s); + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto data = Tensor4(&data_); + auto norm = Tensor4(&norm_); // stores normalizer without power norm= chpool<red::sum>( F<op::square>(src) , lsize_ ) * salpha + knorm_; data = src * F<op::power>(norm, -beta_ ); } -void LRNLayer::ComputeGradient(const vector<SLayer>& srclayers) { +void LRNLayer::ComputeGradient(Phase phase) { const float salpha = alpha_ / lsize_; - Shape<4> s=Shape4(batchsize_,channels_, height_, width_); - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s); - Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s); - Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s); - Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), s); + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto norm = Tensor4(&norm_); + auto grad = Tensor4(&grad_); + auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this)); gsrc = grad * F<op::power>( norm, -beta_ ); gsrc += ( - 2.0f * beta_ * salpha ) * chpool<red::sum>( @@ -448,11 +439,11 @@ void MnistLayer::ParseRecords(Phase phase, } CHECK_EQ(dptr, blob->mutable_cpu_data()+blob->count()); } -void MnistLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); - int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); - Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample(); +void MnistLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),1); + int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize(); + Record sample=static_cast<DataLayer*>(srclayers_[0])->sample(); kernel_=proto.mnist_conf().kernel(); sigma_=proto.mnist_conf().sigma(); alpha_=proto.mnist_conf().alpha(); @@ -475,9 +466,9 @@ void MnistLayer::Setup(const LayerProto& proto, } /******************** Implementation for PoolingLayer******************/ -void PoolingLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); +void PoolingLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),1); PoolingProto pool_conf = proto.pooling_conf(); kernel_=pool_conf.kernel(); stride_=pool_conf.stride(); @@ -487,7 +478,7 @@ void PoolingLayer::Setup(const LayerProto& proto, || pool_ == PoolingProto_PoolMethod_MAX) << "Padding implemented only for average and max pooling."; - const auto& srcshape=srclayers[0]->data(this).shape(); + const auto& srcshape=srclayers_[0]->data(this).shape(); int dim=srcshape.size(); CHECK_GT(dim,2); width_ = srcshape[dim-1]; @@ -503,68 +494,49 @@ void PoolingLayer::Setup(const LayerProto& proto, grad_.ReshapeLike(data_); } -void PoolingLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); -} - -void PoolingLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape4(batchsize_, channels_, height_, width_)); - Tensor<cpu, 4> data(data_.mutable_cpu_data(), - Shape4(batchsize_, channels_, pooled_height_, pooled_width_)); +void PoolingLayer::ComputeFeature(Phase phase, Metric* perf) { + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto data = Tensor4(&data_); if(pool_ == PoolingProto_PoolMethod_MAX) data=pool<red::maximum>(src, kernel_, stride_); else if(pool_ == PoolingProto_PoolMethod_AVE) - data=pool<red::sum>(src, kernel_, stride_) - *(1.0f/(kernel_*kernel_)); + data=pool<red::sum>(src, kernel_, stride_) *(1.0f/(kernel_*kernel_)); } /* * partition only on num/channel dim * assume grad and data have the same paritition */ -void PoolingLayer::ComputeGradient(const vector<SLayer>& srclayers) { - Shape<4> s1= Shape4(batchsize_, channels_, height_, width_); - Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),s1); - Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),s1); - Shape<4> s2= Shape4(batchsize_, channels_, pooled_height_, pooled_width_); - Tensor<cpu, 4> data(data_.mutable_cpu_data(), s2); - Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s2); +void PoolingLayer::ComputeGradient(Phase phase) { + auto src = Tensor4(srclayers_[0]->mutable_data(this)); + auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this)); + auto data = Tensor4(&data_); + auto grad = Tensor4(&grad_); if(pool_ == PoolingProto_PoolMethod_MAX) - gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_); + gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_); else if(pool_ == PoolingProto_PoolMethod_AVE) - gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_) - *(1.0f/(kernel_*kernel_)); + gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_) + *(1.0f/(kernel_*kernel_)); } /***************** Implementation for ReLULayer *****************************/ -void ReLULayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - data_.ReshapeLike(srclayers[0]->data(this)); - grad_.ReshapeLike(*(srclayers[0]->mutable_grad(this))); -} - -void ReLULayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); +void ReLULayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); + data_.ReshapeLike(srclayers_[0]->data(this)); + grad_.ReshapeLike(*(srclayers_[0]->mutable_grad(this))); } -void ReLULayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ - Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); - Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape1(data_.count())); +void ReLULayer::ComputeFeature(Phase phase, Metric* perf) { + auto data = Tensor1(&data_); + auto src = Tensor1(srclayers_[0]->mutable_data(this)); data=F<op::relu>(src); } -void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) { - Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count())); - Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); - Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), - Shape1(data_.count())); +void ReLULayer::ComputeGradient(Phase phase) { + auto data = Tensor1(&data_); + auto grad = Tensor1(&grad_); + auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); gsrc=F<op::relu_grad>(data)*grad; } @@ -573,7 +545,7 @@ void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) { void RGBImageLayer::ParseRecords(Phase phase, const vector<Record>& records, Blob<float>* blob){ const vector<int>& s=blob->shape(); - Tensor<cpu, 4> images(data_.mutable_cpu_data(), Shape4(s[0],s[1],s[2],s[3])); + auto images = Tensor4(&data_); const SingleLabelImageRecord& r=records.at(0).image(); Tensor<cpu, 3> raw_image(Shape3(r.shape(0),r.shape(1),r.shape(2))); AllocSpace(raw_image); @@ -625,14 +597,14 @@ void RGBImageLayer::ParseRecords(Phase phase, if(cropsize_) FreeSpace(croped_image); } -void RGBImageLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),1); +void RGBImageLayer::Setup(const LayerProto& proto, int npartitions) { + ParserLayer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),1); scale_=proto.rgbimage_conf().scale(); cropsize_=proto.rgbimage_conf().cropsize(); mirror_=proto.rgbimage_conf().mirror(); - int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize(); - Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample(); + int batchsize=static_cast<DataLayer*>(srclayers_[0])->batchsize(); + Record sample=static_cast<DataLayer*>(srclayers_[0])->sample(); vector<int> shape; shape.push_back(batchsize); for(int x: sample.image().shape()){ @@ -663,7 +635,7 @@ void RGBImageLayer::Setup(const LayerProto& proto, } /***************Implementation for ShardDataLayer**************************/ -void ShardDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ +void ShardDataLayer::ComputeFeature(Phase phase, Metric* perf){ if(random_skip_){ int nskip=rand()%random_skip_; LOG(INFO)<<"Random Skip "<<nskip<<" records, there are "<<shard_->Count() @@ -683,67 +655,55 @@ void ShardDataLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers } } -void ShardDataLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ +void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) { + Layer::Setup(proto, npartitions); shard_= std::make_shared<DataShard>(proto.sharddata_conf().path(), DataShard::kRead); string key; shard_->Next(&key, &sample_); batchsize_=proto.sharddata_conf().batchsize(); + if(partition_dim() == 0) + batchsize_ /= npartitions; records_.resize(batchsize_); random_skip_=proto.sharddata_conf().random_skip(); } /*******************Implementation of TanLayer***************************/ -void TanhLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - data_.ReshapeLike(srclayers[0]->data(this)); - grad_.ReshapeLike(srclayers[0]->grad(this)); +void TanhLayer::Setup(const LayerProto& proto, int npartitions){ + Layer::Setup(proto, npartitions); + data_.ReshapeLike(srclayers_[0]->data(this)); + grad_.ReshapeLike(srclayers_[0]->grad(this)); } -void TanhLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); -} - - -void TanhLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers){ - Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); - Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), - Shape1(data_.count())); +void TanhLayer::ComputeFeature(Phase phase, Metric* perf) { + auto data = Tensor1(&data_); + auto src = Tensor1(srclayers_[0]->mutable_data(this)); data=F<op::stanh>(src); } -void TanhLayer::ComputeGradient(const vector<SLayer>& srclayers) { - Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count())); - Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count())); - Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), - Shape1(data_.count())); +void TanhLayer::ComputeGradient(Phase phase) { + auto data = Tensor1(&data_); + auto grad = Tensor1(&grad_); + auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this)); gsrc=F<op::stanh_grad>(data)*grad; } /********** * Implementation for SoftmaxLossLayer*************************/ -void SoftmaxLossLayer::Setup(const LayerProto& proto, - const vector<SLayer>& srclayers){ - CHECK_EQ(srclayers.size(),2); - data_.Reshape(srclayers[0]->data(this).shape()); +void SoftmaxLossLayer::Setup(const LayerProto& proto, int npartitions) { + LossLayer::Setup(proto, npartitions); + CHECK_EQ(srclayers_.size(),2); + data_.Reshape(srclayers_[0]->data(this).shape()); batchsize_=data_.shape()[0]; dim_=data_.count()/batchsize_; topk_=proto.softmaxloss_conf().topk(); metric_.Reshape(vector<int>{2}); scale_=proto.softmaxloss_conf().scale(); } -void SoftmaxLossLayer::SetupAfterPartition(const LayerProto& proto, - const vector<int> &shape, - const vector<SLayer>& srclayers){ - Setup(proto, srclayers); -} -void SoftmaxLossLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclayers) { +void SoftmaxLossLayer::ComputeFeature(Phase phase, Metric* perf) { Shape<2> s=Shape2(batchsize_, dim_); Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s); - Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s); + Tensor<cpu, 2> src(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), s); Softmax(prob, src); - const float* label=srclayers[1]->data(this).cpu_data(); + const float* label=srclayers_[1]->data(this).cpu_data(); const float* probptr=prob.dptr; float loss=0, precision=0; for(int n=0;n<batchsize_;n++){ @@ -769,14 +729,13 @@ void SoftmaxLossLayer::ComputeFeature(Phase phase, const vector<SLayer>& srclaye probptr+=dim_; } CHECK_EQ(probptr, prob.dptr+prob.shape.Size()); - float *metric=metric_.mutable_cpu_data(); - metric[0]=loss*scale_/(1.0f*batchsize_); - metric[1]=precision*scale_/(1.0f*batchsize_); + perf->Add("loss", loss*scale_/(1.0f*batchsize_)); + perf->Add("accuracy", precision*scale_/(1.0f*batchsize_)); } -void SoftmaxLossLayer::ComputeGradient(const vector<SLayer>& srclayers) { - const float* label=srclayers[1]->data(this).cpu_data(); - Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this); +void SoftmaxLossLayer::ComputeGradient(Phase phase) { + const float* label=srclayers_[1]->data(this).cpu_data(); + Blob<float>* gsrcblob=srclayers_[0]->mutable_grad(this); gsrcblob->CopyFrom(data_); float* gsrcptr=gsrcblob->mutable_cpu_data(); for(int n=0;n<batchsize_;n++){
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/neuralnet/neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc index 2240499..6d82734 100644 --- a/src/neuralnet/neuralnet.cc +++ b/src/neuralnet/neuralnet.cc @@ -1,22 +1,19 @@ #include <algorithm> #include <queue> -#include "proto/model.pb.h" #include "neuralnet/neuralnet.h" #include "utils/singleton.h" -#include "utils/factory.h" -#include "utils/graph.h" -#include "utils/cluster.h" namespace singa { #define LayerT(x) LayerProto_LayerType_k##x #define RegisterLayer(factory, id) \ - factory->Register(LayerProto_LayerType_k##id,\ + factory->Register(LayerProto_LayerType_k##id, \ CreateInstance(id##Layer, Layer)) -void NeuralNet::RegisterLayers(){ - Factory<Layer>* factory=Singleton<Factory<Layer>>::Instance(); +void NeuralNet::RegisterLayers() { + Factory<Layer>* factory = Singleton<Factory<Layer>>::Instance(); + // FooLayer's type is kFoo, register using Foo RegisterLayer(factory, BridgeDst); RegisterLayer(factory, BridgeSrc); RegisterLayer(factory, Convolution); @@ -37,402 +34,329 @@ void NeuralNet::RegisterLayers(){ RegisterLayer(factory, Split); RegisterLayer(factory, Tanh); } -shared_ptr<NeuralNet> NeuralNet::SetupNeuralNet(const NetProto& np, Phase phase, - int group_size){ + +shared_ptr<NeuralNet> NeuralNet::Create( + const NetProto& conf, + Phase phase, + int npartitions) { NetProto proto; - proto.set_partition_type(np.partition_type()); - // exclude layers if necessary - for(auto& layer:np.layer()){ - bool include=true; - for(int x: layer.exclude()){ - if(x==phase) - include=false; + proto.CopyFrom(conf); + proto.clear_layer(); + // exclude layers according to phase + for (const auto& layer : conf.layer()) { + bool include = true; + for (auto x : layer.exclude()) { + if (x == phase) + include = false; } - if(include){ - LayerProto* lp=proto.add_layer(); + if (include) { + LayerProto* lp = proto.add_layer(); lp->CopyFrom(layer); + // using net partition if layer partition is not set + if (!lp->has_partition_dim()) + lp->set_partition_dim(proto.partition_dim()); } } - LOG(INFO)<<"NeuralNet config is "<<proto.DebugString(); - return make_shared<NeuralNet>(proto, group_size); -} -NeuralNet::NeuralNet(NetProto net_proto, int group_size) { - group_size_=group_size; - for(int i=0;i<net_proto.layer_size();i++){ - LayerProto * layer_proto=net_proto.mutable_layer(i); - if(!layer_proto->has_partition_type()) - layer_proto->set_partition_type(net_proto.partition_type()); - } + LOG(INFO) << "NeuralNet config is\n" << proto.DebugString(); - LOG(INFO)<<"Construct Neural Net..."; - ConstructNeuralNet(net_proto); - { - string vis_folder=Cluster::Get()->vis_folder(); - std::ofstream fout(vis_folder+"/nopartition.json", std::ofstream::out); - fout<<ToString(); - fout.flush(); - fout.close(); - } - if(group_size_>1){ - PartitionNeuralNet(); - string vis_folder=Cluster::Get()->vis_folder(); - std::ofstream fout(vis_folder+"/partition.json", std::ofstream::out); - fout<<ToString(); - fout.flush(); - fout.close(); - } - for(auto layer: layers_){ - DLOG(INFO)<<layer->name(); - } - for(auto& layer: layers_){ - for(shared_ptr<Param> p: layer->GetParams()){ - params_.push_back(p); - } - } - LOG(INFO)<<"Neural Net constructed"; - // init all data members to avoid conflicts from multi-thread access - losslayers(); - paramid2param(0); - datalayers(); - parserlayers(); + // TODO(wangwei) create net based on net type, e.g., directed, undirected, etc + auto net = std::make_shared<NeuralNet>(proto, npartitions); + return net; } -void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){ - // construct graph, one node for one layer, identified by layer name - map<string, LayerProto> protos; - for (auto &layer_proto : net_proto.layer()){ - graph_.AddNode(layer_proto.name()); - protos[layer_proto.name()]=layer_proto; - } - for (auto &layer_proto : net_proto.layer()) - if(layer_proto.srclayers_size()) - for(const string& src: layer_proto.srclayers()) - graph_.AddEdge(src, layer_proto.name()); +NeuralNet::~NeuralNet() { + for (auto layer : layers_) + delete layer; +} - // topology sort - graph_.Sort(); - //LOG(ERROR)<<"pure graph without partition\n"<< graph_.ToString(); +NeuralNet::NeuralNet(NetProto netproto, int npartitions) { + LOG(INFO) << "Constructing Neural Net..."; + auto graph = CreateGraph(netproto, npartitions); + CreateNetFromGraph(graph, npartitions); + PrepareDataStructures(); + for (Node* node : graph->nodes()) + delete static_cast<LayerProto*>(node->proto); + delete graph; + LOG(INFO) << "Neural net constructed"; +} - auto* factory=Singleton<Factory<Layer>>::Instance(); - // create Layers according to topology order - for(SNode node: graph_.nodes()){ - shared_ptr<Layer> layer(factory->Create(protos[node->name()].type())); - layer->Init(protos[node->name()]); - name2layer_[node->name()]=layer; +void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) { + auto* factory = Singleton<Factory<Layer>>::Instance(); + // create one layer per node + for (Node* node : graph->nodes()) { + auto layer = factory->Create(static_cast<LayerProto*>(node->proto)->type()); layers_.push_back(layer); + name2layer_[node->name] = layer; } - - // connect Layers. - for(SNode node: graph_.nodes()){ - auto layer=name2layer_[node->name()]; - for(SNode dst: node->dstnodes()) - layer->AddDstLayer(name2layer_[dst->name()]); - for(SNode src: node->srcnodes()) - layer->AddSrcLayer(name2layer_[src->name()]); + // connect layers + for (Node* node : graph->nodes()) { + auto layer = name2layer_[node->name]; + layer->clear_dstlayers(); + for (Node* dst : node->dstnodes) + layer->add_dstlayer(name2layer_[dst->name]); + layer->clear_srclayers(); + for (Node* src : node->srcnodes) + layer->add_srclayer(name2layer_[src->name]); } - // setup layer properties, e.g., shapes - int paramid=0; - for(auto& layer: layers_){ - layer->Setup(); - for(auto param: layer->GetParams()) - param->set_id(paramid++); + // setup layers + int paramid = 0; + map<string, string> layerinfo; + map<string, vector<Layer*>> share_param_layers; + for (Node* node : graph->nodes()) { + auto layer = name2layer_[node->name]; + layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions); + layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape()); + for (auto param : layer->GetParams()) + param->set_id(paramid++); + if (layer->partition_dim() == 0) + share_param_layers[node->origin].push_back(layer); } - LOG(INFO)<<"network graph witout partition\n"<<ToString(); -} - -void NeuralNet::PartitionNeuralNet(){ - graph_=CreatePartitonedGraph(layers_, name2layer_); - //DLOG(ERROR)<<"pure graph after partition\n"<<graph_.ToString(); - map<string, shared_ptr<Layer>> name2layer(name2layer_); - map<string, vector<shared_ptr<Layer>>> share_conf_layers; - name2layer_.clear(); - layers_.clear(); - int gsize=group_size_; - auto* factory=Singleton<Factory<Layer>>::Instance(); - // create Layers according to topology order - for(SNode node: graph_.nodes()){ - LayerProto proto; - proto.set_name(node->name()); - proto.set_partitionid(node->val().partitionid); - string origin=node->val().origin; - if (origin=="kSlice"){ - proto.set_type(LayerT(Slice)); - SliceProto *slice=proto.mutable_slice_conf(); - slice->set_slice_dimension(node->val().slice_dimension); - slice->set_slice_num(node->dstnodes().size()); - }else if(origin== "kConcate"){ - proto.set_type(LayerT(Concate)); - ConcateProto *concate=proto.mutable_concate_conf(); - concate->set_concate_dimension(node->val().concate_dimension); - concate->set_concate_num(node->srcnodes().size()); - }else if(origin=="kSplit"){ - proto.set_type(LayerT(Split)); - SplitProto *split=proto.mutable_split_conf(); - split->set_num_splits(node->dstnodes().size()); - }else if(origin=="kBridgeSrc"){ - proto.set_type(LayerT(BridgeSrc)); - }else if(origin =="kBridgeDst"){ - proto.set_type(LayerT(BridgeDst)); - }else{ - CHECK(name2layer.find(node->val().origin)!=name2layer_.end()) - <<"Unkown origin for node "<<node->val().origin; + LOG(INFO) << "Neural net structure\n" << graph->ToJson(layerinfo); + // share Params for layers generated from the same origin layer + for (auto & entry : share_param_layers) { + auto owner = entry.second.begin(); + auto owner_params = (*owner)->GetParams(); + for (auto it = owner + 1; it != entry.second.end(); it++) { + auto params = (*it)->GetParams(); + CHECK_EQ(params.size(), owner_params.size()); + for (size_t i = 0; i < params.size(); i++) + params.at(i)->ShareData(owner_params.at(i)); } - shared_ptr<Layer> newlayer; - if(proto.has_type()){ - // layers added due to partition - shared_ptr<Layer> layer(factory->Create(proto.type())); - layer->Init(proto); - newlayer=layer; - }else{ - // partitioned layers from origin neuralnet - auto oldlayer=name2layer.at(node->val().origin); - vector<int> shape=oldlayer->shape(nullptr); - if(oldlayer->partition_type()==kNone){ - newlayer=oldlayer; - } else{ - int pdim=oldlayer->partition_dimension(); - shape[pdim]=shape[pdim]/gsize+ - ((node->val().partitionid==gsize-1)?shape[pdim]%gsize:0); - shared_ptr<Layer> layer(factory->Create(oldlayer->type())); - layer->Init(*oldlayer, shape); - layer->set_name(node->name()); - newlayer=layer; - if(oldlayer->partition_type()==kDataPartition) - share_conf_layers[node->val().origin].push_back(newlayer); - } - newlayer->set_partitionid(node->val().partitionid); - } - layers_.push_back(newlayer); - name2layer_[node->name()]=newlayer; } +} - // connect Layers. - for(SNode node: graph_.nodes()){ - auto layer=name2layer_[node->name()]; - layer->ClearDstLayers(); - for(SNode dst: node->dstnodes()) - layer->AddDstLayer(name2layer_[dst->name()]); - layer->ClearSrcLayers(); - for(SNode src: node->srcnodes()) - layer->AddSrcLayer(name2layer_[src->name()]); - } +// add a node for SliceLayer between srcnode and dstnodes +Node* SliceNode(Graph* graph, Node* srcnode, + const vector<Node*>& dstnodes, bool connect_dst) { + string name = srcnode->name + "<"; + LayerProto *proto = new LayerProto(); + proto->set_name(name); + proto->set_type(LayerProto_LayerType_kSlice); + proto->set_partition_id( + static_cast<LayerProto*>(srcnode->proto)->partition_id()); + auto conf = proto->mutable_slice_conf(); + conf->set_slice_dim( + static_cast<LayerProto*>(dstnodes[0]->proto)->partition_dim()); + Node* node = new Node(name, "##" + name, proto->partition_id(), proto); + graph->AddNode(node); + graph->AddEdge(srcnode, node); + if (connect_dst) + for (Node* dst : dstnodes) + graph->AddEdge(node, dst); + return node; +} - LOG(INFO)<<"Adjacency matrix\n"<<ToAdjacency(); +// add a node for ConcateLayer between srcnodes and dstnode +Node* ConcateNodes(Graph* graph, const vector<Node*>& srcnodes, Node* dstnode) { + string name = ">" + dstnode->name; + LayerProto *proto = new LayerProto(); + proto->set_name(name); + proto->set_type(LayerProto_LayerType_kConcate); + proto->set_partition_id( + static_cast<LayerProto*>(dstnode->proto)->partition_id()); + auto conf = proto->mutable_concate_conf(); + conf->set_concate_dim( + static_cast<LayerProto*>(srcnodes[0]->proto)->partition_dim()); + Node* node = new Node(name, "##" + name, proto->partition_id(), proto); + graph->AddNode(node); + graph->AddEdge(node, dstnode); + for (Node* src : srcnodes) + graph->AddEdge(src, node); + return node; +} - // set up layers after - int paramid=0; - for(shared_ptr<Layer> layer: layers_){ - const vector<int>& shape=layer->shape(nullptr); - layer->SetupAfterPartition(); - for(auto param: layer->GetParams()) - param->set_id(paramid++); - const vector<int>& newshape=layer->shape(nullptr); - if(shape.size()) - CHECK(std::equal(shape.begin(),shape.end(),newshape.begin())); - } +// add a node for SplitLayer between srcnode and dstnodes +Node* SplitNode(Graph* graph, Node* srcnode, const vector<Node*>& dstnodes) { + string name = srcnode->name + "+"; + LayerProto *proto = new LayerProto(); + proto->set_name(name); + proto->set_type(LayerProto_LayerType_kSplit); + proto->set_partition_id( + static_cast<LayerProto*>(srcnode->proto)->partition_id()); + Node* node = new Node(name, "##" + name, proto->partition_id(), proto); + graph->AddNode(node); + graph->AddEdge(srcnode, node); + for (Node* dst : dstnodes) + graph->AddEdge(node, dst); + return node; +} - // share Params for layers generated from the same origin layer due to - // data partition - for(auto & entry: share_conf_layers){ - auto layers= entry.second; - auto owner=layers.begin(); - auto owner_confs=(*owner)->GetParams(); - for(auto it=owner+1; it!=layers.end();it++){ - auto params=(*it)->GetParams(); - CHECK_EQ(params.size(), owner_confs.size()); - for(size_t i=0;i<params.size();i++) - params.at(i)->ShareData(owner_confs.at(i)); - } - } - LOG(INFO)<<"network graph after partition layers\n"<<ToString(); +// add a pair of nodes for BridgeSrcLayer and BridgeDstLayer between srcnode +// and dstnode +void BridgeNodes(Graph* graph, Node* srcnode, Node* dstnode) { + string sname = srcnode->name + ":-"; + LayerProto *sproto = new LayerProto(); + sproto->set_name(sname); + sproto->set_type(LayerProto_LayerType_kBridgeSrc); + sproto->set_partition_id( + static_cast<LayerProto*>(srcnode->proto)->partition_id()); + auto sbridge = new Node(sname, "##" + sname, sproto->partition_id(), sproto); + string dname = "-:" + dstnode->name; + LayerProto *dproto = new LayerProto(); + dproto->set_name(dname); + dproto->set_type(LayerProto_LayerType_kBridgeDst); + dproto->set_partition_id( + static_cast<LayerProto*>(dstnode->proto)->partition_id()); + auto dbridge = new Node(dname, "##" + dname, dproto->partition_id(), dproto); + graph->AddNode(sbridge); + graph->AddNode(dbridge); + graph->AddEdge(srcnode, sbridge); + graph->AddEdge(sbridge, dbridge); + graph->AddEdge(dbridge, dstnode); } -Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers, - const map<string, shared_ptr<Layer>>& name2layer){ - Graph graph; - // partition origin nodes/layers - map<string, vector<SNode>> layer2nodes; //from name of original layer to nodes - int gsize=group_size_; - for(const auto& layer: layers){ - vector<SNode> nodes; - if(layer->partition_type()==kDataPartition|| - layer->partition_type()==kLayerPartition){ +Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) { + Graph *graph = new Graph(); + // from name of original layer to nodes + map<string, vector<Node*>> name2nodes; + map<string, const LayerProto*> name2proto; + for (const auto& layer : netproto.layer()) { + vector<Node*> nodes; + int pdim = layer.partition_dim(); + if (pdim == 0 || pdim == 1) { char suffix[4]; - for(int i=0;i<gsize;i++){ - sprintf(suffix, "%02d", i); + for (int i = 0; i < npartitions; i++) { + LayerProto *proto = new LayerProto(layer); + snprintf(suffix, sizeof(suffix), "%02d", i); // differentiate partitions - string nodename=layer->name()+"@"+string(suffix); - auto node=graph.AddNode(nodename, LayerInfo{layer->name(), i,-1,-1}); + string nodename = layer.name() + "@" + string(suffix); + proto->set_partition_id(i); + auto node = new Node(nodename, layer.name(), i, proto); + graph->AddNode(node); nodes.push_back(node); } - }else if(layer->partition_type()==kNone){ - auto node=graph.AddNode(layer->name(), - LayerInfo{layer->name(), 0,-1,-1}); + } else if (pdim == -1) { + LayerProto *proto = new LayerProto(layer); + auto node = new Node(layer.name(), layer.name(), 0, proto); + graph->AddNode(node); nodes.push_back(node); - }else{ - LOG(FATAL)<<"Unknown partition type "<<layer->partition_type(); + } else { + LOG(FATAL) << "Cannot partition layer (" << layer.name() <<") on dim: " + << layer.partition_dim(); } - layer2nodes[layer->name()]=nodes; + name2nodes[layer.name()] = nodes; + name2proto[layer.name()] = &layer; } - // connect nodes, nodes for ConcateLayer and SliceLayer are added. - for(shared_ptr<Layer> layer: layers){ - string name=layer->name(); - PartitionType type=layer->partition_type(); - const vector<SNode>& nodes=layer2nodes.at(name); - for(int srcid=0;srcid<layer->srclayers_size();srcid++){ - shared_ptr<Layer> srclayer=layer->srclayers()[srcid]; - string srcname=srclayer->name(); - const vector<SNode> srcnodes=layer2nodes.at(srcname); - PartitionType srctype=srclayer->partition_type(); - ConnectionType connection=layer->connection_type(srcid); - if(srctype==kNone){ - CHECK_EQ(srcnodes.size(),1) - <<"local layer "<<srcname<<" should not be partitioned"; - SNode srcnode=srcnodes[0]; - if(type==kDataPartition||(type==kLayerPartition&&connection==kOneToOne)){ - LayerInfo info=srcnode->val(); - info.slice_dimension=name2layer.at(name)->partition_dimension(); - graph.InsertSliceNode(srcnode, nodes, info); - } else if(type==kNone){ - CHECK_EQ(nodes.size(),1) - <<"local layer "<<name<<" should not be nodeed"; - graph.AddEdge(srcnode, nodes[0]); - } else { // type==kLayerPartition&&connection==kOneToAll - graph.InsertSplitNode(srcnode, nodes); - } - }else if((type==kNone - &&(srctype==kDataPartition||srctype==kLayerPartition)) - ||(type==kLayerPartition&&connection==kOneToAll&& - (srctype==kDataPartition||srctype==kLayerPartition))){ + // connect nodes, nodes for ConcateLayer, SliceLayer and SplitLayer are added. + auto* factory = Singleton<Factory<Layer>>::Instance(); + for (const auto& layerproto : netproto.layer()) { + string name = layerproto.name(); + int pdim = layerproto.partition_dim(); + const vector<Node*>& nodes = name2nodes.at(name); + for (auto srcname : layerproto.srclayers()) { + const vector<Node*>& srcnodes = name2nodes.at(srcname); + // TODO(wangwei): consider the type of each connection + auto *layer = factory->Create(layerproto.type()); + ConnectionType connection = layer->src_neuron_connection(0); + delete layer; + int src_pdim = name2proto[srcname]->partition_dim(); + // no partition of src layer + if (src_pdim == -1) { + Node* srcnode = srcnodes[0]; + if (pdim == 0 || (pdim == 1 && connection == kOneToOne)) + SliceNode(graph, srcnode, nodes, true); + else if (pdim == -1) + graph->AddEdge(srcnode, nodes[0]); + else // type==kLayerPartition&&connection==kOneToAll + SplitNode(graph, srcnode, nodes); + } else if ((pdim == -1 && (src_pdim == 0 || src_pdim == 1)) + ||(pdim == 1 && connection == kOneToAll && src_pdim == 0)) { // copy/concate the whole srclayer for every dst partition - for(SNode node:nodes){ - LayerInfo info=node->val(); - info.concate_dimension=name2layer.at(srcname)->partition_dimension(); - CHECK_GE(info.concate_dimension,0); - graph.InsertConcateNode(srcnodes, node, info); - } - }else if((srctype==kLayerPartition&&type==kDataPartition) - || (srctype==kDataPartition&&type==kLayerPartition)){ + for (Node* node : nodes) + ConcateNodes(graph, srcnodes, node); + } else if ((src_pdim == 1 && pdim == 0) || (src_pdim == 0 && pdim == 1)) { // the most complext scenario - vector<SNode> slicenodes; - for(SNode srcnode: srcnodes){ - LayerInfo info=srcnode->val(); - info.slice_dimension=name2layer.at(name)->partition_dimension(); - slicenodes.push_back(graph.InsertSliceNode(srcnode, nodes, - info, false)); - } - for(SNode node: nodes){ - LayerInfo info=node->val(); - info.concate_dimension=name2layer.at(srcname)->partition_dimension(); - CHECK_GE(info.concate_dimension,0); - graph.InsertConcateNode(slicenodes, node, info); - } - }else if((srctype==kDataPartition&&type==kDataPartition)|| - (srctype==kLayerPartition&&type==kLayerPartition&& - layer->connection_type(srcid)==kOneToOne)){ + vector<Node*> nodes; + for (Node* srcnode : srcnodes) + nodes.push_back(SliceNode(graph, srcnode, nodes, false)); + for (Node* node : nodes) + ConcateNodes(graph, nodes, node); + } else if ((src_pdim == 0 && pdim == 0)|| + (src_pdim == 1 && pdim == 1 && connection == kOneToOne)) { CHECK_EQ(srcnodes.size(), nodes.size()); - for(size_t i=0;i<srcnodes.size();i++){ - graph.AddEdge(srcnodes[i], nodes[i]); - } + for (size_t i = 0; i < srcnodes.size(); i++) + graph->AddEdge(srcnodes[i], nodes[i]); } } } // must do topology sort, because we have added new nodes. - graph.Sort(); - //LOG(ERROR)<<graph.ToString(); + graph->Sort(); - // add node for split layer - bool data_node=true; - vector<SNode> oldnodes=graph.nodes(); - for(SNode node: oldnodes){ - if(node->dstnodes_size()>1&&node->val().origin!="kSlice" - &&node->val().origin!="kSplit"&&!data_node){ - vector<SNode> dstnodes=node->dstnodes(); - for(SNode dst: dstnodes) - graph.RemoveEdge(node, dst); - graph.InsertSplitNode(node, dstnodes); + // add nodes for SplitLayer + vector<Node*> oldnodes = graph->nodes(); + for (Node* node : oldnodes) { + auto layer = factory->Create(static_cast<LayerProto*>(node->proto)->type()); + if (node->dstnodes.size() > 1 + && layer->dst_layer_connection() == kOneToOne) { + vector<Node*> dstnodes = node->dstnodes; + for (Node* dst : dstnodes) + graph->RemoveEdge(node, dst); + SplitNode(graph, node, dstnodes); } - data_node=false; + delete layer; } - // add bridge - oldnodes=graph.nodes(); - for(SNode node: oldnodes){ - vector<SNode> dstnodes=node->dstnodes(); - for(size_t i=0;i<dstnodes.size();i++){ - SNode dstnode=dstnodes.at(i); - if(node->val().partitionid!=dstnode->val().partitionid){ - graph.RemoveEdge(node, dstnode); - graph.InsertBridgeNode(node, dstnode); + // add nodes for bridge layers + for (Node* node : oldnodes) { + vector<Node*> dstnodes = node->dstnodes; + auto pid1 = static_cast<LayerProto*>(node->proto)->partition_id(); + for (size_t i = 0; i < dstnodes.size(); i++) { + Node* dstnode = dstnodes.at(i); + auto pid2 = static_cast<LayerProto*>(node->proto)->partition_id(); + if (pid1 != pid2) { + graph->RemoveEdge(node, dstnode); + BridgeNodes(graph, node, dstnode); } } } - graph.Sort(); + graph->Sort(); + DLOG(INFO) << "Pure graph structure\n" << graph->ToJson(); return graph; } -std::string NeuralNet::ToString(){ - map<string, string> info; - for(auto layer: layers_){ - info[layer->name()]=IntVecToString(layer->shape(nullptr)); - } - return graph_.ToString(info); -} - -std::string NeuralNet::ToAdjacency(){ - string disp=""; - for(auto& layer: layers_){ - disp+=layer->name()+": "; - for(const auto& dst: layer->dstlayers()) - disp+=dst->name()+", "; - disp+="\n"; - } - return disp; -} - -void NeuralNet::ToProto(NetProto *proto, bool copyData) { - proto->clear_layer(); -} +void NeuralNet::PrepareDataStructures() { + parserlayers_.clear(); + losslayers_.clear(); + datalayers_.clear(); + params_.clear(); + paramid2param_.clear(); + name2layer_.clear(); -string NeuralNet::DebugInfo(){ - string ret; - char display[4096]; - for(auto& layer: layers_){ - if(!layer->is_datalayer()){ - sprintf(display, "Forward layer %10s data norm1 %13.9f\n", - layer->name().c_str(), layer->data(nullptr).asum_data()); - ret+=string(display); - } - } - for (auto it = layers_.rbegin(); it != layers_.rend(); it++){ - shared_ptr<Layer> layer=*it; - if(!(layer->is_datalayer()||layer->is_losslayer()||layer->is_parserlayer())){ - sprintf(display, "Backward layer %10s grad norm1 %13.9f\n", - layer->name().c_str(), layer->grad(nullptr).asum_data()); - ret+=string(display); + for (auto& layer : layers_) { + name2layer_[layer->name()] = layer; + if (layer->is_parserlayer()) + parserlayers_.push_back(static_cast<ParserLayer*>(layer)); + if (layer->is_losslayer()) + losslayers_.push_back(static_cast<LossLayer*>(layer)); + if (layer->is_datalayer()) + datalayers_.push_back(static_cast<DataLayer*>(layer)); + for (Param* p : layer->GetParams()) { + paramid2param_[p->id()] = p; + params_.push_back(p); } } - for(auto& layer: layers_){ - for(auto param: layer->GetParams()){ - sprintf(display, "Layer %10s, param id %2d, name %10s,\ - value norm1 %13.9f, grad norm1 %13.9f\n", - layer->name().c_str(), param->id(), param->name().c_str(), - param->data().asum_data(), param->grad().asum_data()); - ret+=string(display); - } +} +std::string NeuralNet::ToAdjacency() { + string disp = ""; + for (auto& layer : layers_) { + disp += layer->name()+": "; + for (const auto& dst : layer->dstlayers()) + disp += dst->name()+", "; + disp += "\n"; } - return ret; + return disp; } -void NeuralNet::ShareParams(shared_ptr<NeuralNet> other, int flag){ - for(auto& layer: layers_){ - auto otherlayer=other->name2layer(layer->name()); - if(otherlayer!=nullptr){ - const auto& otherparams=otherlayer->GetParams(); - const auto& params=layer->GetParams(); + +void NeuralNet::ShareParams(shared_ptr<NeuralNet> other) { + for (auto& layer : layers_) { + auto otherlayer = other->name2layer(layer->name()); + if (otherlayer != nullptr) { + const auto& otherparams = otherlayer->GetParams(); + const auto& params = layer->GetParams(); CHECK_EQ(params.size(), otherparams.size()); - for(size_t i=0;i<params.size();i++){ + for (size_t i = 0; i < params.size(); i++) { params[i]->ShareData(otherparams[i]); } } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/proto/common.proto ---------------------------------------------------------------------- diff --git a/src/proto/common.proto b/src/proto/common.proto index 70b743c..256206c 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -38,6 +38,7 @@ message BlobProtos { enum ConnectionType { kOneToOne = 0; kOneToAll = 1; + kOneToMany = 2; } // to import caffe's lmdb dataset @@ -79,3 +80,9 @@ message SingleLabelImageRecord { optional bytes pixel = 3; repeated float data = 4; } + +message MetricProto { + repeated string name =1; + repeated int32 count = 2; + repeated float val = 3; +} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/proto/model.proto ---------------------------------------------------------------------- diff --git a/src/proto/model.proto b/src/proto/model.proto index 4256491..a8de5d5 100644 --- a/src/proto/model.proto +++ b/src/proto/model.proto @@ -7,6 +7,8 @@ enum Phase { kPositive = 3; // negative phase for contrastive divergence algorithm kNegative = 4; + kForward = 5; + kBackward = 6; } message ModelProto { @@ -58,7 +60,7 @@ message ModelProto { message NetProto { repeated LayerProto layer = 1; // partitioning type for parallelism - optional PartitionType partition_type = 3 [default = kNone]; + optional int32 partition_dim = 2 [default = -1]; } // weight matrix should be defined before bias vector @@ -99,7 +101,7 @@ message ParamProto { // multiplied on the global weight decay. optional float weight_decay_multiplier = 16 [default = 1]; // partition dimension, -1 for no partition - optional int32 partition_dim = 30 [default = -1]; + optional int32 partition_dim = 30; // usually, the program will infer the param shape repeated int32 shape = 31; @@ -185,15 +187,15 @@ message LayerProto { optional SplitProto split_conf = 42; // configuration for tanh layer optional TanhProto tanh_conf = 43; - // partition type which overrides the partition type for neural net - optional PartitionType partition_type = 59; + + + // overrides the partition dimension for neural net + optional int32 partition_dim =59 [default = -1]; optional string datablob = 58 [default = "unknow"]; // names of parameters shared from other layers repeated string share_param = 60; - // TODO(wangwei): make location ID an array - optional int32 locationid = 61 [default = 0]; - optional int32 partitionid = 62 [default = 0]; + optional int32 partition_id = 62 [default = 0]; } message RGBImageProto { @@ -246,9 +248,7 @@ message ConvolutionProto { message ConcateProto { // on which dimension, starts from 0 - required int32 concate_dimension = 1; - // concatenate offset - optional int32 concate_num = 30; + required int32 concate_dim = 1; } message DataProto { @@ -328,8 +328,7 @@ message PoolingProto { } message SliceProto{ - required int32 slice_dimension=1; - required int32 slice_num=2; + required int32 slice_dim = 1; } message ReLUProto { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc index 42d6a79..cbb0ee1 100644 --- a/src/trainer/server.cc +++ b/src/trainer/server.cc @@ -36,7 +36,7 @@ void Server::Run(){ ping->add_frame("PING", 4); ping->set_type(kConnect); dealer_->Send(&ping); - vector<shared_ptr<Param>> master_params; + vector<Param*> master_params; size_t syncEntry=0; //start recv loop and process requests while (true){ @@ -121,13 +121,13 @@ void Server::Run(){ Msg* Server::HandlePut(Msg **msg){ int version=(*msg)->trgt_third(); int pid=(*msg)->trgt_second(); - shared_ptr<Param> param=nullptr; + Param* param=nullptr; if(shard_->find(pid)!=shard_->end()){ LOG(ERROR)<<"Param ("<<pid<<") is put more than once"; param=shard_->at(pid); }else{ auto factory=Singleton<Factory<Param>>::Instance(); - param=shared_ptr<Param>(factory ->Create("Param")); + param=factory ->Create("Param"); (*shard_)[pid]=param; } auto response=param->HandlePutMsg(msg); @@ -147,7 +147,7 @@ Msg* Server::HandlePut(Msg **msg){ return response; } -Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){ +Msg* Server::HandleGet(Param* param, Msg **msg){ if(param->version()<(*msg)->trgt_third()) return *msg; else{ @@ -158,7 +158,7 @@ Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){ } } -Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) { +Msg* Server::HandleUpdate(Param* param, Msg **msg) { auto* tmp=static_cast<Msg*>((*msg)->CopyAddr()); tmp->SwapAddr(); int paramid=(*msg)->trgt_first(); @@ -174,7 +174,7 @@ Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) { return response; } -Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){ +Msg* Server::HandleSyncRequest(Param* param, Msg **msg){ Msg* response=nullptr; auto shape=Shape1(param->size()); CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float)); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index ce135cc..f4e52a6 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -7,14 +7,15 @@ #include "proto/common.pb.h" #include "trainer/trainer.h" #include "mshadow/tensor.h" + +namespace singa { using std::vector; using std::map; using namespace std::chrono; +using std::make_shared; typedef std::chrono::milliseconds TimeT; -namespace singa { - void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){ // register all layers appearing in the neural net singa::NeuralNet::RegisterLayers(); @@ -33,8 +34,8 @@ void HandleWorkerFinish(void * ctx){ hctx->dealer->Send(&msg); } -const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, - const vector<shared_ptr<Param>>& params){ +const std::unordered_map<int, vector<std::pair<int, int>>> +SliceParams(int num, const vector<Param*>& params){ std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; if (num==0) return paramid2slices; @@ -114,15 +115,15 @@ const vector<int> PartitionSlice(int num, const vector<int>& slices){ previd=slice2box[i]; } else disp+=" "+std::to_string(slices[i]); - LOG(INFO)<<"partition slice (av ="<<avg<<", num="<<num<<"):"<<disp; + LOG(INFO)<<"partition slice (avg ="<<avg<<", num="<<num<<"):"<<disp; return slice2box; } -vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, +vector<Server*> Trainer::CreateServers(int nthreads, const ModelProto & mproto, const vector<int> slices, vector<HandleContext*>* ctx){ auto cluster=Cluster::Get(); - vector<shared_ptr<Server>> servers; + vector<Server*> servers; if(!cluster->has_server()) return servers; @@ -139,7 +140,7 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, auto dealer=make_shared<Dealer>(); dealer->Connect(kInprocRouterEndpoint); for(int sid=start;sid<end;sid++){ - auto server=make_shared<Server>(nthreads++, gid, sid); + auto server=new Server(nthreads++, gid, sid); server->Setup(mproto.updater(), server_shard_, slice2group); servers.push_back(server); auto *hc=new HandleContext{dealer, gid, sid}; @@ -151,20 +152,20 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, return servers; } -vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads, +vector<Worker*> Trainer::CreateWorkers(int nthreads, const ModelProto& mproto, vector<int> *slice_size){ auto cluster=Cluster::Get(); - auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain, + auto net=NeuralNet::Create(mproto.neuralnet(), kTrain, cluster->nworkers_per_group()); int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group()); auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size for(auto param: net->params()){ - if(param->id()==param->owner()) + if(param->id() == param->owner()) for(auto entry: paramid2slices[param->id()]) slice_size->push_back(entry.second); } - vector<shared_ptr<Worker>> workers; + vector<Worker*> workers; if(!cluster->has_worker()) return workers; //LOG(ERROR)<<net->ToString(); @@ -191,33 +192,33 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads, if(gid==gstart) train_net=net; else{ - train_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain, + train_net=NeuralNet::Create(mproto.neuralnet(), kTrain, cluster->nworkers_per_group()); // the train net for other groups may share parameter values from the // first group if(cluster->share_memory()) - train_net->ShareParams(net, kValueOnly); + train_net->ShareParams(net); } if(gid==0){ // validation and test are performed only by the first group if(mproto.test_steps()){ - test_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTest, + test_net=NeuralNet::Create(mproto.neuralnet(), kTest, cluster->nworkers_per_group()); if(test_net!=nullptr) - test_net->ShareParams(train_net, kValueOnly); + test_net->ShareParams(train_net); } if(mproto.validation_steps()){ - validation_net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kValidation, + validation_net=NeuralNet::Create(mproto.neuralnet(), kValidation, cluster->nworkers_per_group()); if(validation_net!=nullptr) - validation_net->ShareParams(train_net, kValueOnly); + validation_net->ShareParams(train_net); } } // create ServerShard for the workers auto shard=make_shared<WorkerShard>(); worker_shards_[gid]=shard; for(auto layer: train_net->layers()){ - int procsid=cluster->ProcsIDOf(gid, layer->partitionid(), kWorkerLayer); + int procsid=cluster->ProcsIDOf(gid, layer->partition_id(), kWorkerLayer); bool local=procsid==cluster->procs_id(); for(auto param: layer->GetParams()){ for(auto entry :paramid2slices[param->owner()]){ @@ -232,9 +233,9 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads, } } for(int wid=wstart;wid<wend;wid++){ - shared_ptr<Worker> worker=nullptr; + Worker* worker=nullptr; if(mproto.alg()==ModelProto_GradCalcAlg_kBackPropagation) - worker=make_shared<BPWorker>(nthreads++,gid, wid); + worker = new BPWorker(nthreads++,gid, wid); else{ // TODO add CDWorker } @@ -267,13 +268,13 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, int nthreads=1; // create workers vector<int> slices; - vector<shared_ptr<Worker>> workers=CreateWorkers(nthreads, mproto, &slices); + vector<Worker*> workers=CreateWorkers(nthreads, mproto, &slices); if(cluster->nserver_groups()&&cluster->nservers_per_group()) slice2server_=PartitionSlice(cluster->nservers_per_group(), slices); nthreads+=workers.size(); // create servers vector<HandleContext*> ctx; - vector<shared_ptr<Server>> servers=CreateServers(nthreads, mproto, slices, + vector<Server*> servers=CreateServers(nthreads, mproto, slices, &ctx); #ifdef USE_MPI @@ -283,14 +284,18 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, #endif vector<std::thread> threads; for(auto server: servers) - threads.push_back(std::thread(&Server::Run,server.get())); + threads.push_back(std::thread(&Server::Run,server)); for(auto worker: workers) - threads.push_back(std::thread(&Worker::Run,worker.get())); + threads.push_back(std::thread(&Worker::Run,worker)); Run(workers, servers); for(auto& thread: threads) thread.join(); for(auto x: ctx) delete x; + for(auto x : servers) + delete x; + for(auto x : workers) + delete x; } inline int bandwidth(int bytes, system_clock::time_point start){ @@ -299,8 +304,8 @@ inline int bandwidth(int bytes, system_clock::time_point start){ return static_cast<int>(bytes*1000.f/duration.count()); } -void Trainer::Run(const vector<shared_ptr<Worker>>& workers, - const vector<shared_ptr<Server>>& servers){ +void Trainer::Run(const vector<Worker*>& workers, + const vector<Server*>& servers){ auto cluster=Cluster::Get(); procs_id_=cluster->procs_id(); LOG(INFO)<<"Stub in process "<<procs_id_<<" starts"; @@ -364,8 +369,8 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers, string prefix((char*)msg->frame_data(), msg->frame_size()); msg->next_frame(); Metric cur; - cur.ParseString(string((char*)msg->frame_data(), msg->frame_size())); - LOG(ERROR)<<prefix<<" step-" <<step<<", "<<cur.ToString(); + cur.ParseFrom(string((char*)msg->frame_data(), msg->frame_size())); + LOG(ERROR)<<prefix<<" step-" <<step<<", "<<cur.ToLogString(); } DeleteMsg(&msg); }else if(cluster->nserver_groups()>0){ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index a92ba2c..80a6283 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -8,8 +8,10 @@ #include "utils/factory.h" #include "trainer/worker.h" #include "proto/model.pb.h" -using std::thread; namespace singa { +using std::thread; +using std::make_shared; + Worker::Worker(int thread_id, int group_id, int worker_id): thread_id_(thread_id), group_id_(group_id), worker_id_(worker_id){ } @@ -52,7 +54,7 @@ void Worker::Run(){ dealer_=make_shared<Dealer>(2*thread_id_); ConnectStub(dealer_, kWorkerParam); for(auto layer: train_net_->layers()) - if(layer->partitionid()==worker_id_) + if(layer->partition_id()==worker_id_) if(layer->is_bridgedstlayer()||layer->is_bridgesrclayer()){ layer_dealer_=make_shared<Dealer>(2*thread_id_+1); ConnectStub(layer_dealer_, kWorkerLayer); @@ -61,7 +63,7 @@ void Worker::Run(){ step_=modelproto_.step(); // init params for(auto layer: train_net_->layers()){ - if(layer->partitionid()==worker_id_) + if(layer->partition_id()==worker_id_) for(auto param: layer->GetParams()){ // only owners fill the memory of parameter values. // others share the memory with owners hence do not need to put/get. @@ -79,7 +81,7 @@ void Worker::Run(){ for(step_=0;step_<modelproto_.warmup_steps();step_++) RunOneBatch(step_, &perf); for(auto layer: train_net_->layers()){ - if(layer->partitionid()==worker_id_) + if(layer->partition_id()==worker_id_) for(auto param: layer->GetParams()) if(param->owner()==param->id()) Put(param, step_); @@ -107,7 +109,7 @@ void Worker::Stop(){ msg->set_type(kStop); dealer_->Send(&msg); // use param dealer to send the stop msg } -int Worker::Put(shared_ptr<Param> param, int step){ +int Worker::Put(Param* param, int step){ Msg* msg=new Msg(); msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1, -1, kStub); @@ -116,7 +118,7 @@ int Worker::Put(shared_ptr<Param> param, int step){ dealer_->Send(&msg); return 1; } -int Worker::Get(shared_ptr<Param> param, int step){ +int Worker::Get(Param* param, int step){ Msg* msg=new Msg(); msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1, -1, kStub); @@ -125,7 +127,7 @@ int Worker::Get(shared_ptr<Param> param, int step){ dealer_->Send(&msg); return 1; } -int Worker::Update(shared_ptr<Param> param, int step){ +int Worker::Update(Param* param, int step){ param->set_local_version(param->version()); if(updater_){ updater_->Update(step, param); @@ -144,30 +146,29 @@ int Worker::Update(shared_ptr<Param> param, int step){ int Worker::CollectAll(shared_ptr<NeuralNet> net, int step){ auto& layers=net->layers(); for(auto& layer: layers){ - if(layer->partitionid()==worker_id_) - for(shared_ptr<Param> p: layer->GetParams()){ + if(layer->partition_id()==worker_id_) + for(Param* p: layer->GetParams()){ Collect(p, step); } } return 1; } -int Worker::Collect(shared_ptr<Param> param, int step){ +int Worker::Collect(Param* param, int step){ while(param->version()<=param->local_version()){ std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime)); } return 1; } -const void Worker::DisplayPerformance(const Metric & perf, const string& prefix){ +void Worker::DisplayPerformance(const string& prefix, const Metric & perf) { Msg* msg=new Msg(); msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1,-1, kStub); msg->set_type(kMetric); msg->set_trgt(step_,0,0); - const string disp=perf.ToString(); msg->add_frame(prefix.c_str(), prefix.length()); + const string disp = perf.ToString(); msg->add_frame(disp.c_str(), disp.length()); dealer_->Send(&msg); - //LOG(ERROR)<<prefix<<" "<<perf.ToString(); } void Worker::RunOneBatch(int step, Metric* perf){ @@ -184,10 +185,8 @@ void Worker::RunOneBatch(int step, Metric* perf){ TrainOneBatch(step, perf); //LOG(ERROR)<<"Train "<<step; if(perf!=nullptr){ - perf->Inc(); if(DisplayNow(step)){ - //perf->Avg(); - DisplayPerformance(*perf, "Train"); + DisplayPerformance("Train", *perf); perf->Reset(); } } @@ -208,13 +207,12 @@ void Worker::Test(int nsteps, Phase phase, shared_ptr<NeuralNet> net){ Metric perf; for(int step=0;step<nsteps;step++){ TestOneBatch(step, phase, net, &perf); - perf.Inc(); } //perf.Avg(); if(phase==kValidation) - DisplayPerformance(perf, "Validation"); + DisplayPerformance("Validation", perf); else if (phase==kTest) - DisplayPerformance(perf, "Test"); + DisplayPerformance("Test", perf); } /****************************BPWorker**********************************/ @@ -223,19 +221,20 @@ BPWorker::BPWorker(int thread_id, int group_id, int worker_id): Worker(thread_id, group_id, worker_id){ } -void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){ +void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net, + Metric* perf){ auto& layers=net->layers(); for(auto& layer: layers){ - if(layer->partitionid()==worker_id_){ + if(layer->partition_id()==worker_id_){ if(layer->is_bridgedstlayer()){ - auto* dst=static_cast<BridgeDstLayer*>(layer.get()); + auto* dst=static_cast<BridgeDstLayer*>(layer); while(!dst->ready()){ auto msg=layer_dealer_->Receive(); CHECK_EQ(msg->src_first(), group_id_); string name((char*)msg->frame_data(), msg->frame_size()); auto tmp=net->name2layer(name); CHECK(tmp->is_bridgedstlayer()); - auto* dstlayer=static_cast<BridgeDstLayer*>(tmp.get()); + auto* dstlayer=static_cast<BridgeDstLayer*>(tmp); auto data=dstlayer->mutable_data(nullptr); msg->next_frame(); memcpy(data->mutable_cpu_data(), msg->frame_data(), msg->frame_size()); @@ -244,28 +243,25 @@ void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){ } } if(phase==kTrain){ - for(shared_ptr<Param> p: layer->GetParams()){ + for(Param* p: layer->GetParams()){ Collect(p, step); } } //clock_t s=clock(); - layer->ComputeFeature(phase); + layer->ComputeFeature(phase, perf); //LOG(ERROR)<<layer->name()<<":"<<(clock()-s)*1.0/CLOCKS_PER_SEC; if(layer->is_bridgesrclayer()){ auto dst=layer->dstlayers().at(0); Msg *msg=new Msg(); msg->set_src(group_id_, worker_id_, kWorkerLayer); - msg->set_dst(group_id_, dst->partitionid(), kWorkerLayer); + msg->set_dst(group_id_, dst->partition_id(), kWorkerLayer); msg->add_frame(dst->name().c_str(), dst->name().length()); auto const & blob=layer->data(nullptr); msg->add_frame(blob.cpu_data(), blob.count()*sizeof(float)); layer_dealer_->Send(&msg); } - if(phase==kTrain&&DisplayDebugInfo(step) - &&layer->mutable_data(nullptr)!=nullptr){ - LOG(INFO)<<StringPrintf("Forward layer %10s data norm1 %13.9f", - layer->name().c_str(), layer->data(nullptr).asum_data()); - } + if(phase == kTrain && DisplayDebugInfo(step)) + LOG(INFO) << layer->DebugString(step, kForward); } } } @@ -273,25 +269,17 @@ void BPWorker::Forward(int step, Phase phase, shared_ptr<NeuralNet> net){ void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){ auto& layers=net->layers(); for (auto it = layers.rbegin(); it != layers.rend(); it++){ - shared_ptr<Layer> layer=*it; - if(layer->partitionid()==worker_id_){ + Layer* layer=*it; + if(layer->partition_id()==worker_id_){ if(layer->is_bridgesrclayer()){ //auto* src=static_cast<BridgeSrcLayer*>(layer.get()); // receive grad blobs } - layer->ComputeGradient(); - if(layer->mutable_grad(nullptr)!=nullptr&&DisplayDebugInfo(step)){ - LOG(INFO)<<StringPrintf("Backward layer %10s grad norm1 %13.9f\t", - layer->name().c_str(), layer->grad(nullptr).asum_data()); - for(shared_ptr<Param> p: layer->GetParams()) - LOG(INFO)<<StringPrintf("param id %2d, name %10s,\ - value norm1 %13.9f, grad norm1 %13.9f", - p->id(), p->name().c_str(), - p->data().asum_data(), p->grad().asum_data()); - } - for(shared_ptr<Param> p: layer->GetParams()){ + layer->ComputeGradient(kTrain); + if(DisplayDebugInfo(step)) + LOG(INFO) << layer->DebugString(step, kBackward); + for(Param* p: layer->GetParams()) Update(p, step); - } if(layer->is_bridgedstlayer()){ // send grad blobs } @@ -300,38 +288,14 @@ void BPWorker::Backward(int step, shared_ptr<NeuralNet> net){ } void BPWorker::TrainOneBatch(int step, Metric* perf){ - Forward(step, kTrain, train_net_); + Forward(step, kTrain, train_net_, perf); Backward(step, train_net_); auto losslayers=train_net_->losslayers(); - for(auto layer: losslayers){ - if(layer->partitionid()==worker_id_){ - const float * ptr=layer->metric().cpu_data(); - /* - for(int j=0;j<layer->metric().count();j++) - perf->AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]); - */ - // hard code display info - perf->AddMetric(std::to_string(0)+"#loss", ptr[0]); - perf->AddMetric(std::to_string(1)+"#accuracy", ptr[1]); - } - } } -void BPWorker::TestOneBatch(int step, Phase phase, shared_ptr<NeuralNet> net, Metric* perf){ - Forward(step, phase, net); - const auto& losslayers=net->losslayers(); - for(auto layer: losslayers){ - if(layer->partitionid()==worker_id_){ - const float * ptr=layer->metric().cpu_data(); - /* - for(int j=0;j<layer->metric().count();j++) - perf.AddMetric(std::to_string(j)+"#"+layer->name(), ptr[j]); - */ - // hard code display info - perf->AddMetric(std::to_string(0)+"#loss", ptr[0]); - perf->AddMetric(std::to_string(1)+"#accuracy", ptr[1]); - } - } +void BPWorker::TestOneBatch(int step, Phase phase, + shared_ptr<NeuralNet> net, Metric* perf){ + Forward(step, phase, net, perf); } } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9a6e09fa/src/utils/common.cc ---------------------------------------------------------------------- diff --git a/src/utils/common.cc b/src/utils/common.cc index 67b4486..11a19f8 100644 --- a/src/utils/common.cc +++ b/src/utils/common.cc @@ -160,4 +160,52 @@ void SetupLog(const std::string& log_dir, const std::string& model) { google::SetLogDestination(google::FATAL, fatal.c_str()); } +void Metric::Add(const string& name, float value) { + if(entry_.find(name) == entry_.end()) + entry_[name] = std::make_pair(1, value); + else{ + auto& e = entry_.at(name); + e.first += 1; + e.second += value; + } +} + +void Metric::Reset() { + for(auto e : entry_) { + e.second.first = 0; + e.second.second = 0; + } +} +const string Metric::ToLogString() const{ + string ret; + size_t k = 0; + for(auto e : entry_) { + ret += e.first + " : " ; + ret += std::to_string(e.second.second / e.second.first); + if(++k < entry_.size()) + ret += ", "; + } + return ret; +} + +const string Metric::ToString() const{ + MetricProto proto; + for(auto e : entry_) { + proto.add_name(e.first); + proto.add_count(e.second.first); + proto.add_val(e.second.second); + } + string ret; + proto.SerializeToString(&ret); + return ret; +} + +void Metric::ParseFrom(const string& msg) { + MetricProto proto; + proto.ParseFromString(msg); + Reset(); + for(int i = 0; i < proto.name_size(); i++) { + entry_[proto.name(i)] = std::make_pair(proto.count(i), proto.val(i)); + } +} } // namespace singa
