This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 34a4e73a feat(bindings/object_store): Add support for list and
list_with_delimiter (#1784)
34a4e73a is described below
commit 34a4e73a601b8e89c0643d0b77281c90a59b4a5c
Author: Sen Na <[email protected]>
AuthorDate: Tue Mar 28 15:03:20 2023 +0800
feat(bindings/object_store): Add support for list and list_with_delimiter
(#1784)
* feat: Add support for list and list_with_delimiter
Also added some simple tests for list and list_with_delimiter
* refactor: change `convert_entry` to `format_object_meta`
Also let this function accept path instead of entry.
---
bindings/object_store/src/lib.rs | 123 +++++++++++++++++++++++++++++++++++++--
1 file changed, 119 insertions(+), 4 deletions(-)
diff --git a/bindings/object_store/src/lib.rs b/bindings/object_store/src/lib.rs
index 1513fa32..b9c6fb7c 100644
--- a/bindings/object_store/src/lib.rs
+++ b/bindings/object_store/src/lib.rs
@@ -27,6 +27,7 @@ use chrono::NaiveDateTime;
use chrono::Utc;
use futures::stream::BoxStream;
use futures::Stream;
+use futures::StreamExt;
use object_store::path::Path;
use object_store::GetResult;
use object_store::ListResult;
@@ -34,6 +35,8 @@ use object_store::MultipartId;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::Result;
+use opendal::Metadata;
+use opendal::Metakey;
use opendal::Operator;
use opendal::Reader;
use tokio::io::AsyncWrite;
@@ -139,12 +142,63 @@ impl ObjectStore for OpendalStore {
Ok(())
}
- async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
- todo!()
+ async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_,
Result<ObjectMeta>>> {
+ // object_store `Path` always removes trailing slash
+ // need to add it back
+ let path = prefix.map_or("".into(), |x| format!("{}/", x));
+ let stream = self
+ .inner
+ .scan(&path)
+ .await
+ .map_err(|err| format_object_store_error(err, &path))?;
+
+ let stream = stream.then(|res| async {
+ let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+ let meta = self
+ .inner
+ .metadata(&entry, Metakey::ContentLength |
Metakey::LastModified)
+ .await
+ .map_err(|err| format_object_store_error(err, entry.path()))?;
+
+ Ok(format_object_meta(entry.path(), &meta))
+ });
+
+ Ok(stream.boxed())
}
- async fn list_with_delimiter(&self, _prefix: Option<&Path>) ->
Result<ListResult> {
- todo!()
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
+ let path = prefix.map_or("".into(), |x| format!("{}/", x));
+ let mut stream = self
+ .inner
+ .list(&path)
+ .await
+ .map_err(|err| format_object_store_error(err, &path))?;
+
+ let mut common_prefixes = Vec::new();
+ let mut objects = Vec::new();
+
+ while let Some(res) = stream.next().await {
+ let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+ let meta = self
+ .inner
+ .metadata(
+ &entry,
+ Metakey::Mode | Metakey::ContentLength |
Metakey::LastModified,
+ )
+ .await
+ .map_err(|err| format_object_store_error(err, entry.path()))?;
+
+ if meta.is_dir() {
+ common_prefixes.push(entry.path().into());
+ } else {
+ objects.push(format_object_meta(entry.path(), &meta));
+ }
+ }
+
+ Ok(ListResult {
+ common_prefixes,
+ objects,
+ })
}
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
@@ -196,6 +250,22 @@ fn format_object_store_error(err: opendal::Error, path:
&str) -> object_store::E
}
}
+fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
+ let (secs, nsecs) = meta
+ .last_modified()
+ .map(|v| (v.unix_timestamp(), v.nanosecond()))
+ .unwrap_or((0, 0));
+ ObjectMeta {
+ location: path.into(),
+ last_modified: DateTime::from_utc(
+ NaiveDateTime::from_timestamp_opt(secs, nsecs)
+ .expect("returning timestamp must be valid"),
+ Utc,
+ ),
+ size: meta.content_length() as usize,
+ }
+}
+
struct OpendalReader {
inner: Reader,
}
@@ -225,6 +295,21 @@ mod tests {
use super::*;
+ async fn create_test_object_store() -> Arc<dyn ObjectStore> {
+ let op = Operator::new(services::Memory::default()).unwrap().finish();
+ let object_store = Arc::new(OpendalStore::new(op));
+
+ let path: Path = "data/test.txt".try_into().unwrap();
+ let bytes = Bytes::from_static(b"hello, world!");
+ object_store.put(&path, bytes).await.unwrap();
+
+ let path: Path = "data/nested/test.txt".try_into().unwrap();
+ let bytes = Bytes::from_static(b"hello, world! I am nested.");
+ object_store.put(&path, bytes).await.unwrap();
+
+ object_store
+ }
+
#[tokio::test]
async fn test_basic() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
@@ -240,4 +325,34 @@ mod tests {
assert_eq!(meta.size, 13)
}
+
+ #[tokio::test]
+ async fn test_list() {
+ let object_store = create_test_object_store().await;
+ let path: Path = "data/".try_into().unwrap();
+ let results = object_store
+ .list(Some(&path))
+ .await
+ .unwrap()
+ .collect::<Vec<_>>()
+ .await;
+ assert_eq!(results.len(), 2);
+ let mut locations = results
+ .iter()
+ .map(|x| x.as_ref().unwrap().location.as_ref())
+ .collect::<Vec<_>>();
+ locations.sort();
+ assert_eq!(locations, &["data/nested/test.txt", "data/test.txt"]);
+ }
+
+ #[tokio::test]
+ async fn test_list_with_delimiter() {
+ let object_store = create_test_object_store().await;
+ let path: Path = "data/".try_into().unwrap();
+ let result =
object_store.list_with_delimiter(Some(&path)).await.unwrap();
+ assert_eq!(result.objects.len(), 1);
+ assert_eq!(result.common_prefixes.len(), 1);
+ assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
+ assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
+ }
}