This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f98ea622 feat(core): Implement RFC-3574 Concurrent Stat In List 
(#3599)
2f98ea622 is described below

commit 2f98ea62258cd0b5118e8e6e57469ef05e890f5d
Author: Morris Tai <[email protected]>
AuthorDate: Thu Nov 23 00:58:23 2023 -0500

    feat(core): Implement RFC-3574 Concurrent Stat In List (#3599)
    
    * feat: add concurrent stat in list
    
    * feat: merge main syntax minor fix
    
    * feat: modify for PR review
    
    * chore: cargo fmt
    
    * feat: fix early exit bug
    
    * chore: cargo fmt & clippy
    
    * feat: fix for PR review
    
    * chore: switch to push_back and pop_front
    
    * feature: discard the while loop with an if-else clause
    
    * chore: prune code blocks
    
    * chore: rename Task to StatTask
    
    * chore: cargo fmt
    
    * feat: fix miss changed push_back to pop_front
    
    * feat: replace listing by Option<oio::Lister>
---
 core/src/raw/ops.rs                         |  21 +++++
 core/src/types/list.rs                      | 117 ++++++++++++++++++----------
 core/src/types/operator/operator_futures.rs |  10 +++
 3 files changed, 107 insertions(+), 41 deletions(-)

diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index dbb26227b..caa894d62 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -92,6 +92,13 @@ pub struct OpList {
     /// - `Some(v)` means exist.
     /// - `None` means services doesn't have this meta.
     metakey: FlagSet<Metakey>,
+    /// The concurrent of stat operations inside list operation.
+    /// Users could use this to control the number of concurrent stat 
operation when metadata is unknown.
+    ///
+    /// - If this is set to <= 1, the list operation will be sequential.
+    /// - If this is set to > 1, the list operation will be concurrent,
+    ///   and the maximum number of concurrent operations will be determined 
by this value.
+    concurrent: usize,
 }
 
 impl Default for OpList {
@@ -102,6 +109,7 @@ impl Default for OpList {
             recursive: false,
             // By default, we want to know what's the mode of this entry.
             metakey: Metakey::Mode.into(),
+            concurrent: 1,
         }
     }
 }
@@ -162,6 +170,19 @@ impl OpList {
     pub fn metakey(&self) -> FlagSet<Metakey> {
         self.metakey
     }
+
+    /// Change the concurrent of this list operation.
+    ///
+    /// The default concurrent is 1.
+    pub fn with_concurrent(mut self, concurrent: usize) -> Self {
+        self.concurrent = concurrent;
+        self
+    }
+
+    /// Get the concurrent of list operation.
+    pub fn concurrent(&self) -> usize {
+        self.concurrent
+    }
 }
 
 /// Args for `presign` operation.
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 3611f7c1a..980f0db0e 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -15,23 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp;
+use std::collections::VecDeque;
 use std::pin::Pin;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
 use flagset::FlagSet;
-use futures::future::BoxFuture;
 use futures::FutureExt;
 use futures::Stream;
+use tokio::task::JoinHandle;
 
 use crate::raw::oio::List;
 use crate::raw::*;
 use crate::*;
 
-/// Future constructed by stating.
-type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
-
 /// Lister is designed to list entries at given path in an asynchronous
 /// manner.
 ///
@@ -41,14 +40,22 @@ type StatFuture = BoxFuture<'static, (String, 
Result<RpStat>)>;
 /// - Lister will return `None` if there is no more entries or error has been 
returned.
 pub struct Lister {
     acc: FusedAccessor,
-    lister: oio::Lister,
+    lister: Option<oio::Lister>,
     /// required_metakey is the metakey required by users.
     required_metakey: FlagSet<Metakey>,
 
-    stating: Option<StatFuture>,
+    /// task_queue is used to store tasks that are run in concurrent.
+    task_queue: VecDeque<StatTask>,
     errored: bool,
 }
 
+enum StatTask {
+    /// Handle is used to store the join handle of spawned task.
+    Handle(JoinHandle<(String, Result<RpStat>)>),
+    /// KnownEntry is used to store the entry that already contains the 
required metakey.
+    KnownEntry(Box<Option<(String, Metadata)>>),
+}
+
 /// # Safety
 ///
 /// Lister will only be accessed by `&mut Self`
