This is an automated email from the ASF dual-hosted git repository.

silver pushed a commit to branch cpp-more-operation
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/cpp-more-operation by this 
push:
     new ab31901ae change impl with boost iostreams
ab31901ae is described below

commit ab31901ae70750b7db734d93731542f9c8342702
Author: silver-ymz <[email protected]>
AuthorDate: Mon Sep 4 22:57:35 2023 +0800

    change impl with boost iostreams
    
    Signed-off-by: silver-ymz <[email protected]>
---
 bindings/cpp/CMakeLists.txt       |  2 +-
 bindings/cpp/include/opendal.hpp  | 54 ++++++++++++++++++++--------------
 bindings/cpp/src/lib.rs           | 23 +++++----------
 bindings/cpp/src/opendal.cpp      | 62 ++++++---------------------------------
 bindings/cpp/tests/basic_test.cpp | 39 +++++++++++++++---------
 5 files changed, 74 insertions(+), 106 deletions(-)

diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 06e89f993..af99fd2e2 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -63,7 +63,7 @@ add_custom_command(
         COMMENT "Running cargo..."
 )
 
-find_package(Boost REQUIRED COMPONENTS date_time)
+find_package(Boost REQUIRED COMPONENTS date_time iostreams)
 
 add_library(opendal_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
 target_include_directories(opendal_cpp PUBLIC ${CPP_INCLUDE_DIR} 
Boost::date_time)
diff --git a/bindings/cpp/include/opendal.hpp b/bindings/cpp/include/opendal.hpp
index 177a8520c..20f8d08ab 100644
--- a/bindings/cpp/include/opendal.hpp
+++ b/bindings/cpp/include/opendal.hpp
@@ -21,6 +21,8 @@
 #include "lib.rs.h"
 
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/iostreams/concepts.hpp>
+#include <boost/iostreams/stream.hpp>
 #include <memory>
 #include <optional>
 #include <string>
@@ -66,7 +68,7 @@ struct Entry {
   Entry(ffi::Entry &&);
 };
 
-class ReaderStream;
+class Reader;
 
 /**
  * @class Operator
@@ -125,7 +127,7 @@ public:
    * @param path The path of the data
    * @return The reader of the data
    */
-  ReaderStream reader(std::string_view path);
+  Reader reader(std::string_view path);
 
   /**
    * @brief Check if the path exists
@@ -186,37 +188,45 @@ private:
   std::optional<rust::Box<opendal::ffi::Operator>> operator_;
 };
 
-using Reader = rust::Box<opendal::ffi::Reader>;
-
 /**
- * @class ReaderStreamBuf
- * @brief The stream buffer for ReaderStream
+ * @class Reader
+ * @brief Reader is designed to read data from the operator.
+ * @details It provides basic read and seek operations. If you want to use it
+ * like a stream, you can use `ReaderStream` instead.
+ * @code{.cpp}
+ * auto reader = operator.reader("path");
+ * opendal::ReaderStream stream(reader);
+ * @endcode
  */
-class ReaderStreamBuf : public std::streambuf {
+class Reader
+    : public boost::iostreams::device<boost::iostreams::input_seekable> {
 public:
-  ReaderStreamBuf(Reader &&reader) : reader_(std::move(reader)) {}
+  // Users should not use this type directly.
+  using InternalReader = rust::Box<opendal::ffi::Reader>;
+
+  Reader(InternalReader &&reader) : reader_(std::move(reader)) {}
 
-protected:
-  int_type underflow() override;
-  pos_type seekoff(off_type off, std::ios_base::seekdir dir,
-                   std::ios_base::openmode which) override;
-  pos_type seekpos(pos_type pos, std::ios_base::openmode which) override;
+  std::streamsize read(void *s, std::streamsize n);
+  std::streampos seek(std::streamoff off, std::ios_base::seekdir way);
 
 private:
-  Reader reader_;
+  InternalReader reader_;
 };
 
+// Boost IOStreams requires it to be copyable. So we need to use
+// `reference_wrapper` in ReaderStream. More details can be seen at
+// https://lists.boost.org/Archives/boost/2005/10/95939.php
+
 /**
  * @class ReaderStream
- * @brief The stream for Reader
+ * @brief ReaderStream is a stream wrapper of Reader which can provide
+ * `iostream` interface.
  */
-class ReaderStream : public std::istream {
+class ReaderStream
+    : public boost::iostreams::stream<boost::reference_wrapper<Reader>> {
 public:
-  ReaderStream(Reader &&reader)
-      : std::istream(&buf_), buf_(std::move(reader)) {}
-
-private:
-  ReaderStreamBuf buf_;
+  ReaderStream(Reader &reader)
+      : boost::iostreams::stream<boost::reference_wrapper<Reader>>(
+            boost::ref(reader)) {}
 };
-
 } // namespace opendal
\ No newline at end of file
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 20f18f3ef..c3c0d4ba9 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use anyhow::Result;
+use od::raw::oio::BlockingRead;
 use opendal as od;
-use std::io::{BufRead, BufReader, Seek};
 use std::str::FromStr;
 
 #[cxx::bridge(namespace = "opendal::ffi")]
@@ -75,15 +75,14 @@ mod ffi {
         fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>;
         fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>;
 
-        fn fill_buf(self: &mut Reader) -> Result<&[u8]>;
-        fn buffer(self: &Reader) -> &[u8];
-        fn consume(self: &mut Reader, amt: usize);
+        #[cxx_name = "read"]
+        fn reader_read(self: &mut Reader, buf: &mut [u8]) -> Result<usize>;
         fn seek(self: &mut Reader, offset: u64, dir: SeekDir) -> Result<u64>;
     }
 }
 
 struct Operator(od::BlockingOperator);
-struct Reader(BufReader<od::BlockingReader>);
+struct Reader(od::BlockingReader);
 
 fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> 
Result<Box<Operator>> {
     let scheme = od::Scheme::from_str(scheme)?;
@@ -141,21 +140,13 @@ impl Operator {
     }
 
     fn reader(&self, path: &str) -> Result<Box<Reader>> {
-        Ok(Box::new(Reader(BufReader::new(self.0.reader(path)?))))
+        Ok(Box::new(Reader(self.0.reader(path)?)))
     }
 }
 
 impl Reader {
-    fn fill_buf(&mut self) -> Result<&[u8]> {
-        Ok(self.0.fill_buf()?)
-    }
-
-    fn buffer(&self) -> &[u8] {
-        self.0.buffer()
-    }
-
-    fn consume(&mut self, amt: usize) {
-        self.0.consume(amt)
+    fn reader_read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        Ok(self.0.read(buf)?)
     }
 
     fn seek(&mut self, offset: u64, dir: ffi::SeekDir) -> Result<u64> {
diff --git a/bindings/cpp/src/opendal.cpp b/bindings/cpp/src/opendal.cpp
index 38fc42ae4..7c23ebb41 100644
--- a/bindings/cpp/src/opendal.cpp
+++ b/bindings/cpp/src/opendal.cpp
@@ -87,66 +87,22 @@ std::vector<Entry> Operator::list(std::string_view path) {
   return entries;
 }
 
-ReaderStream Operator::reader(std::string_view path) {
-  return {operator_.value()->reader(RUST_STR(path))};
+Reader Operator::reader(std::string_view path) {
+  return operator_.value()->reader(RUST_STR(path));
 }
 
 // Reader
 
-// Development Note:
-// Because rust side can't get current pointer info of c++, so we delay the
-// `consume` operation to the next `fill_buf`. Please pay attention to call
-// `consume` and update c++ pointers before each `seek` and `fill_buf`
-// operation.
-
-ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);
-
-ReaderStreamBuf::pos_type
-ReaderStreamBuf::seekoff(ReaderStreamBuf::off_type off,
-                         std::ios_base::seekdir dir,
-                         std::ios_base::openmode which) {
-  if (!(which & std::ios_base::in)) {
-    return -1;
-  }
-
-  if (gptr() != nullptr) {
-    reader_->consume(gptr() - eback());
-    setg(gptr(), gptr(), egptr());
-  }
-
-  if (dir == std::ios_base::cur) {
-    off += gptr() - eback();
-  }
-
-  auto res = reader_->seek(off, to_rust_seek_dir(dir));
-
-  auto buffer = reader_->buffer();
-  auto gbeg = (char *)(buffer.data());
-  auto gcurr = gbeg;
-  auto gend = gbeg + buffer.size();
-  setg(gbeg, gcurr, gend);
-
-  return res;
+std::streamsize Reader::read(void *s, std::streamsize n) {
+  auto rust_slice = rust::Slice<uint8_t>(reinterpret_cast<uint8_t *>(s), n);
+  auto read_size = reader_->read(rust_slice);
+  return read_size;
 }
 
-ReaderStreamBuf::pos_type
-ReaderStreamBuf::seekpos(ReaderStreamBuf::pos_type pos,
-                         std::ios_base::openmode which) {
-  return seekoff(pos, std::ios_base::beg, which);
-}
-
-ReaderStreamBuf::int_type ReaderStreamBuf::underflow() {
-  if (gptr() != nullptr) {
-    reader_->consume(gptr() - eback());
-    setg(gptr(), gptr(), egptr());
-  }
-  auto buffer = reader_->fill_buf();
-  auto gbeg = (char *)(buffer.data());
-  auto gcurr = gbeg;
-  auto gend = gbeg + buffer.size();
-  setg(gbeg, gcurr, gend);
+ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);
 
-  return gcurr == gend ? traits_type::eof() : traits_type::to_int_type(*gcurr);
+std::streampos Reader::seek(std::streamoff off, std::ios_base::seekdir dir) {
+  return reader_->seek(off, to_rust_seek_dir(dir));
 }
 
 // Metadata
diff --git a/bindings/cpp/tests/basic_test.cpp 
b/bindings/cpp/tests/basic_test.cpp
index 121c9df38..c6c47c001 100644
--- a/bindings/cpp/tests/basic_test.cpp
+++ b/bindings/cpp/tests/basic_test.cpp
@@ -105,26 +105,37 @@ TEST_F(OpendalTest, ReaderTest) {
   // write
   op.write(file_path, data);
 
-  // read
+  // reader
   auto reader = op.reader(file_path);
+  uint8_t part_data[100];
+  reader.seek(200, std::ios::cur);
+  reader.read(part_data, 100);
+  EXPECT_EQ(reader.seek(0, std::ios::cur), 300);
+  for (int i = 0; i < 100; ++i) {
+    EXPECT_EQ(part_data[i], data[200 + i]);
+  }
+  reader.seek(0, std::ios::beg);
+
+  // stream
+  opendal::ReaderStream stream(reader);
 
-  auto read = [&](std::size_t to_read, std::streampos expected_tellg) {
+  auto read_fn = [&](std::size_t to_read, std::streampos expected_tellg) {
     std::vector<char> v(to_read);
-    reader.read(v.data(), v.size());
-    EXPECT_TRUE(!!reader);
-    EXPECT_EQ(reader.tellg(), expected_tellg);
+    stream.read(v.data(), v.size());
+    EXPECT_TRUE(!!stream);
+    EXPECT_EQ(stream.tellg(), expected_tellg);
   };
 
-  EXPECT_EQ(reader.tellg(), 0);
-  read(10, 10);
-  read(15, 25);
-  read(15, 40);
-  reader.get();
-  EXPECT_EQ(reader.tellg(), 41);
-  read(1000, 1041);
+  EXPECT_EQ(stream.tellg(), 0);
+  read_fn(10, 10);
+  read_fn(15, 25);
+  read_fn(15, 40);
+  stream.get();
+  EXPECT_EQ(stream.tellg(), 41);
+  read_fn(1000, 1041);
 
-  reader.seekg(0, std::ios::beg);
-  std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{reader}, {});
+  stream.seekg(0, std::ios::beg);
+  std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{stream}, {});
   EXPECT_EQ(reader_data, data);
 }
 

Reply via email to