This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-concurrent-tasks
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 274e75ee00aa2ff515e5d85d45702c6b59319736
Author: Xuanwo <git...@xuanwo.io>
AuthorDate: Wed Apr 2 15:07:59 2025 +0800

    refactor: Remove dead code ConcurrentFutures
    
    Signed-off-by: Xuanwo <git...@xuanwo.io>
---
 core/src/raw/futures_util.rs | 244 -------------------------------------------
 core/src/raw/mod.rs          |   1 -
 2 files changed, 245 deletions(-)

diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 3d182bf49..b4c13a15f 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -16,15 +16,9 @@
 // under the License.
 
 use std::collections::VecDeque;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::Context;
 use std::task::Poll;
 
 use futures::poll;
-use futures::stream::FuturesOrdered;
-use futures::FutureExt;
-use futures::StreamExt;
 
 use crate::*;
 
@@ -271,253 +265,15 @@ impl<I: Send + 'static, O: Send + 'static> 
ConcurrentTasks<I, O> {
     }
 }
 
-/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use
-/// [`FuturesOrdered`] or not.
-///
-/// The value of `8` is picked by random, no strict benchmark is done.
-/// Please raise an issue if you found the value is not good enough or you 
want to configure
-/// this value at runtime.
-const CONCURRENT_LARGE_THRESHOLD: usize = 8;
-
-/// ConcurrentFutures is a stream that can hold a stream of concurrent futures.
-///
-/// - the order of the futures is the same.
-/// - the number of concurrent futures is limited by concurrent.
-/// - optimized for small number of concurrent futures.
-/// - zero cost for non-concurrent futures cases (concurrent == 1).
-pub struct ConcurrentFutures<F: Future + Unpin> {
-    tasks: Tasks<F>,
-    concurrent: usize,
-}
-
-/// Tasks is used to hold the entire task queue.
-enum Tasks<F: Future + Unpin> {
-    /// The special case for concurrent == 1.
-    ///
-    /// It works exactly the same like `Option<Fut>` in a struct.
-    Once(Option<F>),
-    /// The special cases for concurrent is small.
-    ///
-    /// At this case, the cost to loop poll is lower than using 
`FuturesOrdered`.
-    ///
-    /// We will replace the future by `TaskResult::Ready` once it's ready to 
avoid consume it again.
-    Small(VecDeque<TaskResult<F>>),
-    /// The general cases for large concurrent.
-    ///
-    /// We use `FuturesOrdered` to avoid huge amount of poll on futures.
-    Large(FuturesOrdered<F>),
-}
-
-impl<F: Future + Unpin> Unpin for Tasks<F> {}
-
-enum TaskResult<F: Future + Unpin> {
-    Polling(F),
-    Ready(F::Output),
-}
-
-impl<F> ConcurrentFutures<F>
-where
-    F: Future + Unpin + 'static,
-{
-    /// Create a new ConcurrentFutures by specifying the number of concurrent 
futures.
-    pub fn new(concurrent: usize) -> Self {
-        if (0..2).contains(&concurrent) {
-            Self {
-                tasks: Tasks::Once(None),
-                concurrent,
-            }
-        } else if (2..=CONCURRENT_LARGE_THRESHOLD).contains(&concurrent) {
-            Self {
-                tasks: Tasks::Small(VecDeque::with_capacity(concurrent)),
-                concurrent,
-            }
-        } else {
-            Self {
-                tasks: Tasks::Large(FuturesOrdered::new()),
-                concurrent,
-            }
-        }
-    }
-
-    /// Drop all tasks.
-    pub fn clear(&mut self) {
-        match &mut self.tasks {
-            Tasks::Once(fut) => *fut = None,
-            Tasks::Small(tasks) => tasks.clear(),
-            Tasks::Large(tasks) => *tasks = FuturesOrdered::new(),
-        }
-    }
-
-    /// Return the length of current concurrent futures (both ongoing and 
ready).
-    pub fn len(&self) -> usize {
-        match &self.tasks {
-            Tasks::Once(fut) => fut.is_some() as usize,
-            Tasks::Small(v) => v.len(),
-            Tasks::Large(v) => v.len(),
-        }
-    }
-
-    /// Return true if there is no futures in the queue.
-    pub fn is_empty(&self) -> bool {
-        self.len() == 0
-    }
-
-    /// Return the number of remaining space to push new futures.
-    pub fn remaining(&self) -> usize {
-        self.concurrent - self.len()
-    }
-
-    /// Return true if there is remaining space to push new futures.
-    pub fn has_remaining(&self) -> bool {
-        self.remaining() > 0
-    }
-
-    /// Push new future into the end of queue.
-    pub fn push_back(&mut self, f: F) {
-        debug_assert!(
-            self.has_remaining(),
-            "concurrent futures must have remaining space"
-        );
-
-        match &mut self.tasks {
-            Tasks::Once(fut) => {
-                *fut = Some(f);
-            }
-            Tasks::Small(v) => v.push_back(TaskResult::Polling(f)),
-            Tasks::Large(v) => v.push_back(f),
-        }
-    }
-
-    /// Push new future into the start of queue, this task will be exactly the 
next to poll.
-    pub fn push_front(&mut self, f: F) {
-        debug_assert!(
-            self.has_remaining(),
-            "concurrent futures must have remaining space"
-        );
-
-        match &mut self.tasks {
-            Tasks::Once(fut) => {
-                *fut = Some(f);
-            }
-            Tasks::Small(v) => v.push_front(TaskResult::Polling(f)),
-            Tasks::Large(v) => v.push_front(f),
-        }
-    }
-}
-
-impl<F> futures::Stream for ConcurrentFutures<F>
-where
-    F: Future + Unpin + 'static,
-{
-    type Item = F::Output;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        match &mut self.get_mut().tasks {
-            Tasks::Once(fut) => match fut {
-                Some(x) => x.poll_unpin(cx).map(|v| {
-                    *fut = None;
-                    Some(v)
-                }),
-                None => Poll::Ready(None),
-            },
-            Tasks::Small(v) => {
-                // Poll all tasks together.
-                for task in v.iter_mut() {
-                    if let TaskResult::Polling(f) = task {
-                        match f.poll_unpin(cx) {
-                            Poll::Pending => {}
-                            Poll::Ready(res) => {
-                                // Replace with ready value if this future has 
been resolved.
-                                *task = TaskResult::Ready(res);
-                            }
-                        }
-                    }
-                }
-
-                // Pick the first one to check.
-                match v.front_mut() {
-                    // Return pending if the first one is still polling.
-                    Some(TaskResult::Polling(_)) => Poll::Pending,
-                    Some(TaskResult::Ready(_)) => {
-                        let res = v.pop_front().unwrap();
-                        match res {
-                            TaskResult::Polling(_) => unreachable!(),
-                            TaskResult::Ready(res) => Poll::Ready(Some(res)),
-                        }
-                    }
-                    None => Poll::Ready(None),
-                }
-            }
-            Tasks::Large(v) => v.poll_next_unpin(cx),
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
-    use std::task::ready;
     use std::time::Duration;
 
-    use futures::future::BoxFuture;
-    use futures::Stream;
     use rand::Rng;
     use tokio::time::sleep;
 
     use super::*;
 
-    struct Lister {
-        size: usize,
-        idx: usize,
-        concurrent: usize,
-        tasks: ConcurrentFutures<BoxFuture<'static, usize>>,
-    }
-
-    impl Stream for Lister {
-        type Item = usize;
-
-        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-            // Randomly sleep for a while, simulate some io operations that up 
to 100 microseconds.
-            let timeout = 
Duration::from_micros(rand::thread_rng().gen_range(0..100));
-            let idx = self.idx;
-            if self.tasks.len() < self.concurrent && self.idx < self.size {
-                let fut = async move {
-                    tokio::time::sleep(timeout).await;
-                    idx
-                };
-                self.idx += 1;
-                self.tasks.push_back(Box::pin(fut));
-            }
-
-            if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) {
-                Poll::Ready(Some(v))
-            } else {
-                Poll::Ready(None)
-            }
-        }
-    }
-
-    #[tokio::test]
-    async fn test_concurrent_futures() {
-        let cases = vec![
-            ("once", 1),
-            ("small", CONCURRENT_LARGE_THRESHOLD - 1),
-            ("large", CONCURRENT_LARGE_THRESHOLD + 1),
-        ];
-
-        for (name, concurrent) in cases {
-            let lister = Lister {
-                size: 1000,
-                idx: 0,
-                concurrent,
-                tasks: ConcurrentFutures::new(concurrent),
-            };
-            let expected: Vec<usize> = (0..1000).collect();
-            let result: Vec<usize> = lister.collect().await;
-
-            assert_eq!(expected, result, "concurrent futures failed: {}", 
name);
-        }
-    }
-
     #[tokio::test]
     async fn test_concurrent_tasks() {
         let executor = Executor::new();
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index ad8d1fe11..986f62136 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -72,7 +72,6 @@ pub use std_io_util::*;
 mod futures_util;
 pub use futures_util::BoxedFuture;
 pub use futures_util::BoxedStaticFuture;
-pub use futures_util::ConcurrentFutures;
 pub use futures_util::ConcurrentTasks;
 pub use futures_util::MaybeSend;
 

Reply via email to