This is an automated email from the ASF dual-hosted git repository. silver pushed a commit to branch cpp-more-operation in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 7d798640395987a063b3e3634e6dfddf1b8f27d0 Author: silver-ymz <[email protected]> AuthorDate: Sun Sep 3 23:28:07 2023 +0800 feat(bindings/cpp): expose reader Signed-off-by: silver-ymz <[email protected]> --- bindings/cpp/include/opendal.hpp | 37 +++++++++++++++++++++ bindings/cpp/src/lib.rs | 44 +++++++++++++++++++++++++ bindings/cpp/src/opendal.cpp | 69 +++++++++++++++++++++++++++++++++++++++ bindings/cpp/tests/basic_test.cpp | 44 ++++++++++++++++++++++++- 4 files changed, 193 insertions(+), 1 deletion(-) diff --git a/bindings/cpp/include/opendal.hpp b/bindings/cpp/include/opendal.hpp index 0a13030c5..78e55f5bd 100644 --- a/bindings/cpp/include/opendal.hpp +++ b/bindings/cpp/include/opendal.hpp @@ -29,6 +29,8 @@ namespace opendal { +constexpr int BUFFER_SIZE = 1024 * 1024; + /** * @enum EntryMode * @brief The mode of the entry @@ -66,6 +68,8 @@ struct Entry { Entry(ffi::Entry &&); }; +class ReaderStream; + /** * @class Operator * @brief Operator is the entry for all public APIs. @@ -117,6 +121,14 @@ public: */ void write(std::string_view path, const std::vector<uint8_t> &data); + /** + * @brief Read data from the operator + * + * @param path The path of the data + * @return The reader of the data + */ + ReaderStream reader(std::string_view path); + /** * @brief Check if the path exists * @@ -176,4 +188,29 @@ private: std::optional<rust::Box<opendal::ffi::Operator>> operator_; }; +using Reader = rust::Box<opendal::ffi::Reader>; + +class ReaderStreamBuf : public std::streambuf { +public: + ReaderStreamBuf(Reader &&reader) : reader_(std::move(reader)) {} + +protected: + int_type underflow() override; + pos_type seekoff(off_type off, std::ios_base::seekdir dir, + std::ios_base::openmode which) override; + pos_type seekpos(pos_type pos, std::ios_base::openmode which) override; + +private: + Reader reader_; +}; + +class ReaderStream : public std::istream { +public: + ReaderStream(Reader &&reader) + : std::istream(&buf_), buf_(std::move(reader)) {} + +private: + ReaderStreamBuf buf_; +}; + } // namespace opendal \ No newline at end of file diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 66498d9e7..20f18f3ef 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -17,6 +17,7 @@ use anyhow::Result; use opendal as od; +use std::io::{BufRead, BufReader, Seek}; use std::str::FromStr; #[cxx::bridge(namespace = "opendal::ffi")] @@ -26,6 +27,12 @@ mod ffi { value: String, } + enum SeekDir { + Start = 0, + Current = 1, + End = 2, + } + enum EntryMode { File = 1, Dir = 2, @@ -54,6 +61,7 @@ mod ffi { extern "Rust" { type Operator; + type Reader; fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> Result<Box<Operator>>; fn read(self: &Operator, path: &str) -> Result<Vec<u8>>; @@ -65,10 +73,17 @@ mod ffi { fn remove(self: &Operator, path: &str) -> Result<()>; fn stat(self: &Operator, path: &str) -> Result<Metadata>; fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>; + fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>; + + fn fill_buf(self: &mut Reader) -> Result<&[u8]>; + fn buffer(self: &Reader) -> &[u8]; + fn consume(self: &mut Reader, amt: usize); + fn seek(self: &mut Reader, offset: u64, dir: SeekDir) -> Result<u64>; } } struct Operator(od::BlockingOperator); +struct Reader(BufReader<od::BlockingReader>); fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> Result<Box<Operator>> { let scheme = od::Scheme::from_str(scheme)?; @@ -124,6 +139,35 @@ impl Operator { fn list(&self, path: &str) -> Result<Vec<ffi::Entry>> { Ok(self.0.list(path)?.into_iter().map(Into::into).collect()) } + + fn reader(&self, path: &str) -> Result<Box<Reader>> { + Ok(Box::new(Reader(BufReader::new(self.0.reader(path)?)))) + } +} + +impl Reader { + fn fill_buf(&mut self) -> Result<&[u8]> { + Ok(self.0.fill_buf()?) + } + + fn buffer(&self) -> &[u8] { + self.0.buffer() + } + + fn consume(&mut self, amt: usize) { + self.0.consume(amt) + } + + fn seek(&mut self, offset: u64, dir: ffi::SeekDir) -> Result<u64> { + let pos = match dir { + ffi::SeekDir::Start => std::io::SeekFrom::Start(offset), + ffi::SeekDir::Current => std::io::SeekFrom::Current(offset as i64), + ffi::SeekDir::End => std::io::SeekFrom::End(offset as i64), + _ => return Err(anyhow::anyhow!("invalid seek dir")), + }; + + Ok(self.0.seek(pos)?) + } } impl From<od::Metadata> for ffi::Metadata { diff --git a/bindings/cpp/src/opendal.cpp b/bindings/cpp/src/opendal.cpp index c0b4a3d90..21be52758 100644 --- a/bindings/cpp/src/opendal.cpp +++ b/bindings/cpp/src/opendal.cpp @@ -24,6 +24,8 @@ using namespace opendal; #define RUST_STR(s) rust::Str(s.data(), s.size()) #define RUST_STRING(s) rust::String(s.data(), s.size()) +// Operator + Operator::Operator(std::string_view scheme, const std::unordered_map<std::string, std::string> &config) { auto rust_map = rust::Vec<ffi::HashMapValue>(); @@ -85,6 +87,58 @@ std::vector<Entry> Operator::list(std::string_view path) { return entries; } +ReaderStream Operator::reader(std::string_view path) { + return {operator_.value()->reader(RUST_STR(path))}; +} + +// Reader + +ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir); + +ReaderStreamBuf::pos_type +ReaderStreamBuf::seekoff(ReaderStreamBuf::off_type off, + std::ios_base::seekdir dir, + std::ios_base::openmode which) { + if (!(which & std::ios_base::in)) { + return -1; + } + + if (dir == std::ios_base::cur) { + off += gptr() - eback(); + } + + auto res = reader_->seek(off, to_rust_seek_dir(dir)); + + auto buffer = reader_->buffer(); + auto gbeg = (char *)(buffer.data()); + auto gcurr = gbeg; + auto gend = gbeg + buffer.size(); + setg(gbeg, gcurr, gend); + + return res; +} + +ReaderStreamBuf::pos_type +ReaderStreamBuf::seekpos(ReaderStreamBuf::pos_type pos, + std::ios_base::openmode which) { + return seekoff(pos, std::ios_base::beg, which); +} + +ReaderStreamBuf::int_type ReaderStreamBuf::underflow() { + if (gptr() != nullptr) { + reader_->consume(gptr() - eback()); + } + auto buffer = reader_->fill_buf(); + auto gbeg = (char *)(buffer.data()); + auto gcurr = gbeg; + auto gend = gbeg + buffer.size(); + setg(gbeg, gcurr, gend); + + return gcurr == gend ? traits_type::eof() : traits_type::to_int_type(*gcurr); +} + +// Metadata + std::optional<std::string> parse_optional_string(ffi::OptionalString &&s); Metadata::Metadata(ffi::Metadata &&other) { @@ -104,6 +158,8 @@ Metadata::Metadata(ffi::Metadata &&other) { } } +// Entry + Entry::Entry(ffi::Entry &&other) : path(std::move(other.path)) {} // helper functions @@ -114,4 +170,17 @@ std::optional<std::string> parse_optional_string(ffi::OptionalString &&s) { } else { return std::nullopt; } +} + +ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir) { + switch (dir) { + case std::ios_base::beg: + return ffi::SeekDir::Start; + case std::ios_base::cur: + return ffi::SeekDir::Current; + case std::ios_base::end: + return ffi::SeekDir::End; + default: + throw std::runtime_error("invalid seekdir"); + } } \ No newline at end of file diff --git a/bindings/cpp/tests/basic_test.cpp b/bindings/cpp/tests/basic_test.cpp index d86cb4fd7..121c9df38 100644 --- a/bindings/cpp/tests/basic_test.cpp +++ b/bindings/cpp/tests/basic_test.cpp @@ -19,9 +19,12 @@ #include "opendal.hpp" #include "gtest/gtest.h" +#include <ctime> #include <optional> +#include <random> #include <string> #include <unordered_map> +#include <vector> class OpendalTest : public ::testing::Test { protected: @@ -30,10 +33,14 @@ protected: std::string scheme; std::unordered_map<std::string, std::string> config; + // random number generator + std::mt19937 rng; + void SetUp() override { scheme = "memory"; - op = opendal::Operator(scheme, config); + rng.seed(time(nullptr)); + op = opendal::Operator(scheme, config); EXPECT_TRUE(op.available()); } }; @@ -86,6 +93,41 @@ TEST_F(OpendalTest, BasicTest) { EXPECT_FALSE(op.is_exist(file_path_renamed)); } +TEST_F(OpendalTest, ReaderTest) { + std::string file_path = "test"; + constexpr int size = 2000; + std::vector<uint8_t> data(size); + + for (auto &d : data) { + d = rng() % 256; + } + + // write + op.write(file_path, data); + + // read + auto reader = op.reader(file_path); + + auto read = [&](std::size_t to_read, std::streampos expected_tellg) { + std::vector<char> v(to_read); + reader.read(v.data(), v.size()); + EXPECT_TRUE(!!reader); + EXPECT_EQ(reader.tellg(), expected_tellg); + }; + + EXPECT_EQ(reader.tellg(), 0); + read(10, 10); + read(15, 25); + read(15, 40); + reader.get(); + EXPECT_EQ(reader.tellg(), 41); + read(1000, 1041); + + reader.seekg(0, std::ios::beg); + std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{reader}, {}); + EXPECT_EQ(reader_data, data); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();
