This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-list
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 49b76242a16bb24b203b91d953186a0eea5ee4da
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 23 18:16:04 2023 +0800

    refactor: Polish concurrent list
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/list.rs | 101 ++++++++++++++++++++++++++++++++++---------------
 1 file changed, 70 insertions(+), 31 deletions(-)

diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 980f0db0e..7a1699db9 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -44,16 +44,58 @@ pub struct Lister {
     /// required_metakey is the metakey required by users.
     required_metakey: FlagSet<Metakey>,
 
-    /// task_queue is used to store tasks that are run in concurrent.
-    task_queue: VecDeque<StatTask>,
+    /// tasks is used to store tasks that are run in concurrent.
+    tasks: VecDeque<StatTask>,
     errored: bool,
 }
 
+/// StatTask is used to store the task that is run in concurrent.
+///
+/// # Note for clippy
+///
+/// Clippy will raise error for this enum like the following:
+///
+/// ```shell
+/// error: large size difference between variants
+///   --> core/src/types/list.rs:64:1
+///    |
+/// 64 | / enum StatTask {
+/// 65 | |     /// Handle is used to store the join handle of spawned task.
+/// 66 | |     Handle(JoinHandle<(String, Result<RpStat>)>),
+///    | |     -------------------------------------------- the second-largest 
variant contains at least 0 bytes
+/// 67 | |     /// KnownEntry is used to store the entry that already contains 
the required metakey.
+/// 68 | |     KnownEntry(Option<Entry>),
+///    | |     ------------------------- the largest variant contains at least 
264 bytes
+/// 69 | | }
+///    | |_^ the entire enum is at least 0 bytes
+///    |
+///    = help: for further information visit 
https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant
+///    = note: `-D clippy::large-enum-variant` implied by `-D warnings`
+///    = help: to override `-D warnings` add 
`#[allow(clippy::large_enum_variant)]`
+/// help: consider boxing the large fields to reduce the total size of the enum
+///    |
+/// 68 |     KnownEntry(Box<Option<Entry>>),
+///    |                ~~~~~~~~~~~~~~~~~~
+/// ```
+/// But this lint is wrong since it doesn't take the generic param JoinHandle 
into account. In fact, they have exactly
+/// the same size:
+///
+/// ```rust
+/// use std::mem::size_of;
+/// use opendal::Result;
+/// use opendal::Entry;
+///
+/// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
+/// assert_eq!(264, size_of::<Option<Entry>>());
+/// ```
+///
+/// So let's ignore this lint:
+#[allow(clippy::large_enum_variant)]
 enum StatTask {
-    /// Handle is used to store the join handle of spawned task.
-    Handle(JoinHandle<(String, Result<RpStat>)>),
-    /// KnownEntry is used to store the entry that already contains the 
required metakey.
-    KnownEntry(Box<Option<(String, Metadata)>>),
+    /// Stating is used to store the join handle of spawned task.
+    Stating(JoinHandle<(String, Result<RpStat>)>),
+    /// Known is used to store the entry that already contains the required 
metakey.
+    Known(Option<Entry>),
 }
 
 /// # Safety
@@ -74,7 +116,7 @@ impl Lister {
             lister: Some(lister),
             required_metakey,
 
-            task_queue: VecDeque::with_capacity(concurrent),
+            tasks: VecDeque::with_capacity(concurrent),
             errored: false,
         })
     }
@@ -89,31 +131,23 @@ impl Stream for Lister {
             return Poll::Ready(None);
         }
 
-        let task_queue_len = self.task_queue.len();
-        let task_queue_cap = self.task_queue.capacity();
-
-        if let Some(lister) = self.lister.as_mut() {
-            if task_queue_len < task_queue_cap {
+        // Trying to pull more tasks if there are more space.
+        if self.tasks.len() < self.tasks.capacity() {
+            if let Some(lister) = self.lister.as_mut() {
                 match lister.poll_next(cx) {
-                    Poll::Pending => {
-                        if task_queue_len == 0 {
-                            return Poll::Pending;
-                        }
-                    }
+                    Poll::Pending => {}
                     Poll::Ready(Ok(Some(oe))) => {
                         let (path, metadata) = oe.into_entry().into_parts();
-                        // TODO: we can optimize this by checking the provided 
metakey provided by services.
                         if metadata.contains_metakey(self.required_metakey) {
-                            self.task_queue
-                                
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
+                            self.tasks
+                                
.push_back(StatTask::Known(Some(Entry::new(path, metadata))));
                         } else {
                             let acc = self.acc.clone();
                             let fut = async move {
                                 let res = acc.stat(&path, 
OpStat::default()).await;
                                 (path, res)
                             };
-                            self.task_queue
-                                
.push_back(StatTask::Handle(tokio::spawn(fut)));
+                            
self.tasks.push_back(StatTask::Stating(tokio::spawn(fut)));
                         }
                     }
                     Poll::Ready(Ok(None)) => {
@@ -127,14 +161,16 @@ impl Stream for Lister {
             }
         }
 
-        if let Some(handle) = self.task_queue.front_mut() {
+        if let Some(handle) = self.tasks.front_mut() {
             return match handle {
-                StatTask::Handle(handle) => {
+                StatTask::Stating(handle) => {
                     let (path, rp) = 
ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;
 
+                    // Make sure this task has been popped after it's ready.
+                    self.tasks.pop_front();
+
                     match rp {
                         Ok(rp) => {
-                            self.task_queue.pop_front();
                             let metadata = rp.into_metadata();
                             Poll::Ready(Some(Ok(Entry::new(path, metadata))))
                         }
@@ -144,15 +180,19 @@ impl Stream for Lister {
                         }
                     }
                 }
-                StatTask::KnownEntry(entry) => {
-                    let (path, metadata) = entry.take().expect("entry must be 
valid");
-                    self.task_queue.pop_front();
-                    Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+                StatTask::Known(entry) => {
+                    let entry = entry.take().expect("entry must be valid");
+                    self.tasks.pop_front();
+                    Poll::Ready(Some(Ok(entry)))
                 }
             };
         }
 
-        Poll::Ready(None)
+        if self.lister.is_none() {
+            Poll::Ready(None)
+        } else {
+            Poll::Pending
+        }
     }
 }
 
@@ -213,7 +253,6 @@ impl Iterator for BlockingLister {
         };
 
         let (path, metadata) = entry.into_entry().into_parts();
-        // TODO: we can optimize this by checking the provided metakey 
provided by services.
         if metadata.contains_metakey(self.required_metakey) {
             return Some(Ok(Entry::new(path, metadata)));
         }

Reply via email to