Repository: incubator-singa
Updated Branches:
  refs/heads/gpu [created] 9dbdfd686


SINGA-41:Support single node single GPU training

Rebase to lastest SINGA master


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/2818605a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/2818605a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/2818605a

Branch: refs/heads/gpu
Commit: 2818605ac10a28af542d764156588d65737f18fd
Parents: 2f66537
Author: seaok <[email protected]>
Authored: Mon Sep 14 15:21:09 2015 +0800
Committer: Wei Wang <[email protected]>
Committed: Tue Sep 29 10:15:04 2015 +0800

----------------------------------------------------------------------
 Makefile.gpu                  | 132 ++++++++++
 include/utils/blob.h          |   8 +
 include/utils/param.h         |   4 +
 src/neuralnet/neuron_layer.cc |  82 ++++--
 src/trainer/trainer.cc        | 522 +++++++++++++++++++++++++++++++++++++
 src/utils/blob.cc             |   6 +-
 6 files changed, 728 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/Makefile.gpu
----------------------------------------------------------------------
diff --git a/Makefile.gpu b/Makefile.gpu
new file mode 100644
index 0000000..0a3bd74
--- /dev/null
+++ b/Makefile.gpu
@@ -0,0 +1,132 @@
+###################User Config Varaibles #############################
+# third-party library installation folder
+HOME_DIR := /usr
+#Cuda installation folder
+CUDA_DIR := /usr/local/cuda
+# Lib folder for system and external libs. You may need to change it.
+LIBRARY_DIRS := $(HOME_DIR)/lib64 $(HOME_DIR)/lib $(HOME_DIR)/local/lib 
$(CUDA_DIR)/lib $(CUDA_DIR)/lib64
+# Header folder for system and external libs. You may need to change it.
+INCLUDE_DIRS := $(HOME_DIR)/include ./include 
$(HOME_DIR)/local/include/zookeeper $(HOME_DIR)/local/include 
$(CUDA_DIR)/include
+# g++ location, should support c++11, tested with 4.8.1
+CXX := g++
+CUCXX := nvcc
+
+######################Setting Varialbes#######################################
+LIBRARIES := glog protobuf openblas zmq czmq zookeeper_mt
+
+# Add CUDA library
+ifneq ($(CUDA_DIR),)
+       LIBRARIES := $(LIBRARIES) cublas cudart curand
+endif
+
+LDFLAGS := $(foreach librarydir, $(LIBRARY_DIRS), -L$(librarydir))\
+       $(foreach library, $(LIBRARIES), -l$(library))
+# Folder to store compiled files
+BUILD_DIR := .libs
+MSHADOW_FLAGS :=-DMSHADOW_USE_CUDA=1 -DMSHADOW_USE_CBLAS=1 -DMSHADOW_USE_MKL=0
+ZK_FLAGS :=-DTHREADED -fpermissive
+CXXFLAGS := -O2 -msse3 -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \
+       $(MSHADOW_FLAGS) $(ZK_FLAGS)\
+       -funroll-loops $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir))
+CUCXXFLAGS := $(MSHADOW_FLAGS) -std=c++11 \
+       $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir))
+
+# Add device compile option
+ifeq ($(CUDA_DIR),)
+       MSHADOW_FLAGS := $(MSHADOW_FLAGS) -DCPU_ONLY
+       CXXFLAGS := $(CXXFLAGS) -DCPU_ONLY
+endif
+
+# find user defined .proto file, and then compute the corresponding .h, .cc
+# files, which cannot be found by shell find, because they haven't been
+# generated currently
+PROTOS := $(shell find src/proto/ -name "*.proto")
+PROTO_SRCS :=$(PROTOS:.proto=.pb.cc)
+PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h))
+PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o))
+
+# each singa src file will generate a .o file
+SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" -o 
-path "src/utils/tool.cc" \) \
+       -prune -o \( -name "*.cc" -type f \) -print )
+SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \
+       $(PROTO_OBJS) )
+-include $(SINGA_OBJS:%.o=%.P)
+
+TEST_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cc")
+TEST_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_SRCS:.cc=.o)))
+-include $(TEST_OBJS:%.o=%.P)
+
+TEST_CUDA_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cu")
+TEST_CUDA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_CUDA_SRCS:.cu=.o)))
+-include $(TEST_CUDA_OBJS:%.o=%.P)
+
+
+SINGA_CUDA_SRCS :=$(shell find src/neuralnet/ -maxdepth 1 -name "*.cu")
+SINGA_CUDA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, 
$(SINGA_CUDA_SRCS:.cu=.o)))
+-include $(SINGA_CUDA_OBJS:%.o=%.P)
+
+GTEST_SRC := include/gtest/gtest-all.cc
+GTEST_HDR := include/gtest/gtest.h
+GTEST_LIB := $(BUILD_DIR)/libgtest.a
+
+OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) )
+CUOBJS := $(sort $(SINGA_CUDA_OBJS) $(TEST_CUDA_OBJS) )
+
+########################Compilation Section###################################
+.PHONY: singa test
+
+singa: $(PROTO_OBJS) $(SINGA_OBJS) $(SINGA_CUDA_OBJS)
+       $(CXX) -shared -o $(BUILD_DIR)/libsinga.so $(SINGA_OBJS)
+       $(CXX) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) src/main.cc -o singa 
$(CXXFLAGS) $(LDFLAGS)
+       @echo
+       $(CXX) $(SINGA_OBJS) $(SINGA_CUDA_OBJS) src/utils/tool.cc -o singatool 
$(CXXFLAGS) $(LDFLAGS)
+       @echo
+
+loader: proto $(LOADER_OBJS)
+       $(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS)
+       @echo
+
+test:  proto $(GTEST_LIB) $(TEST_OBJS) $(TEST_CUDA_OBJS) $(SINGA_OBJS) 
$(SINGA_CUDA_OBJS)
+       $(CXX) $(TEST_OBJS) $(TEST_CUDA_OBJS) include/gtest/gtest_main.cc 
$(GTEST_LIB) \
+               $(SINGA_OBJS) $(SINGA_CUDA_OBJS) -o $(BUILD_DIR)/test 
$(CXXFLAGS) $(LDFLAGS)
+       @echo
+
+$(GTEST_LIB): $(GTEST_HDR) $(GTEST_SRC)
+       $(CXX) $(GTEST_SRC) -c -o $(BUILD_DIR)/gtest-all.o $(CXXFLAGS)
+       ar -rv $(GTEST_LIB) $(BUILD_DIR)/gtest-all.o
+
+# compile all files
+$(OBJS):$(BUILD_DIR)/%.o : %.cc
+       @mkdir -p $(dir $@)
+       $(CXX) $<  $(CXXFLAGS) -MMD -c -o $@
+       cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \
+       sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \
+               -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> 
$(BUILD_DIR)/$*.P; \
+       rm -f $*.d
+
+$(CUOBJS):$(BUILD_DIR)/%.o : %.cu
+       @mkdir -p $(dir $@)
+       $(CUCXX) $< -c -o $@ $(CUCXXFLAGS)
+       cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \
+       sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \
+       -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> $(BUILD_DIR)/$*.P; \
+       rm -f $*.d
+
+proto: $(PROTO_OBJS)
+
+$(PROTO_SRCS): $(PROTOS)
+       protoc --proto_path=src/proto --cpp_out=src/proto $(PROTOS)
+       mkdir -p include/proto/
+       cp src/proto/*.pb.h include/proto/
+       mkdir -p tool/pb2/
+       touch tool/pb2/__init__.py
+       protoc --proto_path=src/proto --python_out=tool/pb2/ $(PROTOS)
+       @echo
+
+clean:
+       rm -rf *.a *.so
+       rm -rf include/proto/*
+       rm -rf src/proto/*.pb.h src/proto/*.pb.cc
+       rm -rf tool/pb2/*
+       rm -rf $(BUILD_DIR)
+       @echo

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/include/utils/blob.h
----------------------------------------------------------------------
diff --git a/include/utils/blob.h b/include/utils/blob.h
index 91db095..a7079de 100644
--- a/include/utils/blob.h
+++ b/include/utils/blob.h
@@ -181,6 +181,14 @@ class Blob {
     CHECK(data_);
     return static_cast<Dtype*>(data_->mutable_gpu_data());
   }
+  inline Dtype* mutable_xpu_data() {
+    CHECK(data_);
+       #ifndef CPU_ONLY
+               return static_cast<Dtype*>(data_->mutable_gpu_data());
+       #else
+           return static_cast<Dtype*>(data_->mutable_cpu_data());
+       #endif
+  }
   /// @brief Compute the sum of absolute values (L1 norm) of the data.
   Dtype asum_data() const;
   Dtype sum_data() const;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
index f690438..9829334 100644
--- a/include/utils/param.h
+++ b/include/utils/param.h
@@ -214,6 +214,10 @@ class Param {
   inline float* mutable_cpu_data() { return data_->mutable_cpu_data(); }
   inline float* mutable_cpu_grad() { return grad_.mutable_cpu_data(); }
   inline float* mutable_cpu_history() { return history_.mutable_cpu_data(); }
+
+  inline float* mutable_xpu_data() { return data_->mutable_xpu_data(); }
+  inline float* mutable_xpu_grad() { return grad_.mutable_xpu_data(); }
+  inline float* mutable_xpu_history() { return history_.mutable_xpu_data(); }
   /**
    * @return slice start ID
    */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/neuralnet/neuron_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuron_layer.cc b/src/neuralnet/neuron_layer.cc
