SINGA-21 Code Review-2

Rebase to lastest master. This pull request should be at the frontest.
Tested with mnist and cifar10, with different cluster settings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/7d39f881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/7d39f881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/7d39f881

Branch: refs/heads/master
Commit: 7d39f8813d057565224402e230afacb98c8c366b
Parents: cfde471
Author: wang wei <[email protected]>
Authored: Wed Jun 24 21:29:39 2015 +0800
Committer: wang wei <[email protected]>
Committed: Wed Jun 24 21:29:39 2015 +0800

----------------------------------------------------------------------
 examples/cifar10/cluster.conf   |  1 +
 examples/mnist/Makefile.example | 22 ++++++++++++++++++++++
 examples/mnist/cluster.conf     |  4 +++-
 examples/mnist/model.conf       |  2 +-
 include/trainer/trainer.h       |  2 +-
 include/utils/common.h          |  9 +++------
 src/proto/cluster.proto         | 23 -----------------------
 src/trainer/trainer.cc          | 30 ++++++++++++++++--------------
 src/trainer/worker.cc           | 13 ++++++-------
 src/utils/cluster.cc            |  1 +
 src/utils/common.cc             | 35 +++++++++++++++++++++++++++++++++++
 11 files changed, 89 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/cifar10/cluster.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/cluster.conf b/examples/cifar10/cluster.conf
index 97c64fd..e7e3400 100644
--- a/examples/cifar10/cluster.conf
+++ b/examples/cifar10/cluster.conf
@@ -3,4 +3,5 @@ nserver_groups: 1
 nservers_per_group: 1
 nworkers_per_group: 1
 nworkers_per_procs: 1
+nservers_per_procs: 1
 workspace: "examples/cifar10/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/Makefile.example
----------------------------------------------------------------------
diff --git a/examples/mnist/Makefile.example b/examples/mnist/Makefile.example
new file mode 100644
index 0000000..9016887
--- /dev/null
+++ b/examples/mnist/Makefile.example
@@ -0,0 +1,22 @@
+libs :=singa glog protobuf
+
+.PHONY: all download create
+
+download: mnist
+
+mnist:
+       wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
+       wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
+       wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
+       wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
+       gunzip train-images-idx3-ubyte.gz && gunzip train-labels-idx1-ubyte.gz
+       gunzip t10k-images-idx3-ubyte.gz && gunzip t10k-labels-idx1-ubyte.gz
+
+create:
+       $(CXX) create_shard.cc -std=c++11 -lsinga -lprotobuf -lglog 
-I../../include \
+               -L../../.libs/ -Wl,-unresolved-symbols=ignore-in-shared-libs 
-Wl,-rpath=../../.libs/ \
+               -o create_shard.bin
+       mkdir mnist_train_shard
+       mkdir mnist_test_shard
+       ./create_shard.bin train-images-idx3-ubyte train-labels-idx1-ubyte 
mnist_train_shard
+       ./create_shard.bin t10k-images-idx3-ubyte t10k-labels-idx1-ubyte 
mnist_test_shard

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/cluster.conf
----------------------------------------------------------------------
diff --git a/examples/mnist/cluster.conf b/examples/mnist/cluster.conf
index 6b8a8e6..ff25b8c 100644
--- a/examples/mnist/cluster.conf
+++ b/examples/mnist/cluster.conf
@@ -2,4 +2,6 @@ nworker_groups: 1
 nserver_groups: 1
 nservers_per_group: 1
 nworkers_per_group: 1
-workspace: "examples/cifar10/"
+nservers_per_procs: 1
+nworkers_per_procs: 1
+workspace: "examples/mnist/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/examples/mnist/model.conf
----------------------------------------------------------------------
diff --git a/examples/mnist/model.conf b/examples/mnist/model.conf
index 3786c4f..cd113db 100644
--- a/examples/mnist/model.conf
+++ b/examples/mnist/model.conf
@@ -1,5 +1,5 @@
 name: "deep-big-simple-mlp"
