SINGA-130 Data Prefetching

Fix a bug due to thread_ not joined and deleted clearly.
Always allocate the buf_key and buf_val as they are used no matter
prefetching is enabled or not.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/991c6ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/991c6ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/991c6ab2

Branch: refs/heads/master
Commit: 991c6ab29dd25666cff33b9cf60bde15e660068e
Parents: 4344ab7
Author: Wei Wang <[email protected]>
Authored: Thu Apr 7 22:50:00 2016 +0800
Committer: Wei Wang <[email protected]>
Committed: Thu Apr 7 22:50:00 2016 +0800

----------------------------------------------------------------------
 Makefile.am                           |  3 +--
 examples/cifar10/job.conf             |  2 --
 include/singa/neuralnet/input_layer.h | 18 +--------------
 src/driver.cc                         |  1 -
 src/neuralnet/input_layer/prefetch.cc | 37 ------------------------------
 src/neuralnet/input_layer/store.cc    | 13 +++++++++--
 src/proto/job.proto                   |  8 +------
 7 files changed, 14 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 4eb11e1..a30b9d1 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -64,7 +64,6 @@ SINGA_SRCS := src/driver.cc \
               src/neuralnet/input_layer/onehot.cc \
               src/neuralnet/input_layer/csv.cc \
               src/neuralnet/input_layer/image_preprocess.cc \
-              src/neuralnet/input_layer/prefetch.cc \
               src/neuralnet/input_layer/record.cc \
               src/neuralnet/input_layer/deprecated.cc \
               src/neuralnet/input_layer/store.cc \
@@ -178,7 +177,7 @@ bin_PROGRAMS = singa singatool $(PROGS)
 pydir = $(CURDIR)/tool/python/singa/
 py_LTLIBRARIES = $(PY_PROGS)
 #gpudir = $(CURDIR)/.libs
-#gpu_LTLIBRARIES = libsingagpu.so 
+#gpu_LTLIBRARIES = libsingagpu.so
 
 #lib_LTLIBRARIES = libsinga.la
 libsinga_la_SOURCES = $(PROTO_SRCS) $(SINGA_SRCS)

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/examples/cifar10/job.conf
----------------------------------------------------------------------
diff --git a/examples/cifar10/job.conf b/examples/cifar10/job.conf
index cbb95eb..d20b452 100644
--- a/examples/cifar10/job.conf
+++ b/examples/cifar10/job.conf
@@ -38,7 +38,6 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
-      #prefetching: false
     }
     include: kTrain
   }
@@ -68,7 +67,6 @@ neuralnet {
       shape: 3
       shape: 32
       shape: 32
-      #prefetching: false
     }
     include: kTest
   }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/include/singa/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/input_layer.h 
