This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f00fd1cbc feat: allow using object_store as opendal's backend (#6283)
f00fd1cbc is described below

commit f00fd1cbc9bb4b0d26f714fb4dae8667e036375c
Author: flaneur <[email protected]>
AuthorDate: Tue Aug 26 17:43:57 2025 +0800

    feat: allow using object_store as opendal's backend (#6283)
    
    * add object-store as service backend
    
    * add object store as features
    
    * add scaffold for impl Access
    
    * add parse_error
    
    * add read()
    
    * add reader
    
    * factor out reader.rs
    
    * add writer
    
    * move the code from core/ to integration/object_store
    
    * move backend.rs to mod.rs
    
    * add deleter.rs
    
    * add deleter
    
    * add lister.rs
    
    * add test case
    
    * add test cases
    
    * fix apache header
    
    * cargo fmt
    
    * fix clippy
    
    * directly use async fn
    
    * use async fn in read
    
    * revert unused cargo service
    
    * rename backend/ as service/
    
    * do the io in read()
    
    * fix the build error on bytes_stream not able to Sync
    
    * use a bytes_deque for Buffer
    
    * fix clippy
    
    * add scaffold for having MultipartWrite
    
    * fix the build
    
    * add initiate_part
    
    * add write_part
    
    * save the changes about multi part write
    
    * implement MultipartWriter
    
    * tune
    
    * fix the etag handling
    
    * validate the complete_part
    
    * add test case for it
    
    * revise the test
    
    * fix test case about multipart upload
    
    * minor rename
    
    * fix clippy
    
    * revert core/Cargo.toml
    
    * revert cargo.toml
    
    * fix set_err to track the cause
    
    * fix build
    
    * fix oio::Read
    
    * extract a format_metadata
    
    * implement BatchDelete
    
    * do not allow delete with version
    
    * refactor the multipart upload
    
    * clean up unused code
    
    * refactor: import object_store::path::Path as ObjectStorePath for better 
readability
    
    * refactor: move OpXxx to GetOptions conversion functions to core.rs with 
consistent naming
    
    * handle op_stat by get_opts
    
    * explicitly impl Sync to replace Mutex
    
    * cargo fmt
---
 integrations/object_store/examples/basic.rs      |   4 +-
 integrations/object_store/src/lib.rs             |   4 +
 integrations/object_store/src/service/core.rs    | 163 ++++++++++
 integrations/object_store/src/service/deleter.rs |  69 ++++
 integrations/object_store/src/service/error.rs   |  54 ++++
 integrations/object_store/src/service/lister.rs  |  75 +++++
 integrations/object_store/src/service/mod.rs     | 388 +++++++++++++++++++++++
 integrations/object_store/src/service/reader.rs  |  82 +++++
 integrations/object_store/src/service/writer.rs  | 204 ++++++++++++
 9 files changed, 1041 insertions(+), 2 deletions(-)

diff --git a/integrations/object_store/examples/basic.rs 
b/integrations/object_store/examples/basic.rs
index eaf4a18e5..1afbc9c74 100644
--- a/integrations/object_store/examples/basic.rs
+++ b/integrations/object_store/examples/basic.rs
@@ -1,7 +1,7 @@
 use bytes::Bytes;
 #[cfg(feature = "services-s3")]
 use object_store::aws::AmazonS3Builder;
-use object_store::path::Path;
+use object_store::path::Path as ObjectStorePath;
 use object_store::ObjectStore;
 use object_store_opendal::OpendalStore;
 
