Repository: incubator-singa Updated Branches: refs/heads/dev 4db968c2e -> d3c1bae61
SINGA-211 Add TextFileReader and TextFileWriter for CSV files Add TextFileReader and TextFileWriter for reading and writing csv files. Pass a simple test for these two classes. Fixed bugs in BinFileReader and BinFileWriter. Add a constructor for BinFile Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d3c1bae6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d3c1bae6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d3c1bae6 Branch: refs/heads/dev Commit: d3c1bae61253efa58afab9d5d7a7ef8a6a2596d2 Parents: 4db968c Author: XiangruiCAI <[email protected]> Authored: Tue Jun 28 16:09:21 2016 +0800 Committer: XiangruiCAI <[email protected]> Committed: Tue Jun 28 21:39:27 2016 +0800 ---------------------------------------------------------------------- include/singa/io/reader.h | 40 +++++++++++++--- include/singa/io/writer.h | 47 +++++++++++++------ src/io/binfile_reader.cc | 19 ++++++-- src/io/binfile_writer.cc | 70 +++++++++------------------ src/io/textfile_reader.cc | 63 +++++++++++++++++++++++++ src/io/textfile_writer.cc | 61 ++++++++++++++++++++++++ test/singa/test_binfile_rw.cc | 10 ++-- test/singa/test_textfile_rw.cc | 94 +++++++++++++++++++++++++++++++++++++ 8 files changed, 327 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/reader.h ---------------------------------------------------------------------- diff --git a/include/singa/io/reader.h b/include/singa/io/reader.h index f693da2..bd1b3fe 100644 --- a/include/singa/io/reader.h +++ b/include/singa/io/reader.h @@ -40,7 +40,7 @@ class Reader { /// path is the path to the storage, could be a file path, database /// connection, or hdfs path. /// return true if open successfully, otherwise false. - virtual bool Open(const std::string& path, int capacity = 10485760) = 0; + virtual bool Open(const std::string& path) = 0; /// Release resources. virtual void Close() = 0; @@ -54,11 +54,14 @@ class Reader { virtual int Count() = 0; }; +/// Binfilereader reads tuples from binary file with key-value pairs. class BinFileReader : public Reader { public: ~BinFileReader() { Close(); } /// \copydoc Open(const std::string& path) - bool Open(const std::string& path, int capacity = 10485760) override; + bool Open(const std::string& path) override; + /// \copydoc Open(const std::string& path), user defines capacity + bool Open(const std::string& path, int capacity); /// \copydoc Close() void Close() override; /// \copydoc Read(std::string* key, std::string* value) @@ -69,30 +72,55 @@ class BinFileReader : public Reader { inline std::string path() { return path_; } protected: + /// Open a file with path_ and initialize buf_ + bool OpenFile(); /// Read the next filed, including content_len and content; /// return true if succeed. bool ReadField(std::string* content); - /// Read data from disk if the current data in the buffer is not a full field. /// size is the size of the next field. bool PrepareNextField(int size); private: - std::string path_ = ""; /// file to be read + std::string path_ = ""; + /// ifstream std::ifstream fdat_; /// internal buffer char* buf_ = nullptr; /// offset inside the buf_ int offset_ = 0; - /// allocated bytes for the buf_ - int capacity_ = 0; + /// allocated bytes for the buf_, default is 10M + int capacity_ = 10485760; /// bytes in buf_ int bufsize_ = 0; /// magic word const char kMagicWord[2] = {'s', 'g'}; }; +/// TextFileReader reads tuples from CSV file. +class TextFileReader : public Reader { + public: + ~TextFileReader() { Close(); } + /// \copydoc Open(const std::string& path) + bool Open(const std::string& path) override; + /// \copydoc Close() + void Close() override; + /// \copydoc Read(std::string* key, std::string* value) + bool Read(std::string* key, std::string* value) override; + /// \copydoc Count() + int Count() override; + /// return path to text file + inline std::string path() { return path_; } + + private: + /// file to be read + std::string path_ = ""; + /// ifstream + std::ifstream fdat_; + /// current line number + int lineNo_ = 0; +}; } // namespace io } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/include/singa/io/writer.h ---------------------------------------------------------------------- diff --git a/include/singa/io/writer.h b/include/singa/io/writer.h index a847ead..f20a22b 100644 --- a/include/singa/io/writer.h +++ b/include/singa/io/writer.h @@ -44,11 +44,8 @@ class Writer { /// - a path to local directory. This is to be compatible with the older /// version (DataShard). The KVFile is shard.dat under that directory /// - a hdfs file starting with "hdfs://" - /// mode is KVFile open mode(kCreate, kAppend). - /// buffer Caches capacity bytes data for every disk op (read or write), - /// default is 10MB. - virtual bool Open(const std::string &path, Mode mode, - int capacity = 10485760) = 0; + /// mode is open mode(kCreate, kAppend). + virtual bool Open(const std::string &path, Mode mode) = 0; /// Release resources. virtual void Close() = 0; @@ -73,9 +70,10 @@ class Writer { class BinFileWriter : public Writer { public: ~BinFileWriter() { Close(); } - /// \copydoc Open(const std::string &path, Mode mode, int bufsize = 10485760) - bool Open(const std::string &path, Mode mode, - int capacity = 10485760) override; + /// \copydoc Open(const std::string &path, Mode mode) + bool Open(const std::string &path, Mode mode) override; + /// \copydoc Open(const std::string& path), user defines capacity + bool Open(const std::string& path, Mode mode, int capacity); /// \copydoc Close() void Close() override; /// \copydoc Write(const std::string& key, const std::string& value) override; @@ -86,26 +84,47 @@ class BinFileWriter : public Writer { inline std::string path() { return path_; } protected: - /// Setup the disk pointer to the right position for append in case that - /// the pervious write crashes. - /// return offset (end pos) of the last success written record. - int PrepareForAppend(const std::string &path); + /// Open a file with path_ and initialize buf_ + bool OpenFile(); private: + /// file to be written std::string path_ = ""; Mode mode_; - /// file to be written + /// ofstream std::ofstream fdat_; /// internal buffer char *buf_ = nullptr; /// allocated bytes for the buf_ - int capacity_ = 0; + int capacity_ = 10485760; /// bytes in buf_ int bufsize_ = 0; /// magic word const char kMagicWord[2]= {'s', 'g'}; }; +/// TextFileWriter write training/validation/test tuples in CSV file. +class TextFileWriter : public Writer { + public: + ~TextFileWriter() { Close(); } + /// \copydoc Open(const std::string &path, Mode mode) + bool Open(const std::string &path, Mode mode) override; + /// \copydoc Close() + void Close() override; + /// \copydoc Write(const std::string& key, const std::string& value) override; + bool Write(const std::string &key, const std::string &value) override; + /// \copydoc Flush() + void Flush() override; + /// return path to text file + inline std::string path() { return path_; } + + private: + /// file to be written + std::string path_ = ""; + Mode mode_; + /// ofstream + std::ofstream fdat_; +}; } // namespace io } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_reader.cc ---------------------------------------------------------------------- diff --git a/src/io/binfile_reader.cc b/src/io/binfile_reader.cc index 6a29540..d54eeb5 100644 --- a/src/io/binfile_reader.cc +++ b/src/io/binfile_reader.cc @@ -21,13 +21,15 @@ namespace singa { namespace io { +bool BinFileReader::Open(const std::string& path) { + path_ = path; + return OpenFile(); +} + bool BinFileReader::Open(const std::string& path, int capacity) { path_ = path; capacity_ = capacity; - buf_ = new char[capacity_]; - fdat_.open(path_, std::ios::in | std::ios::binary); - CHECK(fdat_.is_open()) << "Cannot open file " << path_; - return fdat_.is_open(); + return OpenFile(); } void BinFileReader::Close() { @@ -57,7 +59,7 @@ bool BinFileReader::Read(std::string* key, std::string* value) { int BinFileReader::Count() { std::ifstream fin(path_, std::ios::in | std::ios::binary); - CHECK(fdat_.is_open()) << "Cannot create file " << path_; + CHECK(fin.is_open()) << "Cannot create file " << path_; int count = 0; while (true) { size_t len; @@ -80,6 +82,13 @@ int BinFileReader::Count() { return count; } +bool BinFileReader::OpenFile() { + buf_ = new char[capacity_]; + fdat_.open(path_, std::ios::in | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + return fdat_.is_open(); +} + bool BinFileReader::ReadField(std::string* content) { content->clear(); int ssize = sizeof(size_t); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/binfile_writer.cc ---------------------------------------------------------------------- diff --git a/src/io/binfile_writer.cc b/src/io/binfile_writer.cc index b1d7951..e207453 100644 --- a/src/io/binfile_writer.cc +++ b/src/io/binfile_writer.cc @@ -21,33 +21,19 @@ namespace singa { namespace io { +bool BinFileWriter::Open(const std::string& path, Mode mode) { + path_ = path; + mode_ = mode; + buf_ = new char[capacity_]; + return OpenFile(); +} + bool BinFileWriter::Open(const std::string& path, Mode mode, int capacity) { CHECK(!fdat_.is_open()); path_ = path; mode_ = mode; capacity_ = capacity; - buf_ = new char[capacity_]; - switch (mode) { - case kCreate: - fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc); - CHECK(fdat_.is_open()) << "Cannot create file " << path_; - break; - case kAppend: - fdat_.open(path_, std::ios::in | std::ios::binary); - CHECK(fdat_.is_open()) << "Cannot open file " << path_; - fdat_.close(); - { - int last_tuple = PrepareForAppend(path_); - fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in | - std::ios::ate); - fdat_.seekp(last_tuple); - } - break; - default: - LOG(FATAL) << "unknown model to open KVFile " << mode; - break; - } - return fdat_.is_open(); + return OpenFile(); } void BinFileWriter::Close() { @@ -98,7 +84,6 @@ bool BinFileWriter::Write(const std::string& key, const std::string& value) { } void BinFileWriter::Flush() { - CHECK(fdat_); if (bufsize_ > 0) { fdat_.write(buf_, bufsize_); fdat_.flush(); @@ -106,31 +91,22 @@ void BinFileWriter::Flush() { } } -int BinFileWriter::PrepareForAppend(const std::string& path) { - std::ifstream fin(path, std::ios::in | std::ios::binary); - if (!fin.is_open()) return 0; - int last_tuple_offset = 0; - char buf[256]; - size_t len; - char magic[4]; - while (true) { - fin.read(magic, sizeof(magic)); - if (!fin.good()) break; - if (magic[2] == 1) { - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.read(buf, len); - buf[len] = '\0'; - if (!fin.good()) break; - } - fin.read(reinterpret_cast<char*>(&len), sizeof(len)); - if (!fin.good()) break; - fin.seekg(len, std::ios_base::cur); - if (!fin.good()) break; - last_tuple_offset = fin.tellg(); +bool BinFileWriter::OpenFile() { + buf_ = new char[capacity_]; + switch (mode_) { + case kCreate: + fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + break; + case kAppend: + fdat_.open(path_, std::ios::app | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + break; + default: + LOG(FATAL) << "unknown mode to open binary file " << mode_; + break; } - fin.close(); - return last_tuple_offset; + return fdat_.is_open(); } } // namespace io } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_reader.cc ---------------------------------------------------------------------- diff --git a/src/io/textfile_reader.cc b/src/io/textfile_reader.cc new file mode 100644 index 0000000..7612241 --- /dev/null +++ b/src/io/textfile_reader.cc @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "singa/io/reader.h" +#include "singa/utils/logging.h" + +namespace singa { +namespace io { +bool TextFileReader::Open(const std::string& path) { + path_ = path; + fdat_.open(path_, std::ios::in); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + return fdat_.is_open(); +} + +void TextFileReader::Close() { + if (fdat_.is_open()) fdat_.close(); +} + +bool TextFileReader::Read(std::string* key, std::string* value) { + CHECK(fdat_.is_open()) << "File not open!"; + key->clear(); + value->clear(); + if (!std::getline(fdat_, *value)) { + if (fdat_.eof()) + return false; + else + LOG(FATAL) << "Error in reading text file"; + } + *key = std::to_string(lineNo_++); + return true; +} + +int TextFileReader::Count() { + std::ifstream fin(path_, std::ios::in); + CHECK(fin.is_open()) << "Cannot create file " << path_; + int count = 0; + string line; + while (!fin.eof()) { + std::getline(fin, line); + if (line != "") count++; + } + fin.close(); + return count; +} + +} // namespace io +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/src/io/textfile_writer.cc ---------------------------------------------------------------------- diff --git a/src/io/textfile_writer.cc b/src/io/textfile_writer.cc new file mode 100644 index 0000000..7868b85 --- /dev/null +++ b/src/io/textfile_writer.cc @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "singa/io/writer.h" +#include "singa/utils/logging.h" + +namespace singa { +namespace io { +bool TextFileWriter::Open(const std::string& path, Mode mode) { + CHECK(!fdat_.is_open()); + path_ = path; + mode_ = mode; + switch (mode) { + case kCreate: + fdat_.open(path_, std::ios::out | std::ios::trunc); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + break; + case kAppend: + fdat_.open(path_, std::ios::app); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + break; + default: + LOG(FATAL) << "unknown mode to open text file " << mode; + break; + } + return fdat_.is_open(); +} + +void TextFileWriter::Close() { + Flush(); + if (fdat_.is_open()) fdat_.close(); +} + +bool TextFileWriter::Write(const std::string& key, const std::string& value) { + CHECK(fdat_.is_open()) << "File not open!"; + if (value.size() == 0) return false; + fdat_ << value << std::endl; + return true; +} + +void TextFileWriter::Flush() { + if (fdat_.is_open()) + fdat_.flush(); +} +} // namespace io +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_binfile_rw.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_binfile_rw.cc b/test/singa/test_binfile_rw.cc index 45afd56..ddee4f1 100644 --- a/test/singa/test_binfile_rw.cc +++ b/test/singa/test_binfile_rw.cc @@ -23,13 +23,13 @@ #include "../include/singa/io/writer.h" #include "gtest/gtest.h" -const char* path = "./binfile_test"; +const char* path_bin = "./binfile_test"; using singa::io::BinFileReader; using singa::io::BinFileWriter; TEST(BinFileWriter, Create) { BinFileWriter writer; bool ret; - ret = writer.Open(path, singa::io::kCreate); + ret = writer.Open(path_bin, singa::io::kCreate); EXPECT_EQ(true, ret); std::string key = ""; @@ -47,7 +47,7 @@ TEST(BinFileWriter, Create) { TEST(BinFileWriter, Append) { BinFileWriter writer; bool ret; - ret = writer.Open(path, singa::io::kAppend); + ret = writer.Open(path_bin, singa::io::kAppend, 20971520); EXPECT_EQ(true, ret); std::string key = "1"; @@ -67,7 +67,7 @@ TEST(BinFileWriter, Append) { TEST(BinFileReader, Read) { BinFileReader reader; bool ret; - ret = reader.Open(path); + ret = reader.Open(path_bin); EXPECT_EQ(true, ret); int cnt = reader.Count(); @@ -91,5 +91,5 @@ TEST(BinFileReader, Read) { EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str()); reader.Close(); - remove(path); + remove(path_bin); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d3c1bae6/test/singa/test_textfile_rw.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_textfile_rw.cc b/test/singa/test_textfile_rw.cc new file mode 100644 index 0000000..7494f46 --- /dev/null +++ b/test/singa/test_textfile_rw.cc @@ -0,0 +1,94 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "../include/singa/io/reader.h" +#include "../include/singa/io/writer.h" +#include "gtest/gtest.h" + +const char* path_csv = "./textfile_test.csv"; +using singa::io::TextFileReader; +using singa::io::TextFileWriter; +TEST(TextFileWriter, Create) { + TextFileWriter writer; + bool ret; + ret = writer.Open(path_csv, singa::io::kCreate); + EXPECT_EQ(true, ret); + + std::string key = ""; + std::string value = "This is a test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + writer.Flush(); + writer.Close(); +} + +TEST(TextFileWriter, Append) { + TextFileWriter writer; + bool ret; + ret = writer.Open(path_csv, singa::io::kAppend); + EXPECT_EQ(true, ret); + + std::string key = "1"; + std::string value = "This is another test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + key = "2"; + value = "This is another test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + writer.Flush(); + writer.Close(); +} +TEST(TextFileReader, Read) { + TextFileReader reader; + bool ret; + ret = reader.Open(path_csv); + EXPECT_EQ(true, ret); + + int cnt = reader.Count(); + EXPECT_EQ(4, cnt); + + std::string key, value; + reader.Read(&key, &value); + EXPECT_STREQ("0", key.c_str()); + EXPECT_STREQ("This is a test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("1", key.c_str()); + EXPECT_STREQ("This is a test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("2", key.c_str()); + EXPECT_STREQ("This is another test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("3", key.c_str()); + EXPECT_STREQ("This is another test for binfile io.", value.c_str()); + + reader.Close(); + remove(path_csv); +}
