Repository: incubator-singa Updated Branches: refs/heads/master 958dfebbe -> dc7f1996d
SINGA-82 Refactor input layers using data store abstraction Add Store abstraction for read (writing data). Implemented two backend, 1. KVFile, which was named DataShard. It is a binary file, each tuple has a unique key. 2. TextFile, which is a plain text file with each line be the value field of a tuple (the key is the line No.). TODO, implment HDFS and image folder as the backend. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d99b24cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d99b24cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d99b24cb Branch: refs/heads/master Commit: d99b24cb75def9fdbdc59273c4297abb75813c36 Parents: 958dfeb Author: Wei Wang <[email protected]> Authored: Mon Oct 5 19:26:45 2015 +0800 Committer: wang sheng <[email protected]> Committed: Wed Oct 7 15:19:58 2015 +0800 ---------------------------------------------------------------------- Makefile.am | 13 ++- include/io/hdfs_store.h | 22 ++++ include/io/imagefolder_store.h | 21 ++++ include/io/kvfile.h | 182 ++++++++++++++++++++++++++++++ include/io/kvfile_store.h | 53 +++++++++ include/io/store.h | 79 +++++++++++++ include/io/textfile_store.h | 49 ++++++++ src/io/kvfile.cc | 217 ++++++++++++++++++++++++++++++++++++ src/io/kvfile_store.cc | 71 ++++++++++++ src/io/store.cc | 57 ++++++++++ src/io/textfile_store.cc | 83 ++++++++++++++ src/test/test_store.cc | 92 +++++++++++++++ 12 files changed, 937 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/Makefile.am ---------------------------------------------------------------------- diff --git a/Makefile.am b/Makefile.am index 77d6ded..f8e765d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -46,7 +46,11 @@ SINGA_SRCS := src/driver.cc \ src/neuralnet/output_layer.cc \ src/neuralnet/neuralnet.cc \ src/comm/socket.cc \ - src/comm/msg.cc + src/comm/msg.cc \ + src/io/kvfile.cc \ + src/io/kvfile_store.cc \ + src/io/textfile_store.cc \ + src/io/store.cc SINGA_HDRS := include/singa.h \ include/utils/cluster.h \ @@ -80,6 +84,10 @@ SINGA_HDRS := include/singa.h \ include/mshadow/tensor_random.h \ include/comm/msg.h \ include/comm/socket.h + src/io/store.h \ + src/io/kvfile.h \ + src/io/kvfile_store.h \ + src/io/textfile_store.h GTEST_SRCS := include/gtest/gtest-all.cc GTEST_HRDS := include/gtest/gtest.h @@ -89,7 +97,8 @@ TEST_SRCS := include/gtest/gtest_main.cc \ src/test/test_msg.cc \ src/test/test_neuralnet.cc \ src/test/test_paramslicer.cc \ - src/test/test_shard.cc + src/test/test_shard.cc \ + src/test/test_store.cc #EXTRA_PROGRAMS = $(PROGS) EXTRA_PROGRAMS = singatest http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/hdfs_store.h ---------------------------------------------------------------------- diff --git a/include/io/hdfs_store.h b/include/io/hdfs_store.h new file mode 100644 index 0000000..f85615b --- /dev/null +++ b/include/io/hdfs_store.h @@ -0,0 +1,22 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +// TODO(wangwei) use hdfs as data storage http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/imagefolder_store.h ---------------------------------------------------------------------- diff --git a/include/io/imagefolder_store.h b/include/io/imagefolder_store.h new file mode 100644 index 0000000..c05d92d --- /dev/null +++ b/include/io/imagefolder_store.h @@ -0,0 +1,21 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ +// TODO(wangwei) store images in a disk folder http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/kvfile.h ---------------------------------------------------------------------- diff --git a/include/io/kvfile.h b/include/io/kvfile.h new file mode 100644 index 0000000..d70f198 --- /dev/null +++ b/include/io/kvfile.h @@ -0,0 +1,182 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_IO_KVFILE_H_ +#define SINGA_IO_KVFILE_H_ + +#include <fstream> +#include <string> +#include <unordered_set> + +#define USE_PROTOBUF 1 + +#ifdef USE_PROTOBUF +#include <google/protobuf/message.h> +#endif + +namespace singa { namespace io { + + +/** + * KVFile stores training/validation/test tuples. + * Every worker node should have a KVFile for training data (validation/test + * KVFile is optional). + * KVFile consists of a set of unordered tuples. Each tuple is + * encoded as [key_len key val_len val] (key_len and val_len are of type + * uint32, which indicate the bytes of key and value respectively. + * + * When KVFile is created, it will remove the last tuple if the value size + * and key size do not match because the last write crashed. + * + * TODO(wangwei) split one KVFile into multiple KVFile s. + * + */ +class KVFile { + public: + enum Mode { + // read only mode used in training + kRead = 0, + // write mode used in creating KVFile (will overwrite previous one) + kCreate = 1, + // append mode, e.g. used when previous creating crashes + kAppend = 2 + }; + + /** + * KVFile constructor. + * + * @param path path to the disk KVFile, it can be + * - a path to local disk file. + * - a path to local directory. This is to be compatible with the older + * version (DataShard). The KVFile is shard.dat under that directory + * - a hdfs file starting with "hdfs://" + * @param mode KVFile open mode, KVFile::kRead, KVFile::kWrite or + * KVFile::kAppend + * @param bufsize Cache bufsize bytes data for every disk op (read or write), + * default is 10MB. + */ + KVFile(const std::string& path, Mode mode, int bufsize = 10485760); + ~KVFile(); + +#ifdef USE_PROTOBUF + /** + * read next tuple from the KVFile. + * + * @param key Tuple key + * @param val Record of type Message + * @return false if read unsuccess, e.g., the tuple was not inserted + * completely. + */ + bool Next(std::string* key, google::protobuf::Message* val); + /** + * Append one tuple to the KVFile. + * + * @param key e.g., image path + * @param val + * @return false if unsucess, e.g., inserted before + */ + bool Insert(const std::string& key, const google::protobuf::Message& tuple); +#endif + /** + * read next tuple from the KVFile. + * + * @param key Tuple key + * @param val Record of type string + * @return false if unsuccess, e.g. the tuple was not inserted completely. + */ + bool Next(std::string* key, std::string* val); + /** + * Append one tuple to the KVFile. + * + * @param key e.g., image path + * @param val + * @return false if unsucess, e.g., inserted before + */ + bool Insert(const std::string& key, const std::string& tuple); + /** + * Move the read pointer to the head of the KVFile file. + * Used for repeated reading. + */ + void SeekToFirst(); + /** + * Flush buffered data to disk. + * Used only for kCreate or kAppend. + */ + void Flush(); + /** + * Iterate through all tuples to get the num of all tuples. + * + * @return num of tuples + */ + int Count(); + /** + * @return path to KVFile file + */ + inline std::string path() { return path_; } + + protected: + /** + * Read the next key and prepare buffer for reading value. + * + * @param key + * @return length (i.e., bytes) of value field. + */ + int Next(std::string* key); + /** + * Setup the disk pointer to the right position for append in case that + * the pervious write crashes. + * + * @param path KVFile path. + * @return offset (end pos) of the last success written record. + */ + int PrepareForAppend(const std::string& path); + /** + * Read data from disk if the current data in the buffer is not a full field. + * + * @param size size of the next field. + */ + bool PrepareNextField(int size); + + private: + std::string path_ = ""; + Mode mode_; + //!< either ifstream or ofstream + std::fstream fdat_; + //!< to avoid replicated record + std::unordered_set<std::string> keys_; + //!< internal buffer + char* buf_ = nullptr; + //!< offset inside the buf_ + int offset_ = 0; + //!< allocated bytes for the buf_ + int capacity_ = 0; + //!< bytes in buf_, used in reading + int bufsize_ = 0; +}; +} /* io */ + +/** + * @deprecated {ShardData is deprecated! Use KVFile}. +using ShardData = KVFile; +*/ +} // namespace singa + +#endif // SINGA_IO_KVFILE_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/kvfile_store.h ---------------------------------------------------------------------- diff --git a/include/io/kvfile_store.h b/include/io/kvfile_store.h new file mode 100644 index 0000000..bda7409 --- /dev/null +++ b/include/io/kvfile_store.h @@ -0,0 +1,53 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + + +#ifndef SINGA_IO_KVFILE_STORE_H_ +#define SINGA_IO_KVFILE_STORE_H_ + +#include <string> +#include "io/store.h" +#include "io/kvfile.h" + +namespace singa { namespace io { + +/** + * Use the KVFile as the data storage. + * + * KVFile is a binary file. Each tuple is stored as byte string. + */ +class KVFileStore : public Store { + public: + bool Open(const std::string& source, Mode mode) override; + void Close() override; + bool Read(std::string* key, std::string* value) override; + void SeekToFirst() override; + bool Write(const std::string& key, const std::string& value) override; + void Flush() override; + + private: + KVFile* file_ = nullptr; + Mode mode_; +}; + +} /* io */ +} /* singa */ +#endif // SINGA_IO_KVFILE_STORE_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/store.h ---------------------------------------------------------------------- diff --git a/include/io/store.h b/include/io/store.h new file mode 100644 index 0000000..8665af0 --- /dev/null +++ b/include/io/store.h @@ -0,0 +1,79 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_IO_STORE_H_ +#define SINGA_IO_STORE_H_ + +#include <string> + +namespace singa { namespace io { +using std::string; +enum Mode { kCreate, kRead, kAppend }; + +/** + * General key-value store that provides functions for reading and writing + * tuples. + * + * Subclasses implement the functions for a specific data storage, e.g., CSV + * file, HDFS, image folder, singa::io::SFile, leveldb, lmdb, etc. + */ +class Store { + public: + Store() { } + virtual ~Store() { } + /** + * @param[in] source path to the storage, could be a file path, folder path + * or hdfs path, or even a http url. + * @param[in] mode + * @return true if open successfully, otherwise false. + */ + virtual bool Open(const std::string& source, Mode mode) = 0; + virtual void Close() = 0; + /** + * Read a tuple. + * + * @param[out] key + * @param[out] value + * @return true if read successfully, otherwise false. + */ + virtual bool Read(std::string* key, std::string* value) = 0; + /** + * Seek the read header to the first tuple. + */ + virtual void SeekToFirst() = 0; + /** + * Write a tuple. + * + * @param[in] key + * @param[in] value + * @return true if success, otherwise false. + */ + virtual bool Write(const std::string& key, const std::string& value) = 0; + /** + * Flush writing buffer if it has. + */ + virtual void Flush() {} +}; + +Store* CreateStore(const std::string& store); +} // namespace io +} /* singa */ +#endif // SINGA_IO_STORE_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/textfile_store.h ---------------------------------------------------------------------- diff --git a/include/io/textfile_store.h b/include/io/textfile_store.h new file mode 100644 index 0000000..4c020e9 --- /dev/null +++ b/include/io/textfile_store.h @@ -0,0 +1,49 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + + +#include <fstream> +#include "io/store.h" + +namespace singa { namespace io { +/** + * Use text file as the data storage, one line per tuple. + * + * It is used for storeing CSV format data where the key is the line No. and + * the value is the line. + */ +class TextFileStore : public Store { + public: + bool Open(const std::string& source, Mode mode) override; + void Close() override; + bool Read(std::string* key, std::string* value) override; + void SeekToFirst() override; + bool Write(const std::string& key, const std::string& value) override; + void Flush() override; + + private: + int lineNo_ = 0; + std::fstream* fs_ = nullptr; + Mode mode_; +}; +} /* io */ + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/kvfile.cc ---------------------------------------------------------------------- diff --git a/src/io/kvfile.cc b/src/io/kvfile.cc new file mode 100644 index 0000000..aa52150 --- /dev/null +++ b/src/io/kvfile.cc @@ -0,0 +1,217 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "io/kvfile.h" + +#include <glog/logging.h> +#include <string> + +namespace singa { namespace io { +KVFile::KVFile(const std::string& path, Mode mode, int capacity) : +path_(path), mode_(mode), capacity_(capacity) { + buf_ = new char[capacity]; + switch (mode) { + case KVFile::kRead: + fdat_.open(path_, std::ios::in | std::ios::binary); + if (!fdat_.is_open()) { + // path may be a directory + path_ = path + "/shard.dat"; + fdat_.open(path_, std::ios::in | std::ios::binary); + } + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + break; + case KVFile::kCreate: + fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + break; + case KVFile::kAppend: + fdat_.open(path_, std::ios::in | std::ios::binary); + if (!fdat_.is_open()) { + // path may be a directory + path_ = path + "/shard.dat"; + fdat_.open(path_, std::ios::in | std::ios::binary); + } + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + { + int last_tuple = PrepareForAppend(path_); + fdat_.open(path_, std::ios::binary | std::ios::out + | std::ios::in | std::ios::ate); + fdat_.seekp(last_tuple); + } + break; + default: + LOG(FATAL) << "unknown model to open KVFile " << mode; + break; + } +} + +KVFile::~KVFile() { + if (mode_ != kRead) + Flush(); + delete buf_; + fdat_.close(); +} +#ifdef USE_PROTOBUF +bool KVFile::Next(std::string* key, google::protobuf::Message* val) { + int vallen = Next(key); + if (vallen == 0) return false; + val->ParseFromArray(buf_ + offset_, vallen); + offset_ += vallen; + return true; +} + +bool KVFile::Insert(const std::string& key, + const google::protobuf::Message& val) { + std::string str; + val.SerializeToString(&str); + return Insert(key, str); +} +#endif + +bool KVFile::Next(std::string *key, std::string* val) { + int vallen = Next(key); + if (vallen == 0) return false; + val->clear(); + for (int i = 0; i < vallen; ++i) + val->push_back(buf_[offset_ + i]); + offset_ += vallen; + return true; +} + +// insert one complete tuple +bool KVFile::Insert(const std::string& key, const std::string& val) { + if (keys_.find(key) != keys_.end() || val.size() == 0) + return false; + int size = key.size() + val.size() + 2*sizeof(size_t); + if (bufsize_ + size > capacity_) { + fdat_.write(buf_, bufsize_); + bufsize_ = 0; + CHECK_LE(size, capacity_) << "Tuple size is larger than capacity " + << "Try a larger capacity size"; + } + *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size(); + bufsize_ += sizeof(size_t); + memcpy(buf_ + bufsize_, key.data(), key.size()); + bufsize_ += key.size(); + *reinterpret_cast<size_t*>(buf_ + bufsize_) = val.size(); + bufsize_ += sizeof(size_t); + memcpy(buf_ + bufsize_, val.data(), val.size()); + bufsize_ += val.size(); + return true; +} + +void KVFile::SeekToFirst() { + CHECK_EQ(mode_, kRead); + bufsize_ = 0; + offset_ = 0; + fdat_.clear(); + fdat_.seekg(0); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; +} + +void KVFile::Flush() { + fdat_.write(buf_, bufsize_); + fdat_.flush(); + bufsize_ = 0; +} + +int KVFile::Count() { + std::ifstream fin(path_, std::ios::in | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + int count = 0; + while (true) { + size_t len; + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + count++; + } + fin.close(); + return count; +} + +int KVFile::Next(std::string *key) { + key->clear(); + int ssize = sizeof(size_t); + if (!PrepareNextField(ssize)) return 0; + int keylen = *reinterpret_cast<size_t*>(buf_ + offset_); + offset_ += ssize; + if (!PrepareNextField(keylen)) return 0; + for (int i = 0; i < keylen; ++i) + key->push_back(buf_[offset_ + i]); + offset_ += keylen; + if (!PrepareNextField(ssize)) return 0; + int vallen = *reinterpret_cast<size_t*>(buf_ + offset_); + offset_ += ssize; + if (!PrepareNextField(vallen)) return 0; + return vallen; +} + +int KVFile::PrepareForAppend(const std::string& path) { + std::ifstream fin(path, std::ios::in | std::ios::binary); + if (!fin.is_open()) return 0; + int last_tuple_offset = 0; + char buf[256]; + size_t len; + while (true) { + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.read(buf, len); + buf[len] = '\0'; + if (!fin.good()) break; + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + keys_.insert(std::string(buf)); + last_tuple_offset = fin.tellg(); + } + fin.close(); + return last_tuple_offset; +} + +// if the buf does not have the next complete field, read data from disk +bool KVFile::PrepareNextField(int size) { + if (offset_ + size > bufsize_) { + bufsize_ -= offset_; + // wangsh: commented, not sure what this check does + // CHECK_LE(bufsize_, offset_); + for (int i = 0; i < bufsize_; ++i) + buf_[i] = buf_[i + offset_]; + offset_ = 0; + if (fdat_.eof()) { + return false; + } else { + fdat_.read(buf_ + bufsize_, capacity_ - bufsize_); + bufsize_ += fdat_.gcount(); + if (size > bufsize_) return false; + } + } + return true; +} + +} /* io */ +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/kvfile_store.cc ---------------------------------------------------------------------- diff --git a/src/io/kvfile_store.cc b/src/io/kvfile_store.cc new file mode 100644 index 0000000..11609bf --- /dev/null +++ b/src/io/kvfile_store.cc @@ -0,0 +1,71 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + + +#include "io/kvfile_store.h" +#include <glog/logging.h> + +namespace singa { namespace io { + +bool KVFileStore::Open(const std::string& source, Mode mode) { + CHECK(file_ == nullptr); + if (mode == kRead) + file_ = new KVFile(source, KVFile::kRead); + else if (mode == kCreate) + file_ = new KVFile(source, KVFile::kCreate); + else if (mode == kAppend) + file_ = new KVFile(source, KVFile::kAppend); + mode_ = mode; + return file_ != nullptr; +} + +void KVFileStore::Close() { + if (file_ != nullptr) + delete file_; + file_ = nullptr; +} + +bool KVFileStore::Read(std::string* key, std::string* value) { + CHECK_EQ(mode_, kRead); + CHECK(file_ != nullptr); + return file_->Next(key, value); +} + +void KVFileStore::SeekToFirst() { + CHECK_EQ(mode_, kRead); + CHECK(file_ != nullptr); + file_->SeekToFirst(); +} +bool KVFileStore::Write(const std::string& key, const std::string& value) { + CHECK_NE(mode_, kRead); + CHECK(file_ != nullptr); + return file_->Insert(key, value); +} + +void KVFileStore::Flush() { + CHECK_NE(mode_, kRead); + CHECK(file_!= nullptr); + file_->Flush(); +} + +} /* io */ + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/store.cc ---------------------------------------------------------------------- diff --git a/src/io/store.cc b/src/io/store.cc new file mode 100644 index 0000000..6412628 --- /dev/null +++ b/src/io/store.cc @@ -0,0 +1,57 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + + +#include "io/store.h" +#include "io/kvfile_store.h" +#include "io/textfile_store.h" + +namespace singa { namespace io { +Store* CreateStore(const std::string& backend) { + Store *store = nullptr; + if (backend.compare("textfile") == 0) { + store = new TextFileStore(); + } else if (backend.compare("kvfile") == 0) { + store = new KVFileStore(); + } + +#ifdef USE_LMDB + if (backend == "lmdb") { + return new LMDBStore(); + } +#endif + +#ifdef USE_OPENCV + if (backend == "imagefolder") { + return new ImageFolderStore(); + } +#endif + +#ifdef USE_HDFS + if (backend == "hdfs") { + return new HDFSStore(); + } +#endif + return store; +} +} /* io */ + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/textfile_store.cc ---------------------------------------------------------------------- diff --git a/src/io/textfile_store.cc b/src/io/textfile_store.cc new file mode 100644 index 0000000..74ec9a4 --- /dev/null +++ b/src/io/textfile_store.cc @@ -0,0 +1,83 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + + +#include "io/textfile_store.h" +#include <glog/logging.h> + +namespace singa { namespace io { +bool TextFileStore::Open(const std::string& source, Mode mode) { + if (mode == kRead) + fs_ = new std::fstream(source, std::fstream::in); + else if (mode == kCreate) { + fs_ = new std::fstream(source, std::fstream::out); + } + mode_ = mode; + return fs_->is_open(); +} + +void TextFileStore::Close() { + if (fs_ != nullptr) { + if (fs_->is_open()) { + fs_->close(); + } + delete fs_; + } +} + +bool TextFileStore::Read(std::string* key, std::string* value) { + CHECK_EQ(mode_, kRead); + CHECK(fs_ != nullptr); + CHECK(value != nullptr); + CHECK(key != nullptr); + if (!std::getline(*fs_, *value)) { + if (fs_->eof()) + return false; + else + LOG(FATAL) << "error in reading csv file"; + } + *key = std::to_string(lineNo_++); + return true; +} + +void TextFileStore::SeekToFirst() { + CHECK_EQ(mode_, kRead); + CHECK(fs_ != nullptr); + lineNo_ = 0; + fs_->clear(); + fs_->seekg(0); +} + +bool TextFileStore::Write(const std::string& key, const std::string& value) { + CHECK_NE(mode_, kRead); + CHECK(fs_ != nullptr); + // csv store does not write key + *fs_ << value << '\n'; + return true; +} + +void TextFileStore::Flush() { + fs_->flush(); +} + +} /* io */ + +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/test/test_store.cc ---------------------------------------------------------------------- diff --git a/src/test/test_store.cc b/src/test/test_store.cc new file mode 100644 index 0000000..f69aebb --- /dev/null +++ b/src/test/test_store.cc @@ -0,0 +1,92 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ +#include <string> +#include "gtest/gtest.h" +#include "io/store.h" + +TEST(TextFileStore, Open) { + auto store = singa::io::CreateStore("textfile"); + EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kCreate), true); + store->Close(); + EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kRead), true); + store->Close(); +} + +TEST(TextFileStore, Write) { + auto store = singa::io::CreateStore("textfile"); + store->Open("src/test/store.txt", singa::io::kCreate); + store->Write("001", "first tuple"); + store->Write("002", "second tuple"); + store->Flush(); + store->Write("003", "third tuple"); + store->Close(); +} + +TEST(TextFileStore, Read) { + auto store = singa::io::CreateStore("textfile"); + EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kRead), true); + std::string key, value; + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(key, "0"); + EXPECT_EQ(value, "first tuple"); + + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(store->Read(&key, &value), false); + store->SeekToFirst(); + + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(key, "0"); + EXPECT_EQ(value, "first tuple"); +} +TEST(KVFileStore, Open) { + auto store = singa::io::CreateStore("kvfile"); + EXPECT_EQ(store->Open("src/test/store.bin", singa::io::kCreate), true); + store->Close(); + EXPECT_EQ(store->Open("src/test/store.bin", singa::io::kRead), true); + store->Close(); +} +TEST(KVFileStore, Write) { + auto store = singa::io::CreateStore("kvfile"); + store->Open("src/test/store.bin", singa::io::kCreate); + store->Write("001", "first tuple"); + store->Write("002", "second tuple"); + store->Flush(); + store->Write("003", "third tuple"); + store->Close(); +} +TEST(KVFileStore, Read) { + auto store = singa::io::CreateStore("kvfile"); + store->Open("src/test/store.bin", singa::io::kRead); + std::string key, value; + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(key, "001"); + EXPECT_EQ(value, "first tuple"); + + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(store->Read(&key, &value), false); + store->SeekToFirst(); + + EXPECT_EQ(store->Read(&key, &value), true); + EXPECT_EQ(key, "001"); + EXPECT_EQ(value, "first tuple"); +}
