This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ba3fc4528 refactor(object_store): upgrade object_store to 0.7. (#3713)
ba3fc4528 is described below

commit ba3fc452852fa43912910ce99fe26c5adb10250c
Author: Yang Xiufeng <[email protected]>
AuthorDate: Tue Dec 5 15:28:54 2023 +0800

    refactor(object_store): upgrade object_store to 0.7. (#3713)
    
    * upgrade object_store to 0.7.
    
    * impl ObjectStore:list_with_offset
---
 Cargo.lock                           |  6 +--
 integrations/object_store/Cargo.toml |  2 +-
 integrations/object_store/src/lib.rs | 90 ++++++++++++++++++++++++++++++++----
 3 files changed, 86 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6ddd4a0fb..b54e972d9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4588,16 +4588,16 @@ dependencies = [
 
 [[package]]
 name = "object_store"
-version = "0.6.1"
+version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58"
+checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4"
 dependencies = [
  "async-trait",
  "bytes",
  "chrono",
  "futures",
  "humantime",
- "itertools 0.10.5",
+ "itertools 0.11.0",
  "parking_lot 0.12.1",
  "percent-encoding",
  "snafu",
diff --git a/integrations/object_store/Cargo.toml 
b/integrations/object_store/Cargo.toml
index 51b1db215..99e542110 100644
--- a/integrations/object_store/Cargo.toml
+++ b/integrations/object_store/Cargo.toml
@@ -31,7 +31,7 @@ version.workspace = true
 async-trait = "0.1"
 bytes = "1"
 futures = "0.3"
-object_store = "0.6"
+object_store = "0.7"
 opendal.workspace = true
 tokio = "1"
 
diff --git a/integrations/object_store/src/lib.rs 
b/integrations/object_store/src/lib.rs
index 9ffd8277a..5d6123c1d 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/lib.rs
@@ -25,14 +25,17 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::Stream;
 use futures::StreamExt;
+use futures::TryStreamExt;
 use object_store::path::Path;
 use object_store::GetOptions;
 use object_store::GetResult;
+use object_store::GetResultPayload;
 use object_store::ListResult;
 use object_store::MultipartId;
 use object_store::ObjectMeta;
 use object_store::ObjectStore;
 use object_store::Result;
+use opendal::Entry;
 use opendal::Metadata;
 use opendal::Metakey;
 use opendal::Operator;
@@ -88,24 +91,39 @@ impl ObjectStore for OpendalStore {
         })
     }
 
-    async fn get_opts(&self, location: &Path, _: GetOptions) -> 
Result<GetResult> {
-        let r = self
+    async fn get_opts(&self, _location: &Path, _options: GetOptions) -> 
Result<GetResult> {
+        Err(object_store::Error::NotSupported {
+            source: Box::new(opendal::Error::new(
+                opendal::ErrorKind::Unsupported,
+                "get_opts is not implemented so far",
+            )),
+        })
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let meta = self
             .inner
-            .reader(location.as_ref())
+            .stat(location.as_ref())
             .await
             .map_err(|err| format_object_store_error(err, location.as_ref()))?;
 
-        Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
-    }
-
-    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let meta = ObjectMeta {
+            location: location.clone(),
+            last_modified: meta.last_modified().unwrap_or_default(),
+            size: meta.content_length() as usize,
+            e_tag: meta.etag().map(|x| x.to_string()),
+        };
         let r = self
             .inner
             .reader(location.as_ref())
             .await
             .map_err(|err| format_object_store_error(err, location.as_ref()))?;
 
-        Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(Box::pin(OpendalReader { inner: 
r })),
+            range: (0..meta.size),
+            meta,
+        })
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
@@ -165,6 +183,37 @@ impl ObjectStore for OpendalStore {
         Ok(stream.boxed())
     }
 
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let path = prefix.map_or("".into(), |x| format!("{}/", x));
+        let offset = offset.clone();
+        let stream = if 
self.inner.info().full_capability().list_with_start_after {
+            self.inner
+                .lister_with(&path)
+                .start_after(offset.as_ref())
+                .metakey(Metakey::ContentLength | Metakey::LastModified)
+                .recursive(true)
+                .await
+                .map_err(|err| format_object_store_error(err, &path))?
+                .then(try_format_object_meta)
+                .boxed()
+        } else {
+            self.inner
+                .lister_with(&path)
+                .metakey(Metakey::ContentLength | Metakey::LastModified)
+                .recursive(true)
+                .await
+                .map_err(|err| format_object_store_error(err, &path))?
+                .try_filter(move |entry| futures::future::ready(entry.path() > 
offset.as_ref()))
+                .then(try_format_object_meta)
+                .boxed()
+        };
+        Ok(stream)
+    }
+
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
         let path = prefix.map_or("".into(), |x| format!("{}/", x));
         let mut stream = self
@@ -252,6 +301,13 @@ fn format_object_meta(path: &str, meta: &Metadata) -> 
ObjectMeta {
     }
 }
 
+async fn try_format_object_meta(res: Result<Entry, opendal::Error>) -> 
Result<ObjectMeta> {
+    let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+    let meta = entry.metadata();
+
+    Ok(format_object_meta(entry.path(), meta))
+}
+
 struct OpendalReader {
     inner: Reader,
 }
@@ -377,4 +433,22 @@ mod tests {
         assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
         assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
     }
+
+    #[tokio::test]
+    async fn test_list_with_offset() {
+        let object_store = create_test_object_store().await;
+        let path: Path = "data/".try_into().unwrap();
+        let offset: Path = "data/nested/test.txt".try_into().unwrap();
+        let result = object_store
+            .list_with_offset(Some(&path), &offset)
+            .await
+            .unwrap()
+            .collect::<Vec<_>>()
+            .await;
+        assert_eq!(result.len(), 1);
+        assert_eq!(
+            result[0].as_ref().unwrap().location.as_ref(),
+            "data/test.txt"
+        );
+    }
 }

Reply via email to