-train_steps: 10000
+train_steps: 1000
 test_steps:10
 test_frequency:60
 display_frequency:30

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 6e93f80..ed93374 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -97,7 +97,7 @@ class Trainer{
  protected:
 
   vector<shared_ptr<Server>> CreateServers(int nthread, const ModelProto& 
mproto,
-      const vector<int> slices, vector<HandleContext>* ctx);
+      const vector<int> slices, vector<HandleContext*>* ctx);
   vector<shared_ptr<Worker>> CreateWorkers(int nthread,
       const ModelProto& mproto, vector<int> *slice_size);
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
index 6444962..aca35ec 100644
--- a/include/utils/common.h
+++ b/include/utils/common.h
@@ -9,6 +9,9 @@
 
 namespace singa {
 
+std::string IntVecToString(const std::vector<int>& vec) ;
+std::string VStringPrintf(std::string fmt, va_list l) ;
+std::string StringPrintf(std::string fmt, ...) ;
 void ReadProtoFromTextFile(const char* filename,
                            google::protobuf::Message* proto);
 void WriteProtoToTextFile(const google::protobuf::Message& proto,
@@ -17,13 +20,7 @@ void ReadProtoFromBinaryFile(const char* filename,
                              google::protobuf::Message* proto);
 void WriteProtoToBinaryFile(const google::protobuf::Message& proto,
                             const char* filename);
-std::string IntVecToString(const std::vector<int>& vec);
-std::string StringPrintf(std::string fmt, ...);
-inline float rand_real() {
-  return static_cast<float>(rand()) / (RAND_MAX + 1.0f);
-}
 
-<<<<<<< HEAD
 /*
 inline void Sleep(int millisec=1){
   std::this_thread::sleep_for(std::chrono::milliseconds(millisec));

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/proto/cluster.proto
----------------------------------------------------------------------
diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto
index c2f941f..4f7e661 100644
--- a/src/proto/cluster.proto
+++ b/src/proto/cluster.proto
@@ -47,26 +47,3 @@ message ServerTopology {
   // neighbor group id
        repeated int32 neighbor = 3;
 }
-enum MsgType{
-  kGet=0;
-  kPut=1;
-  kSync=2;
-  kUpdate=3;
-  kSyncRequest=4;
-  kSyncResponse=5;
-  kStop=6;
-  kData=7;
-  kRGet=8;
-  kRUpdate=9;
-  kConnect=10;
-  kMetric=11;
-};
-
-enum EntityType{
-  kWorkerParam=0;
-  kWorkerLayer=1;
-  kServer=2;
-  kStub=3;
-  kRuntime=4;
-};
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 2a89de2..6c08a3a 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -31,7 +31,9 @@ void HandleWorkerFinish(void * ctx){
 
 const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num,
     const vector<shared_ptr<Param>>& params){
-  CHECK_GT(num,0);
+  std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
+  if (num==0)
+    return paramid2slices;
   vector<int> param_size;
   int avg=0;
   for(const auto& x:params){
@@ -43,7 +45,6 @@ const std::unordered_map<int, vector<std::pair<int, int>>> 
SliceParams(int num,
   LOG(INFO)<<"Slicer, param avg="<<avg<<", diff= "<<diff;
 
   int capacity=avg, sliceid=0, nbox=0;
-  std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
   for(auto& param: params){
     if(param->id()!=param->owner())
       continue;
@@ -115,7 +116,7 @@ const vector<int> PartitionSlice(int num, const 
vector<int>& slices){
 vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
     const ModelProto & mproto,
     const vector<int> slices,
-    vector<HandleContext>* ctx){
+    vector<HandleContext*>* ctx){
   auto cluster=Cluster::Get();
   vector<shared_ptr<Server>> servers;
   if(!cluster->has_server())
@@ -137,10 +138,10 @@ vector<shared_ptr<Server>> Trainer::CreateServers(int 
nthreads,
       auto server=make_shared<Server>(nthreads++, gid, sid);
       server->Setup(mproto.updater(), server_shard_, slice2group);
       servers.push_back(server);
-      HandleContext hc{dealer, gid, sid};
+      auto *hc=new HandleContext{dealer, gid, sid};
       ctx->push_back(hc);
-      CHECK(cluster->runtime()->sWatchSGroup(gid, sid, HandleWorkerFinish,
-            &(ctx->back())));
+      CHECK(cluster->runtime()->WatchSGroup(gid, sid, HandleWorkerFinish,
+            ctx->back()));
     }
   }
   return servers;
@@ -174,12 +175,12 @@ vector<shared_ptr<Worker>> Trainer::CreateWorkers(int 
nthreads,
   auto net=NeuralNet::SetupNeuralNet(mproto.neuralnet(), kTrain,
       cluster->nworkers_per_group());
   int lcm=LeastCommonMultiple(cluster->nserver_groups(), 
cluster->nservers_per_group());
-  auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
-  for(auto param: net->params()){
-    if(param->id()==param->owner())
-      for(auto entry: paramid2slices[param->id()])
-        slice_size->push_back(entry.second);
-  }
+    auto paramid2slices=SliceParams(lcm, net->params()); // sliceid, size
+    for(auto param: net->params()){
+      if(param->id()==param->owner())
+        for(auto entry: paramid2slices[param->id()])
+          slice_size->push_back(entry.second);
+    }
 
   for(int gid=gstart;gid<gend;gid++){
     shared_ptr<NeuralNet> train_net, test_net, validation_net;
@@ -257,10 +258,11 @@ void Trainer::Start(const ModelProto& mproto, const 
ClusterProto& cproto,
   // create workers
   vector<int> slices;
   vector<shared_ptr<Worker>> workers=CreateWorkers(nthreads, mproto, &slices);
-  slice2server_=PartitionSlice(cluster->nservers_per_group(), slices);
+  if(cluster->nserver_groups()&&cluster->nservers_per_group())
+    slice2server_=PartitionSlice(cluster->nservers_per_group(), slices);
   nthreads+=workers.size();
   // create servers
-  vector<HandleContext> ctx;
+  vector<HandleContext*> ctx;
   vector<shared_ptr<Server>> servers=CreateServers(nthreads, mproto, slices,
       &ctx);
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 17ff323..788e77c 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -19,10 +19,7 @@ void Worker::Setup(const ModelProto& model,
   train_net_=train_net;
   modelproto_=model;
   auto cluster=Cluster::Get();
-  if(cluster->nserver_groups()&&cluster->server_update()){
-    int sgid=group_id_/cluster->nworker_groups_per_server_group();
-    CHECK(cluster->runtime()->JoinSGroup(group_id_, worker_id_, sgid));
-  }else{
+  if(!(cluster->nserver_groups()&&cluster->server_update())){
     updater_=shared_ptr<Updater>(Singleton<Factory<Updater>>::Instance()
         ->Create("Updater"));
     updater_->Init(model.updater());
@@ -33,7 +30,7 @@ void Worker::ConnectStub(shared_ptr<Dealer> dealer, 
EntityType type){
   if(updater_==nullptr){
     auto cluster=Cluster::Get();
     int sgid=group_id_/cluster->nworker_groups_per_server_group();
-    CHECK(cluster->runtime()->wJoinSGroup(group_id_, worker_id_, sgid));
+    CHECK(cluster->runtime()->JoinSGroup(group_id_, worker_id_, sgid));
   }
 
   dealer->Connect(kInprocRouterEndpoint);
@@ -93,8 +90,10 @@ void Worker::Run(){
 
 void Worker::Stop(){
   auto cluster=Cluster::Get();
-  int sgid=group_id_/cluster->nworker_groups_per_server_group();
-  cluster->runtime()->LeaveSGroup(group_id_, worker_id_, sgid);
+  if(updater_ == nullptr){
+    int sgid=group_id_/cluster->nworker_groups_per_server_group();
+    cluster->runtime()->LeaveSGroup(group_id_, worker_id_, sgid);
+  }
   Msg* msg=new Msg();
   msg->set_src(group_id_, worker_id_, kWorkerParam);
   msg->set_dst(-1,-1, kStub);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index c344627..347b98e 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -3,6 +3,7 @@
 #include <fstream>
 #include "utils/cluster.h"
 #include "proto/cluster.pb.h"
+#include "proto/common.pb.h"
 #include <sys/stat.h>
 #include <sys/types.h>
 namespace singa {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/7d39f881/src/utils/common.cc
----------------------------------------------------------------------
diff --git a/src/utils/common.cc b/src/utils/common.cc
index 7cb217e..ed94856 100644
--- a/src/utils/common.cc
+++ b/src/utils/common.cc
@@ -18,6 +18,41 @@ using google::protobuf::io::ZeroCopyInputStream;
 using google::protobuf::Message;
 
 const int kBufLen = 1024;
+std::string IntVecToString(const vector<int>& vec) {
+  string disp="(";
+  for(int x: vec)
+    disp+=std::to_string(x)+", ";
+  return disp+")";
+}
+/**
+ *  * Formatted string.
+ *   */
+string VStringPrintf(string fmt, va_list l) {
+  char buffer[32768];
+  vsnprintf(buffer, 32768, fmt.c_str(), l);
+  return string(buffer);
+}
+
+/**
+ *  * Formatted string.
+ *   */
+string StringPrintf(string fmt, ...) {
+  va_list l;
+  va_start(l, fmt); //fmt.AsString().c_str());
+  string result = VStringPrintf(fmt, l);
+  va_end(l);
+  return result;
+}
+
+void Debug() {
+  int i = 0;
+  char hostname[256];
+  gethostname(hostname, sizeof(hostname));
+  printf("PID %d on %s ready for attach\n", getpid(), hostname);
+  fflush(stdout);
+  while (0 == i)
+    sleep(5);
+}
 
 // the proto related functions are from Caffe.
 void ReadProtoFromTextFile(const char* filename, Message* proto) {

Reply via email to