index 4e3acf0..2d84fea 100644
--- a/src/neuralnet/neuron_layer.cc
+++ b/src/neuralnet/neuron_layer.cc
@@ -30,6 +30,7 @@
 namespace singa {
 
 using namespace mshadow;
+using namespace mshadow::expr;
 using mshadow::cpu;
 
 using mshadow::Shape;
@@ -42,6 +43,12 @@ using mshadow::Tensor;
 using std::string;
 using std::vector;
 
+#ifndef CPU_ONLY
+    #define xpu mshadow::gpu
+#else
+    #define xpu mshadow::cpu
+#endif
+
 inline Tensor<cpu, 4> Tensor4(Blob<float>* blob) {
   const vector<int>& shape = blob->shape();
   Tensor<cpu, 4> tensor(blob->mutable_cpu_data(),
@@ -68,6 +75,30 @@ inline Tensor<cpu, 1> Tensor1(Blob<float>* blob) {
   return tensor;
 }
 
+inline Tensor<xpu, 4> Tensor4XPU(Blob<float>* blob) {
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 4> tensor(blob->mutable_xpu_data(),
+         Shape4(shape[0], shape[1], shape[2], shape[3]));
+  return tensor;
+}
+
+inline Tensor<xpu, 3> Tensor3XPU(Blob<float>* blob){
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 3> tensor(blob->mutable_xpu_data(),
+         Shape3(shape[0], shape[1], blob->count() / shape[0] / shape[1]));
+  return tensor;
+}
+inline Tensor<xpu, 2> Tensor2XPU(Blob<float>* blob){
+  const vector<int>& shape = blob->shape();
+  Tensor<xpu, 2> tensor(blob->mutable_xpu_data(),
+         Shape2(shape[0], blob->count() / shape[0]));
+  return tensor;
+}
+inline Tensor<xpu, 1> Tensor1XPU(Blob<float>* blob){
+  Tensor<xpu, 1> tensor(blob->mutable_xpu_data(), Shape1(blob->count()));
+  return tensor;
+}
+
 /************ Implementation for ConvolutionLayer*************************/
 ConvolutionLayer::~ConvolutionLayer() {
   delete weight_;
@@ -112,11 +143,11 @@ void ConvolutionLayer::Setup(const LayerProto& conf,
 
 void ConvolutionLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
-  auto src = Tensor4(srclayers[0]->mutable_data(this));
-  auto data = Tensor3(&data_);
-  auto col = Tensor2(&col_data_);
-  auto weight = Tensor2(weight_->mutable_data());
-  auto bias = Tensor1(bias_->mutable_data());
+  auto src = Tensor4XPU(srclayers_[0]->mutable_data(this));
+  auto data = Tensor3XPU(&data_);
+  auto col = Tensor2XPU(&col_data_);
+  auto weight = Tensor2XPU(weight_->mutable_data());
+  auto bias = Tensor1XPU(bias_->mutable_data());
   for (int n = 0; n < batchsize_; n++) {
     if (pad_ > 0)
       col = expr::unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
@@ -129,17 +160,17 @@ void ConvolutionLayer::ComputeFeature(int flag,
 
 void ConvolutionLayer::ComputeGradient(int flag,
     const vector<Layer*>& srclayers) {
-  auto src = Tensor4(srclayers[0]->mutable_data(this));
-  auto col = Tensor2(&col_data_);
-  auto weight = Tensor2(weight_->mutable_data());
-  auto grad = Tensor3(&grad_);
-  auto gcol = Tensor2(&col_grad_);
-  auto gweight = Tensor2(weight_->mutable_grad());
-  auto gbias = Tensor1(bias_->mutable_grad());
-  Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this);
-  Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_));
+  auto src = Tensor4XPU(srclayers_[0]->mutable_data(this));
+  auto col = Tensor2XPU(&col_data_);
+  auto weight = Tensor2XPU(weight_->mutable_data());
+  auto grad = Tensor3XPU(&grad_);
+  auto gcol = Tensor2XPU(&col_grad_);
+  auto gweight = Tensor2XPU(weight_->mutable_grad());
+  auto gbias = Tensor1XPU(bias_->mutable_grad());
+  Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this);
+  Tensor<xpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_));
   if (gsrcblob != nullptr)
-    gsrc.dptr = gsrcblob->mutable_cpu_data();
+    gsrc.dptr = gsrcblob->mutable_xpu_data();
   gbias = expr::sumall_except_dim<1>(grad);
   gweight = 0.0f;
   Shape<3> padshp(gsrc.shape.SubShape());
@@ -158,6 +189,7 @@ void ConvolutionLayer::ComputeGradient(int flag,
           imgshp);
     }
   }
