SINGA-21 Code Review-2 Rebase to lastest master. This pull request should be at the frontest. Tested with mnist and cifar10, with different cluster settings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/7d39f881 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/7d39f881 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/7d39f881 Branch: refs/heads/master Commit: 7d39f8813d057565224402e230afacb98c8c366b Parents: cfde471 Author: wang wei <[email protected]> Authored: Wed Jun 24 21:29:39 2015 +0800 Committer: wang wei <[email protected]> Committed: Wed Jun 24 21:29:39 2015 +0800 ---------------------------------------------------------------------- examples/cifar10/cluster.conf | 1 + examples/mnist/Makefile.example | 22 ++++++++++++++++++++++ examples/mnist/cluster.conf | 4 +++- examples/mnist/model.conf | 2 +- include/trainer/trainer.h | 2 +- include/utils/common.h | 9 +++------ src/proto/cluster.proto | 23 ----------------------- src/trainer/trainer.cc | 30 ++++++++++++++++-------------- src/trainer/worker.cc | 13 ++++++------- src/utils/cluster.cc | 1 + src/utils/common.cc | 35 +++++++++++++++++++++++++++++++++++ 11 files changed, 89 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/cifar10/cluster.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/cluster.conf b/examples/cifar10/cluster.conf index 97c64fd..e7e3400 100644 --- a/examples/cifar10/cluster.conf +++ b/examples/cifar10/cluster.conf @@ -3,4 +3,5 @@ nserver_groups: 1 nservers_per_group: 1 nworkers_per_group: 1 nworkers_per_procs: 1 +nservers_per_procs: 1 workspace: "examples/cifar10/" http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/Makefile.example ---------------------------------------------------------------------- diff --git a/examples/mnist/Makefile.example b/examples/mnist/Makefile.example new file mode 100644 index 0000000..9016887 --- /dev/null +++ b/examples/mnist/Makefile.example @@ -0,0 +1,22 @@ +libs :=singa glog protobuf + +.PHONY: all download create + +download: mnist + +mnist: + wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz + wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz + wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz + wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz + gunzip train-images-idx3-ubyte.gz && gunzip train-labels-idx1-ubyte.gz + gunzip t10k-images-idx3-ubyte.gz && gunzip t10k-labels-idx1-ubyte.gz + +create: + $(CXX) create_shard.cc -std=c++11 -lsinga -lprotobuf -lglog -I../../include \ + -L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs -Wl,-rpath=../../.libs/ \ + -o create_shard.bin + mkdir mnist_train_shard + mkdir mnist_test_shard + ./create_shard.bin train-images-idx3-ubyte train-labels-idx1-ubyte mnist_train_shard + ./create_shard.bin t10k-images-idx3-ubyte t10k-labels-idx1-ubyte mnist_test_shard http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/cluster.conf ---------------------------------------------------------------------- diff --git a/examples/mnist/cluster.conf b/examples/mnist/cluster.conf index 6b8a8e6..ff25b8c 100644 --- a/examples/mnist/cluster.conf +++ b/examples/mnist/cluster.conf @@ -2,4 +2,6 @@ nworker_groups: 1 nserver_groups: 1 nservers_per_group: 1 nworkers_per_group: 1 -workspace: "examples/cifar10/" +nservers_per_procs: 1 +nworkers_per_procs: 1 +workspace: "examples/mnist/" http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/model.conf ---------------------------------------------------------------------- diff --git a/examples/mnist/model.conf b/examples/mnist/model.conf index 3786c4f..cd113db 100644 --- a/examples/mnist/model.conf +++ b/examples/mnist/model.conf @@ -1,5 +1,5 @@ name: "deep-big-simple-mlp" -train_steps: 10000 +train_steps: 1000 test_steps:10 test_frequency:60 display_frequency:30 http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index 6e93f80..ed93374 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -97,7 +97,7 @@ class Trainer{ protected: vector<shared_ptr<Server>> CreateServers(int nthread, const ModelProto& mproto, - const vector<int> slices, vector<HandleContext>* ctx); + const vector<int> slices, vector<HandleContext*>* ctx); vector<shared_ptr<Worker>> CreateWorkers(int nthread, const ModelProto& mproto, vector<int> *slice_size); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/include/utils/common.h ---------------------------------------------------------------------- diff --git a/include/utils/common.h b/include/utils/common.h index 6444962..aca35ec 100644 --- a/include/utils/common.h +++ b/include/utils/common.h @@ -9,6 +9,9 @@ namespace singa { +std::string IntVecToString(const std::vector<int>& vec) ; +std::string VStringPrintf(std::string fmt, va_list l) ; +std::string StringPrintf(std::string fmt, ...) ; void ReadProtoFromTextFile(const char* filename, google::protobuf::Message* proto); void WriteProtoToTextFile(const google::protobuf::Message& proto, @@ -17,13 +20,7 @@ void ReadProtoFromBinaryFile(const char* filename, google::protobuf::Message* proto); void WriteProtoToBinaryFile(const google::protobuf::Message& proto, const char* filename); -std::string IntVecToString(const std::vector<int>& vec); -std::string StringPrintf(std::string fmt, ...); -inline float rand_real() { - return static_cast<float>(rand()) / (RAND_MAX + 1.0f); -} -<<<<<<< HEAD /* inline void Sleep(int millisec=1){ std::this_thread::sleep_for(std::chrono::milliseconds(millisec)); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/proto/cluster.proto ---------------------------------------------------------------------- diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto index c2f941f..4f7e661 100644 --- a/src/proto/cluster.proto +++ b/src/proto/cluster.proto @@ -47,26 +47,3 @@ message ServerTopology { // neighbor group id repeated int32 neighbor = 3; } -enum MsgType{ - kGet=0; - kPut=1; - kSync=2; - kUpdate=3; - kSyncRequest=4; - kSyncResponse=5; - kStop=6; - kData=7; - kRGet=8; - kRUpdate=9; - kConnect=10; - kMetric=11; -}; - -enum EntityType{ - kWorkerParam=0; - kWorkerLayer=1; - kServer=2; - kStub=3; - kRuntime=4; -}; - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index 2a89de2..6c08a3a 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -31,7 +31,9 @@ void HandleWorkerFinish(void * ctx){ const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, const vector<shared_ptr<Param>>& params){ - CHECK_GT(num,0); + std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; + if (num==0) + return paramid2slices; vector<int> param_size; int avg=0; for(const auto& x:params){ @@ -43,7 +45,6 @@ const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, LOG(INFO)<<"Slicer, param avg="<<avg<<", diff= "<<diff; int capacity=avg, sliceid=0, nbox=0; - std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; for(auto& param: params){ if(param->id()!=param->owner()) continue; @@ -115,7 +116,7 @@ const vector<int> PartitionSlice(int num, const vector<int>& slices){ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, const ModelProto & mproto, const vector<int> slices, - vector<HandleContext>* ctx){ + vector<HandleContext*>* ctx){ auto cluster=Cluster::Get(); vector<shared_ptr<Server>> servers; if(!cluster->has_server()) @@ -137,10 +138,10 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, auto server=make_shared<Server>(nthreads++, gid, sid); server->Setup(mproto.updater(), server_shard_, slice2group); servers.push_back(server); - HandleContext hc{dealer, gid, sid}; + auto *hc=new HandleContext{dealer, gid, sid}; ctx->push_back(hc); - CHECK(cluster->runtime()->sWatchSGroup(gid, sid, HandleWorkerFinish, - &(ctx->back()))); + CHECK(cluster->runtime()->WatchSGroup(gid, sid, HandleWorkerFinish, + ctx->back())); } } return servers; @@ -174,12 +175,12 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int nthreads, auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain, cluster->nworkers_per_group()); int lcm=LeastCommonMultiple(cluster->nserver_groups(), cluster->nservers_per_group()); - auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size - for(auto param: net->params()){ - if(param->id()==param->owner()) - for(auto entry: paramid2slices[param->id()]) - slice_size->push_back(entry.second); - } + auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size + for(auto param: net->params()){ + if(param->id()==param->owner()) + for(auto entry: paramid2slices[param->id()]) + slice_size->push_back(entry.second); + } for(int gid=gstart;gid<gend;gid++){ shared_ptr<NeuralNet> train_net, test_net, validation_net; @@ -257,10 +258,11 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, // create workers vector<int> slices; vector<shared_ptr<Worker>> workers=CreateWorkers(nthreads, mproto, &slices); - slice2server_=PartitionSlice(cluster->nservers_per_group(), slices); + if(cluster->nserver_groups()&&cluster->nservers_per_group()) + slice2server_=PartitionSlice(cluster->nservers_per_group(), slices); nthreads+=workers.size(); // create servers - vector<HandleContext> ctx; + vector<HandleContext*> ctx; vector<shared_ptr<Server>> servers=CreateServers(nthreads, mproto, slices, &ctx); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index 17ff323..788e77c 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -19,10 +19,7 @@ void Worker::Setup(const ModelProto& model, train_net_=train_net; modelproto_=model; auto cluster=Cluster::Get(); - if(cluster->nserver_groups()&&cluster->server_update()){ - int sgid=group_id_/cluster->nworker_groups_per_server_group(); - CHECK(cluster->runtime()->JoinSGroup(group_id_, worker_id_, sgid)); - }else{ + if(!(cluster->nserver_groups()&&cluster->server_update())){ updater_=shared_ptr<Updater>(Singleton<Factory<Updater>>::Instance() ->Create("Updater")); updater_->Init(model.updater()); @@ -33,7 +30,7 @@ void Worker::ConnectStub(shared_ptr<Dealer> dealer, EntityType type){ if(updater_==nullptr){ auto cluster=Cluster::Get(); int sgid=group_id_/cluster->nworker_groups_per_server_group(); - CHECK(cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid)); + CHECK(cluster->runtime()->JoinSGroup(group_id_, worker_id_, sgid)); } dealer->Connect(kInprocRouterEndpoint); @@ -93,8 +90,10 @@ void Worker::Run(){ void Worker::Stop(){ auto cluster=Cluster::Get(); - int sgid=group_id_/cluster->nworker_groups_per_server_group(); - cluster->runtime()->LeaveSGroup(group_id_, worker_id_, sgid); + if(updater_ == nullptr){ + int sgid=group_id_/cluster->nworker_groups_per_server_group(); + cluster->runtime()->LeaveSGroup(group_id_, worker_id_, sgid); + } Msg* msg=new Msg(); msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1,-1, kStub); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc index c344627..347b98e 100644 --- a/src/utils/cluster.cc +++ b/src/utils/cluster.cc @@ -3,6 +3,7 @@ #include <fstream> #include "utils/cluster.h" #include "proto/cluster.pb.h" +#include "proto/common.pb.h" #include <sys/stat.h> #include <sys/types.h> namespace singa { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/utils/common.cc ---------------------------------------------------------------------- diff --git a/src/utils/common.cc b/src/utils/common.cc index 7cb217e..ed94856 100644 --- a/src/utils/common.cc +++ b/src/utils/common.cc @@ -18,6 +18,41 @@ using google::protobuf::io::ZeroCopyInputStream; using google::protobuf::Message; const int kBufLen = 1024; +std::string IntVecToString(const vector<int>& vec) { + string disp="("; + for(int x: vec) + disp+=std::to_string(x)+", "; + return disp+")"; +} +/** + * * Formatted string. + * */ +string VStringPrintf(string fmt, va_list l) { + char buffer[32768]; + vsnprintf(buffer, 32768, fmt.c_str(), l); + return string(buffer); +} + +/** + * * Formatted string. + * */ +string StringPrintf(string fmt, ...) { + va_list l; + va_start(l, fmt); //fmt.AsString().c_str()); + string result = VStringPrintf(fmt, l); + va_end(l); + return result; +} + +void Debug() { + int i = 0; + char hostname[256]; + gethostname(hostname, sizeof(hostname)); + printf("PID %d on %s ready for attach\n", getpid(), hostname); + fflush(stdout); + while (0 == i) + sleep(5); +} // the proto related functions are from Caffe. void ReadProtoFromTextFile(const char* filename, Message* proto) {
