Repository: incubator-singa
Updated Branches:
  refs/heads/master 8329aa0c3 -> cf4be5a86


SINGA-130 Data prefetching

Move fetch_data thread to ComputeFeature()

Pass cpplint check


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4344ab7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4344ab7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4344ab7b

Branch: refs/heads/master
Commit: 4344ab7b2a17711d77465a6c52e50b41cad0dfef
Parents: a0bdd0b
Author: Anh Dinh <[email protected]>
Authored: Wed Apr 6 11:35:05 2016 +0800
Committer: Anh Dinh <[email protected]>
Committed: Thu Apr 7 18:47:55 2016 +0800

----------------------------------------------------------------------
 include/singa/neuralnet/input_layer.h | 16 +++++---------
 src/neuralnet/input_layer/record.cc   |  2 +-
 src/neuralnet/input_layer/store.cc    | 34 ++++++++++++------------------
 3 files changed, 20 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h 
b/include/singa/neuralnet/input_layer.h
index 2e4edd1..0980f35 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -25,7 +25,6 @@
 #include <string>
 #include <vector>
 #include <thread>
-#include <deque>
 #include "singa/io/store.h"
 #include "singa/io/kvfile.h"
 #include "singa/neuralnet/layer.h"
@@ -41,14 +40,12 @@ class StoreInputLayer : virtual public InputLayer {
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) 
override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
 
-  
  protected:
   /**
    * Helper method for doing the prefetching, basically read (key,value) pairs
    * to buf_keys and buf_vals_ vector of size batchsize_. 
    */
-  void fetch_data(); 
-
+  void fetch_data();
   /**
    * Parsing the (key, val) tuple to get feature (and label).
    * Subclasses must implment this function.
@@ -61,13 +58,11 @@ class StoreInputLayer : virtual public InputLayer {
   virtual bool Parse(int k, int flag, const string& key, const string& val) = 
0;
 
  protected:
-  
   int batchsize_ = 1;
   int random_skip_ = 0;
   io::Store* store_ = nullptr;
- 
-  vector<std::string> buf_keys_, buf_vals_; 
-  std::deque<std::thread> threads_; // prefetching thread
+  vector<std::string> buf_keys_, buf_vals_;
+  std::thread *thread_ = nullptr;  // prefetching thread
 };
 
 /**
@@ -79,8 +74,8 @@ class SingleLabelRecordLayer : public StoreInputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) 
override;
   void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-  protected:
-  
+
+ protected:
   /**
    * Load a single record (tuple), e.g., the mean or standard variance vector.
    */
@@ -95,7 +90,6 @@ class SingleLabelRecordLayer : public StoreInputLayer {
    * UFLDL</a>
    */
   Blob<float> mean_, std_;
-  
 };
 /**
  * Specific layer that parses the value string loaded by Store as a line from

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/record.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/record.cc 
b/src/neuralnet/input_layer/record.cc
index 67d312d..b14fc80 100644
--- a/src/neuralnet/input_layer/record.cc
+++ b/src/neuralnet/input_layer/record.cc
@@ -52,7 +52,7 @@ bool RecordInputLayer::Parse(int k, int flag, const string& 
key,
   int size = data_.count() / batchsize_;
   if (image.data_size()) {
     CHECK_EQ(size, image.data_size());
-    float* ptr = data_.mutable_cpu_data() + k * size; 
+    float* ptr = data_.mutable_cpu_data() + k * size;
     for (int i = 0; i< size; i++)
       ptr[i] = image.data(i);
   } else if (image.pixel().size()) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4344ab7b/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc 
b/src/neuralnet/input_layer/store.cc
index ff9eb8d..63b9d05 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -22,7 +22,6 @@
 #include "singa/neuralnet/input_layer.h"
 #include "singa/utils/context.h"
 #include "singa/utils/singleton.h"
-#include <time.h>
 namespace singa {
 
 using std::thread;
@@ -52,15 +51,9 @@ void StoreInputLayer::Setup(const LayerProto& conf,
     shape.push_back(s);
   data_.Reshape(shape);
   aux_data_.resize(batchsize_);
-  buf_keys_.resize(batchsize_); 
-  buf_vals_.resize(batchsize_); 
-
-  // initialize prefetch buffer and start the thread
-  if (conf.store_conf().prefetching())
-    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
 }
 
-void StoreInputLayer::fetch_data(){
+void StoreInputLayer::fetch_data() {
   if (store_ == nullptr) {
     store_ = io::OpenStore(layer_conf_.store_conf().backend(),
         layer_conf_.store_conf().path(),
@@ -93,25 +86,26 @@ void StoreInputLayer::fetch_data(){
 void StoreInputLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
   // if prefetching, wait for the thread to finish
-  if (layer_conf_.store_conf().prefetching()){
-    threads_.front().join(); 
-    threads_.pop_front(); 
+  if (layer_conf_.store_conf().prefetching()) {
+    if (thread_ == nullptr) {
+      buf_keys_.resize(batchsize_);
+      buf_vals_.resize(batchsize_);
+      thread_ = new thread(&StoreInputLayer::fetch_data, this);
+    }
+    thread_->join();
+    delete thread_;
+  } else {
+    fetch_data();
   }
-  else
-    fetch_data(); 
-
-  for (int k = 0; k < batchsize_; k++) 
+  for (int k = 0; k < batchsize_; k++)
     Parse(k, flag, buf_keys_[k], buf_vals_[k]);
-  
   if (layer_conf_.store_conf().prefetching())
-    threads_.push_back(thread(&StoreInputLayer::fetch_data, this));
+    thread_ = new thread(&StoreInputLayer::fetch_data, this);
 }
 
-
 void SingleLabelRecordLayer::Setup(const LayerProto& conf,
     const vector<Layer*>& srclayers) {
   StoreInputLayer::Setup(conf, srclayers);
-
 }
 
 void SingleLabelRecordLayer::ComputeFeature(int flag,
@@ -151,7 +145,7 @@ void SingleLabelRecordLayer::ComputeFeature(int flag,
   if (std_.count()) {
     const float* std = std_.cpu_data();
     for (int k = 0; k < batchsize_; k++) {
-      float* dptr = data_.mutable_cpu_data() + k * std_.count();  
+      float* dptr = data_.mutable_cpu_data() + k * std_.count();
       for (int i = 0; i < std_.count(); i++) {
         dptr[i] /= std[i];
       }

Reply via email to