@@ -58,14 +65,16 @@ impl Lister {
     /// Create a new lister.
     pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList) 
-> Result<Self> {
         let required_metakey = args.metakey();
+        let concurrent = cmp::max(1, args.concurrent());
+
         let (_, lister) = acc.list(path, args).await?;
 
         Ok(Self {
             acc,
-            lister,
+            lister: Some(lister),
             required_metakey,
 
-            stating: None,
+            task_queue: VecDeque::with_capacity(concurrent),
             errored: false,
         })
     }
@@ -80,44 +89,70 @@ impl Stream for Lister {
             return Poll::Ready(None);
         }
 
-        if let Some(fut) = self.stating.as_mut() {
-            let (path, rp) = ready!(fut.poll_unpin(cx));
+        let task_queue_len = self.task_queue.len();
+        let task_queue_cap = self.task_queue.capacity();
+
+        if let Some(lister) = self.lister.as_mut() {
+            if task_queue_len < task_queue_cap {
+                match lister.poll_next(cx) {
+                    Poll::Pending => {
+                        if task_queue_len == 0 {
+                            return Poll::Pending;
+                        }
+                    }
+                    Poll::Ready(Ok(Some(oe))) => {
+                        let (path, metadata) = oe.into_entry().into_parts();
+                        // TODO: we can optimize this by checking the provided 
metakey provided by services.
+                        if metadata.contains_metakey(self.required_metakey) {
+                            self.task_queue
+                                
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
+                        } else {
+                            let acc = self.acc.clone();
+                            let fut = async move {
+                                let res = acc.stat(&path, 
OpStat::default()).await;
+                                (path, res)
+                            };
+                            self.task_queue
+                                
.push_back(StatTask::Handle(tokio::spawn(fut)));
+                        }
+                    }
+                    Poll::Ready(Ok(None)) => {
+                        self.lister = None;
+                    }
+                    Poll::Ready(Err(err)) => {
+                        self.errored = true;
+                        return Poll::Ready(Some(Err(err)));
+                    }
+                };
+            }
+        }
 
-            // Make sure we will not poll this future again.
-            self.stating = None;
-            let metadata = match rp {
-                Ok(rp) => rp.into_metadata(),
-                Err(err) => {
-                    self.errored = true;
-                    return Poll::Ready(Some(Err(err)));
+        if let Some(handle) = self.task_queue.front_mut() {
+            return match handle {
+                StatTask::Handle(handle) => {
+                    let (path, rp) = 
ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;
+
+                    match rp {
+                        Ok(rp) => {
+                            self.task_queue.pop_front();
+                            let metadata = rp.into_metadata();
+                            Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+                        }
+                        Err(err) => {
+                            self.errored = true;
+                            Poll::Ready(Some(Err(err)))
+                        }
+                    }
+                }
+                StatTask::KnownEntry(entry) => {
+                    let (path, metadata) = entry.take().expect("entry must be 
valid");
+                    self.task_queue.pop_front();
+                    Poll::Ready(Some(Ok(Entry::new(path, metadata))))
                 }
             };
-
-            return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
         }
 
-        match ready!(self.lister.poll_next(cx)) {
-            Ok(Some(oe)) => {
-                let (path, metadata) = oe.into_entry().into_parts();
-                if metadata.contains_metakey(self.required_metakey) {
-                    return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
-                }
-
-                let acc = self.acc.clone();
-                let fut = async move {
-                    let res = acc.stat(&path, OpStat::default()).await;
-
-                    (path, res)
-                };
-                self.stating = Some(Box::pin(fut));
-                self.poll_next(cx)
-            }
-            Ok(None) => Poll::Ready(None),
-            Err(err) => {
-                self.errored = true;
-                Poll::Ready(Some(Err(err)))
-            }
-        }
+        Poll::Ready(None)
     }
 }
 
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index b895df19e..58dd9e74c 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -616,6 +616,16 @@ impl FutureLister {
         self.0 = self.0.map_args(|args| args.with_metakey(v));
         self
     }
+
+    /// Concurrent is used to control the number of concurrent stat requests.
+    ///
+    /// If concurrent is set to <=1, the lister will perform stat requests 
sequentially.
+    ///
+    /// The default concurrent is 1.
+    pub fn concurrent(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_concurrent(v));
+        self
+    }
 }
 
 impl Future for FutureLister {

Reply via email to