This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 34a4e73a feat(bindings/object_store): Add support for list and 
list_with_delimiter (#1784)
34a4e73a is described below

commit 34a4e73a601b8e89c0643d0b77281c90a59b4a5c
Author: Sen Na <[email protected]>
AuthorDate: Tue Mar 28 15:03:20 2023 +0800

    feat(bindings/object_store): Add support for list and list_with_delimiter 
(#1784)
    
    * feat: Add support for list and list_with_delimiter
    
    Also added some simple tests for list and list_with_delimiter
    
    * refactor: change `convert_entry` to `format_object_meta`
    
    Also let this function accept path instead of entry.
---
 bindings/object_store/src/lib.rs | 123 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 119 insertions(+), 4 deletions(-)

diff --git a/bindings/object_store/src/lib.rs b/bindings/object_store/src/lib.rs
index 1513fa32..b9c6fb7c 100644
--- a/bindings/object_store/src/lib.rs
+++ b/bindings/object_store/src/lib.rs
@@ -27,6 +27,7 @@ use chrono::NaiveDateTime;
 use chrono::Utc;
 use futures::stream::BoxStream;
 use futures::Stream;
+use futures::StreamExt;
 use object_store::path::Path;
 use object_store::GetResult;
 use object_store::ListResult;
@@ -34,6 +35,8 @@ use object_store::MultipartId;
 use object_store::ObjectMeta;
 use object_store::ObjectStore;
 use object_store::Result;
+use opendal::Metadata;
+use opendal::Metakey;
 use opendal::Operator;
 use opendal::Reader;
 use tokio::io::AsyncWrite;
@@ -139,12 +142,63 @@ impl ObjectStore for OpendalStore {
         Ok(())
     }
 
-    async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
-        todo!()
+    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
+        // object_store `Path` always removes trailing slash
+        // need to add it back
+        let path = prefix.map_or("".into(), |x| format!("{}/", x));
+        let stream = self
+            .inner
+            .scan(&path)
+            .await
+            .map_err(|err| format_object_store_error(err, &path))?;
+
+        let stream = stream.then(|res| async {
+            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+            let meta = self
+                .inner
+                .metadata(&entry, Metakey::ContentLength | 
Metakey::LastModified)
+                .await
+                .map_err(|err| format_object_store_error(err, entry.path()))?;
+
+            Ok(format_object_meta(entry.path(), &meta))
+        });
+
+        Ok(stream.boxed())
     }
 
-    async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> 
Result<ListResult> {
-        todo!()
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+        let path = prefix.map_or("".into(), |x| format!("{}/", x));
+        let mut stream = self
+            .inner
+            .list(&path)
+            .await
+            .map_err(|err| format_object_store_error(err, &path))?;
+
+        let mut common_prefixes = Vec::new();
+        let mut objects = Vec::new();
+
+        while let Some(res) = stream.next().await {
+            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+            let meta = self
+                .inner
+                .metadata(
+                    &entry,
+                    Metakey::Mode | Metakey::ContentLength | 
Metakey::LastModified,
+                )
+                .await
+                .map_err(|err| format_object_store_error(err, entry.path()))?;
+
+            if meta.is_dir() {
+                common_prefixes.push(entry.path().into());
+            } else {
+                objects.push(format_object_meta(entry.path(), &meta));
+            }
+        }
+
+        Ok(ListResult {
+            common_prefixes,
+            objects,
+        })
     }
 
     async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
@@ -196,6 +250,22 @@ fn format_object_store_error(err: opendal::Error, path: 
&str) -> object_store::E
     }
 }
 
+fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
+    let (secs, nsecs) = meta
+        .last_modified()
+        .map(|v| (v.unix_timestamp(), v.nanosecond()))
+        .unwrap_or((0, 0));
+    ObjectMeta {
+        location: path.into(),
+        last_modified: DateTime::from_utc(
+            NaiveDateTime::from_timestamp_opt(secs, nsecs)
+                .expect("returning timestamp must be valid"),
+            Utc,
+        ),
+        size: meta.content_length() as usize,
+    }
+}
+
 struct OpendalReader {
     inner: Reader,
 }
@@ -225,6 +295,21 @@ mod tests {
 
     use super::*;
 
+    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
+        let op = Operator::new(services::Memory::default()).unwrap().finish();
+        let object_store = Arc::new(OpendalStore::new(op));
+
+        let path: Path = "data/test.txt".try_into().unwrap();
+        let bytes = Bytes::from_static(b"hello, world!");
+        object_store.put(&path, bytes).await.unwrap();
+
+        let path: Path = "data/nested/test.txt".try_into().unwrap();
+        let bytes = Bytes::from_static(b"hello, world! I am nested.");
+        object_store.put(&path, bytes).await.unwrap();
+
+        object_store
+    }
+
     #[tokio::test]
     async fn test_basic() {
         let op = Operator::new(services::Memory::default()).unwrap().finish();
@@ -240,4 +325,34 @@ mod tests {
 
         assert_eq!(meta.size, 13)
     }
+
+    #[tokio::test]
+    async fn test_list() {
+        let object_store = create_test_object_store().await;
+        let path: Path = "data/".try_into().unwrap();
+        let results = object_store
+            .list(Some(&path))
+            .await
+            .unwrap()
+            .collect::<Vec<_>>()
+            .await;
+        assert_eq!(results.len(), 2);
+        let mut locations = results
+            .iter()
+            .map(|x| x.as_ref().unwrap().location.as_ref())
+            .collect::<Vec<_>>();
+        locations.sort();
+        assert_eq!(locations, &["data/nested/test.txt", "data/test.txt"]);
+    }
+
+    #[tokio::test]
+    async fn test_list_with_delimiter() {
+        let object_store = create_test_object_store().await;
+        let path: Path = "data/".try_into().unwrap();
+        let result = 
object_store.list_with_delimiter(Some(&path)).await.unwrap();
+        assert_eq!(result.objects.len(), 1);
+        assert_eq!(result.common_prefixes.len(), 1);
+        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
+        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
+    }
 }

Reply via email to