Xuanwo commented on code in PR #3599:
URL:
https://github.com/apache/incubator-opendal/pull/3599#discussion_r1401374817
##########
core/src/types/list.rs:
##########
@@ -80,43 +83,87 @@ impl Stream for Lister {
return Poll::Ready(None);
}
- if let Some(fut) = self.stating.as_mut() {
- let (path, rp) = ready!(fut.poll_unpin(cx));
-
- // Make sure we will not poll this future again.
- self.stating = None;
- let metadata = match rp {
- Ok(rp) => rp.into_metadata(),
- Err(err) => {
- self.errored = true;
- return Poll::Ready(Some(Err(err)));
- }
- };
-
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ if !self.task_queue.is_empty() {
+ if let Some(handle) = self.task_queue.back_mut() {
+ let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "join handle error")
+ .with_operation("types::Lister::poll_next")
+ .set_source(err)
+ })?;
+
+ return match rp {
+ Ok(rp) => {
+ self.task_queue.pop_back();
+ let metadata = rp.into_metadata();
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ }
+ Err(err) => {
+ self.errored = true;
+ Poll::Ready(Some(Err(err)))
+ }
+ };
+ }
}
- match ready!(self.lister.poll_next(cx)) {
- Ok(Some(oe)) => {
- let (path, metadata) = oe.into_entry().into_parts();
- if metadata.contains_metakey(self.required_metakey) {
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
- }
-
+ if let Some(entry) = self.buf.clone() {
+ let (path, metadata) = entry.into_parts();
+ return if metadata.contains_metakey(self.required_metakey) {
+ self.buf = None;
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ } else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
-
(path, res)
};
- self.stating = Some(Box::pin(fut));
+
+ self.task_queue.push_front(tokio::spawn(fut));
+ self.buf = None;
self.poll_next(cx)
- }
- Ok(None) => Poll::Ready(None),
- Err(err) => {
- self.errored = true;
- Poll::Ready(Some(Err(err)))
- }
+ };
+ }
+
+ loop {
+ return match ready!(self.lister.poll_next(cx)) {
+ Ok(Some(oe)) => {
+ let (path, metadata) = oe.into_entry().into_parts();
+ // TODO: we can optimize this by checking the provided
metakey provided by services.
+ if metadata.contains_metakey(self.required_metakey) {
+ return if self.task_queue.is_empty() {
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ } else {
+ self.buf = Some(Entry::new(path, metadata));
+ self.poll_next(cx)
+ };
+ }
+
+ if self.task_queue.len() < self.task_queue.capacity() {
+ let acc = self.acc.clone();
+
+ let fut = async move {
+ let res = acc.stat(&path, OpStat::default()).await;
+ (path, res)
+ };
+
+ self.task_queue.push_front(tokio::spawn(fut));
+ continue;
+ } else {
+ self.buf = Some(Entry::new(path, metadata));
Review Comment:
We can pre-check the `task_queue` capacity before so that we don't need an
extra `buf`.
##########
core/src/types/list.rs:
##########
@@ -80,43 +83,87 @@ impl Stream for Lister {
return Poll::Ready(None);
}
- if let Some(fut) = self.stating.as_mut() {
- let (path, rp) = ready!(fut.poll_unpin(cx));
-
- // Make sure we will not poll this future again.
- self.stating = None;
- let metadata = match rp {
- Ok(rp) => rp.into_metadata(),
- Err(err) => {
- self.errored = true;
- return Poll::Ready(Some(Err(err)));
- }
- };
-
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ if !self.task_queue.is_empty() {
Review Comment:
We don't need to check this, since we already have `if let Some(handle) =
self.task_queue.back_mut()`
##########
core/src/types/list.rs:
##########
@@ -80,43 +83,87 @@ impl Stream for Lister {
return Poll::Ready(None);
}
- if let Some(fut) = self.stating.as_mut() {
- let (path, rp) = ready!(fut.poll_unpin(cx));
-
- // Make sure we will not poll this future again.
- self.stating = None;
- let metadata = match rp {
- Ok(rp) => rp.into_metadata(),
- Err(err) => {
- self.errored = true;
- return Poll::Ready(Some(Err(err)));
- }
- };
-
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ if !self.task_queue.is_empty() {
+ if let Some(handle) = self.task_queue.back_mut() {
+ let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "join handle error")
Review Comment:
We can use `opendal::raw::new_task_join_error`
##########
core/src/types/list.rs:
##########
@@ -80,43 +83,87 @@ impl Stream for Lister {
return Poll::Ready(None);
}
- if let Some(fut) = self.stating.as_mut() {
- let (path, rp) = ready!(fut.poll_unpin(cx));
-
- // Make sure we will not poll this future again.
- self.stating = None;
- let metadata = match rp {
- Ok(rp) => rp.into_metadata(),
- Err(err) => {
- self.errored = true;
- return Poll::Ready(Some(Err(err)));
- }
- };
-
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ if !self.task_queue.is_empty() {
+ if let Some(handle) = self.task_queue.back_mut() {
+ let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "join handle error")
+ .with_operation("types::Lister::poll_next")
+ .set_source(err)
+ })?;
+
+ return match rp {
+ Ok(rp) => {
+ self.task_queue.pop_back();
+ let metadata = rp.into_metadata();
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ }
+ Err(err) => {
+ self.errored = true;
+ Poll::Ready(Some(Err(err)))
+ }
+ };
+ }
}
- match ready!(self.lister.poll_next(cx)) {
- Ok(Some(oe)) => {
- let (path, metadata) = oe.into_entry().into_parts();
- if metadata.contains_metakey(self.required_metakey) {
- return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
- }
-
+ if let Some(entry) = self.buf.clone() {
+ let (path, metadata) = entry.into_parts();
+ return if metadata.contains_metakey(self.required_metakey) {
+ self.buf = None;
+ Poll::Ready(Some(Ok(Entry::new(path, metadata))))
+ } else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
-
(path, res)
};
- self.stating = Some(Box::pin(fut));
+
+ self.task_queue.push_front(tokio::spawn(fut));
+ self.buf = None;
self.poll_next(cx)
- }
- Ok(None) => Poll::Ready(None),
- Err(err) => {
- self.errored = true;
- Poll::Ready(Some(Err(err)))
- }
+ };
+ }
+
+ loop {
Review Comment:
Current implemention doesn't have concurrent. In the worst case, we will
just have only one `JoinHandle`:
- `lister.poll_next` returns `Polling::Ready(Ok(Some(entry))`, pushed into
`task_queue`.
- The next `lister.poll_next` returns `Polling::Pending`, so we exit until
next user call.
- task queue is not empty now, we are polling the `task_queue.back_mut`
- After the join handle in `task_queue` returns `Ready`, we start over.
In this way, no concurrent will happen since we only have on join handle
created.
The root cause is that we should handle `ready!()` carefully.
---
Maybe we can call `self.lister.poll_next(cx)` first to fill the
`taks_queue`, than we can poll the join handle in task queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]