This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-concurrent-tasks in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 274e75ee00aa2ff515e5d85d45702c6b59319736 Author: Xuanwo <git...@xuanwo.io> AuthorDate: Wed Apr 2 15:07:59 2025 +0800 refactor: Remove dead code ConcurrentFutures Signed-off-by: Xuanwo <git...@xuanwo.io> --- core/src/raw/futures_util.rs | 244 ------------------------------------------- core/src/raw/mod.rs | 1 - 2 files changed, 245 deletions(-) diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 3d182bf49..b4c13a15f 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -16,15 +16,9 @@ // under the License. use std::collections::VecDeque; -use std::future::Future; -use std::pin::Pin; -use std::task::Context; use std::task::Poll; use futures::poll; -use futures::stream::FuturesOrdered; -use futures::FutureExt; -use futures::StreamExt; use crate::*; @@ -271,253 +265,15 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> { } } -/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use -/// [`FuturesOrdered`] or not. -/// -/// The value of `8` is picked by random, no strict benchmark is done. -/// Please raise an issue if you found the value is not good enough or you want to configure -/// this value at runtime. -const CONCURRENT_LARGE_THRESHOLD: usize = 8; - -/// ConcurrentFutures is a stream that can hold a stream of concurrent futures. -/// -/// - the order of the futures is the same. -/// - the number of concurrent futures is limited by concurrent. -/// - optimized for small number of concurrent futures. -/// - zero cost for non-concurrent futures cases (concurrent == 1). -pub struct ConcurrentFutures<F: Future + Unpin> { - tasks: Tasks<F>, - concurrent: usize, -} - -/// Tasks is used to hold the entire task queue. -enum Tasks<F: Future + Unpin> { - /// The special case for concurrent == 1. - /// - /// It works exactly the same like `Option<Fut>` in a struct. - Once(Option<F>), - /// The special cases for concurrent is small. - /// - /// At this case, the cost to loop poll is lower than using `FuturesOrdered`. - /// - /// We will replace the future by `TaskResult::Ready` once it's ready to avoid consume it again. - Small(VecDeque<TaskResult<F>>), - /// The general cases for large concurrent. - /// - /// We use `FuturesOrdered` to avoid huge amount of poll on futures. - Large(FuturesOrdered<F>), -} - -impl<F: Future + Unpin> Unpin for Tasks<F> {} - -enum TaskResult<F: Future + Unpin> { - Polling(F), - Ready(F::Output), -} - -impl<F> ConcurrentFutures<F> -where - F: Future + Unpin + 'static, -{ - /// Create a new ConcurrentFutures by specifying the number of concurrent futures. - pub fn new(concurrent: usize) -> Self { - if (0..2).contains(&concurrent) { - Self { - tasks: Tasks::Once(None), - concurrent, - } - } else if (2..=CONCURRENT_LARGE_THRESHOLD).contains(&concurrent) { - Self { - tasks: Tasks::Small(VecDeque::with_capacity(concurrent)), - concurrent, - } - } else { - Self { - tasks: Tasks::Large(FuturesOrdered::new()), - concurrent, - } - } - } - - /// Drop all tasks. - pub fn clear(&mut self) { - match &mut self.tasks { - Tasks::Once(fut) => *fut = None, - Tasks::Small(tasks) => tasks.clear(), - Tasks::Large(tasks) => *tasks = FuturesOrdered::new(), - } - } - - /// Return the length of current concurrent futures (both ongoing and ready). - pub fn len(&self) -> usize { - match &self.tasks { - Tasks::Once(fut) => fut.is_some() as usize, - Tasks::Small(v) => v.len(), - Tasks::Large(v) => v.len(), - } - } - - /// Return true if there is no futures in the queue. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Return the number of remaining space to push new futures. - pub fn remaining(&self) -> usize { - self.concurrent - self.len() - } - - /// Return true if there is remaining space to push new futures. - pub fn has_remaining(&self) -> bool { - self.remaining() > 0 - } - - /// Push new future into the end of queue. - pub fn push_back(&mut self, f: F) { - debug_assert!( - self.has_remaining(), - "concurrent futures must have remaining space" - ); - - match &mut self.tasks { - Tasks::Once(fut) => { - *fut = Some(f); - } - Tasks::Small(v) => v.push_back(TaskResult::Polling(f)), - Tasks::Large(v) => v.push_back(f), - } - } - - /// Push new future into the start of queue, this task will be exactly the next to poll. - pub fn push_front(&mut self, f: F) { - debug_assert!( - self.has_remaining(), - "concurrent futures must have remaining space" - ); - - match &mut self.tasks { - Tasks::Once(fut) => { - *fut = Some(f); - } - Tasks::Small(v) => v.push_front(TaskResult::Polling(f)), - Tasks::Large(v) => v.push_front(f), - } - } -} - -impl<F> futures::Stream for ConcurrentFutures<F> -where - F: Future + Unpin + 'static, -{ - type Item = F::Output; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match &mut self.get_mut().tasks { - Tasks::Once(fut) => match fut { - Some(x) => x.poll_unpin(cx).map(|v| { - *fut = None; - Some(v) - }), - None => Poll::Ready(None), - }, - Tasks::Small(v) => { - // Poll all tasks together. - for task in v.iter_mut() { - if let TaskResult::Polling(f) = task { - match f.poll_unpin(cx) { - Poll::Pending => {} - Poll::Ready(res) => { - // Replace with ready value if this future has been resolved. - *task = TaskResult::Ready(res); - } - } - } - } - - // Pick the first one to check. - match v.front_mut() { - // Return pending if the first one is still polling. - Some(TaskResult::Polling(_)) => Poll::Pending, - Some(TaskResult::Ready(_)) => { - let res = v.pop_front().unwrap(); - match res { - TaskResult::Polling(_) => unreachable!(), - TaskResult::Ready(res) => Poll::Ready(Some(res)), - } - } - None => Poll::Ready(None), - } - } - Tasks::Large(v) => v.poll_next_unpin(cx), - } - } -} - #[cfg(test)] mod tests { - use std::task::ready; use std::time::Duration; - use futures::future::BoxFuture; - use futures::Stream; use rand::Rng; use tokio::time::sleep; use super::*; - struct Lister { - size: usize, - idx: usize, - concurrent: usize, - tasks: ConcurrentFutures<BoxFuture<'static, usize>>, - } - - impl Stream for Lister { - type Item = usize; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // Randomly sleep for a while, simulate some io operations that up to 100 microseconds. - let timeout = Duration::from_micros(rand::thread_rng().gen_range(0..100)); - let idx = self.idx; - if self.tasks.len() < self.concurrent && self.idx < self.size { - let fut = async move { - tokio::time::sleep(timeout).await; - idx - }; - self.idx += 1; - self.tasks.push_back(Box::pin(fut)); - } - - if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) { - Poll::Ready(Some(v)) - } else { - Poll::Ready(None) - } - } - } - - #[tokio::test] - async fn test_concurrent_futures() { - let cases = vec![ - ("once", 1), - ("small", CONCURRENT_LARGE_THRESHOLD - 1), - ("large", CONCURRENT_LARGE_THRESHOLD + 1), - ]; - - for (name, concurrent) in cases { - let lister = Lister { - size: 1000, - idx: 0, - concurrent, - tasks: ConcurrentFutures::new(concurrent), - }; - let expected: Vec<usize> = (0..1000).collect(); - let result: Vec<usize> = lister.collect().await; - - assert_eq!(expected, result, "concurrent futures failed: {}", name); - } - } - #[tokio::test] async fn test_concurrent_tasks() { let executor = Executor::new(); diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index ad8d1fe11..986f62136 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -72,7 +72,6 @@ pub use std_io_util::*; mod futures_util; pub use futures_util::BoxedFuture; pub use futures_util::BoxedStaticFuture; -pub use futures_util::ConcurrentFutures; pub use futures_util::ConcurrentTasks; pub use futures_util::MaybeSend;