Repository: incubator-singa Updated Branches: refs/heads/gpu [created] 9dbdfd686
SINGA-41:Support single node single GPU training Rebase to lastest SINGA master Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/2818605a Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/2818605a Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/2818605a Branch: refs/heads/gpu Commit: 2818605ac10a28af542d764156588d65737f18fd Parents: 2f66537 Author: seaok <[email protected]> Authored: Mon Sep 14 15:21:09 2015 +0800 Committer: Wei Wang <[email protected]> Committed: Tue Sep 29 10:15:04 2015 +0800 ---------------------------------------------------------------------- Makefile.gpu | 132 ++++++++++ include/utils/blob.h | 8 + include/utils/param.h | 4 + src/neuralnet/neuron_layer.cc | 82 ++++-- src/trainer/trainer.cc | 522 +++++++++++++++++++++++++++++++++++++ src/utils/blob.cc | 6 +- 6 files changed, 728 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/Makefile.gpu ---------------------------------------------------------------------- diff --git a/Makefile.gpu b/Makefile.gpu new file mode 100644 index 0000000..0a3bd74 --- /dev/null +++ b/Makefile.gpu @@ -0,0 +1,132 @@ +###################User Config Varaibles ############################# +# third-party library installation folder +HOME_DIR := /usr +#Cuda installation folder +CUDA_DIR := /usr/local/cuda +# Lib folder for system and external libs. You may need to change it. +LIBRARY_DIRS := $(HOME_DIR)/lib64 $(HOME_DIR)/lib $(HOME_DIR)/local/lib $(CUDA_DIR)/lib $(CUDA_DIR)/lib64 +# Header folder for system and external libs. You may need to change it. +INCLUDE_DIRS := $(HOME_DIR)/include ./include $(HOME_DIR)/local/include/zookeeper $(HOME_DIR)/local/include $(CUDA_DIR)/include +# g++ location, should support c++11, tested with 4.8.1 +CXX := g++ +CUCXX := nvcc + +######################Setting Varialbes####################################### +LIBRARIES := glog protobuf openblas zmq czmq zookeeper_mt + +# Add CUDA library +ifneq ($(CUDA_DIR),) + LIBRARIES := $(LIBRARIES) cublas cudart curand +endif + +LDFLAGS := $(foreach librarydir, $(LIBRARY_DIRS), -L$(librarydir))\ + $(foreach library, $(LIBRARIES), -l$(library)) +# Folder to store compiled files +BUILD_DIR := .libs +MSHADOW_FLAGS :=-DMSHADOW_USE_CUDA=1 -DMSHADOW_USE_CBLAS=1 -DMSHADOW_USE_MKL=0 +ZK_FLAGS :=-DTHREADED -fpermissive +CXXFLAGS := -O2 -msse3 -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \ + $(MSHADOW_FLAGS) $(ZK_FLAGS)\ + -funroll-loops $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir)) +CUCXXFLAGS := $(MSHADOW_FLAGS) -std=c++11 \ + $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir)) + +# Add device compile option +ifeq ($(CUDA_DIR),) + MSHADOW_FLAGS := $(MSHADOW_FLAGS) -DCPU_ONLY + CXXFLAGS := $(CXXFLAGS) -DCPU_ONLY +endif + +# find user defined .proto file, and then compute the corresponding .h, .cc +# files, which cannot be found by shell find, because they haven't been +# generated currently +PROTOS := $(shell find src/proto/ -name "*.proto") +PROTO_SRCS :=$(PROTOS:.proto=.pb.cc) +PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h)) +PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o)) + +# each singa src file will generate a .o file +SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" -o -path "src/utils/tool.cc" \) \ + -prune -o \( -name "*.cc" -type f \) -print ) +SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \ + $(PROTO_OBJS) ) +-include $(SINGA_OBJS:%.o=%.P) + +TEST_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cc") +TEST_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_SRCS:.cc=.o))) +-include $(TEST_OBJS:%.o=%.P) + +TEST_CUDA_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cu") +TEST_CUDA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_CUDA_SRCS:.cu=.o))) +-include $(TEST_CUDA_OBJS:%.o=%.P) + + +SINGA_CUDA_SRCS :=$(shell find src/neuralnet/ -maxdepth 1 -name "*.cu") +SINGA_CUDA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_CUDA_SRCS:.cu=.o))) +-include $(SINGA_CUDA_OBJS:%.o=%.P) + +GTEST_SRC := include/gtest/gtest-all.cc +GTEST_HDR := include/gtest/gtest.h +GTEST_LIB := $(BUILD_DIR)/libgtest.a + +OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) ) +CUOBJS := $(sort $(SINGA_CUDA_OBJS) $(TEST_CUDA_OBJS) ) + +########################Compilation Section################################### +.PHONY: singa test + +singa: $(PROTO_OBJS) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) + $(CXX) -shared -o $(BUILD_DIR)/libsinga.so $(SINGA_OBJS) + $(CXX) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) src/main.cc -o singa $(CXXFLAGS) $(LDFLAGS) + @echo + $(CXX) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) src/utils/tool.cc -o singatool $(CXXFLAGS) $(LDFLAGS) + @echo + +loader: proto $(LOADER_OBJS) + $(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS) + @echo + +test: proto $(GTEST_LIB) $(TEST_OBJS) $(TEST_CUDA_OBJS) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) + $(CXX) $(TEST_OBJS) $(TEST_CUDA_OBJS) include/gtest/gtest_main.cc $(GTEST_LIB) \ + $(SINGA_OBJS) $(SINGA_CUDA_OBJS) -o $(BUILD_DIR)/test $(CXXFLAGS) $(LDFLAGS) + @echo + +$(GTEST_LIB): $(GTEST_HDR) $(GTEST_SRC) + $(CXX) $(GTEST_SRC) -c -o $(BUILD_DIR)/gtest-all.o $(CXXFLAGS) + ar -rv $(GTEST_LIB) $(BUILD_DIR)/gtest-all.o + +# compile all files +$(OBJS):$(BUILD_DIR)/%.o : %.cc + @mkdir -p $(dir $@) + $(CXX) $< $(CXXFLAGS) -MMD -c -o $@ + cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \ + sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \ + -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> $(BUILD_DIR)/$*.P; \ + rm -f $*.d + +$(CUOBJS):$(BUILD_DIR)/%.o : %.cu + @mkdir -p $(dir $@) + $(CUCXX) $< -c -o $@ $(CUCXXFLAGS) + cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \ + sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \ + -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> $(BUILD_DIR)/$*.P; \ + rm -f $*.d + +proto: $(PROTO_OBJS) + +$(PROTO_SRCS): $(PROTOS) + protoc --proto_path=src/proto --cpp_out=src/proto $(PROTOS) + mkdir -p include/proto/ + cp src/proto/*.pb.h include/proto/ + mkdir -p tool/pb2/ + touch tool/pb2/__init__.py + protoc --proto_path=src/proto --python_out=tool/pb2/ $(PROTOS) + @echo + +clean: + rm -rf *.a *.so + rm -rf include/proto/* + rm -rf src/proto/*.pb.h src/proto/*.pb.cc + rm -rf tool/pb2/* + rm -rf $(BUILD_DIR) + @echo http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/include/utils/blob.h ---------------------------------------------------------------------- diff --git a/include/utils/blob.h b/include/utils/blob.h index 91db095..a7079de 100644 --- a/include/utils/blob.h +++ b/include/utils/blob.h @@ -181,6 +181,14 @@ class Blob { CHECK(data_); return static_cast<Dtype*>(data_->mutable_gpu_data()); } + inline Dtype* mutable_xpu_data() { + CHECK(data_); + #ifndef CPU_ONLY + return static_cast<Dtype*>(data_->mutable_gpu_data()); + #else + return static_cast<Dtype*>(data_->mutable_cpu_data()); + #endif + } /// @brief Compute the sum of absolute values (L1 norm) of the data. Dtype asum_data() const; Dtype sum_data() const; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/include/utils/param.h ---------------------------------------------------------------------- diff --git a/include/utils/param.h b/include/utils/param.h index f690438..9829334 100644 --- a/include/utils/param.h +++ b/include/utils/param.h @@ -214,6 +214,10 @@ class Param { inline float* mutable_cpu_data() { return data_->mutable_cpu_data(); } inline float* mutable_cpu_grad() { return grad_.mutable_cpu_data(); } inline float* mutable_cpu_history() { return history_.mutable_cpu_data(); } + + inline float* mutable_xpu_data() { return data_->mutable_xpu_data(); } + inline float* mutable_xpu_grad() { return grad_.mutable_xpu_data(); } + inline float* mutable_xpu_history() { return history_.mutable_xpu_data(); } /** * @return slice start ID */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/neuralnet/neuron_layer.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuron_layer.cc b/src/neuralnet/neuron_layer.cc index 4e3acf0..2d84fea 100644 --- a/src/neuralnet/neuron_layer.cc +++ b/src/neuralnet/neuron_layer.cc @@ -30,6 +30,7 @@ namespace singa { using namespace mshadow; +using namespace mshadow::expr; using mshadow::cpu; using mshadow::Shape; @@ -42,6 +43,12 @@ using mshadow::Tensor; using std::string; using std::vector; +#ifndef CPU_ONLY + #define xpu mshadow::gpu +#else + #define xpu mshadow::cpu +#endif + inline Tensor<cpu, 4> Tensor4(Blob<float>* blob) { const vector<int>& shape = blob->shape(); Tensor<cpu, 4> tensor(blob->mutable_cpu_data(), @@ -68,6 +75,30 @@ inline Tensor<cpu, 1> Tensor1(Blob<float>* blob) { return tensor; } +inline Tensor<xpu, 4> Tensor4XPU(Blob<float>* blob) { + const vector<int>& shape = blob->shape(); + Tensor<xpu, 4> tensor(blob->mutable_xpu_data(), + Shape4(shape[0], shape[1], shape[2], shape[3])); + return tensor; +} + +inline Tensor<xpu, 3> Tensor3XPU(Blob<float>* blob){ + const vector<int>& shape = blob->shape(); + Tensor<xpu, 3> tensor(blob->mutable_xpu_data(), + Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1])); + return tensor; +} +inline Tensor<xpu, 2> Tensor2XPU(Blob<float>* blob){ + const vector<int>& shape = blob->shape(); + Tensor<xpu, 2> tensor(blob->mutable_xpu_data(), + Shape2(shape[0], blob->count() / shape[0])); + return tensor; +} +inline Tensor<xpu, 1> Tensor1XPU(Blob<float>* blob){ + Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count())); + return tensor; +} + /************ Implementation for ConvolutionLayer*************************/ ConvolutionLayer::~ConvolutionLayer() { delete weight_; @@ -112,11 +143,11 @@ void ConvolutionLayer::Setup(const LayerProto& conf, void ConvolutionLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - auto src = Tensor4(srclayers[0]->mutable_data(this)); - auto data = Tensor3(&data_); - auto col = Tensor2(&col_data_); - auto weight = Tensor2(weight_->mutable_data()); - auto bias = Tensor1(bias_->mutable_data()); + auto src = Tensor4XPU(srclayers_[0]->mutable_data(this)); + auto data = Tensor3XPU(&data_); + auto col = Tensor2XPU(&col_data_); + auto weight = Tensor2XPU(weight_->mutable_data()); + auto bias = Tensor1XPU(bias_->mutable_data()); for (int n = 0; n < batchsize_; n++) { if (pad_ > 0) col = expr::unpack_patch2col(pad(src[n], pad_), kernel_, stride_); @@ -129,17 +160,17 @@ void ConvolutionLayer::ComputeFeature(int flag, void ConvolutionLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { - auto src = Tensor4(srclayers[0]->mutable_data(this)); - auto col = Tensor2(&col_data_); - auto weight = Tensor2(weight_->mutable_data()); - auto grad = Tensor3(&grad_); - auto gcol = Tensor2(&col_grad_); - auto gweight = Tensor2(weight_->mutable_grad()); - auto gbias = Tensor1(bias_->mutable_grad()); - Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this); - Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_)); + auto src = Tensor4XPU(srclayers_[0]->mutable_data(this)); + auto col = Tensor2XPU(&col_data_); + auto weight = Tensor2XPU(weight_->mutable_data()); + auto grad = Tensor3XPU(&grad_); + auto gcol = Tensor2XPU(&col_grad_); + auto gweight = Tensor2XPU(weight_->mutable_grad()); + auto gbias = Tensor1XPU(bias_->mutable_grad()); + Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this); + Tensor<xpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_)); if (gsrcblob != nullptr) - gsrc.dptr = gsrcblob->mutable_cpu_data(); + gsrc.dptr = gsrcblob->mutable_xpu_data(); gbias = expr::sumall_except_dim<1>(grad); gweight = 0.0f; Shape<3> padshp(gsrc.shape.SubShape()); @@ -158,6 +189,7 @@ void ConvolutionLayer::ComputeGradient(int flag, imgshp); } } + // weight_->mutable_data()->mutable_cpu_data(); } /******************* Implementation for CConvolutionLayer *********/ @@ -421,10 +453,10 @@ void InnerProductLayer::Setup(const LayerProto& conf, void InnerProductLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - auto data = Tensor2(&data_); - auto src = Tensor2(srclayers[0]->mutable_data(this)); - auto weight = Tensor2(weight_->mutable_data()); - auto bias = Tensor1(bias_->mutable_data()); + auto data = Tensor2XPU(&data_); + auto src = Tensor2XPU(srclayers_[0]->mutable_data(this)); + auto weight = Tensor2XPU(weight_->mutable_data()); + auto bias = Tensor1XPU(bias_->mutable_data()); if (transpose_) data = dot(src, weight); else @@ -435,11 +467,11 @@ void InnerProductLayer::ComputeFeature(int flag, void InnerProductLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { - auto src = Tensor2(srclayers[0]->mutable_data(this)); - auto grad = Tensor2(&grad_); - auto weight = Tensor2(weight_->mutable_data()); - auto gweight = Tensor2(weight_->mutable_grad()); - auto gbias = Tensor1(bias_->mutable_grad()); + auto src = Tensor2XPU(srclayers_[0]->mutable_data(this)); + auto grad = Tensor2XPU(&grad_); + auto weight = Tensor2XPU(weight_->mutable_data()); + auto gweight = Tensor2XPU(weight_->mutable_grad()); + auto gbias = Tensor1XPU(bias_->mutable_grad()); gbias = expr::sum_rows(grad); if (transpose_) @@ -447,7 +479,7 @@ void InnerProductLayer::ComputeGradient(int flag, else gweight = dot(grad.T(), src); if (srclayers[0]->mutable_grad(this) != nullptr) { - auto gsrc = Tensor2(srclayers[0]->mutable_grad(this)); + auto gsrc = Tensor2XPU(srclayers_[0]->mutable_grad(this)); if (transpose_) gsrc = dot(grad, weight.T()); else http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc new file mode 100644 index 0000000..22b5757 --- /dev/null +++ b/src/trainer/trainer.cc @@ -0,0 +1,522 @@ +#include <thread> +#include <vector> +#include <map> +#include <chrono> +#include <glog/logging.h> +#include "utils/tinydir.h" +#include <unistd.h> +#include "utils/cluster.h" +#include "utils/common.h" +#include "proto/common.pb.h" +#include "trainer/trainer.h" +#include "mshadow/tensor.h" + + +namespace singa { +using std::vector; +using std::map; +using std::queue; +using namespace std::chrono; +using std::make_shared; + +/***********************Trainer****************************/ +Trainer::~Trainer() { + // free Params (i.e., slices) in server shard + for (auto entry : server_shard_) + for (auto param : entry.second->shares) + delete param; + delete router_; +} + +const vector<int> SliceParams(const vector<Param*>& params) { + // for load-balance among servers in a group and among server groups + int nserver_grps = Cluster::Get()->nserver_groups(); + int nservers_per_grp = Cluster::Get()->nservers_per_group(); + int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp); + + // collect sizes of unique Params + std::vector<int> paramsize; + for (auto param : params) + if (param->id() == param->owner()) + paramsize.push_back(param->size()); + // slice into lcm pieces to achieve good load-balance for both intra-group + // partition (among servers in a group) and inter-group partition (each group + // is assgined a sub-set of slices) + auto param_slice = Slice(lcm, paramsize); + // construct map from Param ID to its slices <slice id, len> + std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; + vector<int> slices; + auto it = param_slice.begin(); + int slice_id = 0; + for (auto param : params) { + if (param->id() == param->owner()) { + for (int len : *it) { + slices.push_back(len); + paramid2slices[param->id()].push_back(std::make_pair(slice_id++, len)); + } + it++; + } + } + // add slice info for every Param + for (auto param : params) + for (auto entry : paramid2slices[param->owner()]) { + param->AddSlice(entry.first, entry.second); + LOG(INFO) << "param id " << param->id() << " owner=" << param->owner() + << ": " << entry.first << ", " << entry.second; + } + return slices; +} + +void Trainer::SetupWorkerServer( + const JobProto& job_conf, + const vector<Worker*>& workers, + const vector<Server*>& servers) { + auto cluster = Cluster::Get(); + int grp_size = cluster->nworkers_per_group(); + const auto& net_conf = job_conf.neuralnet(); + auto net = NeuralNet::Create(net_conf, kTrain, grp_size); + // MUST do SliceParam before share param/net with others + auto slices = SliceParams(net->params()); + + std::unordered_map<int, shared_ptr<NeuralNet>> grp_net; + int first_grp = workers.size() ? workers.at(0)->grp_id() : -1; + for (auto worker : workers) { + int grp_id = worker->grp_id(); + int worker_id = worker->id(); + shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr; + if (grp_net.find(grp_id) == grp_net.end()) { + if (grp_id == first_grp) { + // test are performed only by the first group now. TODO update. + if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) { + test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for exp + test_net->ShareParamsFrom(net); + } + // validation are performed only by the first group. TODO update. + if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) { + valid_net = NeuralNet::Create(net_conf, kValidation, 1); + valid_net->ShareParamsFrom(net); + } + grp_net[grp_id] = net; + } else { + grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size); + if(cluster->share_memory()) + grp_net[grp_id]->ShareParamsFrom(net); + } + for (auto layer : grp_net[grp_id]->layers()) { + bool local = layer->partition_id() >= workers.front()->id() + && layer->partition_id() <= workers.back()->id(); + for (auto param : layer->GetParams()) { + int hash = Hash(grp_id, param->owner()); + if (worker_shard_.find(hash) == worker_shard_.end()) + worker_shard_[hash] = new ParamEntry(); + worker_shard_[hash]->AddParam(local, param); + } + } + } + LOG(INFO) << "grp " << worker->grp_id() << ", worker " + << worker->id() << " net " << grp_net[grp_id].get(); + worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net); + } + + // partition among server groups, each group maintains one sub-set for sync + auto slice2group = PartitionSlices(cluster->nserver_groups(), slices); + for (auto server : servers) + server->Setup(job_conf.updater(), &server_shard_, slice2group); + // partition within one server group, each server updates for one sub-set + slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices); +} + +vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) { + auto cluster = Cluster::Get(); + vector<Server*> servers; + if (!cluster->has_server()) + return servers; + + int pid = cluster->procs_id(); + // if true, server procs (logical) id starts after worker procs + if (cluster->server_worker_separate()) + pid -= cluster->nworker_procs(); + int procs_size = cluster->nservers_per_procs(); + int grp_size = cluster->nservers_per_group(); + int gid = pid * procs_size / grp_size; + int start = pid * procs_size % grp_size; + int end = start + procs_size; + for (int sid = start; sid < end; sid++) { + auto server = new Server(nthreads++, gid, sid); + servers.push_back(server); + } + return servers; +} + +vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) { + auto cluster=Cluster::Get(); + vector<Worker*> workers; + if(!cluster->has_worker()) + return workers; + int pid = cluster->procs_id(); + int grp_size = cluster->nworkers_per_group(); + int procs_size = cluster->nworkers_per_procs(); + int gstart, gend, wstart, wend; + if (grp_size >= procs_size) { + // all workers in this procs are from the same group + gstart = pid * procs_size / grp_size; + gend = gstart + 1; + wstart = pid * procs_size % grp_size; + wend = wstart + procs_size; + } else { + // there are multiple (complete) groups in this procs. + CHECK_EQ(procs_size % grp_size, 0); + int groups_per_procs = procs_size / grp_size; + gstart = pid * groups_per_procs; + gend = (pid+1) * groups_per_procs; + wstart = 0; + wend = grp_size; + } + for (int gid = gstart; gid < gend; gid++) { + for (int wid = wstart; wid < wend; wid++) { + auto *worker = Worker::Create(job); + worker->Init(nthreads++,gid, wid); + workers.push_back(worker); + } + } + return workers; +} + +void Trainer::Resume(JobProto* jobConf) { + tinydir_dir dir; + string folder = Cluster::Get()->checkpoint_folder(); + tinydir_open(&dir, folder.c_str()); + int latest_step = 0; + // there would be multi checkpoint files (from diff workers) for one step + vector<string> ck_files; + // iterate all files to get the files for the last checkpoint + while (dir.has_next) { + tinydir_file file; + tinydir_readfile(&dir, &file); + tinydir_next(&dir); + char* ch = strstr(file.name, "step"); + if (ch == nullptr) { + if (file.name[0] != '.') + LOG(INFO) << "Irregular file in checkpoint folder: " << file.name; + continue; + } + + LOG(INFO) << "Add checkpoint file for resume: " << ch; + int step = atoi(ch+4); + if (step == latest_step) { + ck_files.push_back(file.name); + } else if(step > latest_step) { + latest_step = step; + ck_files.clear(); + ck_files.push_back(string(file.name)); + } + } + + if (latest_step > 0) { + jobConf->set_step(latest_step); + if (!jobConf->has_reset_param_version()) + jobConf->set_reset_param_version(false); + jobConf->clear_checkpoint_path(); + for (auto ck_file : ck_files) + jobConf->add_checkpoint_path(folder + "/" + ck_file); + } + tinydir_close(&dir); +} + +void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) { + // register job to zookeeper at the beginning + auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster()); + if (resume) + Resume(job); + + router_ = new Router(); + router_->Bind(kInprocRouterEndpoint); + const string hostip = cluster->hostip(); + int port = router_->Bind("tcp://" + hostip + ":*"); + // register endpoint to zookeeper + cluster->Register(getpid(), hostip + ":" + std::to_string(port)); + + int nthreads = 1; + const vector<Worker*> workers = CreateWorkers(nthreads, *job); + nthreads += workers.size(); + const vector<Server*> servers = CreateServers(nthreads, *job); + SetupWorkerServer(*job, workers, servers); + +#ifdef USE_MPI + for (int i = 0; i < nthreads; i++) + MPIQueues.push_back(make_shared<SafeQueue>()); +#endif + vector<std::thread> threads; + for(auto server : servers) + threads.push_back(std::thread(&Server::Run, server)); + for(auto worker : workers) + threads.push_back(std::thread(&Worker::Run, worker)); + Run(workers, servers); + for(auto& thread : threads) + thread.join(); + for(auto server : servers) + delete server; + for(auto worker : workers) + delete worker; +} + +inline int bandwidth(int bytes, system_clock::time_point start) { + auto now=system_clock::now(); + auto duration=duration_cast<std::chrono::milliseconds> (now - start); + return static_cast<int>(bytes*1000.f/duration.count()); +} + +void Trainer::Run( + const vector<Worker*>& workers, + const vector<Server*>& servers) { + int nworkers = workers.size(), nservers = servers.size(); + auto cluster = Cluster::Get(); + procs_id_ = cluster->procs_id(); + LOG(INFO) << "Stub in process " << procs_id_ << " starts"; + + // for sync among server groups + auto start = std::chrono::system_clock::now(); + float trans_size = 0.f; // total size of msg transferred since start time + int sync_server_id = 0; + int max_bandwidth = cluster->bandwidth(); + int nserver_grps = cluster->nserver_groups(); + + map<int, Dealer*> inter_dealers; // for sending msg to other procs + + std::queue<Msg*> msg_queue; + Poller poll(router_); + bool stop=false; + while (!stop || !msg_queue.empty()) { + if (msg_queue.empty()) { + // if the poll time is large, then the poller may not expire + // if it is small, then many reminder messages will be sent which may + // slow done the process of other request. TODO tune it. + auto *sock = poll.Wait(cluster->poll_time()); + if (poll.Terminated()) { + LOG(ERROR) << "Connection broken!"; + exit(0); + } else if (sock == nullptr) { + if (nserver_grps > 1 && bandwidth(trans_size, start) < max_bandwidth) { + Msg* msg = GenSyncReminderMsg(sync_server_id, servers); + router_->Send(&msg) ; + sync_server_id = (sync_server_id + 1) % nservers; + } + continue; + } + Msg* msg = router_->Receive(); + msg_queue.push(msg); + } + Msg* msg = msg_queue.front(); + msg_queue.pop(); + int type = msg->type(), dst = msg->dst(), flag = AddrType(dst); + if (flag == kStub && (AddrProc(dst) == procs_id_ || AddrGrp(dst) == -1)) { + if (type == kConnect) { + DeleteMsg(&msg); + } else if (type == kMetric) { + DisplayMetric(&msg); + } else if (type == kStop) { + int src_flag = AddrType(msg->src()); + if (src_flag == kServer) nservers--; + else if (src_flag == kWorkerParam) nworkers--; + DeleteMsg(&msg); + if (nworkers == 0 && nservers == 0) break; + } else if (nserver_grps > 0) { + HandleLocalMsg(&msg_queue, &msg); + } else { + DeleteMsg(&msg); + } + } else { + int dst_procs = AddrProc(dst); + if (flag != kStub) + dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag); + if (dst_procs != procs_id_) { + if (bandwidth(trans_size, start) <= cluster->bandwidth()) { + start = std::chrono::system_clock::now(); + trans_size = 0; + } + trans_size += msg->size(); + + if (inter_dealers.find(dst_procs) == inter_dealers.end()) + inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs); + inter_dealers[dst_procs]->Send(&msg); + } else { + if (type == kSyncRequest) + msg->AddFormatFrame("i", max_bandwidth - bandwidth(trans_size, start)); + router_->Send(&msg); + } + } + } + LOG(ERROR) << "Stub in process " << procs_id_ << " stops"; + for (auto& entry : inter_dealers) + delete entry.second; +} + +Msg* Trainer::GenSyncReminderMsg(int server, const vector<Server*>& servers ) { + Msg* msg = new Msg(); + msg->set_src(Addr(-1,-1, kStub)); + msg->set_dst(Addr(servers[server]->grp_id(), servers[server]->id(), kServer)); + msg->set_type(kSyncReminder); + return msg; +} + +void Trainer::DisplayMetric(Msg** msg) { + Msg* msgg = *msg; + // only display metrics from the first group + if (AddrGrp(msgg->src()) == 0) { + int step = msgg->trgt_version(); + char prefix[128]; + msgg->ParseFormatFrame("s", prefix); + CHECK(msgg->NextFrame()); + const string perf(static_cast<char*>(msgg->FrameData()), msgg->FrameSize()); + Metric cur(perf); + LOG(ERROR) << prefix << " step-" << step <<", " << cur.ToLogString(); + } + DeleteMsg(msg); +} + +Dealer* Trainer::CreateInterProcsDealer(int dst_procs) { + // forward to other procs + auto cluster = Cluster::Get(); + auto dealer = new Dealer(); + while(cluster->endpoint(dst_procs)=="") { + //kCollectSleepTime)); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register"; + } + dealer->Connect("tcp://"+cluster->endpoint(dst_procs)); + return dealer; +} + +void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) { + Msg* msgg = *msg; + int paramid = ParamID(msgg->trgt_val()); + int type = msgg->type(); + int grp; + ParamEntry *entry = nullptr; + switch (type) { // TODO process other requests, e.g. RESTful + case kUpdate: + grp = AddrGrp(msgg->src()); + entry = worker_shard_.at(Hash(grp, paramid)); + for(auto update_msg : HandleUpdate(entry, msg)) + msg_queue->push(update_msg); + break; + case kRUpdate: + grp = AddrGrp(msgg->dst()); + entry = worker_shard_.at(Hash(grp, paramid)); + HandleUpdateResponse(entry, msg); + break; + case kGet: + grp = AddrGrp(msgg->src()); + entry = worker_shard_.at(Hash(grp, paramid)); + for(auto get_msg : HandleGet(entry, msg)) + msg_queue->push(get_msg); + break; + case kRGet: + grp = AddrGrp(msgg->dst()); + entry = worker_shard_.at(Hash(grp, paramid)); + HandleGetResponse(entry, msg); + break; + case kPut: + grp = AddrGrp(msgg->src()); + entry = worker_shard_.at(Hash(grp, paramid)); + for(auto put_msg : HandlePut(entry, msg)) + msg_queue->push(put_msg); + break; + default: + LOG(ERROR)<<"Unknow message type:"<<type; + break; + } +} + +void Trainer::GenMsgs(int type, int version, ParamEntry* entry, + Msg* msg, vector<Msg*> *ret) { + int src_grp = AddrGrp(msg->src()); + int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group(); + auto param=entry->shares.at(0); + for (int idx = 0 ; idx < param->num_slices(); idx++) { + int slice_id =param->slice_start() + idx; + int server = slice2server_[slice_id]; + int procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer); + Msg* new_msg = nullptr; + if (type == kPut) { + CHECK_GT(entry->num_total, 0); + //new_msg = param->GenPutMsg(procs != procs_id_, idx); + new_msg = param->GenPutMsg(true, idx); + new_msg->AddFormatFrame("i", entry->num_total); + } else if (type == kGet) { + //new_msg = param->GenGetMsg(procs != procs_id_, idx); + new_msg = param->GenGetMsg(true, idx); + } else if (type == kUpdate) { + //new_msg = param->GenUpdateMsg(procs != procs_id_, idx); + new_msg = param->GenUpdateMsg(true, idx); + new_msg->AddFormatFrame("i", entry->num_local); + } else { + LOG(FATAL) << "Wrong type"; + } + new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version); + new_msg->set_src(Addr(src_grp, procs_id_, kStub)); + new_msg->set_dst(Addr(dst_grp, server, kServer)); + ret->push_back(new_msg); + } +} + +const vector<Msg*> Trainer::HandleGet(ParamEntry* entry, Msg** msg) { + vector<Msg*> ret; + int version = (*msg)->trgt_version(); + if (version > entry->next_version) { + entry->next_version = version; + GenMsgs(kGet, version, entry, *msg, &ret); + } + DeleteMsg(msg); + return ret; +} + +const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) { + vector<Msg*> ret; + entry->num_update++; + if (entry->num_update >= entry->num_local) { + // average local gradient + if (entry->num_local > 1) { + auto it = entry->shares.begin(); + auto shape=mshadow::Shape1((*it)->size()); + mshadow::Tensor<mshadow::cpu,1> sum((*it)->mutable_cpu_grad(), shape); + for (++it; it != entry->shares.end(); it++) { + mshadow::Tensor<mshadow::cpu,1> grad((*it)->mutable_cpu_grad(), shape); + sum += grad; + } + sum /= entry->num_total; + } + int step = (*msg)->trgt_version(); + GenMsgs(kUpdate, step, entry, *msg, &ret); + entry->num_update = 0; + } + DeleteMsg(msg); + return ret; +} + +const vector<Msg*> Trainer::HandlePut(ParamEntry* entry, Msg** msg) { + vector<Msg*> ret; + int version = (*msg)->trgt_version(); + GenMsgs(kPut, version, entry, *msg, &ret); + DeleteMsg(msg); + return ret; +} + +void Trainer::HandleGetResponse(ParamEntry* entry, Msg** msg) { + int version = (*msg)->trgt_version(); + int sliceid = SliceID((*msg)->trgt_val()); + auto param = entry->shares.at(0); + if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start())) + param->set_version(version); + DeleteMsg(msg); +} + +void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) { + int version = (*msg)->trgt_version(); + int sliceid = SliceID((*msg)->trgt_val()); + auto param = entry->shares.at(0); + if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start())) + param->set_version(version); + DeleteMsg(msg); +} +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/utils/blob.cc ---------------------------------------------------------------------- diff --git a/src/utils/blob.cc b/src/utils/blob.cc index 24c7f54..ef582fa 100644 --- a/src/utils/blob.cc +++ b/src/utils/blob.cc @@ -61,9 +61,11 @@ #include "utils/blob.h" #include <cblas.h> +#include <cuda_runtime.h> #include <math.h> #include <utility> + #define NOT_IMPLEMENTED LOG(FATAL) << "Not implemented function" #define NO_GPU LOG(FATAL) << "CPU-only Mode: cannot make GPU call." // Instantiate a class with float and double specifications. @@ -187,7 +189,9 @@ void SyncedMemory::to_gpu() { switch (head_) { case UNINITIALIZED: CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); - CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N)); + //CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N)); + // + CUDA_CHECK(cudaMemset(gpu_ptr_, 0, size_)); head_ = HEAD_AT_GPU; break; case HEAD_AT_CPU:
