SINGA-226 Add parallel training on a single machine for singa v1.0

Update local_updater.cc to an alternative version which
fully utilizes parallelly data copying.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4e7f3c13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4e7f3c13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4e7f3c13

Branch: refs/heads/dev
Commit: 4e7f3c13b1de9bbe58d25dccec5d67ac8d98a3fe
Parents: 0184fac
Author: WANG Ji <[email protected]>
Authored: Fri Jul 22 19:29:54 2016 +0800
Committer: WANG Ji <[email protected]>
Committed: Fri Jul 22 21:16:51 2016 +0800

----------------------------------------------------------------------
 include/singa/model/updater.h      | 23 ++++++---
 src/model/feed_forward_net.cc      |  2 +-
 src/model/updater/local_updater.cc | 90 +++++++++++++++------------------
 3 files changed, 60 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/include/singa/model/updater.h
----------------------------------------------------------------------
diff --git a/include/singa/model/updater.h b/include/singa/model/updater.h
index ef7c32a..e0a656c 100644
--- a/include/singa/model/updater.h
+++ b/include/singa/model/updater.h
@@ -29,7 +29,9 @@
 #include <mutex>
 #include <condition_variable>
 #include <string>
+#include <utility>
 #include <unordered_map>
+#include <atomic>
 
 namespace singa {
 /// Basic Updater class just forward all the method function call
@@ -69,17 +71,26 @@ class LocalUpdater : public Updater {
   /// all the partial gradients are aggrageted in a synchronized style 
training.
   virtual void Apply(int step, const string& name, Tensor& grad,
                      Tensor& value) override;
-
  private:
+  template <typename T1, typename T2>
+  struct key_hasher {
+    size_t operator() (const std::pair<T1, T2>& p) const {
+      auto h1 = std::hash<T1>{}(p.first);
+      auto h2 = std::hash<T2>{}(p.second);
+      return h1 ^ h2;
+    }
+  };
+
   int total_num_;
   std::shared_ptr<Device> dev_;
-  std::unordered_map<std::string, int> aggr_count_, copy_count_;
-  std::unordered_map<std::string, Tensor> grad_buffer_,
-    param_buffer_, partial_sum_;
-  std::unordered_map<std::string, bool> has_init_;
+  std::unordered_map<std::string, std::atomic<int>> dev_index_;
+  std::unordered_map<std::string, int> to_updater_finished_;
+  std::unordered_map<std::pair<int, std::string>, Tensor,
+    key_hasher<int, std::string>> grad_buffer_;
+  std::unordered_map<std::string, Tensor> sum_, param_buffer_;
   std::unordered_map<std::string, std::mutex> mtx_;
   std::unordered_map<std::string, std::condition_variable>
-    aggr_count_eq_total_num_;
+    to_updater_all_finished_;
 };
 }  //  namespace singa
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/src/model/feed_forward_net.cc
----------------------------------------------------------------------
diff --git a/src/model/feed_forward_net.cc b/src/model/feed_forward_net.cc
index 297dda0..9450c9e 100644
--- a/src/model/feed_forward_net.cc
+++ b/src/model/feed_forward_net.cc
@@ -85,7 +85,7 @@ void FeedForwardNet::Compile(bool shuffle, bool to_register,
   shuffle_ = shuffle;
   bool train = (updater != nullptr) && (loss != nullptr);
   bool test = metric != nullptr;
-  CHECK(train || test) << "Must set upater and loss, or set metric";
+  CHECK(train || test) << "Must set updater and loss, or set metric";
   updater_ = updater;
   loss_ = loss;
   metric_ = metric;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/src/model/updater/local_updater.cc
----------------------------------------------------------------------
diff --git a/src/model/updater/local_updater.cc 
b/src/model/updater/local_updater.cc
index 575f7ab..eab4a7c 100644
--- a/src/model/updater/local_updater.cc
+++ b/src/model/updater/local_updater.cc
@@ -17,66 +17,60 @@
  */
 
 #include "singa/model/updater.h"
+#include <vector>
 
 namespace singa {
 
 void LocalUpdater::Register(const string& name, const ParamSpec& specs) {
   opt_->Register(name, specs);
-  aggr_count_[name] = 0;
-  copy_count_[name] = 0;
-  has_init_[name] = false;
+  param_buffer_[name];
+  param_buffer_[name].ToDevice(dev_);
+  sum_[name];
+  sum_[name].ToDevice(dev_);
+  for (int i = 0; i < total_num_; ++i) {
+    grad_buffer_[std::make_pair(i, name)];
+    grad_buffer_[std::make_pair(i, name)].ToDevice(dev_);
+  }
+  dev_index_[name] = 0;
+  to_updater_finished_[name] = 0;
 }
 
-void LocalUpdater::Apply(int step, const string& name, Tensor& grad, Tensor& 
value) {
-  CHECK(aggr_count_.count(name) == 1) << "Parameter " << name
-                                         << " has not been registered before.";
-  /// This lock is aimed to protect aggregation counter, data transfering 
buffer,
-  /// and partial aggregation result. However, the data transfering can be 
moved out
-  /// of the critial section to improve performance in the future.
-  std::unique_lock<std::mutex> lock(mtx_[name]);
-  if (aggr_count_[name] == 0) {
-    switch (has_init_[name]) {
-      case (false): {
-        Tensor tmp(grad.shape(), dev_, grad.data_type());
-        partial_sum_[name] = tmp;
-        param_buffer_[name].ResetLike(tmp);
-        grad_buffer_[name].ResetLike(tmp);
-        has_init_[name] = true;
-        /* No break: intented fall-through */
-      }
-      case (true):
-        param_buffer_[name].CopyData(value);
-        partial_sum_[name].SetValue(.0f);
-    }
+void LocalUpdater::Apply(int step, const string& name, Tensor& grad,
+                         Tensor& value) {
+  CHECK(param_buffer_.count(name) == 1) << "Parameter " << name
+                                        << " has not been registered before.";
+  int nth = dev_index_[name]++;
+  auto key = std::make_pair(nth, name);
+  if (grad_buffer_[key].Size() != grad.Size()) {
+    grad_buffer_[key].Reshape(grad.shape());
+    grad_buffer_[key].AsType(grad.data_type());
   }
-  grad_buffer_[name].CopyData(grad);
-  Add(partial_sum_[name], grad_buffer_[name], &partial_sum_[name]);
-  ++aggr_count_[name];
+  grad_buffer_[key].CopyData(grad);
 
-  /// Block this thread when we have not gotten enough paritial gradients.
-  if (aggr_count_[name] != total_num_) {
-    while (aggr_count_[name] != total_num_) {
-      aggr_count_eq_total_num_[name].wait(lock);
+  std::unique_lock<std::mutex> lock(mtx_[name]);
+  ++to_updater_finished_[name];
+  if (to_updater_finished_[name] != total_num_) {
+    while (to_updater_finished_[name] > 0) {
+      to_updater_all_finished_[name].wait(lock);
     }
   } else {
-  /// Now we get enought paritial gradient from all neural net instances,
-  /// then we calcuate the average gradient. The first notified thread
-  /// finish the averaging once.
-    Div(partial_sum_[name], static_cast<float>(total_num_),
-        &partial_sum_[name]);
-    copy_count_[name] = 0;
-  /// For now, gradient aggregation and SGD algorithm run on the same device.
-  /// TODO(wangji): It is possible to make them run separately.
-  /// Apply optimization algorithm based on the averaged gradient.
-    opt_->Apply(step, name, partial_sum_[name], param_buffer_[name]);
-    aggr_count_eq_total_num_[name].notify_all();
+    if (param_buffer_[name].Size() != value.Size()) {
+      param_buffer_[name].Reshape(value.shape());
+      param_buffer_[name].AsType(value.data_type());
+      param_buffer_[name].CopyData(value);
+      sum_[name].ResetLike(param_buffer_[name]);
+    }
+    sum_[name].SetValue(.0f);
+    for (int i = 0; i < total_num_; ++i)
+      Add(sum_[name], grad_buffer_[std::make_pair(i, name)], &sum_[name]);
+    Div(sum_[name], static_cast<float>(total_num_), &sum_[name]);
+    opt_->Apply(step, name, sum_[name], param_buffer_[name]);
+    to_updater_finished_[name] = 0;
+    dev_index_[name] = 0;
+    to_updater_all_finished_[name].notify_all();
   }
-
+  lock.unlock();
   value.CopyData(param_buffer_[name]);
-
-  /// The last thread finishing copy should set aggregation counter back to 0.
-  ++copy_count_[name];
-  if (copy_count_[name] == total_num_)
-    aggr_count_[name] = 0;
 }
+
 }  // namesapce singa

Reply via email to