This is an automated email from the ASF dual-hosted git repository.

silver pushed a commit to branch cpp-more-operation
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 7d798640395987a063b3e3634e6dfddf1b8f27d0
Author: silver-ymz <[email protected]>
AuthorDate: Sun Sep 3 23:28:07 2023 +0800

    feat(bindings/cpp): expose reader
    
    Signed-off-by: silver-ymz <[email protected]>
---
 bindings/cpp/include/opendal.hpp  | 37 +++++++++++++++++++++
 bindings/cpp/src/lib.rs           | 44 +++++++++++++++++++++++++
 bindings/cpp/src/opendal.cpp      | 69 +++++++++++++++++++++++++++++++++++++++
 bindings/cpp/tests/basic_test.cpp | 44 ++++++++++++++++++++++++-
 4 files changed, 193 insertions(+), 1 deletion(-)

diff --git a/bindings/cpp/include/opendal.hpp b/bindings/cpp/include/opendal.hpp
index 0a13030c5..78e55f5bd 100644
--- a/bindings/cpp/include/opendal.hpp
+++ b/bindings/cpp/include/opendal.hpp
@@ -29,6 +29,8 @@
 
 namespace opendal {
 
+constexpr int BUFFER_SIZE = 1024 * 1024;
+
 /**
  * @enum EntryMode
  * @brief The mode of the entry
@@ -66,6 +68,8 @@ struct Entry {
   Entry(ffi::Entry &&);
 };
 
+class ReaderStream;
+
 /**
  * @class Operator
  * @brief Operator is the entry for all public APIs.
@@ -117,6 +121,14 @@ public:
    */
   void write(std::string_view path, const std::vector<uint8_t> &data);
 
+  /**
+   * @brief Read data from the operator
+   *
+   * @param path The path of the data
+   * @return The reader of the data
+   */
+  ReaderStream reader(std::string_view path);
+
   /**
    * @brief Check if the path exists
    *
@@ -176,4 +188,29 @@ private:
   std::optional<rust::Box<opendal::ffi::Operator>> operator_;
 };
 
+using Reader = rust::Box<opendal::ffi::Reader>;
+
+class ReaderStreamBuf : public std::streambuf {
+public:
+  ReaderStreamBuf(Reader &&reader) : reader_(std::move(reader)) {}
+
+protected:
+  int_type underflow() override;
+  pos_type seekoff(off_type off, std::ios_base::seekdir dir,
+                   std::ios_base::openmode which) override;
+  pos_type seekpos(pos_type pos, std::ios_base::openmode which) override;
+
+private:
+  Reader reader_;
+};
+
+class ReaderStream : public std::istream {
+public:
+  ReaderStream(Reader &&reader)
+      : std::istream(&buf_), buf_(std::move(reader)) {}
+
+private:
+  ReaderStreamBuf buf_;
+};
+
 } // namespace opendal
\ No newline at end of file
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 66498d9e7..20f18f3ef 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -17,6 +17,7 @@
 
 use anyhow::Result;
 use opendal as od;
+use std::io::{BufRead, BufReader, Seek};
 use std::str::FromStr;
 
 #[cxx::bridge(namespace = "opendal::ffi")]
@@ -26,6 +27,12 @@ mod ffi {
         value: String,
     }
 
+    enum SeekDir {
+        Start = 0,
+        Current = 1,
+        End = 2,
+    }
+
     enum EntryMode {
         File = 1,
         Dir = 2,
@@ -54,6 +61,7 @@ mod ffi {
 
     extern "Rust" {
         type Operator;
+        type Reader;
 
         fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> 
Result<Box<Operator>>;
         fn read(self: &Operator, path: &str) -> Result<Vec<u8>>;
@@ -65,10 +73,17 @@ mod ffi {
         fn remove(self: &Operator, path: &str) -> Result<()>;
         fn stat(self: &Operator, path: &str) -> Result<Metadata>;
         fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>;
+        fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>;
+
+        fn fill_buf(self: &mut Reader) -> Result<&[u8]>;
+        fn buffer(self: &Reader) -> &[u8];
+        fn consume(self: &mut Reader, amt: usize);
+        fn seek(self: &mut Reader, offset: u64, dir: SeekDir) -> Result<u64>;
     }
 }
 
 struct Operator(od::BlockingOperator);
+struct Reader(BufReader<od::BlockingReader>);
 
 fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> 
Result<Box<Operator>> {
     let scheme = od::Scheme::from_str(scheme)?;
@@ -124,6 +139,35 @@ impl Operator {
     fn list(&self, path: &str) -> Result<Vec<ffi::Entry>> {
         Ok(self.0.list(path)?.into_iter().map(Into::into).collect())
     }
+
+    fn reader(&self, path: &str) -> Result<Box<Reader>> {
+        Ok(Box::new(Reader(BufReader::new(self.0.reader(path)?))))
+    }
+}
+
+impl Reader {
+    fn fill_buf(&mut self) -> Result<&[u8]> {
+        Ok(self.0.fill_buf()?)
+    }
+
+    fn buffer(&self) -> &[u8] {
+        self.0.buffer()
+    }
+
+    fn consume(&mut self, amt: usize) {
+        self.0.consume(amt)
+    }
+
+    fn seek(&mut self, offset: u64, dir: ffi::SeekDir) -> Result<u64> {
+        let pos = match dir {
+            ffi::SeekDir::Start => std::io::SeekFrom::Start(offset),
+            ffi::SeekDir::Current => std::io::SeekFrom::Current(offset as i64),
+            ffi::SeekDir::End => std::io::SeekFrom::End(offset as i64),
+            _ => return Err(anyhow::anyhow!("invalid seek dir")),
+        };
+
+        Ok(self.0.seek(pos)?)
+    }
 }
 
 impl From<od::Metadata> for ffi::Metadata {
diff --git a/bindings/cpp/src/opendal.cpp b/bindings/cpp/src/opendal.cpp
index c0b4a3d90..21be52758 100644
--- a/bindings/cpp/src/opendal.cpp
+++ b/bindings/cpp/src/opendal.cpp
@@ -24,6 +24,8 @@ using namespace opendal;
 #define RUST_STR(s) rust::Str(s.data(), s.size())
 #define RUST_STRING(s) rust::String(s.data(), s.size())
 
+// Operator
+
 Operator::Operator(std::string_view scheme,
                    const std::unordered_map<std::string, std::string> &config) 
{
   auto rust_map = rust::Vec<ffi::HashMapValue>();
@@ -85,6 +87,58 @@ std::vector<Entry> Operator::list(std::string_view path) {
   return entries;
 }
 
+ReaderStream Operator::reader(std::string_view path) {
+  return {operator_.value()->reader(RUST_STR(path))};
+}
+
+// Reader
+
+ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);
+
+ReaderStreamBuf::pos_type
+ReaderStreamBuf::seekoff(ReaderStreamBuf::off_type off,
+                         std::ios_base::seekdir dir,
+                         std::ios_base::openmode which) {
+  if (!(which & std::ios_base::in)) {
+    return -1;
+  }
+
+  if (dir == std::ios_base::cur) {
+    off += gptr() - eback();
+  }
+
+  auto res = reader_->seek(off, to_rust_seek_dir(dir));
+
+  auto buffer = reader_->buffer();
+  auto gbeg = (char *)(buffer.data());
+  auto gcurr = gbeg;
+  auto gend = gbeg + buffer.size();
+  setg(gbeg, gcurr, gend);
+
+  return res;
+}
+
+ReaderStreamBuf::pos_type
+ReaderStreamBuf::seekpos(ReaderStreamBuf::pos_type pos,
+                         std::ios_base::openmode which) {
+  return seekoff(pos, std::ios_base::beg, which);
+}
+
+ReaderStreamBuf::int_type ReaderStreamBuf::underflow() {
+  if (gptr() != nullptr) {
+    reader_->consume(gptr() - eback());
+  }
+  auto buffer = reader_->fill_buf();
+  auto gbeg = (char *)(buffer.data());
+  auto gcurr = gbeg;
+  auto gend = gbeg + buffer.size();
+  setg(gbeg, gcurr, gend);
+
+  return gcurr == gend ? traits_type::eof() : traits_type::to_int_type(*gcurr);
+}
+
+// Metadata
+
 std::optional<std::string> parse_optional_string(ffi::OptionalString &&s);
 
 Metadata::Metadata(ffi::Metadata &&other) {
@@ -104,6 +158,8 @@ Metadata::Metadata(ffi::Metadata &&other) {
   }
 }
 
+// Entry
+
 Entry::Entry(ffi::Entry &&other) : path(std::move(other.path)) {}
 
 // helper functions
@@ -114,4 +170,17 @@ std::optional<std::string> 
parse_optional_string(ffi::OptionalString &&s) {
   } else {
     return std::nullopt;
   }
+}
+
+ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir) {
+  switch (dir) {
+  case std::ios_base::beg:
+    return ffi::SeekDir::Start;
+  case std::ios_base::cur:
+    return ffi::SeekDir::Current;
+  case std::ios_base::end:
+    return ffi::SeekDir::End;
+  default:
+    throw std::runtime_error("invalid seekdir");
+  }
 }
\ No newline at end of file
diff --git a/bindings/cpp/tests/basic_test.cpp 
b/bindings/cpp/tests/basic_test.cpp
index d86cb4fd7..121c9df38 100644
--- a/bindings/cpp/tests/basic_test.cpp
+++ b/bindings/cpp/tests/basic_test.cpp
@@ -19,9 +19,12 @@
 
 #include "opendal.hpp"
 #include "gtest/gtest.h"
+#include <ctime>
 #include <optional>
+#include <random>
 #include <string>
 #include <unordered_map>
+#include <vector>
 
 class OpendalTest : public ::testing::Test {
 protected:
@@ -30,10 +33,14 @@ protected:
   std::string scheme;
   std::unordered_map<std::string, std::string> config;
 
+  // random number generator
+  std::mt19937 rng;
+
   void SetUp() override {
     scheme = "memory";
-    op = opendal::Operator(scheme, config);
+    rng.seed(time(nullptr));
 
+    op = opendal::Operator(scheme, config);
     EXPECT_TRUE(op.available());
   }
 };
@@ -86,6 +93,41 @@ TEST_F(OpendalTest, BasicTest) {
   EXPECT_FALSE(op.is_exist(file_path_renamed));
 }
 
+TEST_F(OpendalTest, ReaderTest) {
+  std::string file_path = "test";
+  constexpr int size = 2000;
+  std::vector<uint8_t> data(size);
+
+  for (auto &d : data) {
+    d = rng() % 256;
+  }
+
+  // write
+  op.write(file_path, data);
+
+  // read
+  auto reader = op.reader(file_path);
+
+  auto read = [&](std::size_t to_read, std::streampos expected_tellg) {
+    std::vector<char> v(to_read);
+    reader.read(v.data(), v.size());
+    EXPECT_TRUE(!!reader);
+    EXPECT_EQ(reader.tellg(), expected_tellg);
+  };
+
+  EXPECT_EQ(reader.tellg(), 0);
+  read(10, 10);
+  read(15, 25);
+  read(15, 40);
+  reader.get();
+  EXPECT_EQ(reader.tellg(), 41);
+  read(1000, 1041);
+
+  reader.seekg(0, std::ios::beg);
+  std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{reader}, {});
+  EXPECT_EQ(reader_data, data);
+}
+
 int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();

Reply via email to