This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ba3fc4528 refactor(object_store): upgrade object_store to 0.7. (#3713)
ba3fc4528 is described below
commit ba3fc452852fa43912910ce99fe26c5adb10250c
Author: Yang Xiufeng <[email protected]>
AuthorDate: Tue Dec 5 15:28:54 2023 +0800
refactor(object_store): upgrade object_store to 0.7. (#3713)
* upgrade object_store to 0.7.
* impl ObjectStore:list_with_offset
---
Cargo.lock | 6 +--
integrations/object_store/Cargo.toml | 2 +-
integrations/object_store/src/lib.rs | 90 ++++++++++++++++++++++++++++++++----
3 files changed, 86 insertions(+), 12 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 6ddd4a0fb..b54e972d9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4588,16 +4588,16 @@ dependencies = [
[[package]]
name = "object_store"
-version = "0.6.1"
+version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58"
+checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4"
dependencies = [
"async-trait",
"bytes",
"chrono",
"futures",
"humantime",
- "itertools 0.10.5",
+ "itertools 0.11.0",
"parking_lot 0.12.1",
"percent-encoding",
"snafu",
diff --git a/integrations/object_store/Cargo.toml
b/integrations/object_store/Cargo.toml
index 51b1db215..99e542110 100644
--- a/integrations/object_store/Cargo.toml
+++ b/integrations/object_store/Cargo.toml
@@ -31,7 +31,7 @@ version.workspace = true
async-trait = "0.1"
bytes = "1"
futures = "0.3"
-object_store = "0.6"
+object_store = "0.7"
opendal.workspace = true
tokio = "1"
diff --git a/integrations/object_store/src/lib.rs
b/integrations/object_store/src/lib.rs
index 9ffd8277a..5d6123c1d 100644
--- a/integrations/object_store/src/lib.rs
+++ b/integrations/object_store/src/lib.rs
@@ -25,14 +25,17 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
+use futures::TryStreamExt;
use object_store::path::Path;
use object_store::GetOptions;
use object_store::GetResult;
+use object_store::GetResultPayload;
use object_store::ListResult;
use object_store::MultipartId;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::Result;
+use opendal::Entry;
use opendal::Metadata;
use opendal::Metakey;
use opendal::Operator;
@@ -88,24 +91,39 @@ impl ObjectStore for OpendalStore {
})
}
- async fn get_opts(&self, location: &Path, _: GetOptions) ->
Result<GetResult> {
- let r = self
+ async fn get_opts(&self, _location: &Path, _options: GetOptions) ->
Result<GetResult> {
+ Err(object_store::Error::NotSupported {
+ source: Box::new(opendal::Error::new(
+ opendal::ErrorKind::Unsupported,
+ "get_opts is not implemented so far",
+ )),
+ })
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let meta = self
.inner
- .reader(location.as_ref())
+ .stat(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
- Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
- }
-
- async fn get(&self, location: &Path) -> Result<GetResult> {
+ let meta = ObjectMeta {
+ location: location.clone(),
+ last_modified: meta.last_modified().unwrap_or_default(),
+ size: meta.content_length() as usize,
+ e_tag: meta.etag().map(|x| x.to_string()),
+ };
let r = self
.inner
.reader(location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
- Ok(GetResult::Stream(Box::pin(OpendalReader { inner: r })))
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(Box::pin(OpendalReader { inner:
r })),
+ range: (0..meta.size),
+ meta,
+ })
}
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
@@ -165,6 +183,37 @@ impl ObjectStore for OpendalStore {
Ok(stream.boxed())
}
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let path = prefix.map_or("".into(), |x| format!("{}/", x));
+ let offset = offset.clone();
+ let stream = if
self.inner.info().full_capability().list_with_start_after {
+ self.inner
+ .lister_with(&path)
+ .start_after(offset.as_ref())
+ .metakey(Metakey::ContentLength | Metakey::LastModified)
+ .recursive(true)
+ .await
+ .map_err(|err| format_object_store_error(err, &path))?
+ .then(try_format_object_meta)
+ .boxed()
+ } else {
+ self.inner
+ .lister_with(&path)
+ .metakey(Metakey::ContentLength | Metakey::LastModified)
+ .recursive(true)
+ .await
+ .map_err(|err| format_object_store_error(err, &path))?
+ .try_filter(move |entry| futures::future::ready(entry.path() >
offset.as_ref()))
+ .then(try_format_object_meta)
+ .boxed()
+ };
+ Ok(stream)
+ }
+
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
@@ -252,6 +301,13 @@ fn format_object_meta(path: &str, meta: &Metadata) ->
ObjectMeta {
}
}
+async fn try_format_object_meta(res: Result<Entry, opendal::Error>) ->
Result<ObjectMeta> {
+ let entry = res.map_err(|err| format_object_store_error(err, ""))?;
+ let meta = entry.metadata();
+
+ Ok(format_object_meta(entry.path(), meta))
+}
+
struct OpendalReader {
inner: Reader,
}
@@ -377,4 +433,22 @@ mod tests {
assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
}
+
+ #[tokio::test]
+ async fn test_list_with_offset() {
+ let object_store = create_test_object_store().await;
+ let path: Path = "data/".try_into().unwrap();
+ let offset: Path = "data/nested/test.txt".try_into().unwrap();
+ let result = object_store
+ .list_with_offset(Some(&path), &offset)
+ .await
+ .unwrap()
+ .collect::<Vec<_>>()
+ .await;
+ assert_eq!(result.len(), 1);
+ assert_eq!(
+ result[0].as_ref().unwrap().location.as_ref(),
+ "data/test.txt"
+ );
+ }
}