This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 553834d33b refactor(object_store): Polish implementation details of
object_store (#4749)
553834d33b is described below
commit 553834d33bf3cfb35e6d1e1ec8a1f46c677bee5e
Author: Xuanwo <[email protected]>
AuthorDate: Mon Jun 17 19:11:21 2024 +0800
refactor(object_store): Polish implementation details of object_store
(#4749)
* refactor(object_store): Polish implementation details of object_store
Signed-off-by: Xuanwo <[email protected]>
* Polish readme
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
integrations/object_store/README.md | 94 +++-
integrations/object_store/src/lib.rs | 540 +++------------------
integrations/object_store/src/{lib.rs => store.rs} | 210 ++++----
.../object_store/src/{send_wrapper.rs => utils.rs} | 131 +++--
4 files changed, 329 insertions(+), 646 deletions(-)
diff --git a/integrations/object_store/README.md
b/integrations/object_store/README.md
index 1f6e8653a7..2bf5b6a198 100644
--- a/integrations/object_store/README.md
+++ b/integrations/object_store/README.md
@@ -1,12 +1,27 @@
-# OpenDAL object_store Binding
+# Apache OpenDALâ„¢ object_store integration
-This crate intends to build an
[object_store](https://crates.io/crates/object_store) binding.
+[![Build Status]][actions] [![Latest Version]][crates.io] [![Crate
Downloads]][crates.io] [![chat]][discord]
-The `OpendalStore` uses the `opendal` crate to interact with the underlying
object storage system. The `Operator` is used to create a new `OpendalStore`
instance. The `OpendalStore` uses the `Operator` to perform operations on the
object storage system, such as `put`, `get`, `delete`, and `list`.
+[build status]:
https://img.shields.io/github/actions/workflow/status/apache/opendal/ci_integration_object_store.yml?branch=main
+[actions]: https://github.com/apache/opendal/actions?query=branch%3Amain
+[latest version]: https://img.shields.io/crates/v/object_store_opendal.svg
+[crates.io]: https://crates.io/crates/object_store_opendal
+[crate downloads]: https://img.shields.io/crates/d/object_store_opendal.svg
+[chat]: https://img.shields.io/discord/1081052318650339399
+[discord]: https://opendal.apache.org/discord
+
+`object_store_opendal` is an
[`object_store`](https://crates.io/crates/object_store) implementation using
[`opendal`](https://github.com/apache/opendal).
+
+This crate can help you to access 30 more storage services with the same
object_store API.
+
+
+## Useful Links
+
+- Documentation: [stable](https://docs.rs/object_store_opendal/)
## Examples
-Firstly, you need to add the following dependencies to your `Cargo.toml`:
+Add the following dependencies to your `Cargo.toml` with correct version:
```toml
[dependencies]
@@ -15,23 +30,62 @@ object_store_opendal = "0.44.0"
opendal = { version = "0.47.0", features = ["services-s3"] }
```
-> [!NOTE]
->
-> The current version we support is object store 0.10.
+Build `OpendalStore` via `opendal::Operator`:
+
+```rust
+use std::sync::Arc;
+
+use bytes::Bytes;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use object_store_opendal::OpendalStore;
+use opendal::services::S3;
+use opendal::{Builder, Operator};
+
+#[tokio::main]
+async fn main() {
+ let builder = S3::from_map(
+ vec![
+ ("access_key".to_string(), "my_access_key".to_string()),
+ ("secret_key".to_string(), "my_secret_key".to_string()),
+ ("endpoint".to_string(), "my_endpoint".to_string()),
+ ("region".to_string(), "my_region".to_string()),
+ ]
+ .into_iter()
+ .collect(),
+ );
+
+ // Create a new operator
+ let operator = Operator::new(builder).unwrap().finish();
+
+ // Create a new object store
+ let object_store = Arc::new(OpendalStore::new(operator));
+
+ let path = Path::from("data/nested/test.txt");
+ let bytes = Bytes::from_static(b"hello, world! I am nested.");
+
+ object_store.put(&path, bytes.clone().into()).await.unwrap();
+
+ let content = object_store
+ .get(&path)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+
+ assert_eq!(content, bytes);
+}
+```
+
+## Branding
+
+The first and most prominent mentions must use the full form: **Apache
OpenDALâ„¢** of the name for any individual usage (webpage, handout, slides,
etc.) Depending on the context and writing style, you should use the full form
of the name sufficiently often to ensure that readers clearly understand the
association of both the OpenDAL project and the OpenDAL software product to the
ASF as the parent organization.
-Then you can use the `OpendalStore` as in the [example](examples/basic.rs).
+For more details, see the [Apache Product Name Usage
Guide](https://www.apache.org/foundation/marks/guide).
-## API
+## License and Trademarks
-The `OpendalStore` implements the `ObjectStore` trait, which provides the
following methods:
+Licensed under the Apache License, Version 2.0:
http://www.apache.org/licenses/LICENSE-2.0
-| Method | Description |
link
|
-|-----------------------|---------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|
-| `put` | Put an object into the store. |
[put](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.put)
|
-| `get` | Get an object from the store. |
[get](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.get)
|
-| `get_range` | Get a range of bytes from an object in the store. |
[get_range](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.get_range)
|
-| `head` | Get the metadata of an object in the store. |
[head](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.head)
|
-| `delete` | Delete an object from the store. |
[delete](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.delete)
|
-| `list` | List objects in the store. |
[list](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.list)
|
-| `list_with_offset` | List objects in the store with an offset. |
[list_with_offset](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.list_with_offset)
|
-| `list_with_delimiter` | List objects in the store with a delimiter. |
[list_with_delimiter](https://docs.rs/object_store/0.9.0/object_store/trait.ObjectStore.html#tymethod.list_with_delimiter)
|
+Apache OpenDAL, OpenDAL, and Apache are either registered trademarks or
trademarks of the Apache Software Foundation.
diff --git a/integrations/object_store/src/lib.rs
b/integrations/object_store/src/lib.rs
index 6c18102e9e..9b2d691655 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/lib.rs
@@ -15,373 +15,60 @@
// specific language governing permissions and limitations
// under the License.
-mod send_wrapper;
-
-use std::future::IntoFuture;
-use std::ops::Range;
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use futures::stream::BoxStream;
-use futures::FutureExt;
-use futures::StreamExt;
-use futures::TryStreamExt;
-use object_store::path::Path;
-use object_store::GetOptions;
-use object_store::GetResult;
-use object_store::GetResultPayload;
-use object_store::ListResult;
-use object_store::MultipartUpload;
-use object_store::ObjectMeta;
-use object_store::ObjectStore;
-use object_store::PutMultipartOpts;
-use object_store::PutOptions;
-use object_store::PutPayload;
-use object_store::PutResult;
-use object_store::Result;
-use opendal::Entry;
-use opendal::Metadata;
-use opendal::Metakey;
-use opendal::Operator;
-use send_wrapper::IntoSendFuture;
-use send_wrapper::IntoSendStream;
-
-#[derive(Debug)]
-pub struct OpendalStore {
- inner: Operator,
-}
-
-impl OpendalStore {
- /// Create OpendalStore by given Operator.
- pub fn new(op: Operator) -> Self {
- Self { inner: op }
- }
-}
-
-impl std::fmt::Display for OpendalStore {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "OpenDAL({:?})", self.inner)
- }
-}
-
-#[async_trait]
-impl ObjectStore for OpendalStore {
- async fn put(&self, location: &Path, bytes: PutPayload) ->
Result<PutResult> {
- let bytes: Bytes = bytes.into();
- self.inner
- .write(location.as_ref(), bytes)
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
- Ok(PutResult {
- e_tag: None,
- version: None,
- })
- }
-
- async fn put_opts(
- &self,
- _location: &Path,
- _bytes: PutPayload,
- _opts: PutOptions,
- ) -> Result<PutResult> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "put_opts is not implemented so far",
- )),
- })
- }
-
- async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "put_multipart is not implemented so far",
- )),
- })
- }
-
- async fn put_multipart_opts(
- &self,
- _location: &Path,
- _opts: PutMultipartOpts,
- ) -> Result<Box<dyn MultipartUpload>> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "put_multipart_opts is not implemented so far",
- )),
- })
- }
-
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let meta = self
- .inner
- .stat(location.as_ref())
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
- let meta = ObjectMeta {
- location: location.clone(),
- last_modified: meta.last_modified().unwrap_or_default(),
- size: meta.content_length() as usize,
- e_tag: meta.etag().map(|x| x.to_string()),
- version: meta.version().map(|x| x.to_string()),
- };
- let r = self
- .inner
- .reader(location.as_ref())
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
- let stream = r
- .into_bytes_stream(0..meta.size as u64)
- .await
- .map_err(|err| object_store::Error::Generic {
- store: "IoError",
- source: Box::new(err),
- })?
- .into_send()
- .map_err(|err| object_store::Error::Generic {
- store: "IoError",
- source: Box::new(err),
- });
-
- Ok(GetResult {
- payload: GetResultPayload::Stream(Box::pin(stream)),
- range: 0..meta.size,
- meta,
- attributes: Default::default(),
- })
- }
-
- async fn get_opts(&self, _location: &Path, _options: GetOptions) ->
Result<GetResult> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "get_opts is not implemented so far",
- )),
- })
- }
-
- async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
- let bs = self
- .inner
- .read_with(location.as_ref())
- .range(range.start as u64..range.end as u64)
- .into_future()
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
- Ok(bs.to_bytes())
- }
-
- async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let meta = self
- .inner
- .stat(location.as_ref())
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
- Ok(ObjectMeta {
- location: location.clone(),
- last_modified: meta.last_modified().unwrap_or_default(),
- size: meta.content_length() as usize,
- e_tag: meta.etag().map(|x| x.to_string()),
- version: meta.version().map(|x| x.to_string()),
- })
- }
-
- async fn delete(&self, location: &Path) -> Result<()> {
- self.inner
- .delete(location.as_ref())
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, location.as_ref()))?;
-
- Ok(())
- }
-
- fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
- // object_store `Path` always removes trailing slash
- // need to add it back
- let path = prefix.map_or("".into(), |x| format!("{}/", x));
-
- let fut = async move {
- let stream = self
- .inner
- .lister_with(&path)
- .metakey(Metakey::ContentLength | Metakey::LastModified)
- .recursive(true)
- .await
- .map_err(|err| format_object_store_error(err, &path))?;
-
- let stream = stream.then(|res| async {
- let entry = res.map_err(|err| format_object_store_error(err,
""))?;
- let meta = entry.metadata();
-
- Ok(format_object_meta(entry.path(), meta))
- });
- Ok::<_, object_store::Error>(stream)
- };
-
- fut.into_stream().try_flatten().into_send().boxed()
- }
-
- fn list_with_offset(
- &self,
- prefix: Option<&Path>,
- offset: &Path,
- ) -> BoxStream<'_, Result<ObjectMeta>> {
- let path = prefix.map_or("".into(), |x| format!("{}/", x));
- let offset = offset.clone();
-
- let fut = async move {
- let fut = if
self.inner.info().full_capability().list_with_start_after {
- self.inner
- .lister_with(&path)
- .start_after(offset.as_ref())
- .metakey(Metakey::ContentLength | Metakey::LastModified)
- .recursive(true)
- .into_future()
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, &path))?
- .then(try_format_object_meta)
- .into_send()
- .boxed()
- } else {
- self.inner
- .lister_with(&path)
- .metakey(Metakey::ContentLength | Metakey::LastModified)
- .recursive(true)
- .into_future()
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, &path))?
- .try_filter(move |entry|
futures::future::ready(entry.path() > offset.as_ref()))
- .then(try_format_object_meta)
- .into_send()
- .boxed()
- };
- Ok::<_, object_store::Error>(fut)
- };
-
- fut.into_stream().into_send().try_flatten().boxed()
- }
-
- async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
- let path = prefix.map_or("".into(), |x| format!("{}/", x));
- let mut stream = self
- .inner
- .lister_with(&path)
- .metakey(Metakey::Mode | Metakey::ContentLength |
Metakey::LastModified)
- .into_future()
- .into_send()
- .await
- .map_err(|err| format_object_store_error(err, &path))?
- .into_send();
-
- let mut common_prefixes = Vec::new();
- let mut objects = Vec::new();
-
- while let Some(res) = stream.next().into_send().await {
- let entry = res.map_err(|err| format_object_store_error(err, ""))?;
- let meta = entry.metadata();
-
- if meta.is_dir() {
- common_prefixes.push(entry.path().into());
- } else {
- objects.push(format_object_meta(entry.path(), meta));
- }
- }
-
- Ok(ListResult {
- common_prefixes,
- objects,
- })
- }
-
- async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "copy is not implemented so far",
- )),
- })
- }
-
- async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "rename is not implemented so far",
- )),
- })
- }
-
- async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()>
{
- Err(object_store::Error::NotSupported {
- source: Box::new(opendal::Error::new(
- opendal::ErrorKind::Unsupported,
- "copy_if_not_exists is not implemented so far",
- )),
- })
- }
-}
-
-fn format_object_store_error(err: opendal::Error, path: &str) ->
object_store::Error {
- use opendal::ErrorKind;
- match err.kind() {
- ErrorKind::NotFound => object_store::Error::NotFound {
- path: path.to_string(),
- source: Box::new(err),
- },
- ErrorKind::Unsupported => object_store::Error::NotSupported {
- source: Box::new(err),
- },
- ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
- path: path.to_string(),
- source: Box::new(err),
- },
- kind => object_store::Error::Generic {
- store: kind.into_static(),
- source: Box::new(err),
- },
- }
-}
-
-fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
- let version = match meta.metakey().contains(Metakey::Version) {
- true => meta.version().map(|x| x.to_string()),
- false => None,
- };
-
- let e_tag = match meta.metakey().contains(Metakey::Etag) {
- true => meta.etag().map(|x| x.to_string()),
- false => None,
- };
-
- ObjectMeta {
- location: path.into(),
- last_modified: meta.last_modified().unwrap_or_default(),
- size: meta.content_length() as usize,
- e_tag,
- version,
- }
-}
-
-async fn try_format_object_meta(res: Result<Entry, opendal::Error>) ->
Result<ObjectMeta> {
- let entry = res.map_err(|err| format_object_store_error(err, ""))?;
- let meta = entry.metadata();
-
- Ok(format_object_meta(entry.path(), meta))
-}
+//! object_store_opendal is an object store implementation using opendal.
+//!
+//! This crate can help you to access 30 more storage services with the same
object_store API.
+//!
+//! ```no_run
+//! use std::sync::Arc;
+//!
+//! use bytes::Bytes;
+//! use object_store::path::Path;
+//! use object_store::ObjectStore;
+//! use object_store_opendal::OpendalStore;
+//! use opendal::services::S3;
+//! use opendal::{Builder, Operator};
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let builder = S3::from_map(
+//! vec![
+//! ("access_key".to_string(), "my_access_key".to_string()),
+//! ("secret_key".to_string(), "my_secret_key".to_string()),
+//! ("endpoint".to_string(), "my_endpoint".to_string()),
+//! ("region".to_string(), "my_region".to_string()),
+//! ]
+//! .into_iter()
+//! .collect(),
+//! );
+//!
+//! // Create a new operator
+//! let operator = Operator::new(builder).unwrap().finish();
+//!
+//! // Create a new object store
+//! let object_store = Arc::new(OpendalStore::new(operator));
+//!
+//! let path = Path::from("data/nested/test.txt");
+//! let bytes = Bytes::from_static(b"hello, world! I am nested.");
+//!
+//! object_store.put(&path, bytes.clone().into()).await.unwrap();
+//!
+//! let content = object_store
+//! .get(&path)
+//! .await
+//! .unwrap()
+//! .bytes()
+//! .await
+//! .unwrap();
+//!
+//! assert_eq!(content, bytes);
+//! }
+//! ```
+
+mod store;
+pub use store::OpendalStore;
+
+mod utils;
// Make sure `send_wrapper` works as expected
#[cfg(all(feature = "send_wrapper", target_arch = "wasm32"))]
@@ -407,122 +94,3 @@ mod assert_send {
assert_send(store.list_with_delimiter(None));
}
}
-
-#[cfg(test)]
-mod tests {
- use std::sync::Arc;
-
- use object_store::path::Path;
- use object_store::ObjectStore;
- use opendal::services;
-
- use super::*;
-
- async fn create_test_object_store() -> Arc<dyn ObjectStore> {
- let op = Operator::new(services::Memory::default()).unwrap().finish();
- let object_store = Arc::new(OpendalStore::new(op));
-
- let path: Path = "data/test.txt".into();
- let bytes = Bytes::from_static(b"hello, world!");
- object_store.put(&path, bytes.into()).await.unwrap();
-
- let path: Path = "data/nested/test.txt".into();
- let bytes = Bytes::from_static(b"hello, world! I am nested.");
- object_store.put(&path, bytes.into()).await.unwrap();
-
- object_store
- }
-
- #[tokio::test]
- async fn test_basic() {
- let op = Operator::new(services::Memory::default()).unwrap().finish();
- let object_store: Arc<dyn ObjectStore> =
Arc::new(OpendalStore::new(op));
-
- // Retrieve a specific file
- let path: Path = "data/test.txt".into();
-
- let bytes = Bytes::from_static(b"hello, world!");
- object_store.put(&path, bytes.clone().into()).await.unwrap();
-
- let meta = object_store.head(&path).await.unwrap();
-
- assert_eq!(meta.size, 13);
-
- assert_eq!(
- object_store
- .get(&path)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap(),
- bytes
- );
- }
-
- #[tokio::test]
- async fn test_list() {
- let object_store = create_test_object_store().await;
- let path: Path = "data/".into();
- let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
- assert_eq!(results.len(), 2);
- let mut locations = results
- .iter()
- .map(|x| x.as_ref().unwrap().location.as_ref())
- .collect::<Vec<_>>();
-
- let expected_files = vec![
- (
- "data/nested/test.txt",
- Bytes::from_static(b"hello, world! I am nested."),
- ),
- ("data/test.txt", Bytes::from_static(b"hello, world!")),
- ];
-
- let expected_locations = expected_files.iter().map(|x|
x.0).collect::<Vec<&str>>();
-
- locations.sort();
- assert_eq!(locations, expected_locations);
-
- for (location, bytes) in expected_files {
- let path: Path = location.into();
- assert_eq!(
- object_store
- .get(&path)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap(),
- bytes
- );
- }
- }
-
- #[tokio::test]
- async fn test_list_with_delimiter() {
- let object_store = create_test_object_store().await;
- let path: Path = "data/".into();
- let result =
object_store.list_with_delimiter(Some(&path)).await.unwrap();
- assert_eq!(result.objects.len(), 1);
- assert_eq!(result.common_prefixes.len(), 1);
- assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
- assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
- }
-
- #[tokio::test]
- async fn test_list_with_offset() {
- let object_store = create_test_object_store().await;
- let path: Path = "data/".into();
- let offset: Path = "data/nested/test.txt".into();
- let result = object_store
- .list_with_offset(Some(&path), &offset)
- .collect::<Vec<_>>()
- .await;
- assert_eq!(result.len(), 1);
- assert_eq!(
- result[0].as_ref().unwrap().location.as_ref(),
- "data/test.txt"
- );
- }
-}
diff --git a/integrations/object_store/src/lib.rs
b/integrations/object_store/src/store.rs
similarity index 78%
copy from integrations/object_store/src/lib.rs
copy to integrations/object_store/src/store.rs
index 6c18102e9e..cbfc1b9845 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/store.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-mod send_wrapper;
-
+use std::fmt::{Debug, Display, Formatter};
use std::future::IntoFuture;
use std::ops::Range;
+use crate::utils::*;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
@@ -38,15 +38,60 @@ use object_store::PutMultipartOpts;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
-use object_store::Result;
-use opendal::Entry;
-use opendal::Metadata;
-use opendal::Metakey;
use opendal::Operator;
-use send_wrapper::IntoSendFuture;
-use send_wrapper::IntoSendStream;
-
-#[derive(Debug)]
+use opendal::{Buffer, Metakey};
+
+/// OpendalStore implements ObjectStore trait by using opendal.
+///
+/// This allows users to use opendal as an object store without extra cost.
+///
+/// Visit [`opendal::services`] for more information about supported services.
+///
+/// ```no_run
+/// use std::sync::Arc;
+///
+/// use bytes::Bytes;
+/// use object_store::path::Path;
+/// use object_store::ObjectStore;
+/// use object_store_opendal::OpendalStore;
+/// use opendal::services::S3;
+/// use opendal::{Builder, Operator};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let builder = S3::from_map(
+/// vec![
+/// ("access_key".to_string(), "my_access_key".to_string()),
+/// ("secret_key".to_string(), "my_secret_key".to_string()),
+/// ("endpoint".to_string(), "my_endpoint".to_string()),
+/// ("region".to_string(), "my_region".to_string()),
+/// ]
+/// .into_iter()
+/// .collect(),
+/// );
+///
+/// // Create a new operator
+/// let operator = Operator::new(builder).unwrap().finish();
+///
+/// // Create a new object store
+/// let object_store = Arc::new(OpendalStore::new(operator));
+///
+/// let path = Path::from("data/nested/test.txt");
+/// let bytes = Bytes::from_static(b"hello, world! I am nested.");
+///
+/// object_store.put(&path, bytes.clone().into()).await.unwrap();
+///
+/// let content = object_store
+/// .get(&path)
+/// .await
+/// .unwrap()
+/// .bytes()
+/// .await
+/// .unwrap();
+///
+/// assert_eq!(content, bytes);
+/// }
+/// ```
pub struct OpendalStore {
inner: Operator,
}
@@ -58,18 +103,42 @@ impl OpendalStore {
}
}
-impl std::fmt::Display for OpendalStore {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "OpenDAL({:?})", self.inner)
+impl Debug for OpendalStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let info = self.inner.info();
+ f.debug_struct("OpendalStore")
+ .field("scheme", &info.scheme())
+ .field("name", &info.name())
+ .field("root", &info.root())
+ .field("capability", &info.full_capability())
+ .finish()
+ }
+}
+
+impl Display for OpendalStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let info = self.inner.info();
+ write!(
+ f,
+ "Opendal({}, bucket={}, root={})",
+ info.scheme(),
+ info.name(),
+ info.root()
+ )
+ }
+}
+
+impl From<Operator> for OpendalStore {
+ fn from(value: Operator) -> Self {
+ Self::new(value)
}
}
#[async_trait]
impl ObjectStore for OpendalStore {
- async fn put(&self, location: &Path, bytes: PutPayload) ->
Result<PutResult> {
- let bytes: Bytes = bytes.into();
+ async fn put(&self, location: &Path, bytes: PutPayload) ->
object_store::Result<PutResult> {
self.inner
- .write(location.as_ref(), bytes)
+ .write(location.as_ref(), Buffer::from_iter(bytes.into_iter()))
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
@@ -84,7 +153,7 @@ impl ObjectStore for OpendalStore {
_location: &Path,
_bytes: PutPayload,
_opts: PutOptions,
- ) -> Result<PutResult> {
+ ) -> object_store::Result<PutResult> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -93,7 +162,10 @@ impl ObjectStore for OpendalStore {
})
}
- async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart(
+ &self,
+ _location: &Path,
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -106,7 +178,7 @@ impl ObjectStore for OpendalStore {
&self,
_location: &Path,
_opts: PutMultipartOpts,
- ) -> Result<Box<dyn MultipartUpload>> {
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -115,7 +187,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
+ async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let meta = self
.inner
.stat(location.as_ref())
@@ -130,6 +202,7 @@ impl ObjectStore for OpendalStore {
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
};
+
let r = self
.inner
.reader(location.as_ref())
@@ -158,7 +231,11 @@ impl ObjectStore for OpendalStore {
})
}
- async fn get_opts(&self, _location: &Path, _options: GetOptions) ->
Result<GetResult> {
+ async fn get_opts(
+ &self,
+ _location: &Path,
+ _options: GetOptions,
+ ) -> object_store::Result<GetResult> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -167,7 +244,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
+ async fn get_range(&self, location: &Path, range: Range<usize>) ->
object_store::Result<Bytes> {
let bs = self
.inner
.read_with(location.as_ref())
@@ -180,7 +257,7 @@ impl ObjectStore for OpendalStore {
Ok(bs.to_bytes())
}
- async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
let meta = self
.inner
.stat(location.as_ref())
@@ -197,7 +274,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn delete(&self, location: &Path) -> Result<()> {
+ async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner
.delete(location.as_ref())
.into_send()
@@ -207,7 +284,7 @@ impl ObjectStore for OpendalStore {
Ok(())
}
- fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
+ fn list(&self, prefix: Option<&Path>) -> BoxStream<'_,
object_store::Result<ObjectMeta>> {
// object_store `Path` always removes trailing slash
// need to add it back
let path = prefix.map_or("".into(), |x| format!("{}/", x));
@@ -237,7 +314,7 @@ impl ObjectStore for OpendalStore {
&self,
prefix: Option<&Path>,
offset: &Path,
- ) -> BoxStream<'_, Result<ObjectMeta>> {
+ ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let offset = offset.clone();
@@ -275,7 +352,7 @@ impl ObjectStore for OpendalStore {
fut.into_stream().into_send().try_flatten().boxed()
}
- async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
object_store::Result<ListResult> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
.inner
@@ -307,7 +384,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
+ async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()>
{
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -316,7 +393,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
+ async fn rename(&self, _from: &Path, _to: &Path) ->
object_store::Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -325,7 +402,7 @@ impl ObjectStore for OpendalStore {
})
}
- async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()>
{
+ async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) ->
object_store::Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
@@ -335,79 +412,6 @@ impl ObjectStore for OpendalStore {
}
}
-fn format_object_store_error(err: opendal::Error, path: &str) ->
object_store::Error {
- use opendal::ErrorKind;
- match err.kind() {
- ErrorKind::NotFound => object_store::Error::NotFound {
- path: path.to_string(),
- source: Box::new(err),
- },
- ErrorKind::Unsupported => object_store::Error::NotSupported {
- source: Box::new(err),
- },
- ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
- path: path.to_string(),
- source: Box::new(err),
- },
- kind => object_store::Error::Generic {
- store: kind.into_static(),
- source: Box::new(err),
- },
- }
-}
-
-fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
- let version = match meta.metakey().contains(Metakey::Version) {
- true => meta.version().map(|x| x.to_string()),
- false => None,
- };
-
- let e_tag = match meta.metakey().contains(Metakey::Etag) {
- true => meta.etag().map(|x| x.to_string()),
- false => None,
- };
-
- ObjectMeta {
- location: path.into(),
- last_modified: meta.last_modified().unwrap_or_default(),
- size: meta.content_length() as usize,
- e_tag,
- version,
- }
-}
-
-async fn try_format_object_meta(res: Result<Entry, opendal::Error>) ->
Result<ObjectMeta> {
- let entry = res.map_err(|err| format_object_store_error(err, ""))?;
- let meta = entry.metadata();
-
- Ok(format_object_meta(entry.path(), meta))
-}
-
-// Make sure `send_wrapper` works as expected
-#[cfg(all(feature = "send_wrapper", target_arch = "wasm32"))]
-mod assert_send {
- use object_store::ObjectStore;
-
- #[allow(dead_code)]
- fn assert_send<T: Send>(_: T) {}
-
- #[allow(dead_code)]
- fn assertion() {
- let op = super::Operator::new(opendal::services::Memory::default())
- .unwrap()
- .finish();
- let store = super::OpendalStore::new(op);
- assert_send(store.put(&"test".into(), bytes::Bytes::new()));
- assert_send(store.get(&"test".into()));
- assert_send(store.get_range(&"test".into(), 0..1));
- assert_send(store.head(&"test".into()));
- assert_send(store.delete(&"test".into()));
- assert_send(store.list(None));
- assert_send(store.list_with_offset(None, &"test".into()));
- assert_send(store.list_with_delimiter(None));
- }
-}
-
#[cfg(test)]
mod tests {
use std::sync::Arc;
diff --git a/integrations/object_store/src/send_wrapper.rs
b/integrations/object_store/src/utils.rs
similarity index 53%
rename from integrations/object_store/src/send_wrapper.rs
rename to integrations/object_store/src/utils.rs
index 82b4e89854..c4c593797e 100644
--- a/integrations/object_store/src/send_wrapper.rs
+++ b/integrations/object_store/src/utils.rs
@@ -15,14 +15,103 @@
// specific language governing permissions and limitations
// under the License.
-//! Conditionally add the `Send` marker trait for the wrapped type.
-//! Only take effect when the `send_wrapper` feature is enabled.
-
use futures::Stream;
+use object_store::ObjectMeta;
+use opendal::{Entry, Metadata, Metakey};
+
+/// Conditionally add the `Send` marker trait for the wrapped type.
+/// Only take effect when the `send_wrapper` feature is enabled.
#[cfg(not(feature = "send_wrapper"))]
-pub use noop_wrapper::NoopWrapper as SendWrapper;
+use noop_wrapper::NoopWrapper as SendWrapper;
#[cfg(feature = "send_wrapper")]
-pub use send_wrapper::SendWrapper;
+use send_wrapper::SendWrapper;
+
+/// Format `opendal::Error` to `object_store::Error`.
+pub fn format_object_store_error(err: opendal::Error, path: &str) ->
object_store::Error {
+ use opendal::ErrorKind;
+ match err.kind() {
+ ErrorKind::NotFound => object_store::Error::NotFound {
+ path: path.to_string(),
+ source: Box::new(err),
+ },
+ ErrorKind::Unsupported => object_store::Error::NotSupported {
+ source: Box::new(err),
+ },
+ ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
+ path: path.to_string(),
+ source: Box::new(err),
+ },
+ kind => object_store::Error::Generic {
+ store: kind.into_static(),
+ source: Box::new(err),
+ },
+ }
+}
+
+/// Format `opendal::Metadata` to `object_store::ObjectMeta`.
+pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
+ let version = match meta.metakey().contains(Metakey::Version) {
+ true => meta.version().map(|x| x.to_string()),
+ false => None,
+ };
+
+ let e_tag = match meta.metakey().contains(Metakey::Etag) {
+ true => meta.etag().map(|x| x.to_string()),
+ false => None,
+ };
+
+ ObjectMeta {
+ location: path.into(),
+ last_modified: meta.last_modified().unwrap_or_default(),
+ size: meta.content_length() as usize,
+ e_tag,
+ version,
+ }
+}
+
+/// Try to format `opendal::Entry` to `object_store::ObjectMeta`.
+pub async fn try_format_object_meta(
+ res: object_store::Result<Entry, opendal::Error>,
+) -> object_store::Result<ObjectMeta> {
+ let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+ let meta = entry.metadata();
+
+ Ok(format_object_meta(entry.path(), meta))
+}
+
+/// Make given future `Send`.
+pub trait IntoSendFuture {
+ type Output;
+
+ fn into_send(self) -> Self::Output;
+}
+
+impl<T> IntoSendFuture for T
+where
+ T: futures::Future,
+{
+ type Output = SendWrapper<T>;
+ fn into_send(self) -> Self::Output {
+ SendWrapper::new(self)
+ }
+}
+
+/// Make given Stream `Send`.
+pub trait IntoSendStream {
+ type Output;
+
+ fn into_send(self) -> Self::Output;
+}
+
+impl<T> IntoSendStream for T
+where
+ T: Stream,
+{
+ type Output = SendWrapper<T>;
+ fn into_send(self) -> Self::Output {
+ SendWrapper::new(self)
+ }
+}
#[cfg(not(feature = "send_wrapper"))]
mod noop_wrapper {
@@ -70,35 +159,3 @@ mod noop_wrapper {
}
}
}
-
-pub trait IntoSendFuture {
- type Output;
-
- fn into_send(self) -> Self::Output;
-}
-
-impl<T> IntoSendFuture for T
-where
- T: futures::Future,
-{
- type Output = SendWrapper<T>;
- fn into_send(self) -> Self::Output {
- SendWrapper::new(self)
- }
-}
-
-pub trait IntoSendStream {
- type Output;
-
- fn into_send(self) -> Self::Output;
-}
-
-impl<T> IntoSendStream for T
-where
- T: Stream,
-{
- type Output = SendWrapper<T>;
- fn into_send(self) -> Self::Output {
- SendWrapper::new(self)
- }
-}