SINGA-130 Data prefetching layer Extended StoreInputLayer to support prefetching of data. It maintains a buffer for (key,value) pairs read from the storage layer. In Setup(), it launches a new thread for reading data into the buffer. This thread stores data into the buffer. The ComputeFeature() method waits for thread to finish (join) before parsing it into data_ and aux_ field. Finally, it launches another thread.
In terms of memory consumption, this prefetching use extra (batchsize*recordsize) bytes for the buffer. However, we observe no visible runtime improvement, as I/O time is very small (in order of milliseconds without prefetching, and tens of microsecond with prefetching) compared to CPU time. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/a0bdd0b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/a0bdd0b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/a0bdd0b8 Branch: refs/heads/master Commit: a0bdd0b85ddba7d670ab04c5de04a29c8366e868 Parents: 5f67e57 Author: Anh Dinh <[email protected]> Authored: Tue Apr 5 23:14:36 2016 +0800 Committer: Anh Dinh <[email protected]> Committed: Thu Apr 7 18:47:55 2016 +0800 ---------------------------------------------------------------------- examples/cifar10/job.conf | 2 ++ include/singa/neuralnet/input_layer.h | 18 ++++++++-- include/singa/neuralnet/layer.h | 4 +-- src/neuralnet/input_layer/record.cc | 2 +- src/neuralnet/input_layer/store.cc | 57 +++++++++++++++++++++--------- src/proto/job.proto | 2 ++ 6 files changed, 63 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/examples/cifar10/job.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf index d20b452..cbb95eb 100644 --- a/examples/cifar10/job.conf +++ b/examples/cifar10/job.conf @@ -38,6 +38,7 @@ neuralnet { shape: 3 shape: 32 shape: 32 + #prefetching: false } include: kTrain } @@ -67,6 +68,7 @@ neuralnet { shape: 3 shape: 32 shape: 32 + #prefetching: false } include: kTest } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/input_layer.h ---------------------------------------------------------------------- diff --git a/include/singa/neuralnet/input_layer.h b/include/singa/neuralnet/input_layer.h index 73d509b..2e4edd1 100644 --- a/include/singa/neuralnet/input_layer.h +++ b/include/singa/neuralnet/input_layer.h @@ -25,6 +25,7 @@ #include <string> #include <vector> #include <thread> +#include <deque> #include "singa/io/store.h" #include "singa/io/kvfile.h" #include "singa/neuralnet/layer.h" @@ -40,9 +41,15 @@ class StoreInputLayer : virtual public InputLayer { void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; - + protected: /** + * Helper method for doing the prefetching, basically read (key,value) pairs + * to buf_keys and buf_vals_ vector of size batchsize_. + */ + void fetch_data(); + + /** * Parsing the (key, val) tuple to get feature (and label). * Subclasses must implment this function. * @param[in] k parse this tuple as the k-th instance of one mini-batch. @@ -54,9 +61,13 @@ class StoreInputLayer : virtual public InputLayer { virtual bool Parse(int k, int flag, const string& key, const string& val) = 0; protected: + int batchsize_ = 1; int random_skip_ = 0; io::Store* store_ = nullptr; + + vector<std::string> buf_keys_, buf_vals_; + std::deque<std::thread> threads_; // prefetching thread }; /** @@ -68,8 +79,8 @@ class SingleLabelRecordLayer : public StoreInputLayer { public: void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override; void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; - - protected: + protected: + /** * Load a single record (tuple), e.g., the mean or standard variance vector. */ @@ -84,6 +95,7 @@ class SingleLabelRecordLayer : public StoreInputLayer { * UFLDL</a> */ Blob<float> mean_, std_; + }; /** * Specific layer that parses the value string loaded by Store as a line from http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/include/singa/neuralnet/layer.h ---------------------------------------------------------------------- diff --git a/include/singa/neuralnet/layer.h b/include/singa/neuralnet/layer.h index ce47b47..c8ea3fc 100644 --- a/include/singa/neuralnet/layer.h +++ b/include/singa/neuralnet/layer.h @@ -208,7 +208,7 @@ class Layer { /** * @return a const ref for Blob vector storing feature values of this layer. */ - virtual const vector<Blob<float>*>& data() const { + virtual const vector<Blob<float>*>& data() { return datavec_; } @@ -252,7 +252,7 @@ class Layer { /** * @return auxiliary data, e.g., image label. */ - virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) const { + virtual const vector<AuxType>& aux_data(const Layer* from = nullptr) { return aux_data_; } /** http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/record.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/input_layer/record.cc b/src/neuralnet/input_layer/record.cc index b14fc80..67d312d 100644 --- a/src/neuralnet/input_layer/record.cc +++ b/src/neuralnet/input_layer/record.cc @@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& key, int size = data_.count() / batchsize_; if (image.data_size()) { CHECK_EQ(size, image.data_size()); - float* ptr = data_.mutable_cpu_data() + k * size; + float* ptr = data_.mutable_cpu_data() + k * size; for (int i = 0; i< size; i++) ptr[i] = image.data(i); } else if (image.pixel().size()) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/neuralnet/input_layer/store.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/input_layer/store.cc b/src/neuralnet/input_layer/store.cc index 3b642ca..ff9eb8d 100644 --- a/src/neuralnet/input_layer/store.cc +++ b/src/neuralnet/input_layer/store.cc @@ -22,9 +22,11 @@ #include "singa/neuralnet/input_layer.h" #include "singa/utils/context.h" #include "singa/utils/singleton.h" - +#include <time.h> namespace singa { +using std::thread; + StoreInputLayer::~StoreInputLayer() { if (store_ != nullptr) { delete store_; @@ -44,11 +46,21 @@ void StoreInputLayer::Setup(const LayerProto& conf, } else { batchsize_ = conf.store_conf().batchsize(0); } + + vector<int> shape {batchsize_}; + for (int s : conf.store_conf().shape()) + shape.push_back(s); + data_.Reshape(shape); + aux_data_.resize(batchsize_); + buf_keys_.resize(batchsize_); + buf_vals_.resize(batchsize_); + + // initialize prefetch buffer and start the thread + if (conf.store_conf().prefetching()) + threads_.push_back(thread(&StoreInputLayer::fetch_data, this)); } -void StoreInputLayer::ComputeFeature(int flag, - const vector<Layer*>& srclayers) { - string key, val; +void StoreInputLayer::fetch_data(){ if (store_ == nullptr) { store_ = io::OpenStore(layer_conf_.store_conf().backend(), layer_conf_.store_conf().path(), @@ -61,6 +73,7 @@ void StoreInputLayer::ComputeFeature(int flag, random_skip_ = distribution(*generator); } + string key, val; while (random_skip_ > 0) { if (!store_->Read(&key, &val)) { store_->SeekToFirst(); @@ -70,30 +83,43 @@ void StoreInputLayer::ComputeFeature(int flag, } } for (int k = 0; k < batchsize_; k++) { - if (!store_->Read(&key, &val)) { + if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) { store_->SeekToFirst(); - CHECK(store_->Read(&key, &val)); + CHECK(store_->Read(&buf_keys_[k], &buf_vals_[k])); } - // TODO(wangwei) random skip and shuffle among this mini-batch - Parse(k, flag, key, val); } } +void StoreInputLayer::ComputeFeature(int flag, + const vector<Layer*>& srclayers) { + // if prefetching, wait for the thread to finish + if (layer_conf_.store_conf().prefetching()){ + threads_.front().join(); + threads_.pop_front(); + } + else + fetch_data(); + + for (int k = 0; k < batchsize_; k++) + Parse(k, flag, buf_keys_[k], buf_vals_[k]); + + if (layer_conf_.store_conf().prefetching()) + threads_.push_back(thread(&StoreInputLayer::fetch_data, this)); +} + + void SingleLabelRecordLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { StoreInputLayer::Setup(conf, srclayers); - vector<int> shape {batchsize_}; - for (int s : conf.store_conf().shape()) - shape.push_back(s); - data_.Reshape(shape); - aux_data_.resize(batchsize_); } + void SingleLabelRecordLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - StoreInputLayer::ComputeFeature(flag, srclayers); + StoreInputLayer::ComputeFeature(flag, srclayers); auto& store_conf = layer_conf_.store_conf(); + if (store_conf.has_mean_file() && mean_.count() == 0) { mean_.Reshape(vector<int>{data_.count() / batchsize_}); LoadRecord(store_conf.backend(), store_conf.mean_file(), &mean_); @@ -125,7 +151,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag, if (std_.count()) { const float* std = std_.cpu_data(); for (int k = 0; k < batchsize_; k++) { - float* dptr = data_.mutable_cpu_data() + k * std_.count(); + float* dptr = data_.mutable_cpu_data() + k * std_.count(); for (int i = 0; i < std_.count(); i++) { dptr[i] /= std[i]; } @@ -133,5 +159,4 @@ void SingleLabelRecordLayer::ComputeFeature(int flag, } } - } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/a0bdd0b8/src/proto/job.proto ---------------------------------------------------------------------- diff --git a/src/proto/job.proto b/src/proto/job.proto index 7157a84..0a1f968 100644 --- a/src/proto/job.proto +++ b/src/proto/job.proto @@ -380,7 +380,9 @@ message StoreProto { optional bool encoded = 10 [default = false]; optional int32 random_skip = 11 [default = 0]; optional bool has_label = 12 [default = true]; + optional bool prefetching = 13 [default = true]; } + message CharRNNProto { optional string path = 1; optional string vocab_path = 2;
