SINGA-226 Add parallel training on a single machine for singa v1.0 Update local_updater.cc to an alternative version which fully utilizes parallelly data copying.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4e7f3c13 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4e7f3c13 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4e7f3c13 Branch: refs/heads/dev Commit: 4e7f3c13b1de9bbe58d25dccec5d67ac8d98a3fe Parents: 0184fac Author: WANG Ji <[email protected]> Authored: Fri Jul 22 19:29:54 2016 +0800 Committer: WANG Ji <[email protected]> Committed: Fri Jul 22 21:16:51 2016 +0800 ---------------------------------------------------------------------- include/singa/model/updater.h | 23 ++++++--- src/model/feed_forward_net.cc | 2 +- src/model/updater/local_updater.cc | 90 +++++++++++++++------------------ 3 files changed, 60 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/include/singa/model/updater.h ---------------------------------------------------------------------- diff --git a/include/singa/model/updater.h b/include/singa/model/updater.h index ef7c32a..e0a656c 100644 --- a/include/singa/model/updater.h +++ b/include/singa/model/updater.h @@ -29,7 +29,9 @@ #include <mutex> #include <condition_variable> #include <string> +#include <utility> #include <unordered_map> +#include <atomic> namespace singa { /// Basic Updater class just forward all the method function call @@ -69,17 +71,26 @@ class LocalUpdater : public Updater { /// all the partial gradients are aggrageted in a synchronized style training. virtual void Apply(int step, const string& name, Tensor& grad, Tensor& value) override; - private: + template <typename T1, typename T2> + struct key_hasher { + size_t operator() (const std::pair<T1, T2>& p) const { + auto h1 = std::hash<T1>{}(p.first); + auto h2 = std::hash<T2>{}(p.second); + return h1 ^ h2; + } + }; + int total_num_; std::shared_ptr<Device> dev_; - std::unordered_map<std::string, int> aggr_count_, copy_count_; - std::unordered_map<std::string, Tensor> grad_buffer_, - param_buffer_, partial_sum_; - std::unordered_map<std::string, bool> has_init_; + std::unordered_map<std::string, std::atomic<int>> dev_index_; + std::unordered_map<std::string, int> to_updater_finished_; + std::unordered_map<std::pair<int, std::string>, Tensor, + key_hasher<int, std::string>> grad_buffer_; + std::unordered_map<std::string, Tensor> sum_, param_buffer_; std::unordered_map<std::string, std::mutex> mtx_; std::unordered_map<std::string, std::condition_variable> - aggr_count_eq_total_num_; + to_updater_all_finished_; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/src/model/feed_forward_net.cc ---------------------------------------------------------------------- diff --git a/src/model/feed_forward_net.cc b/src/model/feed_forward_net.cc index 297dda0..9450c9e 100644 --- a/src/model/feed_forward_net.cc +++ b/src/model/feed_forward_net.cc @@ -85,7 +85,7 @@ void FeedForwardNet::Compile(bool shuffle, bool to_register, shuffle_ = shuffle; bool train = (updater != nullptr) && (loss != nullptr); bool test = metric != nullptr; - CHECK(train || test) << "Must set upater and loss, or set metric"; + CHECK(train || test) << "Must set updater and loss, or set metric"; updater_ = updater; loss_ = loss; metric_ = metric; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e7f3c13/src/model/updater/local_updater.cc ---------------------------------------------------------------------- diff --git a/src/model/updater/local_updater.cc b/src/model/updater/local_updater.cc index 575f7ab..eab4a7c 100644 --- a/src/model/updater/local_updater.cc +++ b/src/model/updater/local_updater.cc @@ -17,66 +17,60 @@ */ #include "singa/model/updater.h" +#include <vector> namespace singa { void LocalUpdater::Register(const string& name, const ParamSpec& specs) { opt_->Register(name, specs); - aggr_count_[name] = 0; - copy_count_[name] = 0; - has_init_[name] = false; + param_buffer_[name]; + param_buffer_[name].ToDevice(dev_); + sum_[name]; + sum_[name].ToDevice(dev_); + for (int i = 0; i < total_num_; ++i) { + grad_buffer_[std::make_pair(i, name)]; + grad_buffer_[std::make_pair(i, name)].ToDevice(dev_); + } + dev_index_[name] = 0; + to_updater_finished_[name] = 0; } -void LocalUpdater::Apply(int step, const string& name, Tensor& grad, Tensor& value) { - CHECK(aggr_count_.count(name) == 1) << "Parameter " << name - << " has not been registered before."; - /// This lock is aimed to protect aggregation counter, data transfering buffer, - /// and partial aggregation result. However, the data transfering can be moved out - /// of the critial section to improve performance in the future. - std::unique_lock<std::mutex> lock(mtx_[name]); - if (aggr_count_[name] == 0) { - switch (has_init_[name]) { - case (false): { - Tensor tmp(grad.shape(), dev_, grad.data_type()); - partial_sum_[name] = tmp; - param_buffer_[name].ResetLike(tmp); - grad_buffer_[name].ResetLike(tmp); - has_init_[name] = true; - /* No break: intented fall-through */ - } - case (true): - param_buffer_[name].CopyData(value); - partial_sum_[name].SetValue(.0f); - } +void LocalUpdater::Apply(int step, const string& name, Tensor& grad, + Tensor& value) { + CHECK(param_buffer_.count(name) == 1) << "Parameter " << name + << " has not been registered before."; + int nth = dev_index_[name]++; + auto key = std::make_pair(nth, name); + if (grad_buffer_[key].Size() != grad.Size()) { + grad_buffer_[key].Reshape(grad.shape()); + grad_buffer_[key].AsType(grad.data_type()); } - grad_buffer_[name].CopyData(grad); - Add(partial_sum_[name], grad_buffer_[name], &partial_sum_[name]); - ++aggr_count_[name]; + grad_buffer_[key].CopyData(grad); - /// Block this thread when we have not gotten enough paritial gradients. - if (aggr_count_[name] != total_num_) { - while (aggr_count_[name] != total_num_) { - aggr_count_eq_total_num_[name].wait(lock); + std::unique_lock<std::mutex> lock(mtx_[name]); + ++to_updater_finished_[name]; + if (to_updater_finished_[name] != total_num_) { + while (to_updater_finished_[name] > 0) { + to_updater_all_finished_[name].wait(lock); } } else { - /// Now we get enought paritial gradient from all neural net instances, - /// then we calcuate the average gradient. The first notified thread - /// finish the averaging once. - Div(partial_sum_[name], static_cast<float>(total_num_), - &partial_sum_[name]); - copy_count_[name] = 0; - /// For now, gradient aggregation and SGD algorithm run on the same device. - /// TODO(wangji): It is possible to make them run separately. - /// Apply optimization algorithm based on the averaged gradient. - opt_->Apply(step, name, partial_sum_[name], param_buffer_[name]); - aggr_count_eq_total_num_[name].notify_all(); + if (param_buffer_[name].Size() != value.Size()) { + param_buffer_[name].Reshape(value.shape()); + param_buffer_[name].AsType(value.data_type()); + param_buffer_[name].CopyData(value); + sum_[name].ResetLike(param_buffer_[name]); + } + sum_[name].SetValue(.0f); + for (int i = 0; i < total_num_; ++i) + Add(sum_[name], grad_buffer_[std::make_pair(i, name)], &sum_[name]); + Div(sum_[name], static_cast<float>(total_num_), &sum_[name]); + opt_->Apply(step, name, sum_[name], param_buffer_[name]); + to_updater_finished_[name] = 0; + dev_index_[name] = 0; + to_updater_all_finished_[name].notify_all(); } - + lock.unlock(); value.CopyData(param_buffer_[name]); - - /// The last thread finishing copy should set aggregation counter back to 0. - ++copy_count_[name]; - if (copy_count_[name] == total_num_) - aggr_count_[name] = 0; } + } // namesapce singa
