This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 89627a68a feat(bindings/cpp): expose reader  (#3004)
89627a68a is described below

commit 89627a68a9af8044de8af674d11b230baede06f6
Author: Mingzhuo Yin <[email protected]>
AuthorDate: Tue Sep 5 17:37:05 2023 +0800

    feat(bindings/cpp): expose reader  (#3004)
    
    * feat(bindings/cpp): expose reader
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * fix missing consume in seek function
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * typo
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * typo
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * change impl with boost iostreams
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * rename SeekDir to SeekFrom in rust
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * refactor rust types into different files
    
    Signed-off-by: silver-ymz <[email protected]>
    
    ---------
    
    Signed-off-by: silver-ymz <[email protected]>
---
 bindings/cpp/CMakeLists.txt       |  2 +-
 bindings/cpp/include/opendal.hpp  | 51 +++++++++++++++++++++++++
 bindings/cpp/src/lib.rs           | 77 ++++++++++----------------------------
 bindings/cpp/src/opendal.cpp      | 37 +++++++++++++++++++
 bindings/cpp/src/reader.rs        | 40 ++++++++++++++++++++
 bindings/cpp/src/types.rs         | 78 +++++++++++++++++++++++++++++++++++++++
 bindings/cpp/tests/basic_test.cpp | 55 ++++++++++++++++++++++++++-
 7 files changed, 280 insertions(+), 60 deletions(-)

diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 06e89f993..af99fd2e2 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -63,7 +63,7 @@ add_custom_command(
         COMMENT "Running cargo..."
 )
 
-find_package(Boost REQUIRED COMPONENTS date_time)
+find_package(Boost REQUIRED COMPONENTS date_time iostreams)
 
 add_library(opendal_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
 target_include_directories(opendal_cpp PUBLIC ${CPP_INCLUDE_DIR} 
Boost::date_time)
diff --git a/bindings/cpp/include/opendal.hpp b/bindings/cpp/include/opendal.hpp
index 0a13030c5..80facdc5b 100644
--- a/bindings/cpp/include/opendal.hpp
+++ b/bindings/cpp/include/opendal.hpp
@@ -21,6 +21,8 @@
 #include "lib.rs.h"
 
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/iostreams/concepts.hpp>
+#include <boost/iostreams/stream.hpp>
 #include <memory>
 #include <optional>
 #include <string>
@@ -66,6 +68,8 @@ struct Entry {
   Entry(ffi::Entry &&);
 };
 
+class Reader;
+
 /**
  * @class Operator
  * @brief Operator is the entry for all public APIs.
@@ -117,6 +121,14 @@ public:
    */
   void write(std::string_view path, const std::vector<uint8_t> &data);
 
+  /**
+   * @brief Read data from the operator
+   *
+   * @param path The path of the data
+   * @return The reader of the data
+   */
+  Reader reader(std::string_view path);
+
   /**
    * @brief Check if the path exists
    *
@@ -176,4 +188,43 @@ private:
   std::optional<rust::Box<opendal::ffi::Operator>> operator_;
 };
 
+/**
+ * @class Reader
+ * @brief Reader is designed to read data from the operator.
+ * @details It provides basic read and seek operations. If you want to use it
+ * like a stream, you can use `ReaderStream` instead.
+ * @code{.cpp}
+ * auto reader = operator.reader("path");
+ * opendal::ReaderStream stream(reader);
+ * @endcode
+ */
+class Reader
+    : public boost::iostreams::device<boost::iostreams::input_seekable> {
+public:
+  Reader(rust::Box<opendal::ffi::Reader> &&reader)
+      : raw_reader_(std::move(reader)) {}
+
+  std::streamsize read(void *s, std::streamsize n);
+  std::streampos seek(std::streamoff off, std::ios_base::seekdir way);
+
+private:
+  rust::Box<opendal::ffi::Reader> raw_reader_;
+};
+
+// Boost IOStreams requires it to be copyable. So we need to use
+// `reference_wrapper` in ReaderStream. More details can be seen at
+// https://lists.boost.org/Archives/boost/2005/10/95939.php
+
+/**
+ * @class ReaderStream
+ * @brief ReaderStream is a stream wrapper of Reader which can provide
+ * `iostream` interface.
+ */
+class ReaderStream
+    : public boost::iostreams::stream<boost::reference_wrapper<Reader>> {
+public:
+  ReaderStream(Reader &reader)
+      : boost::iostreams::stream<boost::reference_wrapper<Reader>>(
+            boost::ref(reader)) {}
+};
 } // namespace opendal
\ No newline at end of file
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 66498d9e7..8f4bc850b 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod reader;
+mod types;
+
 use anyhow::Result;
 use opendal as od;
+use reader::Reader;
 use std::str::FromStr;
 
 #[cxx::bridge(namespace = "opendal::ffi")]
@@ -26,6 +30,13 @@ mod ffi {
         value: String,
     }
 
+    #[cxx_name = "SeekDir"]
+    enum SeekFrom {
+        Start = 0,
+        Current = 1,
+        End = 2,
+    }
+
     enum EntryMode {
         File = 1,
         Dir = 2,
@@ -54,6 +65,7 @@ mod ffi {
 
     extern "Rust" {
         type Operator;
+        type Reader;
 
         fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> 
Result<Box<Operator>>;
         fn read(self: &Operator, path: &str) -> Result<Vec<u8>>;
@@ -65,10 +77,14 @@ mod ffi {
         fn remove(self: &Operator, path: &str) -> Result<()>;
         fn stat(self: &Operator, path: &str) -> Result<Metadata>;
         fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>;
+        fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>;
+
+        fn read(self: &mut Reader, buf: &mut [u8]) -> Result<usize>;
+        fn seek(self: &mut Reader, offset: u64, dir: SeekFrom) -> Result<u64>;
     }
 }
 
-struct Operator(od::BlockingOperator);
+pub struct Operator(od::BlockingOperator);
 
 fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> 
Result<Box<Operator>> {
     let scheme = od::Scheme::from_str(scheme)?;
@@ -124,63 +140,8 @@ impl Operator {
     fn list(&self, path: &str) -> Result<Vec<ffi::Entry>> {
         Ok(self.0.list(path)?.into_iter().map(Into::into).collect())
     }
-}
-
-impl From<od::Metadata> for ffi::Metadata {
-    fn from(meta: od::Metadata) -> Self {
-        let mode = meta.mode().into();
-        let content_length = meta.content_length();
-        let cache_control = meta.cache_control().map(ToOwned::to_owned).into();
-        let content_disposition = 
meta.content_disposition().map(ToOwned::to_owned).into();
-        let content_md5 = meta.content_md5().map(ToOwned::to_owned).into();
-        let content_type = meta.content_type().map(ToOwned::to_owned).into();
-        let etag = meta.etag().map(ToOwned::to_owned).into();
-        let last_modified = meta
-            .last_modified()
-            .map(|time| time.to_rfc3339_opts(chrono::SecondsFormat::Nanos, 
false))
-            .into();
-
-        Self {
-            mode,
-            content_length,
-            cache_control,
-            content_disposition,
-            content_md5,
-            content_type,
-            etag,
-            last_modified,
-        }
-    }
-}
-
-impl From<od::Entry> for ffi::Entry {
-    fn from(entry: od::Entry) -> Self {
-        let (path, _) = entry.into_parts();
-        Self { path }
-    }
-}
-
-impl From<od::EntryMode> for ffi::EntryMode {
-    fn from(mode: od::EntryMode) -> Self {
-        match mode {
-            od::EntryMode::FILE => Self::File,
-            od::EntryMode::DIR => Self::Dir,
-            _ => Self::Unknown,
-        }
-    }
-}
 
-impl From<Option<String>> for ffi::OptionalString {
-    fn from(s: Option<String>) -> Self {
-        match s {
-            Some(s) => Self {
-                has_value: true,
-                value: s,
-            },
-            None => Self {
-                has_value: false,
-                value: String::default(),
-            },
-        }
+    fn reader(&self, path: &str) -> Result<Box<Reader>> {
+        Ok(Box::new(Reader(self.0.reader(path)?)))
     }
 }
diff --git a/bindings/cpp/src/opendal.cpp b/bindings/cpp/src/opendal.cpp
index c0b4a3d90..e8ffa75a8 100644
--- a/bindings/cpp/src/opendal.cpp
+++ b/bindings/cpp/src/opendal.cpp
@@ -24,6 +24,8 @@ using namespace opendal;
 #define RUST_STR(s) rust::Str(s.data(), s.size())
 #define RUST_STRING(s) rust::String(s.data(), s.size())
 
+// Operator
+
 Operator::Operator(std::string_view scheme,
                    const std::unordered_map<std::string, std::string> &config) 
{
   auto rust_map = rust::Vec<ffi::HashMapValue>();
@@ -85,6 +87,26 @@ std::vector<Entry> Operator::list(std::string_view path) {
   return entries;
 }
 
+Reader Operator::reader(std::string_view path) {
+  return operator_.value()->reader(RUST_STR(path));
+}
+
+// Reader
+
+std::streamsize Reader::read(void *s, std::streamsize n) {
+  auto rust_slice = rust::Slice<uint8_t>(reinterpret_cast<uint8_t *>(s), n);
+  auto read_size = raw_reader_->read(rust_slice);
+  return read_size;
+}
+
+ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);
+
+std::streampos Reader::seek(std::streamoff off, std::ios_base::seekdir dir) {
+  return raw_reader_->seek(off, to_rust_seek_dir(dir));
+}
+
+// Metadata
+
 std::optional<std::string> parse_optional_string(ffi::OptionalString &&s);
 
 Metadata::Metadata(ffi::Metadata &&other) {
@@ -104,6 +126,8 @@ Metadata::Metadata(ffi::Metadata &&other) {
   }
 }
 
+// Entry
+
 Entry::Entry(ffi::Entry &&other) : path(std::move(other.path)) {}
 
 // helper functions
@@ -114,4 +138,17 @@ std::optional<std::string> 
parse_optional_string(ffi::OptionalString &&s) {
   } else {
     return std::nullopt;
   }
+}
+
+ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir) {
+  switch (dir) {
+  case std::ios_base::beg:
+    return ffi::SeekDir::Start;
+  case std::ios_base::cur:
+    return ffi::SeekDir::Current;
+  case std::ios_base::end:
+    return ffi::SeekDir::End;
+  default:
+    throw std::runtime_error("invalid seekdir");
+  }
 }
\ No newline at end of file
diff --git a/bindings/cpp/src/reader.rs b/bindings/cpp/src/reader.rs
new file mode 100644
index 000000000..9ad55d714
--- /dev/null
+++ b/bindings/cpp/src/reader.rs
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::ffi;
+use anyhow::Result;
+use od::raw::oio::BlockingRead;
+use opendal as od;
+
+pub struct Reader(pub od::BlockingReader);
+
+impl Reader {
+    pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        Ok(self.0.read(buf)?)
+    }
+
+    pub fn seek(&mut self, offset: u64, dir: ffi::SeekFrom) -> Result<u64> {
+        let pos = match dir {
+            ffi::SeekFrom::Start => std::io::SeekFrom::Start(offset),
+            ffi::SeekFrom::Current => std::io::SeekFrom::Current(offset as 
i64),
+            ffi::SeekFrom::End => std::io::SeekFrom::End(offset as i64),
+            _ => return Err(anyhow::anyhow!("invalid seek dir")),
+        };
+
+        Ok(self.0.seek(pos)?)
+    }
+}
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
new file mode 100644
index 000000000..ccd456716
--- /dev/null
+++ b/bindings/cpp/src/types.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::ffi;
+use opendal as od;
+
+impl From<od::Metadata> for ffi::Metadata {
+    fn from(meta: od::Metadata) -> Self {
+        let mode = meta.mode().into();
+        let content_length = meta.content_length();
+        let cache_control = meta.cache_control().map(ToOwned::to_owned).into();
+        let content_disposition = 
meta.content_disposition().map(ToOwned::to_owned).into();
+        let content_md5 = meta.content_md5().map(ToOwned::to_owned).into();
+        let content_type = meta.content_type().map(ToOwned::to_owned).into();
+        let etag = meta.etag().map(ToOwned::to_owned).into();
+        let last_modified = meta
+            .last_modified()
+            .map(|time| time.to_rfc3339_opts(chrono::SecondsFormat::Nanos, 
false))
+            .into();
+
+        Self {
+            mode,
+            content_length,
+            cache_control,
+            content_disposition,
+            content_md5,
+            content_type,
+            etag,
+            last_modified,
+        }
+    }
+}
+
+impl From<od::Entry> for ffi::Entry {
+    fn from(entry: od::Entry) -> Self {
+        let (path, _) = entry.into_parts();
+        Self { path }
+    }
+}
+
+impl From<od::EntryMode> for ffi::EntryMode {
+    fn from(mode: od::EntryMode) -> Self {
+        match mode {
+            od::EntryMode::FILE => Self::File,
+            od::EntryMode::DIR => Self::Dir,
+            _ => Self::Unknown,
+        }
+    }
+}
+
+impl From<Option<String>> for ffi::OptionalString {
+    fn from(s: Option<String>) -> Self {
+        match s {
+            Some(s) => Self {
+                has_value: true,
+                value: s,
+            },
+            None => Self {
+                has_value: false,
+                value: String::default(),
+            },
+        }
+    }
+}
diff --git a/bindings/cpp/tests/basic_test.cpp 
b/bindings/cpp/tests/basic_test.cpp
index d86cb4fd7..c6c47c001 100644
--- a/bindings/cpp/tests/basic_test.cpp
+++ b/bindings/cpp/tests/basic_test.cpp
@@ -19,9 +19,12 @@
 
 #include "opendal.hpp"
 #include "gtest/gtest.h"
+#include <ctime>
 #include <optional>
+#include <random>
 #include <string>
 #include <unordered_map>
+#include <vector>
 
 class OpendalTest : public ::testing::Test {
 protected:
@@ -30,10 +33,14 @@ protected:
   std::string scheme;
   std::unordered_map<std::string, std::string> config;
 
+  // random number generator
+  std::mt19937 rng;
+
   void SetUp() override {
     scheme = "memory";
-    op = opendal::Operator(scheme, config);
+    rng.seed(time(nullptr));
 
+    op = opendal::Operator(scheme, config);
     EXPECT_TRUE(op.available());
   }
 };
@@ -86,6 +93,52 @@ TEST_F(OpendalTest, BasicTest) {
   EXPECT_FALSE(op.is_exist(file_path_renamed));
 }
 
+TEST_F(OpendalTest, ReaderTest) {
+  std::string file_path = "test";
+  constexpr int size = 2000;
+  std::vector<uint8_t> data(size);
+
+  for (auto &d : data) {
+    d = rng() % 256;
+  }
+
+  // write
+  op.write(file_path, data);
+
+  // reader
+  auto reader = op.reader(file_path);
+  uint8_t part_data[100];
+  reader.seek(200, std::ios::cur);
+  reader.read(part_data, 100);
+  EXPECT_EQ(reader.seek(0, std::ios::cur), 300);
+  for (int i = 0; i < 100; ++i) {
+    EXPECT_EQ(part_data[i], data[200 + i]);
+  }
+  reader.seek(0, std::ios::beg);
+
+  // stream
+  opendal::ReaderStream stream(reader);
+
+  auto read_fn = [&](std::size_t to_read, std::streampos expected_tellg) {
+    std::vector<char> v(to_read);
+    stream.read(v.data(), v.size());
+    EXPECT_TRUE(!!stream);
+    EXPECT_EQ(stream.tellg(), expected_tellg);
+  };
+
+  EXPECT_EQ(stream.tellg(), 0);
+  read_fn(10, 10);
+  read_fn(15, 25);
+  read_fn(15, 40);
+  stream.get();
+  EXPECT_EQ(stream.tellg(), 41);
+  read_fn(1000, 1041);
+
+  stream.seekg(0, std::ios::beg);
+  std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{stream}, {});
+  EXPECT_EQ(reader_data, data);
+}
+
 int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();

Reply via email to