+ // weight_->mutable_data()->mutable_cpu_data();
 }
 
 /******************* Implementation for CConvolutionLayer *********/
@@ -421,10 +453,10 @@ void InnerProductLayer::Setup(const LayerProto& conf,
 
 void InnerProductLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
-  auto data = Tensor2(&data_);
-  auto src = Tensor2(srclayers[0]->mutable_data(this));
-  auto weight = Tensor2(weight_->mutable_data());
-  auto bias = Tensor1(bias_->mutable_data());
+  auto data = Tensor2XPU(&data_);
+  auto src = Tensor2XPU(srclayers_[0]->mutable_data(this));
+  auto weight = Tensor2XPU(weight_->mutable_data());
+  auto bias = Tensor1XPU(bias_->mutable_data());
   if (transpose_)
     data = dot(src, weight);
   else
@@ -435,11 +467,11 @@ void InnerProductLayer::ComputeFeature(int flag,
 
 void InnerProductLayer::ComputeGradient(int flag,
     const vector<Layer*>& srclayers) {
-  auto src = Tensor2(srclayers[0]->mutable_data(this));
-  auto grad = Tensor2(&grad_);
-  auto weight = Tensor2(weight_->mutable_data());
-  auto gweight = Tensor2(weight_->mutable_grad());
-  auto gbias = Tensor1(bias_->mutable_grad());
+  auto src = Tensor2XPU(srclayers_[0]->mutable_data(this));
+  auto grad = Tensor2XPU(&grad_);
+  auto weight = Tensor2XPU(weight_->mutable_data());
+  auto gweight = Tensor2XPU(weight_->mutable_grad());
+  auto gbias = Tensor1XPU(bias_->mutable_grad());
 
   gbias = expr::sum_rows(grad);
   if (transpose_)
@@ -447,7 +479,7 @@ void InnerProductLayer::ComputeGradient(int flag,
   else
     gweight = dot(grad.T(), src);
   if (srclayers[0]->mutable_grad(this) != nullptr) {
-    auto gsrc = Tensor2(srclayers[0]->mutable_grad(this));
+    auto gsrc = Tensor2XPU(srclayers_[0]->mutable_grad(this));
     if (transpose_)
       gsrc = dot(grad, weight.T());
     else

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
new file mode 100644
index 0000000..22b5757
--- /dev/null
+++ b/src/trainer/trainer.cc
@@ -0,0 +1,522 @@
+#include <thread>
+#include <vector>
+#include <map>
+#include <chrono>
+#include <glog/logging.h>
+#include "utils/tinydir.h"
+#include <unistd.h>
+#include "utils/cluster.h"
+#include "utils/common.h"
+#include "proto/common.pb.h"
+#include "trainer/trainer.h"
+#include "mshadow/tensor.h"
+
+
+namespace singa {
+using std::vector;
+using std::map;
+using std::queue;
+using namespace std::chrono;
+using std::make_shared;
+
+/***********************Trainer****************************/
+Trainer::~Trainer() {
+  // free Params (i.e., slices) in server shard
+  for (auto entry : server_shard_)
+    for (auto param : entry.second->shares)
+      delete param;
+  delete router_;
+}
+
+const vector<int> SliceParams(const vector<Param*>& params) {
+  // for load-balance among servers in a group and among server groups
+  int nserver_grps = Cluster::Get()->nserver_groups();
+  int nservers_per_grp = Cluster::Get()->nservers_per_group();
+  int lcm = LeastCommonMultiple(nserver_grps, nservers_per_grp);
+
+  // collect sizes of unique Params
+  std::vector<int> paramsize;
+  for (auto param : params)
+    if (param->id() == param->owner())
+      paramsize.push_back(param->size());
+  // slice into lcm pieces to achieve good load-balance for both intra-group
+  // partition (among servers in a group) and inter-group partition (each group
+  // is assgined a sub-set of slices)
+  auto param_slice = Slice(lcm, paramsize);
+  // construct map from Param ID to its slices <slice id, len>
+  std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
+  vector<int> slices;
+  auto it = param_slice.begin();
+  int slice_id = 0;
+  for (auto param : params) {
+    if (param->id() == param->owner()) {
+      for (int len : *it) {
+        slices.push_back(len);
+        paramid2slices[param->id()].push_back(std::make_pair(slice_id++, len));
+      }
+      it++;
+    }
+  }
+  // add slice info for every Param
+  for (auto param : params)
+    for (auto entry : paramid2slices[param->owner()]) {
+      param->AddSlice(entry.first, entry.second);
+      LOG(INFO) << "param id " << param->id() << " owner=" << param->owner()
+        << ": " << entry.first << ", " << entry.second;
+    }
+  return slices;
+}
+
+void Trainer::SetupWorkerServer(
+    const JobProto& job_conf,
+    const vector<Worker*>& workers,
+    const vector<Server*>& servers) {
+  auto cluster = Cluster::Get();
+  int grp_size = cluster->nworkers_per_group();
+  const auto& net_conf = job_conf.neuralnet();
+  auto net = NeuralNet::Create(net_conf, kTrain, grp_size);
+  // MUST do SliceParam before share param/net with others
+  auto slices = SliceParams(net->params());
+
+  std::unordered_map<int, shared_ptr<NeuralNet>> grp_net;
+  int first_grp = workers.size() ? workers.at(0)->grp_id() : -1;
+  for (auto worker : workers) {
+    int grp_id = worker->grp_id();
+    int worker_id = worker->id();
+    shared_ptr<NeuralNet> test_net = nullptr, valid_net = nullptr;
+    if (grp_net.find(grp_id) == grp_net.end()) {
+      if (grp_id == first_grp) {
+        //  test are performed only by the first group now. TODO update.
+        if (first_grp == 0 && job_conf.test_steps() && worker_id == 0) {
+          test_net = NeuralNet::Create(net_conf, kTest, 1); // hard code for 
exp
+          test_net->ShareParamsFrom(net);
+        }
+        //  validation are performed only by the first group. TODO update.
+        if (first_grp == 0 && job_conf.valid_steps() && worker_id == 0) {
+          valid_net = NeuralNet::Create(net_conf, kValidation, 1);
+          valid_net->ShareParamsFrom(net);
+        }
+        grp_net[grp_id] = net;
+      } else {
+        grp_net[grp_id] = NeuralNet::Create(net_conf, kTrain, grp_size);
+        if(cluster->share_memory())
+          grp_net[grp_id]->ShareParamsFrom(net);
+      }
+      for (auto layer : grp_net[grp_id]->layers()) {
+        bool local = layer->partition_id() >= workers.front()->id()
+          && layer->partition_id() <= workers.back()->id();
+        for (auto param : layer->GetParams()) {
+          int hash = Hash(grp_id, param->owner());
+          if (worker_shard_.find(hash) == worker_shard_.end())
+            worker_shard_[hash] = new ParamEntry();
+          worker_shard_[hash]->AddParam(local, param);
+        }
+      }
+    }
+    LOG(INFO) << "grp " << worker->grp_id() << ", worker "
+      << worker->id() << " net " << grp_net[grp_id].get();
+    worker->Setup(job_conf, grp_net[grp_id], valid_net, test_net);
+  }
+
+  //  partition among server groups, each group maintains one sub-set for sync
+  auto slice2group = PartitionSlices(cluster->nserver_groups(), slices);
+  for (auto server : servers)
+    server->Setup(job_conf.updater(), &server_shard_, slice2group);
+  //  partition within one server group, each server updates for one sub-set
+  slice2server_ = PartitionSlices(cluster->nservers_per_group(), slices);
+}
+
+vector<Server*> Trainer::CreateServers(int nthreads, const JobProto& job) {
+  auto cluster = Cluster::Get();
+  vector<Server*> servers;
+  if (!cluster->has_server())
+    return servers;
+
+  int pid = cluster->procs_id();
+  // if true, server procs (logical) id starts after worker procs
+  if (cluster->server_worker_separate())
+    pid -= cluster->nworker_procs();
+  int procs_size = cluster->nservers_per_procs();
+  int grp_size = cluster->nservers_per_group();
+  int gid = pid *  procs_size / grp_size;
+  int start = pid * procs_size % grp_size;
+  int end = start + procs_size;
+  for (int sid = start; sid < end; sid++) {
+    auto server = new Server(nthreads++, gid, sid);
+    servers.push_back(server);
+  }
+  return servers;
+}
+
+vector<Worker*> Trainer::CreateWorkers(int nthreads, const JobProto& job) {
+  auto cluster=Cluster::Get();
+  vector<Worker*> workers;
+  if(!cluster->has_worker())
+    return workers;
+  int pid = cluster->procs_id();
+  int grp_size = cluster->nworkers_per_group();
+  int procs_size = cluster->nworkers_per_procs();
+  int gstart, gend, wstart, wend;
+  if (grp_size >= procs_size) {
+    // all workers in this procs are from the same group
+    gstart = pid * procs_size / grp_size;
+    gend = gstart + 1;
+    wstart = pid * procs_size % grp_size;
+    wend = wstart + procs_size;
+  } else {
+    // there are multiple (complete) groups in this procs.
+    CHECK_EQ(procs_size % grp_size, 0);
+    int groups_per_procs = procs_size / grp_size;
+    gstart = pid * groups_per_procs;
+    gend = (pid+1) * groups_per_procs;
+    wstart = 0;
+    wend = grp_size;
+  }
+  for (int gid = gstart; gid < gend; gid++) {
+    for (int wid = wstart; wid < wend; wid++) {
+      auto *worker = Worker::Create(job);
+      worker->Init(nthreads++,gid, wid);
+      workers.push_back(worker);
+    }
+  }
+  return workers;
+}
+
+void Trainer::Resume(JobProto* jobConf) {
+  tinydir_dir dir;
+  string folder = Cluster::Get()->checkpoint_folder();
+  tinydir_open(&dir, folder.c_str());
+  int latest_step = 0;
+  // there would be multi checkpoint files (from diff workers) for one step
+  vector<string> ck_files;
+  // iterate all files to get the files for the last checkpoint
+  while (dir.has_next) {
+    tinydir_file file;
+    tinydir_readfile(&dir, &file);
+    tinydir_next(&dir);
+    char* ch = strstr(file.name, "step");
+    if (ch == nullptr) {
+      if (file.name[0] != '.')
+        LOG(INFO) << "Irregular file in checkpoint folder: " << file.name;
+      continue;
+    }
+
+    LOG(INFO) << "Add checkpoint file for resume: " << ch;
+    int step = atoi(ch+4);
+    if (step == latest_step) {
+      ck_files.push_back(file.name);
+    } else if(step > latest_step) {
+      latest_step = step;
+      ck_files.clear();
+      ck_files.push_back(string(file.name));
+    }
+  }
+
+  if (latest_step > 0) {
+    jobConf->set_step(latest_step);
+    if (!jobConf->has_reset_param_version())
+      jobConf->set_reset_param_version(false);
+    jobConf->clear_checkpoint_path();
+    for (auto ck_file : ck_files)
+      jobConf->add_checkpoint_path(folder + "/" + ck_file);
+  }
+  tinydir_close(&dir);
+}
+
+void Trainer::Start(bool resume, const SingaProto& singaConf, JobProto* job) {
+  // register job to zookeeper at the beginning
+  auto cluster = Cluster::Setup(job->id(), singaConf, job->cluster());
+  if (resume)
+    Resume(job);
+
+  router_ = new Router();
+  router_->Bind(kInprocRouterEndpoint);
+  const string hostip = cluster->hostip();
+  int port = router_->Bind("tcp://" + hostip + ":*");
+  // register endpoint to zookeeper
+  cluster->Register(getpid(), hostip + ":" + std::to_string(port));
+
+  int nthreads = 1;
+  const vector<Worker*> workers = CreateWorkers(nthreads, *job);
+  nthreads += workers.size();
+  const vector<Server*> servers = CreateServers(nthreads, *job);
+  SetupWorkerServer(*job, workers, servers);
+
+#ifdef USE_MPI
+  for (int i = 0; i < nthreads; i++)
+    MPIQueues.push_back(make_shared<SafeQueue>());
+#endif
+  vector<std::thread> threads;
+  for(auto server : servers)
+    threads.push_back(std::thread(&Server::Run, server));
+  for(auto worker : workers)
+    threads.push_back(std::thread(&Worker::Run, worker));
+  Run(workers, servers);
+  for(auto& thread : threads)
+    thread.join();
+  for(auto server : servers)
+    delete server;
+  for(auto worker : workers)
+    delete worker;
+}
+
+inline int bandwidth(int bytes, system_clock::time_point start) {
+  auto now=system_clock::now();
+  auto duration=duration_cast<std::chrono::milliseconds> (now - start);
+  return static_cast<int>(bytes*1000.f/duration.count());
+}
+
+void Trainer::Run(
+    const vector<Worker*>& workers,
+    const vector<Server*>& servers) {
+  int nworkers = workers.size(), nservers = servers.size();
+  auto cluster = Cluster::Get();
+  procs_id_ = cluster->procs_id();
+  LOG(INFO) << "Stub in process " << procs_id_ << " starts";
+
+  // for sync among server groups
+  auto start = std::chrono::system_clock::now();
+  float trans_size = 0.f;  // total size of msg transferred since start time
+  int sync_server_id = 0;
+  int max_bandwidth = cluster->bandwidth();
+  int nserver_grps = cluster->nserver_groups();
+
+  map<int, Dealer*> inter_dealers;  // for sending msg to other procs
+
+  std::queue<Msg*> msg_queue;
+  Poller poll(router_);
+  bool stop=false;
+  while (!stop || !msg_queue.empty()) {
+    if (msg_queue.empty()) {
+      // if the poll time is large, then the poller may not expire
+      // if it is small, then many reminder messages will be sent which may
+      // slow done the process of other request. TODO tune it.
+      auto *sock = poll.Wait(cluster->poll_time());
+      if (poll.Terminated()) {
+        LOG(ERROR) << "Connection broken!";
+        exit(0);
+      } else if (sock == nullptr) {
+        if (nserver_grps > 1 && bandwidth(trans_size, start) < max_bandwidth) {
+          Msg* msg = GenSyncReminderMsg(sync_server_id, servers);
+          router_->Send(&msg) ;
+          sync_server_id = (sync_server_id + 1) % nservers;
+        }
+        continue;
+      }
+      Msg* msg = router_->Receive();
+      msg_queue.push(msg);
+    }
+    Msg* msg = msg_queue.front();
+    msg_queue.pop();
+    int type = msg->type(), dst = msg->dst(), flag = AddrType(dst);
+    if (flag == kStub && (AddrProc(dst) == procs_id_ || AddrGrp(dst) == -1)) {
+      if (type == kConnect) {
+        DeleteMsg(&msg);
+      } else if (type == kMetric) {
+        DisplayMetric(&msg);
+      } else if (type == kStop) {
+        int src_flag = AddrType(msg->src());
+        if (src_flag == kServer) nservers--;
+        else if (src_flag == kWorkerParam) nworkers--;
+        DeleteMsg(&msg);
+        if (nworkers == 0 && nservers == 0) break;
+      } else if (nserver_grps > 0) {
+        HandleLocalMsg(&msg_queue, &msg);
+      } else {
+        DeleteMsg(&msg);
+      }
+    } else {
+      int dst_procs = AddrProc(dst);
+      if (flag != kStub)
+        dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag);
+      if (dst_procs != procs_id_) {
+        if (bandwidth(trans_size, start) <= cluster->bandwidth()) {
+          start = std::chrono::system_clock::now();
+          trans_size = 0;
+        }
+        trans_size += msg->size();
+
+        if (inter_dealers.find(dst_procs) == inter_dealers.end())
+          inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs);
+        inter_dealers[dst_procs]->Send(&msg);
+      } else {
+        if (type == kSyncRequest)
+          msg->AddFormatFrame("i", max_bandwidth - bandwidth(trans_size, 
start));
+        router_->Send(&msg);
+      }
+    }
+  }
+  LOG(ERROR) << "Stub in process " << procs_id_ << " stops";
+  for (auto& entry : inter_dealers)
+    delete entry.second;
+}
+
+Msg* Trainer::GenSyncReminderMsg(int server, const vector<Server*>& servers ) {
+  Msg* msg = new Msg();
+  msg->set_src(Addr(-1,-1, kStub));
+  msg->set_dst(Addr(servers[server]->grp_id(), servers[server]->id(), 
kServer));
+  msg->set_type(kSyncReminder);
+  return msg;
+}
+
+void Trainer::DisplayMetric(Msg** msg) {
+  Msg* msgg = *msg;
+  // only display metrics from the first group
+  if (AddrGrp(msgg->src()) == 0) {
+    int step = msgg->trgt_version();
+    char prefix[128];
+    msgg->ParseFormatFrame("s", prefix);
+    CHECK(msgg->NextFrame());
+    const string perf(static_cast<char*>(msgg->FrameData()), 
msgg->FrameSize());
+    Metric cur(perf);
+    LOG(ERROR) << prefix << " step-" << step <<", " << cur.ToLogString();
+  }
+  DeleteMsg(msg);
+}
+
+Dealer* Trainer::CreateInterProcsDealer(int dst_procs) {
+  // forward to other procs
+  auto cluster = Cluster::Get();
+  auto dealer = new Dealer();
+  while(cluster->endpoint(dst_procs)=="") {
+    //kCollectSleepTime));
+    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+    LOG(ERROR)<<"waiting for procs "<< dst_procs<<" to register";
+  }
+  dealer->Connect("tcp://"+cluster->endpoint(dst_procs));
+  return dealer;
+}
+
+void Trainer::HandleLocalMsg(queue<Msg*>* msg_queue, Msg** msg) {
+  Msg* msgg = *msg;
+  int paramid = ParamID(msgg->trgt_val());
+  int type = msgg->type();
+  int grp;
+  ParamEntry *entry = nullptr;
+  switch (type) {  // TODO process other requests, e.g. RESTful
+    case kUpdate:
+      grp = AddrGrp(msgg->src());
+      entry = worker_shard_.at(Hash(grp, paramid));
+      for(auto update_msg : HandleUpdate(entry, msg))
+        msg_queue->push(update_msg);
+      break;
+    case kRUpdate:
+      grp = AddrGrp(msgg->dst());
+      entry = worker_shard_.at(Hash(grp, paramid));
+      HandleUpdateResponse(entry, msg);
+      break;
+    case kGet:
+      grp = AddrGrp(msgg->src());
+      entry = worker_shard_.at(Hash(grp, paramid));
+      for(auto get_msg : HandleGet(entry, msg))
+        msg_queue->push(get_msg);
+      break;
+    case kRGet:
+      grp = AddrGrp(msgg->dst());
+      entry = worker_shard_.at(Hash(grp, paramid));
+      HandleGetResponse(entry, msg);
+      break;
+    case kPut:
+      grp = AddrGrp(msgg->src());
+      entry = worker_shard_.at(Hash(grp, paramid));
+      for(auto put_msg : HandlePut(entry, msg))
+        msg_queue->push(put_msg);
+      break;
+    default:
+      LOG(ERROR)<<"Unknow message type:"<<type;
+      break;
+  }
+}
+
+void Trainer::GenMsgs(int type, int version, ParamEntry* entry,
+    Msg* msg, vector<Msg*> *ret) {
+  int src_grp = AddrGrp(msg->src());
+  int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group();
+  auto param=entry->shares.at(0);
+  for (int idx = 0 ; idx < param->num_slices(); idx++) {
+    int slice_id =param->slice_start() + idx;
+    int server = slice2server_[slice_id];
+    int procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer);
+    Msg* new_msg = nullptr;
+    if (type == kPut) {
+      CHECK_GT(entry->num_total, 0);
+      //new_msg = param->GenPutMsg(procs != procs_id_, idx);
+      new_msg = param->GenPutMsg(true, idx);
+      new_msg->AddFormatFrame("i", entry->num_total);
+    } else if (type == kGet) {
+      //new_msg = param->GenGetMsg(procs != procs_id_, idx);
+      new_msg = param->GenGetMsg(true, idx);
+    } else if (type == kUpdate) {
+      //new_msg = param->GenUpdateMsg(procs != procs_id_, idx);
+      new_msg = param->GenUpdateMsg(true, idx);
+      new_msg->AddFormatFrame("i", entry->num_local);
+    } else {
+      LOG(FATAL) << "Wrong type";
+    }
+    new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version);
+    new_msg->set_src(Addr(src_grp, procs_id_, kStub));
+    new_msg->set_dst(Addr(dst_grp, server, kServer));
+    ret->push_back(new_msg);
+  }
+}
+
+const vector<Msg*> Trainer::HandleGet(ParamEntry* entry, Msg** msg) {
+  vector<Msg*> ret;
+  int version = (*msg)->trgt_version();
+  if (version > entry->next_version) {
+    entry->next_version = version;
+    GenMsgs(kGet, version, entry, *msg, &ret);
+  }
+  DeleteMsg(msg);
+  return ret;
+}
+
+const vector<Msg*> Trainer::HandleUpdate(ParamEntry *entry, Msg** msg) {
+  vector<Msg*> ret;
+  entry->num_update++;
+  if (entry->num_update >= entry->num_local) {
+    // average local gradient
+    if (entry->num_local > 1) {
+      auto it = entry->shares.begin();
+      auto shape=mshadow::Shape1((*it)->size());
+      mshadow::Tensor<mshadow::cpu,1> sum((*it)->mutable_cpu_grad(), shape);
+      for (++it; it != entry->shares.end(); it++) {
+        mshadow::Tensor<mshadow::cpu,1> grad((*it)->mutable_cpu_grad(), shape);
+        sum += grad;
+      }
+      sum /= entry->num_total;
+    }
+    int step = (*msg)->trgt_version();
+    GenMsgs(kUpdate, step, entry, *msg, &ret);
+    entry->num_update = 0;
+  }
+  DeleteMsg(msg);
+  return ret;
+}
+
+const vector<Msg*> Trainer::HandlePut(ParamEntry* entry, Msg** msg) {
+  vector<Msg*> ret;
+  int version = (*msg)->trgt_version();
+  GenMsgs(kPut, version, entry, *msg, &ret);
+  DeleteMsg(msg);
+  return ret;
+}
+
+void Trainer::HandleGetResponse(ParamEntry* entry, Msg** msg) {
+  int version = (*msg)->trgt_version();
+  int sliceid = SliceID((*msg)->trgt_val());
+  auto param = entry->shares.at(0);
+  if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start()))
+    param->set_version(version);
+  DeleteMsg(msg);
+}
+
+void Trainer::HandleUpdateResponse(ParamEntry* entry, Msg** msg) {
+  int version = (*msg)->trgt_version();
+  int sliceid = SliceID((*msg)->trgt_val());
+  auto param = entry->shares.at(0);
+  if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start()))
+    param->set_version(version);
+  DeleteMsg(msg);
+}
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2818605a/src/utils/blob.cc
----------------------------------------------------------------------
diff --git a/src/utils/blob.cc b/src/utils/blob.cc
index 24c7f54..ef582fa 100644
--- a/src/utils/blob.cc
+++ b/src/utils/blob.cc
@@ -61,9 +61,11 @@
 #include "utils/blob.h"
 
 #include <cblas.h>
+#include <cuda_runtime.h>
 #include <math.h>
 #include <utility>
 
+
 #define NOT_IMPLEMENTED LOG(FATAL) << "Not implemented function"
 #define NO_GPU LOG(FATAL) << "CPU-only Mode: cannot make GPU call."
 // Instantiate a class with float and double specifications.
@@ -187,7 +189,9 @@ void SyncedMemory::to_gpu() {
   switch (head_) {
   case UNINITIALIZED:
     CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
-    CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N));
+    //CUDA_CHECK(cudaMemset(gpu_ptr_, 0, N));
+       //
+    CUDA_CHECK(cudaMemset(gpu_ptr_, 0, size_));
     head_ = HEAD_AT_GPU;
     break;
   case HEAD_AT_CPU:

Reply via email to