Repository: incubator-singa Updated Branches: refs/heads/dev 833f46195 -> 14d31a44f
SINGA-202 Add reader and writer for binary file Add base classes of IO(Reader and Writer) Add BinFileReader and BinFileWriter for reading and writing binary files in key-value tuples. Pass a simple test for KVFile write and read. Change CMakeLists under src and test folders. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/913f45ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/913f45ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/913f45ce Branch: refs/heads/dev Commit: 913f45ce29c233db1063e18b27f8d5c72d680588 Parents: 833f461 Author: XiangruiCAI <[email protected]> Authored: Tue Jun 21 16:55:14 2016 +0800 Committer: XiangruiCAI <[email protected]> Committed: Thu Jun 23 21:43:30 2016 +0800 ---------------------------------------------------------------------- include/singa/io/reader.h | 99 +++++++++++++++++++++++++++ include/singa/io/writer.h | 112 ++++++++++++++++++++++++++++++ src/io/binfile_reader.cc | 113 ++++++++++++++++++++++++++++++ src/io/binfile_writer.cc | 136 +++++++++++++++++++++++++++++++++++++ test/singa/test_binfile_rw.cc | 95 ++++++++++++++++++++++++++ 5 files changed, 555 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/include/singa/io/reader.h ---------------------------------------------------------------------- diff --git a/include/singa/io/reader.h b/include/singa/io/reader.h new file mode 100644 index 0000000..f693da2 --- /dev/null +++ b/include/singa/io/reader.h @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SINGA_IO_READER_H_ +#define SINGA_IO_READER_H_ + +#include <cstring> +#include <fstream> +#include <string> + +namespace singa { +namespace io { + +using std::string; + +/// General Reader that provides functions for reading tuples. +/// Subclasses implement the functions for a specific data storage, e.g., CSV +/// file, HDFS, kvfile, leveldb, lmdb, etc. +class Reader { + public: + /// In case that users forget to call Close() to release resources, e.g., + /// memory, you can release them here. + virtual ~Reader() {} + + /// path is the path to the storage, could be a file path, database + /// connection, or hdfs path. + /// return true if open successfully, otherwise false. + virtual bool Open(const std::string& path, int capacity = 10485760) = 0; + + /// Release resources. + virtual void Close() = 0; + + /// Read a tuple. + /// return true if read successfully, otherwise false. + virtual bool Read(std::string* key, std::string* value) = 0; + + /// Iterate through all tuples to get the num of all tuples. + /// return num of tuples + virtual int Count() = 0; +}; + +class BinFileReader : public Reader { + public: + ~BinFileReader() { Close(); } + /// \copydoc Open(const std::string& path) + bool Open(const std::string& path, int capacity = 10485760) override; + /// \copydoc Close() + void Close() override; + /// \copydoc Read(std::string* key, std::string* value) + bool Read(std::string* key, std::string* value) override; + /// \copydoc Count() + int Count() override; + /// return path to binary file + inline std::string path() { return path_; } + + protected: + /// Read the next filed, including content_len and content; + /// return true if succeed. + bool ReadField(std::string* content); + + /// Read data from disk if the current data in the buffer is not a full field. + /// size is the size of the next field. + bool PrepareNextField(int size); + + private: + std::string path_ = ""; + /// file to be read + std::ifstream fdat_; + /// internal buffer + char* buf_ = nullptr; + /// offset inside the buf_ + int offset_ = 0; + /// allocated bytes for the buf_ + int capacity_ = 0; + /// bytes in buf_ + int bufsize_ = 0; + /// magic word + const char kMagicWord[2] = {'s', 'g'}; +}; + +} // namespace io +} // namespace singa + +#endif // SINGA_IO_READER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/include/singa/io/writer.h ---------------------------------------------------------------------- diff --git a/include/singa/io/writer.h b/include/singa/io/writer.h new file mode 100644 index 0000000..a847ead --- /dev/null +++ b/include/singa/io/writer.h @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SINGA_IO_WRITER_H_ +#define SINGA_IO_WRITER_H_ + +#include <string> +#include <cstring> +#include <fstream> + +namespace singa { +namespace io { + +using std::string; +enum Mode { kCreate, kAppend }; + +/// General Writer that provides functions for writing tuples. +/// Subclasses implement the functions for a specific data storage, e.g., CSV +/// file, HDFS, image folder, leveldb, lmdb, etc. +class Writer { + public: + /// In case that users forget to call Close() to release resources, e.g., + /// memory, you can release them here. + virtual ~Writer() {} + + /// Open a file. + /// path is the path to the disk BinFile, it can be + /// - a path to local disk file. + /// - a path to local directory. This is to be compatible with the older + /// version (DataShard). The KVFile is shard.dat under that directory + /// - a hdfs file starting with "hdfs://" + /// mode is KVFile open mode(kCreate, kAppend). + /// buffer Caches capacity bytes data for every disk op (read or write), + /// default is 10MB. + virtual bool Open(const std::string &path, Mode mode, + int capacity = 10485760) = 0; + + /// Release resources. + virtual void Close() = 0; + + /// Write a key-value tuple. + /// return true if success, otherwise false. + virtual bool Write(const std::string &key, const std::string &value) = 0; + + /// Flush writing buffer if it has. + virtual void Flush() = 0; +}; + +/// BinFile stores training/validation/test tuples. +/// Each tuple is encoded as [magic_word, key_len, key, val_len, val]: +/// - magic_word has 4 bytes; the first two are "s" and "g", the third one +/// indicates whether key is null, the last one is reserved for future use. +/// - key_len and val_len are of type uint32, which indicate the bytes of key +/// and value respectively; +/// - key_len and key are optional.) +/// When BinFile is created, it will remove the last tuple if the value size +/// and key size do not match because the last write crashed. +class BinFileWriter : public Writer { + public: + ~BinFileWriter() { Close(); } + /// \copydoc Open(const std::string &path, Mode mode, int bufsize = 10485760) + bool Open(const std::string &path, Mode mode, + int capacity = 10485760) override; + /// \copydoc Close() + void Close() override; + /// \copydoc Write(const std::string& key, const std::string& value) override; + bool Write(const std::string &key, const std::string &value) override; + /// \copydoc Flush() + void Flush() override; + /// return path to binary file + inline std::string path() { return path_; } + + protected: + /// Setup the disk pointer to the right position for append in case that + /// the pervious write crashes. + /// return offset (end pos) of the last success written record. + int PrepareForAppend(const std::string &path); + + private: + std::string path_ = ""; + Mode mode_; + /// file to be written + std::ofstream fdat_; + /// internal buffer + char *buf_ = nullptr; + /// allocated bytes for the buf_ + int capacity_ = 0; + /// bytes in buf_ + int bufsize_ = 0; + /// magic word + const char kMagicWord[2]= {'s', 'g'}; +}; + +} // namespace io +} // namespace singa + +#endif // SINGA_IO_WRITER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/src/io/binfile_reader.cc ---------------------------------------------------------------------- diff --git a/src/io/binfile_reader.cc b/src/io/binfile_reader.cc new file mode 100644 index 0000000..6a29540 --- /dev/null +++ b/src/io/binfile_reader.cc @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "singa/io/reader.h" +#include "singa/utils/logging.h" + +namespace singa { +namespace io { +bool BinFileReader::Open(const std::string& path, int capacity) { + path_ = path; + capacity_ = capacity; + buf_ = new char[capacity_]; + fdat_.open(path_, std::ios::in | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + return fdat_.is_open(); +} + +void BinFileReader::Close() { + if (buf_ != nullptr) { + delete[] buf_; + buf_ = nullptr; + } + if (fdat_.is_open()) fdat_.close(); +} + +bool BinFileReader::Read(std::string* key, std::string* value) { + CHECK(fdat_.is_open()) << "File not open!"; + char magic[4]; + int smagic = sizeof(magic); + if (!PrepareNextField(smagic)) return false; + memcpy(magic, buf_ + offset_, smagic); + offset_ += smagic; + + if (magic[0] == kMagicWord[0] && magic[1] == kMagicWord[1]) { + if (magic[2] != 0 && magic[2] != 1) return false; + if (magic[2] == 1) + if (!ReadField(key)) return false; + if (!ReadField(value)) return false; + } + return true; +} + +int BinFileReader::Count() { + std::ifstream fin(path_, std::ios::in | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + int count = 0; + while (true) { + size_t len; + char magic[4]; + fin.read(reinterpret_cast<char*>(magic), sizeof(magic)); + if (!fin.good()) break; + if (magic[2] == 1) { + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + } + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + count++; + } + fin.close(); + return count; +} + +bool BinFileReader::ReadField(std::string* content) { + content->clear(); + int ssize = sizeof(size_t); + if (!PrepareNextField(ssize)) return false; + int len = *reinterpret_cast<size_t*>(buf_ + offset_); + offset_ += ssize; + if (!PrepareNextField(len)) return false; + for (int i = 0; i < len; ++i) content->push_back(buf_[offset_ + i]); + offset_ += len; + return true; +} + +// if the buf does not have the next complete field, read data from disk +bool BinFileReader::PrepareNextField(int size) { + if (offset_ + size > bufsize_) { + bufsize_ -= offset_; + memcpy(buf_, buf_ + offset_, bufsize_); + offset_ = 0; + if (fdat_.eof()) { + return false; + } else { + fdat_.read(buf_ + bufsize_, capacity_ - bufsize_); + bufsize_ += fdat_.gcount(); + if (size > bufsize_) return false; + } + } + return true; +} + +} // namespace io +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/src/io/binfile_writer.cc ---------------------------------------------------------------------- diff --git a/src/io/binfile_writer.cc b/src/io/binfile_writer.cc new file mode 100644 index 0000000..b1d7951 --- /dev/null +++ b/src/io/binfile_writer.cc @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "singa/io/writer.h" +#include "singa/utils/logging.h" + +namespace singa { +namespace io { +bool BinFileWriter::Open(const std::string& path, Mode mode, int capacity) { + CHECK(!fdat_.is_open()); + path_ = path; + mode_ = mode; + capacity_ = capacity; + buf_ = new char[capacity_]; + switch (mode) { + case kCreate: + fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc); + CHECK(fdat_.is_open()) << "Cannot create file " << path_; + break; + case kAppend: + fdat_.open(path_, std::ios::in | std::ios::binary); + CHECK(fdat_.is_open()) << "Cannot open file " << path_; + fdat_.close(); + { + int last_tuple = PrepareForAppend(path_); + fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in | + std::ios::ate); + fdat_.seekp(last_tuple); + } + break; + default: + LOG(FATAL) << "unknown model to open KVFile " << mode; + break; + } + return fdat_.is_open(); +} + +void BinFileWriter::Close() { + Flush(); + if (buf_ != nullptr) { + delete buf_; + buf_ = nullptr; + } + if (fdat_.is_open()) fdat_.close(); +} + +bool BinFileWriter::Write(const std::string& key, const std::string& value) { + CHECK(fdat_.is_open()) << "File not open!"; + if (value.size() == 0) return false; + // magic_word + (key_len + key) + val_len + val + char magic[4]; + int size; + memcpy(magic, kMagicWord, sizeof(kMagicWord)); + magic[3] = 0; + if (key.size() == 0) { + magic[2] = 0; + size = sizeof(magic) + sizeof(size_t) + value.size(); + } else { + magic[2] = 1; + size = sizeof(magic) + 2 * sizeof(size_t) + key.size() + value.size(); + } + + if (bufsize_ + size > capacity_) { + fdat_.write(buf_, bufsize_); + bufsize_ = 0; + CHECK_LE(size, capacity_) << "Tuple size is larger than capacity " + << "Try a larger capacity size"; + } + + memcpy(buf_ + bufsize_, magic, sizeof(magic)); + bufsize_ += sizeof(magic); + if (key.size() > 0) { + *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size(); + bufsize_ += sizeof(size_t); + std::memcpy(buf_ + bufsize_, key.data(), key.size()); + bufsize_ += key.size(); + } + *reinterpret_cast<size_t*>(buf_ + bufsize_) = value.size(); + bufsize_ += sizeof(size_t); + std::memcpy(buf_ + bufsize_, value.data(), value.size()); + bufsize_ += value.size(); + return true; +} + +void BinFileWriter::Flush() { + CHECK(fdat_); + if (bufsize_ > 0) { + fdat_.write(buf_, bufsize_); + fdat_.flush(); + bufsize_ = 0; + } +} + +int BinFileWriter::PrepareForAppend(const std::string& path) { + std::ifstream fin(path, std::ios::in | std::ios::binary); + if (!fin.is_open()) return 0; + int last_tuple_offset = 0; + char buf[256]; + size_t len; + char magic[4]; + while (true) { + fin.read(magic, sizeof(magic)); + if (!fin.good()) break; + if (magic[2] == 1) { + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.read(buf, len); + buf[len] = '\0'; + if (!fin.good()) break; + } + fin.read(reinterpret_cast<char*>(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + last_tuple_offset = fin.tellg(); + } + fin.close(); + return last_tuple_offset; +} +} // namespace io +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/test/singa/test_binfile_rw.cc ---------------------------------------------------------------------- diff --git a/test/singa/test_binfile_rw.cc b/test/singa/test_binfile_rw.cc new file mode 100644 index 0000000..45afd56 --- /dev/null +++ b/test/singa/test_binfile_rw.cc @@ -0,0 +1,95 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "../include/singa/io/reader.h" +#include "../include/singa/io/writer.h" +#include "gtest/gtest.h" + +const char* path = "./binfile_test"; +using singa::io::BinFileReader; +using singa::io::BinFileWriter; +TEST(BinFileWriter, Create) { + BinFileWriter writer; + bool ret; + ret = writer.Open(path, singa::io::kCreate); + EXPECT_EQ(true, ret); + + std::string key = ""; + std::string value = "\nThis is a test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + writer.Flush(); + writer.Close(); +} + +TEST(BinFileWriter, Append) { + BinFileWriter writer; + bool ret; + ret = writer.Open(path, singa::io::kAppend); + EXPECT_EQ(true, ret); + + std::string key = "1"; + std::string value = "\nThis is another test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + key = "2"; + value = "\nThis is another test for binfile io."; + ret = writer.Write(key, value); + EXPECT_EQ(true, ret); + + writer.Flush(); + writer.Close(); +} + +TEST(BinFileReader, Read) { + BinFileReader reader; + bool ret; + ret = reader.Open(path); + EXPECT_EQ(true, ret); + + int cnt = reader.Count(); + EXPECT_EQ(4, cnt); + + std::string key, value; + reader.Read(&key, &value); + EXPECT_STREQ("", key.c_str()); + EXPECT_STREQ("\nThis is a test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("", key.c_str()); + EXPECT_STREQ("\nThis is a test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("1", key.c_str()); + EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str()); + + reader.Read(&key, &value); + EXPECT_STREQ("2", key.c_str()); + EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str()); + + reader.Close(); + remove(path); +}
