Repository: incubator-singa Updated Branches: refs/heads/master 8329aa0c3 -> cf4be5a86
SINGA-130 Data prefetching Move fetch_data thread to ComputeFeature() Pass cpplint check Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4344ab7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4344ab7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4344ab7b Branch: refs/heads/master Commit: 4344ab7b2a17711d77465a6c52e50b41cad0dfef Parents: a0bdd0b Author: Anh Dinh <[email protected]> Authored: Wed Apr 6 11:35:05 2016 +0800 Committer: Anh Dinh <[email protected]> Committed: Thu Apr 7 18:47:55 2016 +0800 ---------------------------------------------------------------------- include/singa/neuralnet/input_layer.h | 16 +++++--------- src/neuralnet/input_layer/record.cc | 2 +- src/neuralnet/input_layer/store.cc | 34 ++++++++++++------------------ 3 files changed, 20 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/include/singa/neuralnet/input_layer.h ---------------------------------------------------------------------- diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h index 2e4edd1..0980f35 100644 --- a/include/singa/neuralnet/input_layer.h +++ b/include/singa/neuralnet/input_layer.h @@ -25,7 +25,6 @@ #include <string> #include <vector> #include <thread> -#include <deque> #include "singa/io/store.h" #include "singa/io/kvfile.h" #include "singa/neuralnet/layer.h" @@ -41,14 +40,12 @@ class StoreInputLayer : virtual public InputLayer { void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; - protected: /** * Helper method for doing the prefetching, basically read (key,value) pairs * to buf_keys and buf_vals_ vector of size batchsize_. */ - void fetch_data(); - + void fetch_data(); /** * Parsing the (key, val) tuple to get feature (and label). * Subclasses must implment this function. @@ -61,13 +58,11 @@ class StoreInputLayer : virtual public InputLayer { virtual bool Parse(int k, int flag, const string& key, const string& val) = 0; protected: - int batchsize_ = 1; int random_skip_ = 0; io::Store* store_ = nullptr; - - vector<std::string> buf_keys_, buf_vals_; - std::deque<std::thread> threads_; // prefetching thread + vector<std::string> buf_keys_, buf_vals_; + std::thread *thread_ = nullptr; // prefetching thread }; /** @@ -79,8 +74,8 @@ class SingleLabelRecordLayer : public StoreInputLayer { public: void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; - protected: - + + protected: /** * Load a single record (tuple), e.g., the mean or standard variance vector. */ @@ -95,7 +90,6 @@ class SingleLabelRecordLayer : public StoreInputLayer { * UFLDL</a> */ Blob<float> mean_, std_; - }; /** * Specific layer that parses the value string loaded by Store as a line from http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/record.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc index 67d312d..b14fc80 100644 --- a/src/neuralnet/input_layer/record.cc +++ b/src/neuralnet/input_layer/record.cc @@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key, int size = data_.count() / batchsize_; if (image.data_size()) { CHECK_EQ(size, image.data_size()); - float* ptr = data_.mutable_cpu_data() + k * size; + float* ptr = data_.mutable_cpu_data() + k * size; for (int i = 0; i< size; i++) ptr[i] = image.data(i); } else if (image.pixel().size()) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/store.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc index ff9eb8d..63b9d05 100644 --- a/src/neuralnet/input_layer/store.cc +++ b/src/neuralnet/input_layer/store.cc @@ -22,7 +22,6 @@ #include "singa/neuralnet/input_layer.h" #include "singa/utils/context.h" #include "singa/utils/singleton.h" -#include <time.h> namespace singa { using std::thread; @@ -52,15 +51,9 @@ void StoreInputLayer::Setup(const LayerProto& conf, shape.push_back(s); data_.Reshape(shape); aux_data_.resize(batchsize_); - buf_keys_.resize(batchsize_); - buf_vals_.resize(batchsize_); - - // initialize prefetch buffer and start the thread - if (conf.store_conf().prefetching()) - threads_.push_back(thread(&StoreInputLayer::fetch_data, this)); } -void StoreInputLayer::fetch_data(){ +void StoreInputLayer::fetch_data() { if (store_ == nullptr) { store_ = io::OpenStore(layer_conf_.store_conf().backend(), layer_conf_.store_conf().path(), @@ -93,25 +86,26 @@ void StoreInputLayer::fetch_data(){ void StoreInputLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { // if prefetching, wait for the thread to finish - if (layer_conf_.store_conf().prefetching()){ - threads_.front().join(); - threads_.pop_front(); + if (layer_conf_.store_conf().prefetching()) { + if (thread_ == nullptr) { + buf_keys_.resize(batchsize_); + buf_vals_.resize(batchsize_); + thread_ = new thread(&StoreInputLayer::fetch_data, this); + } + thread_->join(); + delete thread_; + } else { + fetch_data(); } - else - fetch_data(); - - for (int k = 0; k < batchsize_; k++) + for (int k = 0; k < batchsize_; k++) Parse(k, flag, buf_keys_[k], buf_vals_[k]); - if (layer_conf_.store_conf().prefetching()) - threads_.push_back(thread(&StoreInputLayer::fetch_data, this)); + thread_ = new thread(&StoreInputLayer::fetch_data, this); } - void SingleLabelRecordLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { StoreInputLayer::Setup(conf, srclayers); - } void SingleLabelRecordLayer::ComputeFeature(int flag, @@ -151,7 +145,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag, if (std_.count()) { const float* std = std_.cpu_data(); for (int k = 0; k < batchsize_; k++) { - float* dptr = data_.mutable_cpu_data() + k * std_.count(); + float* dptr = data_.mutable_cpu_data() + k * std_.count(); for (int i = 0; i < std_.count(); i++) { dptr[i] /= std[i]; }
