This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f00fd1cbc feat: allow using object_store as opendal's backend (#6283)
f00fd1cbc is described below
commit f00fd1cbc9bb4b0d26f714fb4dae8667e036375c
Author: flaneur <[email protected]>
AuthorDate: Tue Aug 26 17:43:57 2025 +0800
feat: allow using object_store as opendal's backend (#6283)
* add object-store as service backend
* add object store as features
* add scaffold for impl Access
* add parse_error
* add read()
* add reader
* factor out reader.rs
* add writer
* move the code from core/ to integration/object_store
* move backend.rs to mod.rs
* add deleter.rs
* add deleter
* add lister.rs
* add test case
* add test cases
* fix apache header
* cargo fmt
* fix clippy
* directly use async fn
* use async fn in read
* revert unused cargo service
* rename backend/ as service/
* do the io in read()
* fix the build error on bytes_stream not able to Sync
* use a bytes_deque for Buffer
* fix clippy
* add scaffold for having MultipartWrite
* fix the build
* add initiate_part
* add write_part
* save the changes about multi part write
* implement MultipartWriter
* tune
* fix the etag handling
* validate the complete_part
* add test case for it
* revise the test
* fix test case about multipart upload
* minor rename
* fix clippy
* revert core/Cargo.toml
* revert cargo.toml
* fix set_err to track the cause
* fix build
* fix oio::Read
* extract a format_metadata
* implement BatchDelete
* do not allow delete with version
* refactor the multipart upload
* clean up unused code
* refactor: import object_store::path::Path as ObjectStorePath for better
readability
* refactor: move OpXxx to GetOptions conversion functions to core.rs with
consistent naming
* handle op_stat by get_opts
* explicitly impl Sync to replace Mutex
* cargo fmt
---
integrations/object_store/examples/basic.rs | 4 +-
integrations/object_store/src/lib.rs | 4 +
integrations/object_store/src/service/core.rs | 163 ++++++++++
integrations/object_store/src/service/deleter.rs | 69 ++++
integrations/object_store/src/service/error.rs | 54 ++++
integrations/object_store/src/service/lister.rs | 75 +++++
integrations/object_store/src/service/mod.rs | 388 +++++++++++++++++++++++
integrations/object_store/src/service/reader.rs | 82 +++++
integrations/object_store/src/service/writer.rs | 204 ++++++++++++
9 files changed, 1041 insertions(+), 2 deletions(-)
diff --git a/integrations/object_store/examples/basic.rs
b/integrations/object_store/examples/basic.rs
index eaf4a18e5..1afbc9c74 100644
--- a/integrations/object_store/examples/basic.rs
+++ b/integrations/object_store/examples/basic.rs
@@ -1,7 +1,7 @@
use bytes::Bytes;
#[cfg(feature = "services-s3")]
use object_store::aws::AmazonS3Builder;
-use object_store::path::Path;
+use object_store::path::Path as ObjectStorePath;
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
@@ -16,7 +16,7 @@ async fn main() {
.with_secret_access_key("my_secret_access_key");
let s3_store = OpendalStore::new_amazon_s3(builder).unwrap();
- let path = Path::from("data/nested/test.txt");
+ let path = ObjectStorePath::from("data/nested/test.txt");
let bytes = Bytes::from_static(b"hello, world! I am nested.");
s3_store.put(&path, bytes.clone().into()).await.unwrap();
diff --git a/integrations/object_store/src/lib.rs
b/integrations/object_store/src/lib.rs
index 60a7126cd..9671bedd7 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/lib.rs
@@ -68,6 +68,10 @@ mod utils;
#[cfg(feature = "services-s3")]
mod amazon_s3;
+mod service;
+
+pub use service::{ObjectStoreBuilder, ObjectStoreService};
+
// Make sure `send_wrapper` works as expected
#[cfg(all(feature = "send_wrapper", test))]
mod assert_send {
diff --git a/integrations/object_store/src/service/core.rs
b/integrations/object_store/src/service/core.rs
new file mode 100644
index 000000000..bfcf61d66
--- /dev/null
+++ b/integrations/object_store/src/service/core.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+
+use object_store::{
+ Attribute, AttributeValue, GetOptions, GetRange, ObjectMeta, PutOptions,
PutResult,
+};
+use opendal::raw::*;
+use opendal::*;
+
+/// Parse OpStat arguments to object_store GetOptions for head requests
+pub fn parse_op_stat(args: &OpStat) -> Result<GetOptions> {
+ let mut options = GetOptions {
+ head: true, // This is a head request
+ ..Default::default()
+ };
+
+ if let Some(version) = args.version() {
+ options.version = Some(version.to_string());
+ }
+
+ if let Some(if_match) = args.if_match() {
+ options.if_match = Some(if_match.to_string());
+ }
+
+ if let Some(if_none_match) = args.if_none_match() {
+ options.if_none_match = Some(if_none_match.to_string());
+ }
+
+ if let Some(if_modified_since) = args.if_modified_since() {
+ options.if_modified_since = Some(if_modified_since);
+ }
+
+ if let Some(if_unmodified_since) = args.if_unmodified_since() {
+ options.if_unmodified_since = Some(if_unmodified_since);
+ }
+
+ Ok(options)
+}
+
+/// Parse OpRead arguments to object_store GetOptions
+pub fn parse_op_read(args: &OpRead) -> Result<GetOptions> {
+ let mut options = GetOptions::default();
+
+ if let Some(version) = args.version() {
+ options.version = Some(version.to_string());
+ }
+
+ if let Some(if_match) = args.if_match() {
+ options.if_match = Some(if_match.to_string());
+ }
+
+ if let Some(if_none_match) = args.if_none_match() {
+ options.if_none_match = Some(if_none_match.to_string());
+ }
+
+ if let Some(if_modified_since) = args.if_modified_since() {
+ options.if_modified_since = Some(if_modified_since);
+ }
+
+ if let Some(if_unmodified_since) = args.if_unmodified_since() {
+ options.if_unmodified_since = Some(if_unmodified_since);
+ }
+
+ if !args.range().is_full() {
+ let range = args.range();
+ match range.size() {
+ Some(size) => {
+ options.range =
Some(GetRange::Bounded(range.offset()..range.offset() + size));
+ }
+ None => {
+ options.range = Some(GetRange::Offset(range.offset()));
+ }
+ }
+ }
+
+ Ok(options)
+}
+
+/// Parse OpWrite arguments to object_store PutOptions
+pub fn parse_op_write(args: &OpWrite) -> Result<PutOptions> {
+ let mut opts = PutOptions::default();
+
+ if let Some(content_type) = args.content_type() {
+ opts.attributes.insert(
+ Attribute::ContentType,
+ AttributeValue::from(content_type.to_string()),
+ );
+ }
+
+ if let Some(content_disposition) = args.content_disposition() {
+ opts.attributes.insert(
+ Attribute::ContentDisposition,
+ AttributeValue::from(content_disposition.to_string()),
+ );
+ }
+
+ if let Some(cache_control) = args.cache_control() {
+ opts.attributes.insert(
+ Attribute::CacheControl,
+ AttributeValue::from(cache_control.to_string()),
+ );
+ }
+
+ if let Some(user_metadata) = args.user_metadata() {
+ for (key, value) in user_metadata {
+ opts.attributes.insert(
+ Attribute::Metadata(Cow::from(key.to_string())),
+ AttributeValue::from(value.to_string()),
+ );
+ }
+ }
+ Ok(opts)
+}
+
+/// Convert PutOptions to PutMultipartOptions
+pub fn format_put_multipart_options(opts: PutOptions) ->
object_store::PutMultipartOptions {
+ object_store::PutMultipartOptions {
+ attributes: opts.attributes,
+ ..Default::default()
+ }
+}
+
+/// Format PutResult to OpenDAL Metadata
+pub fn format_put_result(result: PutResult) -> Metadata {
+ let mut metadata = Metadata::new(EntryMode::FILE);
+ if let Some(etag) = &result.e_tag {
+ metadata.set_etag(etag);
+ }
+ if let Some(version) = &result.version {
+ metadata.set_version(version);
+ }
+ metadata
+}
+
+/// Format `object_store::ObjectMeta` to `opendal::Metadata`.
+pub fn format_metadata(meta: &ObjectMeta) -> Metadata {
+ let mut metadata = Metadata::new(EntryMode::FILE);
+ metadata.set_content_length(meta.size);
+ metadata.set_last_modified(meta.last_modified);
+ if let Some(etag) = &meta.e_tag {
+ metadata.set_etag(etag);
+ }
+ if let Some(version) = &meta.version {
+ metadata.set_version(version);
+ }
+ metadata
+}
diff --git a/integrations/object_store/src/service/deleter.rs
b/integrations/object_store/src/service/deleter.rs
new file mode 100644
index 000000000..b4db4b2da
--- /dev/null
+++ b/integrations/object_store/src/service/deleter.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{self, StreamExt};
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+use opendal::raw::oio::BatchDeleteResult;
+use opendal::raw::*;
+use opendal::*;
+
+use super::error::parse_error;
+
+pub struct ObjectStoreDeleter {
+ store: Arc<dyn ObjectStore + 'static>,
+}
+
+impl ObjectStoreDeleter {
+ pub(crate) fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
+ Self { store }
+ }
+}
+
+impl oio::BatchDelete for ObjectStoreDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let object_path = ObjectStorePath::from(path);
+ self.store.delete(&object_path).await.map_err(parse_error)
+ }
+
+ async fn delete_batch(&self, paths: Vec<(String, OpDelete)>) ->
Result<BatchDeleteResult> {
+ // convert paths to stream, then use [`ObjectStore::delete_stream`] to
delete them in batch
+ let stream = stream::iter(paths.iter())
+ .map(|(path, _)| Ok::<_,
object_store::Error>(ObjectStorePath::from(path.as_str())))
+ .boxed();
+ let results =
self.store.delete_stream(stream).collect::<Vec<_>>().await;
+
+ // convert the results to [`BatchDeleteResult`]
+ let mut result_batch = BatchDeleteResult::default();
+ for (idx, result) in results.into_iter().enumerate() {
+ match result {
+ Ok(_) => result_batch
+ .succeeded
+ .push((paths[idx].0.clone(), paths[idx].1.clone())),
+ Err(e) => result_batch.failed.push((
+ paths[idx].0.clone(),
+ paths[idx].1.clone(),
+ parse_error(e),
+ )),
+ }
+ }
+
+ Ok(result_batch)
+ }
+}
diff --git a/integrations/object_store/src/service/error.rs
b/integrations/object_store/src/service/error.rs
new file mode 100644
index 000000000..f8bea1c98
--- /dev/null
+++ b/integrations/object_store/src/service/error.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::Error;
+use opendal::ErrorKind;
+
+pub(crate) fn parse_error(err: object_store::Error) -> Error {
+ match err {
+ object_store::Error::NotFound { .. } => {
+ Error::new(ErrorKind::NotFound, "path not found").set_source(err)
+ }
+
+ object_store::Error::AlreadyExists { .. } => {
+ Error::new(ErrorKind::AlreadyExists, "path already
exists").set_source(err)
+ }
+
+ object_store::Error::PermissionDenied { .. }
+ | object_store::Error::Unauthenticated { .. } => {
+ Error::new(ErrorKind::PermissionDenied, "permission
denied").set_source(err)
+ }
+
+ object_store::Error::InvalidPath { .. } => {
+ Error::new(ErrorKind::NotFound, "invalid path").set_source(err)
+ }
+
+ object_store::Error::NotSupported { .. } => {
+ Error::new(ErrorKind::Unsupported, "operation not
supported").set_source(err)
+ }
+
+ object_store::Error::Precondition { .. } => {
+ Error::new(ErrorKind::ConditionNotMatch, "precondition not
met").set_source(err)
+ }
+
+ object_store::Error::Generic { store, .. } => {
+ Error::new(ErrorKind::Unexpected, format!("{store} operation
failed")).set_source(err)
+ }
+
+ _ => Error::new(ErrorKind::Unexpected, "unknown
error").set_source(err),
+ }
+}
diff --git a/integrations/object_store/src/service/lister.rs
b/integrations/object_store/src/service/lister.rs
new file mode 100644
index 000000000..7585174ba
--- /dev/null
+++ b/integrations/object_store/src/service/lister.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::{stream::BoxStream, StreamExt};
+use object_store::path::Path as ObjectStorePath;
+use object_store::{ObjectMeta, ObjectStore};
+use opendal::raw::*;
+use opendal::*;
+
+use super::error::parse_error;
+use crate::service::core::format_metadata;
+
+pub struct ObjectStoreLister {
+ stream: BoxStream<'static, object_store::Result<ObjectMeta>>,
+}
+
+impl ObjectStoreLister {
+ pub(crate) async fn new(
+ store: Arc<dyn ObjectStore + 'static>,
+ path: &str,
+ args: OpList,
+ ) -> Result<Self> {
+ // If start_after is specified, use list_with_offset
+ let mut stream = if let Some(start_after) = args.start_after() {
+ store
+ .list_with_offset(
+ Some(&ObjectStorePath::from(path)),
+ &ObjectStorePath::from(start_after),
+ )
+ .boxed()
+ } else {
+ store.list(Some(&ObjectStorePath::from(path))).boxed()
+ };
+
+ // Process listing arguments
+ if let Some(limit) = args.limit() {
+ stream = stream.take(limit).boxed();
+ }
+
+ Ok(Self { stream })
+ }
+}
+
+// ObjectStoreLister is safe to share between threads, because it only has
&mut self API calls.
+unsafe impl Sync for ObjectStoreLister {}
+
+impl oio::List for ObjectStoreLister {
+ async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ match self.stream.next().await {
+ Some(Ok(meta)) => {
+ let metadata = format_metadata(&meta);
+ let entry = oio::Entry::new(meta.location.as_ref(),
metadata.clone());
+ Ok(Some(entry))
+ }
+ Some(Err(e)) => Err(parse_error(e)),
+ None => Ok(None),
+ }
+ }
+}
diff --git a/integrations/object_store/src/service/mod.rs
b/integrations/object_store/src/service/mod.rs
new file mode 100644
index 000000000..068d4ad4c
--- /dev/null
+++ b/integrations/object_store/src/service/mod.rs
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+use opendal::raw::oio::BatchDeleter;
+use opendal::raw::oio::MultipartWriter;
+use opendal::raw::*;
+use opendal::Error;
+use opendal::ErrorKind;
+use opendal::*;
+
+mod core;
+mod deleter;
+mod error;
+mod lister;
+mod reader;
+mod writer;
+
+use deleter::ObjectStoreDeleter;
+use error::parse_error;
+use lister::ObjectStoreLister;
+use reader::ObjectStoreReader;
+use writer::ObjectStoreWriter;
+
+use crate::service::core::format_metadata as parse_metadata;
+use crate::service::core::parse_op_stat;
+
+/// ObjectStore backend builder
+#[derive(Default)]
+pub struct ObjectStoreBuilder {
+ store: Option<Arc<dyn ObjectStore + 'static>>,
+}
+
+impl Debug for ObjectStoreBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("ObjectStoreBuilder");
+ d.finish_non_exhaustive()
+ }
+}
+
+impl ObjectStoreBuilder {
+ /// Set the object store instance
+ pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
+ Self { store: Some(store) }
+ }
+}
+
+impl Builder for ObjectStoreBuilder {
+ type Config = ();
+
+ fn build(self) -> Result<impl Access> {
+ let store = self.store.ok_or_else(|| {
+ Error::new(ErrorKind::ConfigInvalid, "object store is required")
+ .with_context("service", Scheme::Custom("object_store"))
+ })?;
+
+ Ok(ObjectStoreService { store })
+ }
+}
+
+/// ObjectStore backend
+pub struct ObjectStoreService {
+ store: Arc<dyn ObjectStore + 'static>,
+}
+
+impl Debug for ObjectStoreService {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("ObjectStoreBackend");
+ d.finish_non_exhaustive()
+ }
+}
+
+impl Access for ObjectStoreService {
+ type Reader = ObjectStoreReader;
+ type Writer = MultipartWriter<ObjectStoreWriter>;
+ type Lister = ObjectStoreLister;
+ type Deleter = BatchDeleter<ObjectStoreDeleter>;
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ let info = AccessorInfo::default();
+ info.set_scheme("object_store")
+ .set_root("/")
+ .set_name("object_store")
+ .set_native_capability(Capability {
+ stat: true,
+ stat_with_if_match: true,
+ stat_with_if_unmodified_since: true,
+ read: true,
+ write: true,
+ delete: true,
+ list: true,
+ list_with_limit: true,
+ list_with_start_after: true,
+ delete_with_version: false,
+ ..Default::default()
+ });
+ Arc::new(info)
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ let path = ObjectStorePath::from(path);
+ let opts = parse_op_stat(&args)?;
+ let result = self
+ .store
+ .get_opts(&path, opts)
+ .await
+ .map_err(parse_error)?;
+ let metadata = parse_metadata(&result.meta);
+ Ok(RpStat::new(metadata))
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let reader = ObjectStoreReader::new(self.store.clone(), path,
args).await?;
+ Ok((reader.rp(), reader))
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
+ Ok((
+ RpWrite::default(),
+ MultipartWriter::new(self.info(), writer, 10),
+ ))
+ }
+
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ let deleter =
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
+ Ok((RpDelete::default(), deleter))
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ let lister = ObjectStoreLister::new(self.store.clone(), path,
args).await?;
+ Ok((RpList::default(), lister))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use object_store::memory::InMemory;
+ use opendal::raw::oio::{Delete, List, Read, Write};
+ use opendal::Buffer;
+
+ #[tokio::test]
+ async fn test_object_store_backend_builder() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let builder = ObjectStoreBuilder::new(store);
+
+ let backend = builder.build().expect("build should succeed");
+ assert!(backend.info().scheme() == "object_store");
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_info() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store)
+ .build()
+ .expect("build should succeed");
+
+ let info = backend.info();
+ assert_eq!(info.scheme(), "object_store");
+ assert_eq!(info.name(), "object_store".into());
+ assert_eq!(info.root(), "/".into());
+
+ let cap = info.native_capability();
+ assert!(cap.stat);
+ assert!(cap.read);
+ assert!(cap.write);
+ assert!(cap.delete);
+ assert!(cap.list);
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_basic_operations() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store.clone())
+ .build()
+ .expect("build should succeed");
+
+ let path = "test_file.txt";
+ let content = b"Hello, world!";
+
+ // Test write
+ let (_, mut writer) = backend
+ .write(path, OpWrite::default())
+ .await
+ .expect("write should succeed");
+
+ writer
+ .write(Buffer::from(&content[..]))
+ .await
+ .expect("write content should succeed");
+ writer.close().await.expect("close should succeed");
+
+ // Test stat
+ let stat_result = backend
+ .stat(path, OpStat::default())
+ .await
+ .expect("stat should succeed");
+
+ assert_eq!(
+ stat_result.into_metadata().content_length(),
+ content.len() as u64
+ );
+
+ // Test read
+ let (_, mut reader) = backend
+ .read(path, OpRead::default())
+ .await
+ .expect("read should succeed");
+
+ let buf = reader.read().await.expect("read should succeed");
+ assert_eq!(buf.to_vec(), content);
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_multipart_upload() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store.clone())
+ .build()
+ .expect("build should succeed");
+
+ let path = "test_file.txt";
+ let content =
+ b"Hello, multipart upload! This is a test content for multipart
upload functionality.";
+ let content_len = content.len();
+
+ // Test multipart upload with multiple chunks
+ let (_, mut writer) = backend
+ .write(path, OpWrite::default())
+ .await
+ .expect("write should succeed");
+
+ // Write content in chunks to simulate multipart upload
+ let chunk_size = 20;
+ for chunk in content.chunks(chunk_size) {
+ writer
+ .write(Buffer::from(chunk))
+ .await
+ .expect("write chunk should succeed");
+ }
+
+ writer.close().await.expect("close should succeed");
+
+ // Verify the uploaded file
+ let stat_result = backend
+ .stat(path, OpStat::default())
+ .await
+ .expect("stat should succeed");
+
+ assert_eq!(
+ stat_result.into_metadata().content_length(),
+ content_len as u64
+ );
+
+ // Read back and verify content
+ let (_, mut reader) = backend
+ .read(path, OpRead::default())
+ .await
+ .expect("read should succeed");
+
+ let buf = reader.read().await.expect("read should succeed");
+ assert_eq!(buf.to_vec(), content);
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_list() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store.clone())
+ .build()
+ .expect("build should succeed");
+
+ // Create multiple files
+ let files = vec![
+ ("dir1/file1.txt", b"content1"),
+ ("dir1/file2.txt", b"content2"),
+ ("dir2/file3.txt", b"content3"),
+ ];
+
+ for (path, content) in &files {
+ let (_, mut writer) = backend
+ .write(path, OpWrite::default())
+ .await
+ .expect("write should succeed");
+ writer
+ .write(Buffer::from(&content[..]))
+ .await
+ .expect("write content should succeed");
+ writer.close().await.expect("close should succeed");
+ }
+
+ // List directory
+ let (_, mut lister) = backend
+ .list("dir1/", OpList::default())
+ .await
+ .expect("list should succeed");
+
+ let mut entries = Vec::new();
+ while let Some(entry) = lister.next().await.expect("next should
succeed") {
+ entries.push(entry);
+ }
+
+ assert_eq!(entries.len(), 2);
+ assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
+ assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_delete() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store)
+ .build()
+ .expect("build should succeed");
+
+ let path = "test_delete.txt";
+ let content = b"To be deleted";
+
+ // Write file
+ let (_, mut writer) = backend
+ .write(path, OpWrite::default())
+ .await
+ .expect("write should succeed");
+ writer
+ .write(Buffer::from(&content[..]))
+ .await
+ .expect("write content should succeed");
+ writer.close().await.expect("close should succeed");
+
+ // Verify file exists
+ backend
+ .stat(path, OpStat::default())
+ .await
+ .expect("file should exist");
+
+ // Delete file
+ let (_, mut deleter) = backend.delete().await.expect("delete should
succeed");
+ deleter
+ .delete(path, OpDelete::default())
+ .expect("delete should succeed");
+ deleter.flush().await.expect("flush should succeed");
+
+ // Verify file is deleted
+ let result = backend.stat(path, OpStat::default()).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_object_store_backend_error_handling() {
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ let backend = ObjectStoreBuilder::new(store)
+ .build()
+ .expect("build should succeed");
+
+ // Test stat on non-existent file
+ let result = backend.stat("non_existent.txt", OpStat::default()).await;
+ assert!(result.is_err());
+
+ // Test read on non-existent file
+ let result = backend.read("non_existent.txt", OpRead::default()).await;
+ assert!(result.is_err());
+
+ // Test list on non-existent directory
+ let result = backend.list("non_existent_dir/",
OpList::default()).await;
+ // This should succeed but return empty results
+ if let Ok((_, mut lister)) = result {
+ let entry = lister.next().await.expect("next should succeed");
+ assert!(entry.is_none());
+ }
+ }
+}
diff --git a/integrations/object_store/src/service/reader.rs
b/integrations/object_store/src/service/reader.rs
new file mode 100644
index 000000000..ca5623127
--- /dev/null
+++ b/integrations/object_store/src/service/reader.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::TryStreamExt;
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+
+use opendal::raw::*;
+use opendal::*;
+
+use super::core::parse_op_read;
+use super::error::parse_error;
+
+/// ObjectStore reader
+pub struct ObjectStoreReader {
+ bytes_stream: BoxStream<'static, object_store::Result<Bytes>>,
+ meta: object_store::ObjectMeta,
+ args: OpRead,
+}
+
+impl ObjectStoreReader {
+ pub(crate) async fn new(
+ store: Arc<dyn ObjectStore + 'static>,
+ path: &str,
+ args: OpRead,
+ ) -> Result<Self> {
+ let path = ObjectStorePath::from(path);
+ let opts = parse_op_read(&args)?;
+ let result = store.get_opts(&path, opts).await.map_err(parse_error)?;
+ let meta = result.meta.clone();
+ let bytes_stream = result.into_stream();
+ Ok(Self {
+ bytes_stream,
+ meta,
+ args,
+ })
+ }
+
+ pub(crate) fn rp(&self) -> RpRead {
+ let mut rp = RpRead::new().with_size(Some(self.meta.size));
+ if !self.args.range().is_full() {
+ let range = self.args.range();
+ let size = match range.size() {
+ Some(size) => size,
+ None => self.meta.size,
+ };
+ rp = rp.with_range(Some(
+ BytesContentRange::default().with_range(range.offset(),
range.offset() + size - 1),
+ ));
+ }
+ rp
+ }
+}
+
+// ObjectStoreReader is safe to share between threads, because the `read()`
method requires
+// `&mut self` and `rp()` is stateless.
+unsafe impl Sync for ObjectStoreReader {}
+
+impl oio::Read for ObjectStoreReader {
+ async fn read(&mut self) -> Result<Buffer> {
+ let bs = self.bytes_stream.try_next().await.map_err(parse_error)?;
+ Ok(bs.map(Buffer::from).unwrap_or_default())
+ }
+}
diff --git a/integrations/object_store/src/service/writer.rs
b/integrations/object_store/src/service/writer.rs
new file mode 100644
index 000000000..dad207082
--- /dev/null
+++ b/integrations/object_store/src/service/writer.rs
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use object_store::path::Path as ObjectStorePath;
+use object_store::MultipartUpload;
+use object_store::ObjectStore;
+use object_store::PutPayload;
+use object_store::{Attribute, AttributeValue};
+
+use opendal::raw::oio::MultipartPart;
+use opendal::raw::*;
+use opendal::*;
+use tokio::sync::Mutex;
+
+use super::core::{format_put_multipart_options, format_put_result,
parse_op_write};
+use super::error::parse_error;
+
+pub struct ObjectStoreWriter {
+ store: Arc<dyn ObjectStore + 'static>,
+ path: ObjectStorePath,
+ args: OpWrite,
+ upload: Mutex<Option<Box<dyn MultipartUpload>>>,
+}
+
+impl ObjectStoreWriter {
+ pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args:
OpWrite) -> Self {
+ Self {
+ store,
+ path: ObjectStorePath::from(path),
+ args,
+ upload: Mutex::new(None),
+ }
+ }
+}
+
+impl oio::MultipartWrite for ObjectStoreWriter {
+ /// Write the entire object in one go.
+ /// Used when the object is small enough to bypass multipart upload.
+ async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
+ // Validate that actual body size matches expected size
+ let actual_size = body.len() as u64;
+ if actual_size != size {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Expected size {size} but got {actual_size}"),
+ ));
+ }
+
+ let bytes = body.to_bytes();
+ let payload = PutPayload::from(bytes);
+ let mut opts = parse_op_write(&self.args)?;
+
+ // Add size metadata for tracking
+ opts.attributes.insert(
+ Attribute::Metadata("content-size".into()),
+ AttributeValue::from(size.to_string()),
+ );
+
+ let result = self
+ .store
+ .put_opts(&self.path, payload, opts)
+ .await
+ .map_err(parse_error)?;
+
+ // Build metadata from put result
+ let mut metadata = Metadata::new(EntryMode::FILE);
+ if let Some(etag) = &result.e_tag {
+ metadata.set_etag(etag);
+ }
+ if let Some(version) = &result.version {
+ metadata.set_version(version);
+ }
+
+ Ok(metadata)
+ }
+
+ // Generate a unique upload ID that we'll use to track this session
+ async fn initiate_part(&self) -> Result<String> {
+ // Start a new multipart upload using object_store
+ let opts = parse_op_write(&self.args)?;
+ let multipart_opts = format_put_multipart_options(opts);
+ let upload = self
+ .store
+ .put_multipart_opts(&self.path, multipart_opts)
+ .await
+ .map_err(parse_error)?;
+
+ // Store the multipart upload for later use
+ let mut guard = self.upload.lock().await;
+ if guard.is_some() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Upload already initiated, abort the previous upload first",
+ ));
+ }
+ *guard = Some(upload);
+
+ // object_store does not provide a way to get the upload id, so we use
a fixed string
+ // as the upload id. it's ok because the upload id is already tracked
inside the upload
+ // object.
+ Ok("".to_string())
+ }
+
+ /// Upload a single part of the multipart upload.
+ /// Part numbers must be sequential starting from 1.
+ /// Returns the ETag and part information for this uploaded part.
+ async fn write_part(
+ &self,
+ _upload_id: &str,
+ part_number: usize,
+ size: u64,
+ body: Buffer,
+ ) -> Result<MultipartPart> {
+ // Validate that actual body size matches expected size
+ let actual_size = body.len() as u64;
+ if actual_size != size {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Expected size {size} but got {actual_size}"),
+ ));
+ }
+
+ // Convert Buffer to PutPayload
+ let bytes = body.to_bytes();
+
+ // Return empty string as ETag since it's not used by object_store
+ let etag = String::new();
+
+ let payload = PutPayload::from(bytes);
+
+ // Upload the part
+ let mut guard = self.upload.lock().await;
+ let upload = guard
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not
initiated"))?;
+ upload.put_part(payload).await.map_err(parse_error)?;
+
+ // Create MultipartPart with the proper ETag
+ let multipart_part = MultipartPart {
+ part_number,
+ etag,
+ checksum: None, // No checksum for now
+ };
+ Ok(multipart_part)
+ }
+
+ async fn complete_part(
+ &self,
+ _upload_id: &str,
+ parts: &[oio::MultipartPart],
+ ) -> Result<Metadata> {
+ // Validate that we have parts to complete
+ if parts.is_empty() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Cannot complete multipart upload with no parts",
+ ));
+ }
+
+ // Get the multipart upload for this upload_id
+ let mut guard = self.upload.lock().await;
+ let upload = guard
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not
initiated"))?;
+
+ // Complete the multipart upload
+ let result = upload.complete().await.map_err(parse_error)?;
+ *guard = None;
+
+ // Build metadata from the result
+ let metadata = format_put_result(result);
+ Ok(metadata)
+ }
+
+ async fn abort_part(&self, _upload_id: &str) -> Result<()> {
+ // Get the multipart upload for this upload_id
+ let mut guard = self.upload.lock().await;
+ let upload = guard
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not
initiated"))?;
+
+ // Abort the multipart upload
+ upload.abort().await.map_err(parse_error)?;
+ *guard = None;
+
+ Ok(())
+ }
+}