b/include/singa/neuralnet/input_layer.h
index 0980f35..0499c4b 100644
--- a/include/singa/neuralnet/input_layer.h
+++ b/include/singa/neuralnet/input_layer.h
@@ -43,7 +43,7 @@ class StoreInputLayer : virtual public InputLayer {
  protected:
   /**
    * Helper method for doing the prefetching, basically read (key,value) pairs
-   * to buf_keys and buf_vals_ vector of size batchsize_. 
+   * to buf_keys and buf_vals_ vector of size batchsize_.
    */
   void fetch_data();
   /**
@@ -152,22 +152,6 @@ class ImagePreprocessLayer : public InputLayer {
   float scale_ = 1;
 };
 
-/**
- * TODO(wangwei) Layer for prefetching data records and parsing them.
- *
- * This layer controls the prefetching thread, i.e.,
- * creating and joining the prefetching thread.
- */
-class PrefetchLayer : public Layer {
- public:
-  ~PrefetchLayer();
-  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
-  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
-
- protected:
-  std::thread thread_;
-};
-
 class OneHotLayer : public InputLayer {
  public:
   void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) 
override;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/driver.cc
----------------------------------------------------------------------
diff --git a/src/driver.cc b/src/driver.cc
index 5016c07..2e38e53 100644
--- a/src/driver.cc
+++ b/src/driver.cc
@@ -115,7 +115,6 @@ void Driver::Init(int argc, char **argv) {
   RegisterLayer<LabelLayer, int>(kLabel);
   RegisterLayer<LRNLayer, int>(kLRN);
   RegisterLayer<MnistLayer, int>(kMnist);
-  RegisterLayer<PrefetchLayer, int>(kPrefetch);
   RegisterLayer<PoolingLayer, int>(kPooling);
   RegisterLayer<RBMHidLayer, int>(kRBMHid);
   RegisterLayer<RBMVisLayer, int>(kRBMVis);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/neuralnet/input_layer/prefetch.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/prefetch.cc 
b/src/neuralnet/input_layer/prefetch.cc
deleted file mode 100644
index 9c7f2d9..0000000
--- a/src/neuralnet/input_layer/prefetch.cc
+++ /dev/null
@@ -1,37 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "singa/neuralnet/input_layer.h"
-namespace singa {
-
-using std::vector;
-
-PrefetchLayer::~PrefetchLayer() {
-  if (thread_.joinable())
-    thread_.join();
-}
-
-
-void PrefetchLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
-  LOG(FATAL) << "Not implemented";
-}
-
-}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/neuralnet/input_layer/store.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer/store.cc 
b/src/neuralnet/input_layer/store.cc
index 63b9d05..a4754f4 100644
--- a/src/neuralnet/input_layer/store.cc
+++ b/src/neuralnet/input_layer/store.cc
@@ -27,9 +27,14 @@ namespace singa {
 using std::thread;
 
 StoreInputLayer::~StoreInputLayer() {
+  if (thread_ != nullptr) {
+    thread_->join();
+    delete thread_;
+  }
   if (store_ != nullptr) {
     delete store_;
   }
+
 }
 
 void StoreInputLayer::Setup(const LayerProto& conf,
@@ -74,6 +79,8 @@ void StoreInputLayer::fetch_data() {
       }
       random_skip_--;
     }
+    buf_keys_.resize(batchsize_);
+    buf_vals_.resize(batchsize_);
   }
   for (int k = 0; k < batchsize_; k++) {
     if (!store_->Read(&buf_keys_[k], &buf_vals_[k])) {
@@ -85,20 +92,22 @@ void StoreInputLayer::fetch_data() {
 
 void StoreInputLayer::ComputeFeature(int flag,
     const vector<Layer*>& srclayers) {
+
   // if prefetching, wait for the thread to finish
   if (layer_conf_.store_conf().prefetching()) {
     if (thread_ == nullptr) {
-      buf_keys_.resize(batchsize_);
-      buf_vals_.resize(batchsize_);
       thread_ = new thread(&StoreInputLayer::fetch_data, this);
     }
     thread_->join();
     delete thread_;
+    thread_ = nullptr;
   } else {
     fetch_data();
   }
+  LOG(ERROR) << "batchsize << " << batchsize_;
   for (int k = 0; k < batchsize_; k++)
     Parse(k, flag, buf_keys_[k], buf_vals_[k]);
+  LOG(ERROR) << "after parse ";
   if (layer_conf_.store_conf().prefetching())
     thread_ = new thread(&StoreInputLayer::fetch_data, this);
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/991c6ab2/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index 0a1f968..b4aa971 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -228,7 +228,6 @@ message LayerProto {
   // layer specific configuration
   // configuration for input layers, id range [100, 200)
   optional StoreProto store_conf = 100;
-  optional PrefetchProto prefetch_conf = 102;
   optional DataProto lmdbdata_conf = 190;
   optional MnistProto mnist_conf = 192;
   optional RGBImageProto rgbimage_conf = 193;
@@ -359,10 +358,6 @@ message RGBImageProto {
   optional string meanfile = 4 [default = ""];
 }
 
-message PrefetchProto {
-  repeated LayerProto sublayers = 1;
-}
-
 message SplitProto {
   optional int32 num_splits = 1 [default = 1];
 }
@@ -380,7 +375,7 @@ message StoreProto {
   optional bool encoded = 10 [default = false];
   optional int32 random_skip = 11 [default = 0];
   optional bool has_label = 12 [default = true];
-  optional bool prefetching = 13 [default = true];
+  optional bool prefetching = 13 [default = false];
 }
 
 message CharRNNProto {
@@ -652,7 +647,6 @@ enum LayerType {
    */
   kCSVInput = 100;
   kImagePreprocess = 101;
-  kPrefetch = 102;
   kRecordInput = 103;
   kLMDBData = 190;  // deprecated
   kLabel = 191;  // deprecated

Reply via email to