This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9976c88d1 refactor: Remove dead code ConcurrentFutures (#5939)
9976c88d1 is described below
commit 9976c88d1fe762c2cda48c23958353cd8eb7a3d6
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 2 18:25:00 2025 +0800
refactor: Remove dead code ConcurrentFutures (#5939)
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/futures_util.rs | 244 -------------------------------------------
core/src/raw/mod.rs | 1 -
2 files changed, 245 deletions(-)
diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs
index 3d182bf49..b4c13a15f 100644
--- a/core/src/raw/futures_util.rs
+++ b/core/src/raw/futures_util.rs
@@ -16,15 +16,9 @@
// under the License.
use std::collections::VecDeque;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::Context;
use std::task::Poll;
use futures::poll;
-use futures::stream::FuturesOrdered;
-use futures::FutureExt;
-use futures::StreamExt;
use crate::*;
@@ -271,253 +265,15 @@ impl<I: Send + 'static, O: Send + 'static>
ConcurrentTasks<I, O> {
}
}
-/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use
-/// [`FuturesOrdered`] or not.
-///
-/// The value of `8` is picked by random, no strict benchmark is done.
-/// Please raise an issue if you found the value is not good enough or you
want to configure
-/// this value at runtime.
-const CONCURRENT_LARGE_THRESHOLD: usize = 8;
-
-/// ConcurrentFutures is a stream that can hold a stream of concurrent futures.
-///
-/// - the order of the futures is the same.
-/// - the number of concurrent futures is limited by concurrent.
-/// - optimized for small number of concurrent futures.
-/// - zero cost for non-concurrent futures cases (concurrent == 1).
-pub struct ConcurrentFutures<F: Future + Unpin> {
- tasks: Tasks<F>,
- concurrent: usize,
-}
-
-/// Tasks is used to hold the entire task queue.
-enum Tasks<F: Future + Unpin> {
- /// The special case for concurrent == 1.
- ///
- /// It works exactly the same like `Option<Fut>` in a struct.
- Once(Option<F>),
- /// The special cases for concurrent is small.
- ///
- /// At this case, the cost to loop poll is lower than using
`FuturesOrdered`.
- ///
- /// We will replace the future by `TaskResult::Ready` once it's ready to
avoid consume it again.
- Small(VecDeque<TaskResult<F>>),
- /// The general cases for large concurrent.
- ///
- /// We use `FuturesOrdered` to avoid huge amount of poll on futures.
- Large(FuturesOrdered<F>),
-}
-
-impl<F: Future + Unpin> Unpin for Tasks<F> {}
-
-enum TaskResult<F: Future + Unpin> {
- Polling(F),
- Ready(F::Output),
-}
-
-impl<F> ConcurrentFutures<F>
-where
- F: Future + Unpin + 'static,
-{
- /// Create a new ConcurrentFutures by specifying the number of concurrent
futures.
- pub fn new(concurrent: usize) -> Self {
- if (0..2).contains(&concurrent) {
- Self {
- tasks: Tasks::Once(None),
- concurrent,
- }
- } else if (2..=CONCURRENT_LARGE_THRESHOLD).contains(&concurrent) {
- Self {
- tasks: Tasks::Small(VecDeque::with_capacity(concurrent)),
- concurrent,
- }
- } else {
- Self {
- tasks: Tasks::Large(FuturesOrdered::new()),
- concurrent,
- }
- }
- }
-
- /// Drop all tasks.
- pub fn clear(&mut self) {
- match &mut self.tasks {
- Tasks::Once(fut) => *fut = None,
- Tasks::Small(tasks) => tasks.clear(),
- Tasks::Large(tasks) => *tasks = FuturesOrdered::new(),
- }
- }
-
- /// Return the length of current concurrent futures (both ongoing and
ready).
- pub fn len(&self) -> usize {
- match &self.tasks {
- Tasks::Once(fut) => fut.is_some() as usize,
- Tasks::Small(v) => v.len(),
- Tasks::Large(v) => v.len(),
- }
- }
-
- /// Return true if there is no futures in the queue.
- pub fn is_empty(&self) -> bool {
- self.len() == 0
- }
-
- /// Return the number of remaining space to push new futures.
- pub fn remaining(&self) -> usize {
- self.concurrent - self.len()
- }
-
- /// Return true if there is remaining space to push new futures.
- pub fn has_remaining(&self) -> bool {
- self.remaining() > 0
- }
-
- /// Push new future into the end of queue.
- pub fn push_back(&mut self, f: F) {
- debug_assert!(
- self.has_remaining(),
- "concurrent futures must have remaining space"
- );
-
- match &mut self.tasks {
- Tasks::Once(fut) => {
- *fut = Some(f);
- }
- Tasks::Small(v) => v.push_back(TaskResult::Polling(f)),
- Tasks::Large(v) => v.push_back(f),
- }
- }
-
- /// Push new future into the start of queue, this task will be exactly the
next to poll.
- pub fn push_front(&mut self, f: F) {
- debug_assert!(
- self.has_remaining(),
- "concurrent futures must have remaining space"
- );
-
- match &mut self.tasks {
- Tasks::Once(fut) => {
- *fut = Some(f);
- }
- Tasks::Small(v) => v.push_front(TaskResult::Polling(f)),
- Tasks::Large(v) => v.push_front(f),
- }
- }
-}
-
-impl<F> futures::Stream for ConcurrentFutures<F>
-where
- F: Future + Unpin + 'static,
-{
- type Item = F::Output;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- match &mut self.get_mut().tasks {
- Tasks::Once(fut) => match fut {
- Some(x) => x.poll_unpin(cx).map(|v| {
- *fut = None;
- Some(v)
- }),
- None => Poll::Ready(None),
- },
- Tasks::Small(v) => {
- // Poll all tasks together.
- for task in v.iter_mut() {
- if let TaskResult::Polling(f) = task {
- match f.poll_unpin(cx) {
- Poll::Pending => {}
- Poll::Ready(res) => {
- // Replace with ready value if this future has
been resolved.
- *task = TaskResult::Ready(res);
- }
- }
- }
- }
-
- // Pick the first one to check.
- match v.front_mut() {
- // Return pending if the first one is still polling.
- Some(TaskResult::Polling(_)) => Poll::Pending,
- Some(TaskResult::Ready(_)) => {
- let res = v.pop_front().unwrap();
- match res {
- TaskResult::Polling(_) => unreachable!(),
- TaskResult::Ready(res) => Poll::Ready(Some(res)),
- }
- }
- None => Poll::Ready(None),
- }
- }
- Tasks::Large(v) => v.poll_next_unpin(cx),
- }
- }
-}
-
#[cfg(test)]
mod tests {
- use std::task::ready;
use std::time::Duration;
- use futures::future::BoxFuture;
- use futures::Stream;
use rand::Rng;
use tokio::time::sleep;
use super::*;
- struct Lister {
- size: usize,
- idx: usize,
- concurrent: usize,
- tasks: ConcurrentFutures<BoxFuture<'static, usize>>,
- }
-
- impl Stream for Lister {
- type Item = usize;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- // Randomly sleep for a while, simulate some io operations that up
to 100 microseconds.
- let timeout =
Duration::from_micros(rand::thread_rng().gen_range(0..100));
- let idx = self.idx;
- if self.tasks.len() < self.concurrent && self.idx < self.size {
- let fut = async move {
- tokio::time::sleep(timeout).await;
- idx
- };
- self.idx += 1;
- self.tasks.push_back(Box::pin(fut));
- }
-
- if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) {
- Poll::Ready(Some(v))
- } else {
- Poll::Ready(None)
- }
- }
- }
-
- #[tokio::test]
- async fn test_concurrent_futures() {
- let cases = vec![
- ("once", 1),
- ("small", CONCURRENT_LARGE_THRESHOLD - 1),
- ("large", CONCURRENT_LARGE_THRESHOLD + 1),
- ];
-
- for (name, concurrent) in cases {
- let lister = Lister {
- size: 1000,
- idx: 0,
- concurrent,
- tasks: ConcurrentFutures::new(concurrent),
- };
- let expected: Vec<usize> = (0..1000).collect();
- let result: Vec<usize> = lister.collect().await;
-
- assert_eq!(expected, result, "concurrent futures failed: {}",
name);
- }
- }
-
#[tokio::test]
async fn test_concurrent_tasks() {
let executor = Executor::new();
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index ad8d1fe11..986f62136 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -72,7 +72,6 @@ pub use std_io_util::*;
mod futures_util;
pub use futures_util::BoxedFuture;
pub use futures_util::BoxedStaticFuture;
-pub use futures_util::ConcurrentFutures;
pub use futures_util::ConcurrentTasks;
pub use futures_util::MaybeSend;