This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2f98ea622 feat(core): Implement RFC-3574 Concurrent Stat In List
(#3599)
2f98ea622 is described below
commit 2f98ea62258cd0b5118e8e6e57469ef05e890f5d
Author: Morris Tai <[email protected]>
AuthorDate: Thu Nov 23 00:58:23 2023 -0500
feat(core): Implement RFC-3574 Concurrent Stat In List (#3599)
* feat: add concurrent stat in list
* feat: merge main syntax minor fix
* feat: modify for PR review
* chore: cargo fmt
* feat: fix early exit bug
* chore: cargo fmt & clippy
* feat: fix for PR review
* chore: switch to push_back and pop_front
* feature: discard the while loop with an if-else clause
* chore: prune code blocks
* chore: rename Task to StatTask
* chore: cargo fmt
* feat: fix miss changed push_back to pop_front
* feat: replace listing by Option<oio::Lister>
---
core/src/raw/ops.rs | 21 +++++
core/src/types/list.rs | 117 ++++++++++++++++++----------
core/src/types/operator/operator_futures.rs | 10 +++
3 files changed, 107 insertions(+), 41 deletions(-)
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index dbb26227b..caa894d62 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -92,6 +92,13 @@ pub struct OpList {
/// - `Some(v)` means exist.
/// - `None` means services doesn't have this meta.
metakey: FlagSet<Metakey>,
+ /// The concurrent of stat operations inside list operation.
+ /// Users could use this to control the number of concurrent stat
operation when metadata is unknown.
+ ///
+ /// - If this is set to <= 1, the list operation will be sequential.
+ /// - If this is set to > 1, the list operation will be concurrent,
+ /// and the maximum number of concurrent operations will be determined
by this value.
+ concurrent: usize,
}
impl Default for OpList {
@@ -102,6 +109,7 @@ impl Default for OpList {
recursive: false,
// By default, we want to know what's the mode of this entry.
metakey: Metakey::Mode.into(),
+ concurrent: 1,
}
}
}
@@ -162,6 +170,19 @@ impl OpList {
pub fn metakey(&self) -> FlagSet<Metakey> {
self.metakey
}
+
+ /// Change the concurrent of this list operation.
+ ///
+ /// The default concurrent is 1.
+ pub fn with_concurrent(mut self, concurrent: usize) -> Self {
+ self.concurrent = concurrent;
+ self
+ }
+
+ /// Get the concurrent of list operation.
+ pub fn concurrent(&self) -> usize {
+ self.concurrent
+ }
}
/// Args for `presign` operation.
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 3611f7c1a..980f0db0e 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -15,23 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp;
+use std::collections::VecDeque;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use flagset::FlagSet;
-use futures::future::BoxFuture;
use futures::FutureExt;
use futures::Stream;
+use tokio::task::JoinHandle;
use crate::raw::oio::List;
use crate::raw::*;
use crate::*;
-/// Future constructed by stating.
-type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
-
/// Lister is designed to list entries at given path in an asynchronous
/// manner.
///
@@ -41,14 +40,22 @@ type StatFuture = BoxFuture<'static, (String,
Result<RpStat>)>;
/// - Lister will return `None` if there is no more entries or error has been
returned.
pub struct Lister {
acc: FusedAccessor,
- lister: oio::Lister,
+ lister: Option<oio::Lister>,
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,
- stating: Option<StatFuture>,
+ /// task_queue is used to store tasks that are run in concurrent.
+ task_queue: VecDeque<StatTask>,
errored: bool,
}
+enum StatTask {
+ /// Handle is used to store the join handle of spawned task.
+ Handle(JoinHandle<(String, Result<RpStat>)>),
+ /// KnownEntry is used to store the entry that already contains the
required metakey.
+ KnownEntry(Box<Option<(String, Metadata)>>),
+}
+
/// # Safety
///
/// Lister will only be accessed by `&mut Self`
@@ -58,14 +65,16 @@ impl Lister {
/// Create a new lister.
pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList)
-> Result<Self> {
let required_metakey = args.metakey();
+ let concurrent = cmp::max(1, args.concurrent());
+
let (_, lister) = acc.list(path, args).await?;
Ok(Self {
acc,
- lister,
+ lister: Some(lister),
required_metakey,
- stating: None,
+ task_queue: VecDeque::with_capacity(concurrent),
errored: false,
})
}
@@ -80,44 +89,70 @@ impl Stream for Lister {
return Poll::Ready(None);
}
- if let Some(fut) = self.stating.as_mut() {
- let (path, rp) = ready!(fut.poll_unpin(cx));
+ let task_queue_len = self.task_queue.len();
+ let task_queue_cap = self.task_queue.capacity();
+
+ if let Some(lister) = self.lister.as_mut() {
+ if task_queue_len < task_queue_cap {
+ match lister.poll_next(cx) {
+ Poll::Pending => {
+ if task_queue_len == 0 {
+ return Poll::Pending;
+ }
+ }
+ Poll::Ready(Ok(Some(oe))) => {
+ let (path, metadata) = oe.into_entry().into_parts();
+ // TODO: we can optimize this by checking the provided
metakey provided by services.
+ if metadata.contains_metakey(self.required_metakey) {
+ self.task_queue
+
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
+ } else {
+ let acc = self.acc.clone();
+ let fut = async move {
+ let res = acc.stat(&path,
OpStat::default()).await;
+ (path, res)
+ };
+ self.task_queue
+
.push_back(StatTask::Handle(tokio::spawn(fut)));
+ }
+ }
+ Poll::Ready(Ok(None)) => {
+ self.lister = None;
+ }
+ Poll::Ready(Err(err)) => {
+ self.errored = true;
+ return Poll::Ready(Some(Err(err)));
+ }
+ };
+ }
+ }
- // Make sure we will not poll this future again.
- self.stating = None;
- let metadata = match rp {
- Ok(rp) => rp.into_metadata(),
- Err(err) => {
- self.errored = true;
- return Poll::Ready(Some(Err(err)));
+ if let Some(handle) = self.task_queue.front_mut() {
+ return match handle {
+ StatTask::Handle(handle) => {
+ let (path, rp) =
ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;
+
+ match rp {
+ Ok(rp) => {
+ self.task_queue.pop_front();
+ let metadata = rp.into_metadata();
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ }
+ Err(err) => {
+ self.errored = true;
+ Poll::Ready(Some(Err(err)))
+ }
+ }
+ }
+ StatTask::KnownEntry(entry) => {
+ let (path, metadata) = entry.take().expect("entry must be
valid");
+ self.task_queue.pop_front();
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
}
};
-
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
}
- match ready!(self.lister.poll_next(cx)) {
- Ok(Some(oe)) => {
- let (path, metadata) = oe.into_entry().into_parts();
- if metadata.contains_metakey(self.required_metakey) {
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
- }
-
- let acc = self.acc.clone();
- let fut = async move {
- let res = acc.stat(&path, OpStat::default()).await;
-
- (path, res)
- };
- self.stating = Some(Box::pin(fut));
- self.poll_next(cx)
- }
- Ok(None) => Poll::Ready(None),
- Err(err) => {
- self.errored = true;
- Poll::Ready(Some(Err(err)))
- }
- }
+ Poll::Ready(None)
}
}
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index b895df19e..58dd9e74c 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -616,6 +616,16 @@ impl FutureLister {
self.0 = self.0.map_args(|args| args.with_metakey(v));
self
}
+
+ /// Concurrent is used to control the number of concurrent stat requests.
+ ///
+ /// If concurrent is set to <=1, the lister will perform stat requests
sequentially.
+ ///
+ /// The default concurrent is 1.
+ pub fn concurrent(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_concurrent(v));
+ self
+ }
}
impl Future for FutureLister {