SINGA-41:Support single node single GPU training Rebase to version 0.1.0. Tested with cifar example. -G compliation option degrades the performance. Speed is similar to CConvolution+CPooling on CPU. Need to optimized the speed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/9dbdfd68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/9dbdfd68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/9dbdfd68 Branch: refs/heads/gpu Commit: 9dbdfd68695e8d3a30cafc14d941a36ea0bf55d6 Parents: 1770377 Author: Wei Wang <[email protected]> Authored: Tue Sep 29 11:18:33 2015 +0800 Committer: Wei Wang <[email protected]> Committed: Tue Sep 29 11:23:21 2015 +0800 ---------------------------------------------------------------------- examples/cifar10/job.conf | 12 +- include/utils/blob.h | 14 +- src/neuralnet/connection_layer.cc | 6 +- src/neuralnet/neuron_layer.cu | 203 ++++++------- src/trainer/server.cc | 256 ---------------- src/trainer/trainer.cc | 521 --------------------------------- src/utils/blob.cc | 5 +- 7 files changed, 120 insertions(+), 897 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/examples/cifar10/job.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index 343d969..91688a4 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -57,7 +57,7 @@ neuralnet { layer { name: "conv1" - type: kCConvolution + type: kConvolution srclayers: "rgb" convolution_conf { num_filters: 32 @@ -84,7 +84,7 @@ neuralnet { layer { name: "pool1" - type: kCPooling + type: kPooling srclayers: "conv1" pooling_conf { pool: MAX @@ -109,7 +109,7 @@ neuralnet { } layer { name: "conv2" - type: kCConvolution + type: kConvolution srclayers: "norm1" convolution_conf { num_filters: 32 @@ -140,7 +140,7 @@ neuralnet { } layer { name: "pool2" - type: kCPooling + type: kPooling srclayers: "relu2" pooling_conf { pool: AVG @@ -160,7 +160,7 @@ neuralnet { } layer { name: "conv3" - type: kCConvolution + type: kConvolution srclayers: "norm2" convolution_conf { num_filters: 64 @@ -190,7 +190,7 @@ neuralnet { } layer { name: "pool3" - type: kCPooling + type: kPooling srclayers: "relu3" pooling_conf { pool: AVG http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/include/utils/blob.h ---------------------------------------------------------------------- diff --git a/include/utils/blob.h b/include/utils/blob.h index 903845d..754abbe 100644 --- a/include/utils/blob.h +++ b/include/utils/blob.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -184,11 +184,11 @@ class Blob { } inline Dtype* mutable_xpu_data() { CHECK(data_); - #ifndef CPU_ONLY - return static_cast<Dtype*>(data_->mutable_gpu_data()); - #else - return static_cast<Dtype*>(data_->mutable_cpu_data()); - #endif + #ifndef CPU_ONLY + return static_cast<Dtype*>(data_->mutable_gpu_data()); + #else + return static_cast<Dtype*>(data_->mutable_cpu_data()); + #endif } /// @brief Compute the sum of absolute values (L1 norm) of the data. Dtype asum_data() const; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/connection_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer.cc b/src/neuralnet/connection_layer.cc index 750a511..acf243d 100644 --- a/src/neuralnet/connection_layer.cc +++ b/src/neuralnet/connection_layer.cc @@ -27,9 +27,9 @@ using std::vector; /********* Implementation for BridgeDstLayer **************/ void BridgeDstLayer::Setup(const LayerProto& proto, const vector<Layer*>& srclayers) { - Layer::Setup(proto, npartitions); - CHECK_EQ(srclayers_.size(), 1); - data_.Reshape(srclayers_[0]->data(this).shape()); + Layer::Setup(proto, srclayers); + CHECK_EQ(srclayers.size(), 1); + data_.Reshape(srclayers[0]->data(this).shape()); grad_.ReshapeLike(data_); } /************* Implementation for ConcateLayer ***********/ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/neuralnet/neuron_layer.cu ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuron_layer.cu b/src/neuralnet/neuron_layer.cu index affb02f..a93ceb9 100644 --- a/src/neuralnet/neuron_layer.cu +++ b/src/neuralnet/neuron_layer.cu @@ -23,118 +23,120 @@ #include <glog/logging.h> #include <algorithm> +#include <string> +#include <vector> #include "utils/singleton.h" #include "mshadow/tensor.h" #include "mshadow/cxxnet_op.h" namespace singa { - using namespace mshadow; - using namespace mshadow::expr; - using mshadow::cpu; - using mshadow::xpu; - - using mshadow::Shape; - using mshadow::Shape1; - using mshadow::Shape2; - using mshadow::Shape3; - using mshadow::Shape4; - using mshadow::Tensor; - - using std::string; - using std::vector; - - inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) { - const vector<int>& shape = blob->shape(); - Tensor<cpu, 4> tensor(blob->mutable_cpu_data(), - Shape4(shape[0], shape[1], shape[2], shape[3])); - return tensor; - } +using namespace mshadow; +using namespace mshadow::expr; +using mshadow::cpu; +using mshadow::xpu; - inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) { - const vector<int>& shape = blob->shape(); - Tensor<cpu, 3> tensor(blob->mutable_cpu_data(), - Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); - return tensor; - } +using mshadow::Shape; +using mshadow::Shape1; +using mshadow::Shape2; +using mshadow::Shape3; +using mshadow::Shape4; +using mshadow::Tensor; - inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) { - const vector<int>& shape = blob->shape(); - Tensor<cpu, 2> tensor(blob->mutable_cpu_data(), - Shape2(shape[0], blob->count() / shape[0])); - return tensor; - } +using std::string; +using std::vector; - inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) { - Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count())); - return tensor; - } +inline Tensor<cpu, 4> Tensor4CPU(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<cpu, 4> tensor(blob->mutable_cpu_data(), + Shape4(shape[0], shape[1], shape[2], shape[3])); + return tensor; +} - inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) { - const vector<int>& shape = blob->shape(); - Tensor<xpu, 4> tensor(blob->mutable_xpu_data(), - Shape4(shape[0], shape[1], shape[2], shape[3])); - return tensor; - } +inline Tensor<cpu, 3> Tensor3CPU(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<cpu, 3> tensor(blob->mutable_cpu_data(), + Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); + return tensor; +} - inline Tensor<xpu, 3> Tensor3(Blob<float>* blob){ - const vector<int>& shape = blob->shape(); - Tensor<xpu, 3> tensor(blob->mutable_xpu_data(), - Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); - return tensor; - } - inline Tensor<xpu, 2> Tensor2(Blob<float>* blob){ - const vector<int>& shape = blob->shape(); - Tensor<xpu, 2> tensor(blob->mutable_xpu_data(), - Shape2(shape[0], blob->count() / shape[0])); - return tensor; - } - inline Tensor<xpu, 1> Tensor1(Blob<float>* blob){ - Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count())); - return tensor; - } +inline Tensor<cpu, 2> Tensor2CPU(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<cpu, 2> tensor(blob->mutable_cpu_data(), + Shape2(shape[0], blob->count() / shape[0])); + return tensor; +} - /************ Implementation for ConvolutionLayer*************************/ - ConvolutionLayer::~ConvolutionLayer() { - delete weight_; - delete bias_; - } - void ConvolutionLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - CHECK_EQ(srclayers.size(), 1); - Layer::Setup(conf, srclayers); - ConvolutionProto conv_conf = conf.convolution_conf(); - kernel_ = conv_conf.kernel(); - CHECK_GT(kernel_, 0) << "Filter size cannot be zero."; - pad_ = conv_conf.pad(); - stride_ = conv_conf.stride(); - num_filters_ = conv_conf.num_filters(); - if (partition_dim() > 0) - num_filters_ /= srclayers.at(0)->num_partitions(); - const vector<int>& srcshape = srclayers[0]->data(this).shape(); - int dim = srcshape.size(); - CHECK_GT(dim, 2); - width_ = srcshape[dim - 1]; - height_ = srcshape[dim - 2]; - if (dim > 3) - channels_ = srcshape[dim - 3]; - else if (dim > 2) - channels_ = 1; - batchsize_ = srcshape[0]; - conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1; - conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1; - col_height_ = channels_ * kernel_ * kernel_; - col_width_ = conv_height_ * conv_width_; - vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_}; - data_.Reshape(shape); - grad_.Reshape(shape); - col_data_.Reshape(vector<int>{col_height_, col_width_}); - col_grad_.Reshape(vector<int>{col_height_, col_width_}); - weight_ = Param::Create(conf.param(0)); - bias_ = Param::Create(conf.param(1)); - weight_->Setup(vector<int>{num_filters_, col_height_}); - bias_->Setup(vector<int>{num_filters_}); - } +inline Tensor<cpu, 1> Tensor1CPU(Blob<float>* blob) { + Tensor<cpu, 1> tensor(blob->mutable_cpu_data(), Shape1(blob->count())); + return tensor; +} + +inline Tensor<xpu, 4> Tensor4(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<xpu, 4> tensor(blob->mutable_xpu_data(), + Shape4(shape[0], shape[1], shape[2], shape[3])); + return tensor; +} + +inline Tensor<xpu, 3> Tensor3(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<xpu, 3> tensor(blob->mutable_xpu_data(), + Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); + return tensor; +} +inline Tensor<xpu, 2> Tensor2(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<xpu, 2> tensor(blob->mutable_xpu_data(), + Shape2(shape[0], blob->count() / shape[0])); + return tensor; +} +inline Tensor<xpu, 1> Tensor1(Blob<float>* blob) { + Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count())); + return tensor; +} + +/************ Implementation for ConvolutionLayer*************************/ +ConvolutionLayer::~ConvolutionLayer() { + delete weight_; + delete bias_; +} +void ConvolutionLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + CHECK_EQ(srclayers.size(), 1); + Layer::Setup(conf, srclayers); + ConvolutionProto conv_conf = conf.convolution_conf(); + kernel_ = conv_conf.kernel(); + CHECK_GT(kernel_, 0) << "Filter size cannot be zero."; + pad_ = conv_conf.pad(); + stride_ = conv_conf.stride(); + num_filters_ = conv_conf.num_filters(); + if (partition_dim() > 0) + num_filters_ /= srclayers.at(0)->num_partitions(); + const vector<int>& srcshape = srclayers[0]->data(this).shape(); + int dim = srcshape.size(); + CHECK_GT(dim, 2); + width_ = srcshape[dim - 1]; + height_ = srcshape[dim - 2]; + if (dim > 3) + channels_ = srcshape[dim - 3]; + else if (dim > 2) + channels_ = 1; + batchsize_ = srcshape[0]; + conv_height_ = (height_ + 2 * pad_ - kernel_) / stride_ + 1; + conv_width_ = (width_ + 2 * pad_ - kernel_) / stride_ + 1; + col_height_ = channels_ * kernel_ * kernel_; + col_width_ = conv_height_ * conv_width_; + vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_}; + data_.Reshape(shape); + grad_.Reshape(shape); + col_data_.Reshape(vector<int>{col_height_, col_width_}); + col_grad_.Reshape(vector<int>{col_height_, col_width_}); + weight_ = Param::Create(conf.param(0)); + bias_ = Param::Create(conf.param(1)); + weight_->Setup(vector<int>{num_filters_, col_height_}); + bias_->Setup(vector<int>{num_filters_}); +} void ConvolutionLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { @@ -184,7 +186,6 @@ void ConvolutionLayer::ComputeGradient(int flag, imgshp); } } - // weight_->mutable_data()->mutable_cpu_data(); } /******************* Implementation for CConvolutionLayer *********/ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc deleted file mode 100644 index f5a0560..0000000 --- a/src/trainer/server.cc +++ /dev/null @@ -1,256 +0,0 @@ -#include <thread> -#include <chrono> -#include "mshadow/tensor.h" -#include "trainer/server.h" -#include "utils/param.h" -#include "utils/singleton.h" -#include "utils/factory.h" -#include "utils/cluster.h" -#include "proto/common.pb.h" - -namespace singa { - -using namespace mshadow; -using std::vector; - -Server::Server(int thread_id,int group_id, int server_id): - thread_id_(thread_id),grp_id_(group_id), id_(server_id){ -} - -void Server::Setup(const UpdaterProto& proto, - std::unordered_map<int, ParamEntry*>* shard, - const vector<int>& slice2group) { - updater_ = Updater::Create(proto); - shard_ = shard; - slice2group_ = slice2group; -} - -Server::~Server() { - delete updater_; -} - -void Stop(void * running) { - *static_cast<bool *>(running) = false; -} - -void Server::Run() { - LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start"; - auto dealer = new Dealer(2*thread_id_); - CHECK(dealer->Connect(kInprocRouterEndpoint)); - Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); - ping->set_type(kConnect); - dealer->Send(&ping); - - auto cluster = Cluster::Get(); - bool running = true; - CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running)); - - int nserver_grps = cluster->nserver_groups(); - vector<Param*> master_params; - size_t syncEntry=0; - Poller poll(dealer); - // start recv loop and process requests - while (running) { - auto *sock = poll.Wait(cluster->poll_time()); - if (poll.Terminated()) { - LOG(ERROR) << "Connection broken!"; - exit(0); - } else if (sock == nullptr) { - continue; - } - Msg* msg=dealer->Receive(); - if (msg==nullptr) break; - Msg* response=nullptr; - int type=msg->type(); - int slice_id = SliceID(msg->trgt_val()); - if (type == kPut) { - response = HandlePut(&msg); - if(slice2group_[slice_id] == grp_id_) - master_params.push_back(shard_->at(slice_id)->shares.at(0)); - } else { - if (shard_->find(slice_id) == shard_->end()) { - // delay the processing by re-queue the msg. - response = msg; - } else if (type == kSyncReminder) { - DeleteMsg(&msg); - if(syncEntry >= master_params.size()) - continue; - auto param = master_params.at(syncEntry); - // control the frequency of synchronization - // currently sync is triggerred only when the slice is updated - // by local worker or other workers for at least nserver_groups times. - // TODO may optimize the trigger condition. - if (abs(param->local_version() - param->version()) >= nserver_grps) { - for (auto msg : GenSyncMsgs(param)) - dealer->Send(&msg); - syncEntry = (syncEntry+1) % master_params.size(); - } - } else { - switch (type) { - case kGet: - response = HandleGet(&msg); - break; - case kUpdate: - for (auto reply : HandleUpdate(&msg)) - dealer->Send(&reply); - break; - case kSyncRequest: - response = HandleSyncRequest(&msg); - break; - default: - LOG(ERROR)<<"Unknown message type "<<type; - break; - } - } - } - if (response != nullptr) - dealer->Send(&response); - } - - // send stop msg to stub - Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); - msg->set_type(kStop); - dealer->Send(&msg); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops"; - delete dealer; -} - -const vector<Msg*> Server::GenSyncMsgs(Param* param) { - vector<Msg*> ret; - // TODO replace the argument (0,0) to sync a chunk instead of a slice - auto msg = param->GenSyncMsg(0, 0); - auto cluster = Cluster::Get(); - for (int i = 0; i < cluster->nserver_groups(); i++) { - if (i != grp_id_) { - Msg* tmp = msg; - if (i < cluster->nserver_groups() - 1) - tmp = new Msg(*msg); - // assume only one server per group, TODO generalize it - tmp->set_dst(Addr(i, 0, kServer)); - tmp->set_src(Addr(grp_id_, id_, kServer)); - ret.push_back(tmp); - param->set_version(param->local_version()); - //LOG(ERROR)<<"sync slice="<<param->id()<<" to procs "<<i; - } - } - return ret; -} - -Msg* Server::HandlePut(Msg **msg) { - int version = (*msg)->trgt_version(); - int slice_id = SliceID((*msg)->trgt_val()); - if (shard_->find(slice_id) != shard_->end()) - LOG(FATAL) << "Param (" << slice_id << ") is put more than once"; - - // TODO(wangwei) replace hard coded param type 0 - auto param = Singleton<Factory<Param>>::Instance()->Create(0); - auto response = param->HandlePutMsg(msg, true); - // parse num of shares of this param from a worker group - int num_shares = 1; - if ((*msg)->NextFrame()) - (*msg)->ParseFormatFrame("i", &num_shares); - DeleteMsg(msg); - (*shard_)[slice_id] = new ParamEntry(num_shares, param); - // must set version after HandlePutMsg which allocates the memory - param->set_version(version); - param->set_local_version(version); - param->set_id(slice_id); - //LOG(ERROR)<<"put norm "<<param->data().asum_data()<<", "<<pid; - // allocate blob for param sync between groups. - if (Cluster::Get()->nserver_groups() > 1 && slice2group_[slice_id] != grp_id_) { - last_data_[slice_id] = std::make_shared<Blob<float>>(); - last_data_[slice_id]->ReshapeLike(param->data()); - last_data_[slice_id]->CopyFrom(param->data()); - } - LOG(INFO)<<"server (group = " << grp_id_ << ", id = " << id_ <<") put slice=" - << slice_id << " size=" << param->size(); - return response; -} - -Msg* Server::HandleGet(Msg **msg) { - int val = (*msg)->trgt_val(); - auto param = shard_->at(SliceID(val))->shares.at(0); - // re-queue the request if the param is not updated to the required version - if(param->version()<(*msg)->trgt_version()) - return *msg; - else { - // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first(); - auto reply = param->HandleGetMsg(msg, false); - reply->set_trgt(val, param->version()); - return reply; - } -} - -const vector<Msg*> Server::HandleUpdate(Msg **msg) { - vector<Msg*> ret; - int sliceid = SliceID((*msg)->trgt_val()); - auto entry = shard_->at(sliceid); - buffer_requests_[sliceid].push_back(*msg); - int num_update; - (*msg)->LastFrame(); - (*msg)->ParseFormatFrame("i", &num_update); - (*msg)->FirstFrame(); - entry->num_update += num_update; - // LOG(ERROR) << "update "<<sliceid<< " from "<<(*msg)->src_second() - // << ", " << num_update << " total " << entry->num_total; - // do update until recv gradients from all shares of this param/slice - if (entry->num_update >= entry->num_total) { - CHECK_EQ(entry->num_update, entry->num_total); - auto& request = buffer_requests_.at(sliceid); - int step = (*msg)->trgt_version(); - auto param = entry->shares.at(0); - // extract and aggregate gradients - param->ParseUpdateMsgs(request); - updater_->Update(step, param, 1.0f / entry->num_total); - param->set_local_version(param->local_version() + 1); - // response to all shares of this param - for (auto response : param->GenUpdateResponseMsgs(&request, false)) { - response->set_trgt((*msg)->trgt_val(), param->local_version()); - ret.push_back(response); - } - entry->num_update = 0; - } - *msg = nullptr; - return ret; -} - -Msg* Server::HandleSyncRequest(Msg **msg) { - Msg* msgg = *msg; - int slice = SliceID(msgg->trgt_val()); - auto param = shard_->at(slice)->shares.at(0); - Msg* response=nullptr; - auto shape=Shape1(param->size()); - CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float)); - Tensor<cpu, 1> tmp(static_cast<float*>(msgg->FrameData()), shape); - Tensor<cpu, 1> cur(param->mutable_data()->mutable_cpu_data(), shape); - //LOG(ERROR)<<"Recv sync for "<<param->id(); - if (slice2group_[slice] == grp_id_) { - // recv sync msg on slice I am mastering - cur+=tmp; - param->set_local_version(param->local_version()+1); - } else { // recv sync msg on slice mastered by others - TensorContainer<cpu, 1> diff(shape); - Tensor<cpu, 1> prev(last_data_[param->id()]->mutable_cpu_data(), shape); - diff=cur-prev; - msgg->NextFrame(); - int bandwidth; - msgg->ParseFormatFrame("i", &bandwidth); - if (bandwidth > 0) { - // send back my updates to the server group mastering this param - response=new Msg(msgg->dst(), msgg->src()); - response->set_type(kSyncRequest); - response->set_trgt(param->id(), param->version()); - response->AddFrame(diff.dptr, param->size()*sizeof(float)); - prev=diff+tmp; - Copy(cur, prev); - } else { // no bandwidth, aggregate my updates for next sync - Copy(prev, tmp); - cur=tmp+diff; - } - } - DeleteMsg(msg); - return response; -} -} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc deleted file mode 100644 index c62e0d1..0000000 --- a/src/trainer/trainer.cc +++ /dev/null @@ -1,521 +0,0 @@ -#include <thread> -#include <vector> -#include <map> -#include <chrono> -#include <glog/logging.h> -#include "utils/tinydir.h" -#include <unistd.h> -#include "utils/cluster.h" -#include "utils/common.h" -#include "proto/common.pb.h" -#include "trainer/trainer.h" - - -namespace singa { -using std::vector; -using std::map; -using std::queue; -using namespace std::chrono; -using std::make_shared; - -/***********************Trainer****************************/ -Trainer::~Trainer() { - // free Params (i.e., slices) in server shard - for (auto entry : server_shard_) - for (auto param : entry.second->shares) - delete param; - delete router_; -} - -const vector<int> SliceParams(const vector<Param*>& params) { - // for load-balance among servers in a group and among server groups - int nserver_grps = Cluster::Get()->nserver_groups(); - int nservers_per_grp = Cluster::Get()->nservers_per_group(); - int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); - - // collect sizes of unique Params - std::vector<int> paramsize; - for (auto param : params) - if (param->id() == param->owner()) - paramsize.push_back(param->size()); - // slice into lcm pieces to achieve good load-balance for both intra-group - // partition (among servers in a group) and inter-group partition (each group - // is assgined a sub-set of slices) - auto param_slice = Slice(lcm, paramsize); - // construct map from Param ID to its slices <slice id, len> - std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; - vector<int> slices; - auto it = param_slice.begin(); - int slice_id = 0; - for (auto param : params) { - if (param->id() == param->owner()) { - for (int len : *it) { - slices.push_back(len); - paramid2slices[param->id()].push_back(std::make_pair(slice_id++, len)); - } - it++; - } - } - // add slice info for every Param - for (auto param : params) - for (auto entry : paramid2slices[param->owner()]) { - param->AddSlice(entry.first, entry.second); - LOG(INFO) << "param id " << param->id() << " owner=" << param->owner() - << ": " << entry.first << ", " << entry.second; - } - return slices; -} - -void Trainer::SetupWorkerServer( - const JobProto& job_conf, - const vector<Worker*>& workers, - const vector<Server*>& servers) { - auto cluster = Cluster::Get(); - int grp_size = cluster->nworkers_per_group(); - const auto& net_conf = job_conf.neuralnet(); - auto net = NeuralNet::Create(net_conf, kTrain, grp_size); - // MUST do SliceParam before share param/net with others - auto slices = SliceParams(net->params()); - - std::unordered_map<int, shared_ptr<NeuralNet>> grp_net; - int first_grp = workers.size() ? workers.at(0)->grp_id() : -1; - for (auto worker : workers) { - int grp_id = worker->grp_id(); - int worker_id = worker->id(); - shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr; - if (grp_net.find(grp_id) == grp_net.end()) { - if (grp_id == first_grp) { - // test are performed only by the first group now. TODO update. - if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) { - test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp - test_net->ShareParamsFrom(net); - } - // validation are performed only by the first group. TODO update. - if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) { - valid_net = NeuralNet::Create(net_conf, kValidation, 1); - valid_net->ShareParamsFrom(net); - } - grp_net[grp_id] = net; - } else { - grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size); - if(cluster->share_memory()) - grp_net[grp_id]->ShareParamsFrom(net); - } - for (auto layer : grp_net[grp_id]->layers()) { - bool local = layer->partition_id() >= workers.front()->id() - && layer->partition_id() <= workers.back()->id(); - for (auto param : layer->GetParams()) { - int hash = Hash(grp_id, param->owner()); - if (worker_shard_.find(hash) == worker_shard_.end()) - worker_shard_[hash] = new ParamEntry(); - worker_shard_[hash]->AddParam(local, param); - } - } - } - LOG(INFO) << "grp " << worker->grp_id() << ", worker " - << worker->id() << " net " << grp_net[grp_id].get(); - worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net); - } - - // partition among server groups, each group maintains one sub-set for sync - auto slice2group = PartitionSlices(cluster->nserver_groups(), slices); - for (auto server : servers) - server->Setup(job_conf.updater(), &server_shard_, slice2group); - // partition within one server group, each server updates for one sub-set - slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices); -} - -vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) { - auto cluster = Cluster::Get(); - vector<Server*> servers; - if (!cluster->has_server()) - return servers; - - int pid = cluster->procs_id(); - // if true, server procs (logical) id starts after worker procs - if (cluster->server_worker_separate()) - pid -= cluster->nworker_procs(); - int procs_size = cluster->nservers_per_procs(); - int grp_size = cluster->nservers_per_group(); - int gid = pid * procs_size / grp_size; - int start = pid * procs_size % grp_size; - int end = start + procs_size; - for (int sid = start; sid < end; sid++) { - auto server = new Server(nthreads++, gid, sid); - servers.push_back(server); - } - return servers; -} - -vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) { - auto cluster=Cluster::Get(); - vector<Worker*> workers; - if(!cluster->has_worker()) - return workers; - int pid = cluster->procs_id(); - int grp_size = cluster->nworkers_per_group(); - int procs_size = cluster->nworkers_per_procs(); - int gstart, gend, wstart, wend; - if (grp_size >= procs_size) { - // all workers in this procs are from the same group - gstart = pid * procs_size / grp_size; - gend = gstart + 1; - wstart = pid * procs_size % grp_size; - wend = wstart + procs_size; - } else { - // there are multiple (complete) groups in this procs. - CHECK_EQ(procs_size % grp_size, 0); - int groups_per_procs = procs_size / grp_size; - gstart = pid * groups_per_procs; - gend = (pid+1) * groups_per_procs; - wstart = 0; - wend = grp_size; - } - for (int gid = gstart; gid < gend; gid++) { - for (int wid = wstart; wid < wend; wid++) { - auto *worker = Worker::Create(job); - worker->Init(nthreads++,gid, wid); - workers.push_back(worker); - } - } - return workers; -} - -void Trainer::Resume(JobProto* jobConf) { - tinydir_dir dir; - string folder = Cluster::Get()->checkpoint_folder(); - tinydir_open(&dir, folder.c_str()); - int latest_step = 0; - // there would be multi checkpoint files (from diff workers) for one step - vector<string> ck_files; - // iterate all files to get the files for the last checkpoint - while (dir.has_next) { - tinydir_file file; - tinydir_readfile(&dir, &file); - tinydir_next(&dir); - char* ch = strstr(file.name, "step"); - if (ch == nullptr) { - if (file.name[0] != '.') - LOG(INFO) << "Irregular file in checkpoint folder: " << file.name; - continue; - } - - LOG(INFO) << "Add checkpoint file for resume: " << ch; - int step = atoi(ch+4); - if (step == latest_step) { - ck_files.push_back(file.name); - } else if(step > latest_step) { - latest_step = step; - ck_files.clear(); - ck_files.push_back(string(file.name)); - } - } - - if (latest_step > 0) { - jobConf->set_step(latest_step); - if (!jobConf->has_reset_param_version()) - jobConf->set_reset_param_version(false); - jobConf->clear_checkpoint_path(); - for (auto ck_file : ck_files) - jobConf->add_checkpoint_path(folder + "/" + ck_file); - } - tinydir_close(&dir); -} - -void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) { - // register job to zookeeper at the beginning - auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster()); - if (resume) - Resume(job); - - router_ = new Router(); - router_->Bind(kInprocRouterEndpoint); - const string hostip = cluster->hostip(); - int port = router_->Bind("tcp://" + hostip + ":*"); - // register endpoint to zookeeper - cluster->Register(getpid(), hostip + ":" + std::to_string(port)); - - int nthreads = 1; - const vector<Worker*> workers = CreateWorkers(nthreads, *job); - nthreads += workers.size(); - const vector<Server*> servers = CreateServers(nthreads, *job); - SetupWorkerServer(*job, workers, servers); - -#ifdef USE_MPI - for (int i = 0; i < nthreads; i++) - MPIQueues.push_back(make_shared<SafeQueue>()); -#endif - vector<std::thread> threads; - for(auto server : servers) - threads.push_back(std::thread(&Server::Run, server)); - for(auto worker : workers) - threads.push_back(std::thread(&Worker::Run, worker)); - Run(workers, servers); - for(auto& thread : threads) - thread.join(); - for(auto server : servers) - delete server; - for(auto worker : workers) - delete worker; -} - -inline int bandwidth(int bytes, system_clock::time_point start) { - auto now=system_clock::now(); - auto duration=duration_cast<std::chrono::milliseconds> (now - start); - return static_cast<int>(bytes*1000.f/duration.count()); -} - -void Trainer::Run( - const vector<Worker*>& workers, - const vector<Server*>& servers) { - int nworkers = workers.size(), nservers = servers.size(); - auto cluster = Cluster::Get(); - procs_id_ = cluster->procs_id(); - LOG(INFO) << "Stub in process " << procs_id_ << " starts"; - - // for sync among server groups - auto start = std::chrono::system_clock::now(); - float trans_size = 0.f; // total size of msg transferred since start time - int sync_server_id = 0; - int max_bandwidth = cluster->bandwidth(); - int nserver_grps = cluster->nserver_groups(); - - map<int, Dealer*> inter_dealers; // for sending msg to other procs - - std::queue<Msg*> msg_queue; - Poller poll(router_); - bool stop=false; - while (!stop || !msg_queue.empty()) { - if (msg_queue.empty()) { - // if the poll time is large, then the poller may not expire - // if it is small, then many reminder messages will be sent which may - // slow done the process of other request. TODO tune it. - auto *sock = poll.Wait(cluster->poll_time()); - if (poll.Terminated()) { - LOG(ERROR) << "Connection broken!"; - exit(0); - } else if (sock == nullptr) { - if (nserver_grps > 1 && bandwidth(trans_size, start) < max_bandwidth) { - Msg* msg = GenSyncReminderMsg(sync_server_id, servers); - router_->Send(&msg) ; - sync_server_id = (sync_server_id + 1) % nservers; - } - continue; - } - Msg* msg = router_->Receive(); - msg_queue.push(msg); - } - Msg* msg = msg_queue.front(); - msg_queue.pop(); - int type = msg->type(), dst = msg->dst(), flag = AddrType(dst); - if (flag == kStub && (AddrProc(dst) == procs_id_ || AddrGrp(dst) == -1)) { - if (type == kConnect) { - DeleteMsg(&msg); - } else if (type == kMetric) { - DisplayMetric(&msg); - } else if (type == kStop) { - int src_flag = AddrType(msg->src()); - if (src_flag == kServer) nservers--; - else if (src_flag == kWorkerParam) nworkers--; - DeleteMsg(&msg); - if (nworkers == 0 && nservers == 0) break; - } else if (nserver_grps > 0) { - HandleLocalMsg(&msg_queue, &msg); - } else { - DeleteMsg(&msg); - } - } else { - int dst_procs = AddrProc(dst); - if (flag != kStub) - dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag); - if (dst_procs != procs_id_) { - if (bandwidth(trans_size, start) <= cluster->bandwidth()) { - start = std::chrono::system_clock::now(); - trans_size = 0; - } - trans_size += msg->size(); - - if (inter_dealers.find(dst_procs) == inter_dealers.end()) - inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs); - inter_dealers[dst_procs]->Send(&msg); - } else { - if (type == kSyncRequest) - msg->AddFormatFrame("i", max_bandwidth - bandwidth(trans_size, start)); - router_->Send(&msg); - } - } - } - LOG(ERROR) << "Stub in process " << procs_id_ << " stops"; - for (auto& entry : inter_dealers) - delete entry.second; -} - -Msg* Trainer::GenSyncReminderMsg(int server, const vector<Server*>& servers ) { - Msg* msg = new Msg(); - msg->set_src(Addr(-1,-1, kStub)); - msg->set_dst(Addr(servers[server]->grp_id(), servers[server]->id(), kServer)); - msg->set_type(kSyncReminder); - return msg; -} - -void Trainer::DisplayMetric(Msg** msg) { - Msg* msgg = *msg; - // only display metrics from the first group - if (AddrGrp(msgg->src()) == 0) { - int step = msgg->trgt_version(); - char prefix[128]; - msgg->ParseFormatFrame("s", prefix); - CHECK(msgg->NextFrame()); - const string perf(static_cast<char*>(msgg->FrameData()), msgg->FrameSize()); - Metric cur(perf); - LOG(ERROR) << prefix << " step-" << step <<", " << cur.ToLogString(); - } - DeleteMsg(msg); -} - -Dealer* Trainer::CreateInterProcsDealer(int dst_procs) { - // forward to other procs - auto cluster = Cluster::Get(); - auto dealer = new Dealer(); - while(cluster->endpoint(dst_procs)=="") { - //kCollectSleepTime)); - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); - LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register"; - } - dealer->Connect("tcp://"+cluster->endpoint(dst_procs)); - return dealer; -} - -void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) { - Msg* msgg = *msg; - int paramid = ParamID(msgg->trgt_val()); - int type = msgg->type(); - int grp; - ParamEntry *entry = nullptr; - switch (type) { // TODO process other requests, e.g. RESTful - case kUpdate: - grp = AddrGrp(msgg->src()); - entry = worker_shard_.at(Hash(grp, paramid)); - for(auto update_msg : HandleUpdate(entry, msg)) - msg_queue->push(update_msg); - break; - case kRUpdate: - grp = AddrGrp(msgg->dst()); - entry = worker_shard_.at(Hash(grp, paramid)); - HandleUpdateResponse(entry, msg); - break; - case kGet: - grp = AddrGrp(msgg->src()); - entry = worker_shard_.at(Hash(grp, paramid)); - for(auto get_msg : HandleGet(entry, msg)) - msg_queue->push(get_msg); - break; - case kRGet: - grp = AddrGrp(msgg->dst()); - entry = worker_shard_.at(Hash(grp, paramid)); - HandleGetResponse(entry, msg); - break; - case kPut: - grp = AddrGrp(msgg->src()); - entry = worker_shard_.at(Hash(grp, paramid)); - for(auto put_msg : HandlePut(entry, msg)) - msg_queue->push(put_msg); - break; - default: - LOG(ERROR)<<"Unknow message type:"<<type; - break; - } -} - -void Trainer::GenMsgs(int type, int version, ParamEntry* entry, - Msg* msg, vector<Msg*> *ret) { - int src_grp = AddrGrp(msg->src()); - int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group(); - auto param=entry->shares.at(0); - for (int idx = 0 ; idx < param->num_slices(); idx++) { - int slice_id =param->slice_start() + idx; - int server = slice2server_[slice_id]; - int procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer); - Msg* new_msg = nullptr; - if (type == kPut) { - CHECK_GT(entry->num_total, 0); - new_msg = param->GenPutMsg(procs != procs_id_, idx); - // new_msg = param->GenPutMsg(true, idx); - new_msg->AddFormatFrame("i", entry->num_total); - } else if (type == kGet) { - new_msg = param->GenGetMsg(procs != procs_id_, idx); - // new_msg = param->GenGetMsg(true, idx); - } else if (type == kUpdate) { - new_msg = param->GenUpdateMsg(procs != procs_id_, idx); - // new_msg = param->GenUpdateMsg(true, idx); - new_msg->AddFormatFrame("i", entry->num_local); - } else { - LOG(FATAL) << "Wrong type"; - } - new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version); - new_msg->set_src(Addr(src_grp, procs_id_, kStub)); - new_msg->set_dst(Addr(dst_grp, server, kServer)); - ret->push_back(new_msg); - } -} - -const vector<Msg*> Trainer::HandleGet(ParamEntry* entry, Msg** msg) { - vector<Msg*> ret; - int version = (*msg)->trgt_version(); - if (version > entry->next_version) { - entry->next_version = version; - GenMsgs(kGet, version, entry, *msg, &ret); - } - DeleteMsg(msg); - return ret; -} - -const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) { - vector<Msg*> ret; - entry->num_update++; - if (entry->num_update >= entry->num_local) { - // average local gradient - if (entry->num_local > 1) { - auto it = entry->shares.begin(); - float* sum = (*it)->mutable_grad()->mutable_cpu_data(); - for (++it; it != entry->shares.end(); it++) { - float* grad = (*it)->mutable_grad()->mutable_cpu_data(); - for (int i = 0; i < (*it)->size(); i++) { - sum[i] += grad[i]; - } - } - } - int step = (*msg)->trgt_version(); - GenMsgs(kUpdate, step, entry, *msg, &ret); - entry->num_update = 0; - } - DeleteMsg(msg); - return ret; -} - -const vector<Msg*> Trainer::HandlePut(ParamEntry* entry, Msg** msg) { - vector<Msg*> ret; - int version = (*msg)->trgt_version(); - GenMsgs(kPut, version, entry, *msg, &ret); - DeleteMsg(msg); - return ret; -} - -void Trainer::HandleGetResponse(ParamEntry* entry, Msg** msg) { - int version = (*msg)->trgt_version(); - int sliceid = SliceID((*msg)->trgt_val()); - auto param = entry->shares.at(0); - if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start())) - param->set_version(version); - DeleteMsg(msg); -} - -void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) { - int version = (*msg)->trgt_version(); - int sliceid = SliceID((*msg)->trgt_val()); - auto param = entry->shares.at(0); - if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start())) - param->set_version(version); - DeleteMsg(msg); -} -} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/9dbdfd68/src/utils/blob.cc ---------------------------------------------------------------------- diff --git a/src/utils/blob.cc b/src/utils/blob.cc index b27f7db..4a9d681 100644 --- a/src/utils/blob.cc +++ b/src/utils/blob.cc @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -* +* * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -192,7 +192,6 @@ void SyncedMemory::to_gpu() { case UNINITIALIZED: CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); //CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N)); - // CUDA_CHECK(cudaMemset(gpu_ptr_, 0, size_)); head_ = HEAD_AT_GPU; break;