@@ -16,7 +16,7 @@ async fn main() {
         .with_secret_access_key("my_secret_access_key");
     let s3_store = OpendalStore::new_amazon_s3(builder).unwrap();
 
-    let path = Path::from("data/nested/test.txt");
+    let path = ObjectStorePath::from("data/nested/test.txt");
     let bytes = Bytes::from_static(b"hello, world! I am nested.");
 
     s3_store.put(&path, bytes.clone().into()).await.unwrap();
diff --git a/integrations/object_store/src/lib.rs 
b/integrations/object_store/src/lib.rs
index 60a7126cd..9671bedd7 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/lib.rs
@@ -68,6 +68,10 @@ mod utils;
 #[cfg(feature = "services-s3")]
 mod amazon_s3;
 
+mod service;
+
+pub use service::{ObjectStoreBuilder, ObjectStoreService};
+
 // Make sure `send_wrapper` works as expected
 #[cfg(all(feature = "send_wrapper", test))]
 mod assert_send {
diff --git a/integrations/object_store/src/service/core.rs 
b/integrations/object_store/src/service/core.rs
new file mode 100644
index 000000000..bfcf61d66
--- /dev/null
+++ b/integrations/object_store/src/service/core.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+
+use object_store::{
+    Attribute, AttributeValue, GetOptions, GetRange, ObjectMeta, PutOptions, 
PutResult,
+};
+use opendal::raw::*;
+use opendal::*;
+
+/// Parse OpStat arguments to object_store GetOptions for head requests
+pub fn parse_op_stat(args: &OpStat) -> Result<GetOptions> {
+    let mut options = GetOptions {
+        head: true, // This is a head request
+        ..Default::default()
+    };
+
+    if let Some(version) = args.version() {
+        options.version = Some(version.to_string());
+    }
+
+    if let Some(if_match) = args.if_match() {
+        options.if_match = Some(if_match.to_string());
+    }
+
+    if let Some(if_none_match) = args.if_none_match() {
+        options.if_none_match = Some(if_none_match.to_string());
+    }
+
+    if let Some(if_modified_since) = args.if_modified_since() {
+        options.if_modified_since = Some(if_modified_since);
+    }
+
+    if let Some(if_unmodified_since) = args.if_unmodified_since() {
+        options.if_unmodified_since = Some(if_unmodified_since);
+    }
+
+    Ok(options)
+}
+
+/// Parse OpRead arguments to object_store GetOptions
+pub fn parse_op_read(args: &OpRead) -> Result<GetOptions> {
+    let mut options = GetOptions::default();
+
+    if let Some(version) = args.version() {
+        options.version = Some(version.to_string());
+    }
+
+    if let Some(if_match) = args.if_match() {
+        options.if_match = Some(if_match.to_string());
+    }
+
+    if let Some(if_none_match) = args.if_none_match() {
+        options.if_none_match = Some(if_none_match.to_string());
+    }
+
+    if let Some(if_modified_since) = args.if_modified_since() {
+        options.if_modified_since = Some(if_modified_since);
+    }
+
+    if let Some(if_unmodified_since) = args.if_unmodified_since() {
+        options.if_unmodified_since = Some(if_unmodified_since);
+    }
+
+    if !args.range().is_full() {
+        let range = args.range();
+        match range.size() {
+            Some(size) => {
+                options.range = 
Some(GetRange::Bounded(range.offset()..range.offset() + size));
+            }
+            None => {
+                options.range = Some(GetRange::Offset(range.offset()));
+            }
+        }
+    }
+
+    Ok(options)
+}
+
+/// Parse OpWrite arguments to object_store PutOptions
+pub fn parse_op_write(args: &OpWrite) -> Result<PutOptions> {
+    let mut opts = PutOptions::default();
+
+    if let Some(content_type) = args.content_type() {
+        opts.attributes.insert(
+            Attribute::ContentType,
+            AttributeValue::from(content_type.to_string()),
+        );
+    }
+
+    if let Some(content_disposition) = args.content_disposition() {
+        opts.attributes.insert(
+            Attribute::ContentDisposition,
+            AttributeValue::from(content_disposition.to_string()),
+        );
+    }
+
+    if let Some(cache_control) = args.cache_control() {
+        opts.attributes.insert(
+            Attribute::CacheControl,
+            AttributeValue::from(cache_control.to_string()),
+        );
+    }
+
+    if let Some(user_metadata) = args.user_metadata() {
+        for (key, value) in user_metadata {
+            opts.attributes.insert(
+                Attribute::Metadata(Cow::from(key.to_string())),
+                AttributeValue::from(value.to_string()),
+            );
+        }
+    }
+    Ok(opts)
+}
+
+/// Convert PutOptions to PutMultipartOptions
+pub fn format_put_multipart_options(opts: PutOptions) -> 
object_store::PutMultipartOptions {
+    object_store::PutMultipartOptions {
+        attributes: opts.attributes,
+        ..Default::default()
+    }
+}
+
+/// Format PutResult to OpenDAL Metadata
+pub fn format_put_result(result: PutResult) -> Metadata {
+    let mut metadata = Metadata::new(EntryMode::FILE);
+    if let Some(etag) = &result.e_tag {
+        metadata.set_etag(etag);
+    }
+    if let Some(version) = &result.version {
+        metadata.set_version(version);
+    }
+    metadata
+}
+
+/// Format `object_store::ObjectMeta` to `opendal::Metadata`.
+pub fn format_metadata(meta: &ObjectMeta) -> Metadata {
+    let mut metadata = Metadata::new(EntryMode::FILE);
+    metadata.set_content_length(meta.size);
+    metadata.set_last_modified(meta.last_modified);
+    if let Some(etag) = &meta.e_tag {
+        metadata.set_etag(etag);
+    }
+    if let Some(version) = &meta.version {
+        metadata.set_version(version);
+    }
+    metadata
+}
diff --git a/integrations/object_store/src/service/deleter.rs 
b/integrations/object_store/src/service/deleter.rs
new file mode 100644
index 000000000..b4db4b2da
--- /dev/null
+++ b/integrations/object_store/src/service/deleter.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::stream::{self, StreamExt};
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+use opendal::raw::oio::BatchDeleteResult;
+use opendal::raw::*;
+use opendal::*;
+
+use super::error::parse_error;
+
+pub struct ObjectStoreDeleter {
+    store: Arc<dyn ObjectStore + 'static>,
+}
+
+impl ObjectStoreDeleter {
+    pub(crate) fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
+        Self { store }
+    }
+}
+
+impl oio::BatchDelete for ObjectStoreDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let object_path = ObjectStorePath::from(path);
+        self.store.delete(&object_path).await.map_err(parse_error)
+    }
+
+    async fn delete_batch(&self, paths: Vec<(String, OpDelete)>) -> 
Result<BatchDeleteResult> {
+        // convert paths to stream, then use [`ObjectStore::delete_stream`] to 
delete them in batch
+        let stream = stream::iter(paths.iter())
+            .map(|(path, _)| Ok::<_, 
object_store::Error>(ObjectStorePath::from(path.as_str())))
+            .boxed();
+        let results = 
self.store.delete_stream(stream).collect::<Vec<_>>().await;
+
+        // convert the results to [`BatchDeleteResult`]
+        let mut result_batch = BatchDeleteResult::default();
+        for (idx, result) in results.into_iter().enumerate() {
+            match result {
+                Ok(_) => result_batch
+                    .succeeded
+                    .push((paths[idx].0.clone(), paths[idx].1.clone())),
+                Err(e) => result_batch.failed.push((
+                    paths[idx].0.clone(),
+                    paths[idx].1.clone(),
+                    parse_error(e),
+                )),
+            }
+        }
+
+        Ok(result_batch)
+    }
+}
diff --git a/integrations/object_store/src/service/error.rs 
b/integrations/object_store/src/service/error.rs
new file mode 100644
index 000000000..f8bea1c98
--- /dev/null
+++ b/integrations/object_store/src/service/error.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::Error;
+use opendal::ErrorKind;
+
+pub(crate) fn parse_error(err: object_store::Error) -> Error {
+    match err {
+        object_store::Error::NotFound { .. } => {
+            Error::new(ErrorKind::NotFound, "path not found").set_source(err)
+        }
+
+        object_store::Error::AlreadyExists { .. } => {
+            Error::new(ErrorKind::AlreadyExists, "path already 
exists").set_source(err)
+        }
+
+        object_store::Error::PermissionDenied { .. }
+        | object_store::Error::Unauthenticated { .. } => {
+            Error::new(ErrorKind::PermissionDenied, "permission 
denied").set_source(err)
+        }
+
+        object_store::Error::InvalidPath { .. } => {
+            Error::new(ErrorKind::NotFound, "invalid path").set_source(err)
+        }
+
+        object_store::Error::NotSupported { .. } => {
+            Error::new(ErrorKind::Unsupported, "operation not 
supported").set_source(err)
+        }
+
+        object_store::Error::Precondition { .. } => {
+            Error::new(ErrorKind::ConditionNotMatch, "precondition not 
met").set_source(err)
+        }
+
+        object_store::Error::Generic { store, .. } => {
+            Error::new(ErrorKind::Unexpected, format!("{store} operation 
failed")).set_source(err)
+        }
+
+        _ => Error::new(ErrorKind::Unexpected, "unknown 
error").set_source(err),
+    }
+}
diff --git a/integrations/object_store/src/service/lister.rs 
b/integrations/object_store/src/service/lister.rs
new file mode 100644
index 000000000..7585174ba
--- /dev/null
+++ b/integrations/object_store/src/service/lister.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use futures::{stream::BoxStream, StreamExt};
+use object_store::path::Path as ObjectStorePath;
+use object_store::{ObjectMeta, ObjectStore};
+use opendal::raw::*;
+use opendal::*;
+
+use super::error::parse_error;
+use crate::service::core::format_metadata;
+
+pub struct ObjectStoreLister {
+    stream: BoxStream<'static, object_store::Result<ObjectMeta>>,
+}
+
+impl ObjectStoreLister {
+    pub(crate) async fn new(
+        store: Arc<dyn ObjectStore + 'static>,
+        path: &str,
+        args: OpList,
+    ) -> Result<Self> {
+        // If start_after is specified, use list_with_offset
+        let mut stream = if let Some(start_after) = args.start_after() {
+            store
+                .list_with_offset(
+                    Some(&ObjectStorePath::from(path)),
+                    &ObjectStorePath::from(start_after),
+                )
+                .boxed()
+        } else {
+            store.list(Some(&ObjectStorePath::from(path))).boxed()
+        };
+
+        // Process listing arguments
+        if let Some(limit) = args.limit() {
+            stream = stream.take(limit).boxed();
+        }
+
+        Ok(Self { stream })
+    }
+}
+
+// ObjectStoreLister is safe to share between threads, because it only has 
&mut self API calls.
+unsafe impl Sync for ObjectStoreLister {}
+
+impl oio::List for ObjectStoreLister {
+    async fn next(&mut self) -> Result<Option<oio::Entry>> {
+        match self.stream.next().await {
+            Some(Ok(meta)) => {
+                let metadata = format_metadata(&meta);
+                let entry = oio::Entry::new(meta.location.as_ref(), 
metadata.clone());
+                Ok(Some(entry))
+            }
+            Some(Err(e)) => Err(parse_error(e)),
+            None => Ok(None),
+        }
+    }
+}
diff --git a/integrations/object_store/src/service/mod.rs 
b/integrations/object_store/src/service/mod.rs
new file mode 100644
index 000000000..068d4ad4c
--- /dev/null
+++ b/integrations/object_store/src/service/mod.rs
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+use opendal::raw::oio::BatchDeleter;
+use opendal::raw::oio::MultipartWriter;
+use opendal::raw::*;
+use opendal::Error;
+use opendal::ErrorKind;
+use opendal::*;
+
+mod core;
+mod deleter;
+mod error;
+mod lister;
+mod reader;
+mod writer;
+
+use deleter::ObjectStoreDeleter;
+use error::parse_error;
+use lister::ObjectStoreLister;
+use reader::ObjectStoreReader;
+use writer::ObjectStoreWriter;
+
+use crate::service::core::format_metadata as parse_metadata;
+use crate::service::core::parse_op_stat;
+
+/// ObjectStore backend builder
+#[derive(Default)]
+pub struct ObjectStoreBuilder {
+    store: Option<Arc<dyn ObjectStore + 'static>>,
+}
+
+impl Debug for ObjectStoreBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("ObjectStoreBuilder");
+        d.finish_non_exhaustive()
+    }
+}
+
+impl ObjectStoreBuilder {
+    /// Set the object store instance
+    pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
+        Self { store: Some(store) }
+    }
+}
+
+impl Builder for ObjectStoreBuilder {
+    type Config = ();
+
+    fn build(self) -> Result<impl Access> {
+        let store = self.store.ok_or_else(|| {
+            Error::new(ErrorKind::ConfigInvalid, "object store is required")
+                .with_context("service", Scheme::Custom("object_store"))
+        })?;
+
+        Ok(ObjectStoreService { store })
+    }
+}
+
+/// ObjectStore backend
+pub struct ObjectStoreService {
+    store: Arc<dyn ObjectStore + 'static>,
+}
+
+impl Debug for ObjectStoreService {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("ObjectStoreBackend");
+        d.finish_non_exhaustive()
+    }
+}
+
+impl Access for ObjectStoreService {
+    type Reader = ObjectStoreReader;
+    type Writer = MultipartWriter<ObjectStoreWriter>;
+    type Lister = ObjectStoreLister;
+    type Deleter = BatchDeleter<ObjectStoreDeleter>;
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        let info = AccessorInfo::default();
+        info.set_scheme("object_store")
+            .set_root("/")
+            .set_name("object_store")
+            .set_native_capability(Capability {
+                stat: true,
+                stat_with_if_match: true,
+                stat_with_if_unmodified_since: true,
+                read: true,
+                write: true,
+                delete: true,
+                list: true,
+                list_with_limit: true,
+                list_with_start_after: true,
+                delete_with_version: false,
+                ..Default::default()
+            });
+        Arc::new(info)
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        let path = ObjectStorePath::from(path);
+        let opts = parse_op_stat(&args)?;
+        let result = self
+            .store
+            .get_opts(&path, opts)
+            .await
+            .map_err(parse_error)?;
+        let metadata = parse_metadata(&result.meta);
+        Ok(RpStat::new(metadata))
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let reader = ObjectStoreReader::new(self.store.clone(), path, 
args).await?;
+        Ok((reader.rp(), reader))
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
+        Ok((
+            RpWrite::default(),
+            MultipartWriter::new(self.info(), writer, 10),
+        ))
+    }
+
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        let deleter = 
BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
+        Ok((RpDelete::default(), deleter))
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let lister = ObjectStoreLister::new(self.store.clone(), path, 
args).await?;
+        Ok((RpList::default(), lister))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use object_store::memory::InMemory;
+    use opendal::raw::oio::{Delete, List, Read, Write};
+    use opendal::Buffer;
+
+    #[tokio::test]
+    async fn test_object_store_backend_builder() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let builder = ObjectStoreBuilder::new(store);
+
+        let backend = builder.build().expect("build should succeed");
+        assert!(backend.info().scheme() == "object_store");
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_info() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store)
+            .build()
+            .expect("build should succeed");
+
+        let info = backend.info();
+        assert_eq!(info.scheme(), "object_store");
+        assert_eq!(info.name(), "object_store".into());
+        assert_eq!(info.root(), "/".into());
+
+        let cap = info.native_capability();
+        assert!(cap.stat);
+        assert!(cap.read);
+        assert!(cap.write);
+        assert!(cap.delete);
+        assert!(cap.list);
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_basic_operations() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store.clone())
+            .build()
+            .expect("build should succeed");
+
+        let path = "test_file.txt";
+        let content = b"Hello, world!";
+
+        // Test write
+        let (_, mut writer) = backend
+            .write(path, OpWrite::default())
+            .await
+            .expect("write should succeed");
+
+        writer
+            .write(Buffer::from(&content[..]))
+            .await
+            .expect("write content should succeed");
+        writer.close().await.expect("close should succeed");
+
+        // Test stat
+        let stat_result = backend
+            .stat(path, OpStat::default())
+            .await
+            .expect("stat should succeed");
+
+        assert_eq!(
+            stat_result.into_metadata().content_length(),
+            content.len() as u64
+        );
+
+        // Test read
+        let (_, mut reader) = backend
+            .read(path, OpRead::default())
+            .await
+            .expect("read should succeed");
+
+        let buf = reader.read().await.expect("read should succeed");
+        assert_eq!(buf.to_vec(), content);
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_multipart_upload() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store.clone())
+            .build()
+            .expect("build should succeed");
+
+        let path = "test_file.txt";
+        let content =
+            b"Hello, multipart upload! This is a test content for multipart 
upload functionality.";
+        let content_len = content.len();
+
+        // Test multipart upload with multiple chunks
+        let (_, mut writer) = backend
+            .write(path, OpWrite::default())
+            .await
+            .expect("write should succeed");
+
+        // Write content in chunks to simulate multipart upload
+        let chunk_size = 20;
+        for chunk in content.chunks(chunk_size) {
+            writer
+                .write(Buffer::from(chunk))
+                .await
+                .expect("write chunk should succeed");
+        }
+
+        writer.close().await.expect("close should succeed");
+
+        // Verify the uploaded file
+        let stat_result = backend
+            .stat(path, OpStat::default())
+            .await
+            .expect("stat should succeed");
+
+        assert_eq!(
+            stat_result.into_metadata().content_length(),
+            content_len as u64
+        );
+
+        // Read back and verify content
+        let (_, mut reader) = backend
+            .read(path, OpRead::default())
+            .await
+            .expect("read should succeed");
+
+        let buf = reader.read().await.expect("read should succeed");
+        assert_eq!(buf.to_vec(), content);
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_list() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store.clone())
+            .build()
+            .expect("build should succeed");
+
+        // Create multiple files
+        let files = vec![
+            ("dir1/file1.txt", b"content1"),
+            ("dir1/file2.txt", b"content2"),
+            ("dir2/file3.txt", b"content3"),
+        ];
+
+        for (path, content) in &files {
+            let (_, mut writer) = backend
+                .write(path, OpWrite::default())
+                .await
+                .expect("write should succeed");
+            writer
+                .write(Buffer::from(&content[..]))
+                .await
+                .expect("write content should succeed");
+            writer.close().await.expect("close should succeed");
+        }
+
+        // List directory
+        let (_, mut lister) = backend
+            .list("dir1/", OpList::default())
+            .await
+            .expect("list should succeed");
+
+        let mut entries = Vec::new();
+        while let Some(entry) = lister.next().await.expect("next should 
succeed") {
+            entries.push(entry);
+        }
+
+        assert_eq!(entries.len(), 2);
+        assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
+        assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_delete() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store)
+            .build()
+            .expect("build should succeed");
+
+        let path = "test_delete.txt";
+        let content = b"To be deleted";
+
+        // Write file
+        let (_, mut writer) = backend
+            .write(path, OpWrite::default())
+            .await
+            .expect("write should succeed");
+        writer
+            .write(Buffer::from(&content[..]))
+            .await
+            .expect("write content should succeed");
+        writer.close().await.expect("close should succeed");
+
+        // Verify file exists
+        backend
+            .stat(path, OpStat::default())
+            .await
+            .expect("file should exist");
+
+        // Delete file
+        let (_, mut deleter) = backend.delete().await.expect("delete should 
succeed");
+        deleter
+            .delete(path, OpDelete::default())
+            .expect("delete should succeed");
+        deleter.flush().await.expect("flush should succeed");
+
+        // Verify file is deleted
+        let result = backend.stat(path, OpStat::default()).await;
+        assert!(result.is_err());
+    }
+
+    #[tokio::test]
+    async fn test_object_store_backend_error_handling() {
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        let backend = ObjectStoreBuilder::new(store)
+            .build()
+            .expect("build should succeed");
+
+        // Test stat on non-existent file
+        let result = backend.stat("non_existent.txt", OpStat::default()).await;
+        assert!(result.is_err());
+
+        // Test read on non-existent file
+        let result = backend.read("non_existent.txt", OpRead::default()).await;
+        assert!(result.is_err());
+
+        // Test list on non-existent directory
+        let result = backend.list("non_existent_dir/", 
OpList::default()).await;
+        // This should succeed but return empty results
+        if let Ok((_, mut lister)) = result {
+            let entry = lister.next().await.expect("next should succeed");
+            assert!(entry.is_none());
+        }
+    }
+}
diff --git a/integrations/object_store/src/service/reader.rs 
b/integrations/object_store/src/service/reader.rs
new file mode 100644
index 000000000..ca5623127
--- /dev/null
+++ b/integrations/object_store/src/service/reader.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::TryStreamExt;
+use object_store::path::Path as ObjectStorePath;
+use object_store::ObjectStore;
+
+use opendal::raw::*;
+use opendal::*;
+
+use super::core::parse_op_read;
+use super::error::parse_error;
+
+/// ObjectStore reader
+pub struct ObjectStoreReader {
+    bytes_stream: BoxStream<'static, object_store::Result<Bytes>>,
+    meta: object_store::ObjectMeta,
+    args: OpRead,
+}
+
+impl ObjectStoreReader {
+    pub(crate) async fn new(
+        store: Arc<dyn ObjectStore + 'static>,
+        path: &str,
+        args: OpRead,
+    ) -> Result<Self> {
+        let path = ObjectStorePath::from(path);
+        let opts = parse_op_read(&args)?;
+        let result = store.get_opts(&path, opts).await.map_err(parse_error)?;
+        let meta = result.meta.clone();
+        let bytes_stream = result.into_stream();
+        Ok(Self {
+            bytes_stream,
+            meta,
+            args,
+        })
+    }
+
+    pub(crate) fn rp(&self) -> RpRead {
+        let mut rp = RpRead::new().with_size(Some(self.meta.size));
+        if !self.args.range().is_full() {
+            let range = self.args.range();
+            let size = match range.size() {
+                Some(size) => size,
+                None => self.meta.size,
+            };
+            rp = rp.with_range(Some(
+                BytesContentRange::default().with_range(range.offset(), 
range.offset() + size - 1),
+            ));
+        }
+        rp
+    }
+}
+
+// ObjectStoreReader is safe to share between threads, because the `read()` 
method requires
+// `&mut self` and `rp()` is stateless.
+unsafe impl Sync for ObjectStoreReader {}
+
+impl oio::Read for ObjectStoreReader {
+    async fn read(&mut self) -> Result<Buffer> {
+        let bs = self.bytes_stream.try_next().await.map_err(parse_error)?;
+        Ok(bs.map(Buffer::from).unwrap_or_default())
+    }
+}
diff --git a/integrations/object_store/src/service/writer.rs 
b/integrations/object_store/src/service/writer.rs
new file mode 100644
index 000000000..dad207082
--- /dev/null
+++ b/integrations/object_store/src/service/writer.rs
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use object_store::path::Path as ObjectStorePath;
+use object_store::MultipartUpload;
+use object_store::ObjectStore;
+use object_store::PutPayload;
+use object_store::{Attribute, AttributeValue};
+
+use opendal::raw::oio::MultipartPart;
+use opendal::raw::*;
+use opendal::*;
+use tokio::sync::Mutex;
+
+use super::core::{format_put_multipart_options, format_put_result, 
parse_op_write};
+use super::error::parse_error;
+
+pub struct ObjectStoreWriter {
+    store: Arc<dyn ObjectStore + 'static>,
+    path: ObjectStorePath,
+    args: OpWrite,
+    upload: Mutex<Option<Box<dyn MultipartUpload>>>,
+}
+
+impl ObjectStoreWriter {
+    pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args: 
OpWrite) -> Self {
+        Self {
+            store,
+            path: ObjectStorePath::from(path),
+            args,
+            upload: Mutex::new(None),
+        }
+    }
+}
+
+impl oio::MultipartWrite for ObjectStoreWriter {
+    /// Write the entire object in one go.
+    /// Used when the object is small enough to bypass multipart upload.
+    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
+        // Validate that actual body size matches expected size
+        let actual_size = body.len() as u64;
+        if actual_size != size {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("Expected size {size} but got {actual_size}"),
+            ));
+        }
+
+        let bytes = body.to_bytes();
+        let payload = PutPayload::from(bytes);
+        let mut opts = parse_op_write(&self.args)?;
+
+        // Add size metadata for tracking
+        opts.attributes.insert(
+            Attribute::Metadata("content-size".into()),
+            AttributeValue::from(size.to_string()),
+        );
+
+        let result = self
+            .store
+            .put_opts(&self.path, payload, opts)
+            .await
+            .map_err(parse_error)?;
+
+        // Build metadata from put result
+        let mut metadata = Metadata::new(EntryMode::FILE);
+        if let Some(etag) = &result.e_tag {
+            metadata.set_etag(etag);
+        }
+        if let Some(version) = &result.version {
+            metadata.set_version(version);
+        }
+
+        Ok(metadata)
+    }
+
+    // Generate a unique upload ID that we'll use to track this session
+    async fn initiate_part(&self) -> Result<String> {
+        // Start a new multipart upload using object_store
+        let opts = parse_op_write(&self.args)?;
+        let multipart_opts = format_put_multipart_options(opts);
+        let upload = self
+            .store
+            .put_multipart_opts(&self.path, multipart_opts)
+            .await
+            .map_err(parse_error)?;
+
+        // Store the multipart upload for later use
+        let mut guard = self.upload.lock().await;
+        if guard.is_some() {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Upload already initiated, abort the previous upload first",
+            ));
+        }
+        *guard = Some(upload);
+
+        // object_store does not provide a way to get the upload id, so we use 
a fixed string
+        // as the upload id. it's ok because the upload id is already tracked 
inside the upload
+        // object.
+        Ok("".to_string())
+    }
+
+    /// Upload a single part of the multipart upload.
+    /// Part numbers must be sequential starting from 1.
+    /// Returns the ETag and part information for this uploaded part.
+    async fn write_part(
+        &self,
+        _upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: Buffer,
+    ) -> Result<MultipartPart> {
+        // Validate that actual body size matches expected size
+        let actual_size = body.len() as u64;
+        if actual_size != size {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("Expected size {size} but got {actual_size}"),
+            ));
+        }
+
+        // Convert Buffer to PutPayload
+        let bytes = body.to_bytes();
+
+        // Return empty string as ETag since it's not used by object_store
+        let etag = String::new();
+
+        let payload = PutPayload::from(bytes);
+
+        // Upload the part
+        let mut guard = self.upload.lock().await;
+        let upload = guard
+            .as_mut()
+            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not 
initiated"))?;
+        upload.put_part(payload).await.map_err(parse_error)?;
+
+        // Create MultipartPart with the proper ETag
+        let multipart_part = MultipartPart {
+            part_number,
+            etag,
+            checksum: None, // No checksum for now
+        };
+        Ok(multipart_part)
+    }
+
+    async fn complete_part(
+        &self,
+        _upload_id: &str,
+        parts: &[oio::MultipartPart],
+    ) -> Result<Metadata> {
+        // Validate that we have parts to complete
+        if parts.is_empty() {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot complete multipart upload with no parts",
+            ));
+        }
+
+        // Get the multipart upload for this upload_id
+        let mut guard = self.upload.lock().await;
+        let upload = guard
+            .as_mut()
+            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not 
initiated"))?;
+
+        // Complete the multipart upload
+        let result = upload.complete().await.map_err(parse_error)?;
+        *guard = None;
+
+        // Build metadata from the result
+        let metadata = format_put_result(result);
+        Ok(metadata)
+    }
+
+    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
+        // Get the multipart upload for this upload_id
+        let mut guard = self.upload.lock().await;
+        let upload = guard
+            .as_mut()
+            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not 
initiated"))?;
+
+        // Abort the multipart upload
+        upload.abort().await.map_err(parse_error)?;
+        *guard = None;
+
+        Ok(())
+    }
+}


Reply via email to