This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch 
xuanwo/investigate-local-list-root-recursive-flake
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 433ff97e37b8f924fa56b73bd4102d3d4d303be0
Author: Xuanwo <[email protected]>
AuthorDate: Thu Feb 26 22:11:54 2026 +0800

    fix(core): skip not found during concurrent listing
---
 core/core/src/raw/oio/list/flat_list.rs | 53 ++++++++++++++++---
 core/services/fs/src/lister.rs          | 91 +++++++++++++++++++++------------
 2 files changed, 106 insertions(+), 38 deletions(-)

diff --git a/core/core/src/raw/oio/list/flat_list.rs 
b/core/core/src/raw/oio/list/flat_list.rs
index 69ff0edb5..424c91615 100644
--- a/core/core/src/raw/oio/list/flat_list.rs
+++ b/core/core/src/raw/oio/list/flat_list.rs
@@ -103,9 +103,31 @@ where
                         );
                         continue;
                     }
+                    Err(e) if e.kind() == ErrorKind::NotFound => {
+                        // Skip directories that are deleted while listing.
+                        log::warn!(
+                            "FlatLister skipping directory due to not found 
during listing: {}",
+                            de.path()
+                        );
+                        continue;
+                    }
                     Err(e) => return Err(e),
                 };
-                if let Some(v) = l.next().await? {
+                let first = loop {
+                    match l.next().await {
+                        Ok(v) => break v,
+                        Err(e) if e.kind() == ErrorKind::NotFound => {
+                            // Skip entries that are deleted during listing.
+                            log::warn!(
+                                "FlatLister skipping entry due to not found 
during listing: {}",
+                                de.path()
+                            );
+                            continue;
+                        }
+                        Err(e) => return Err(e),
+                    }
+                };
+                if let Some(v) = first {
                     self.active_lister.push((Some(de.clone()), l));
 
                     if v.mode().is_dir() {
@@ -120,21 +142,40 @@ where
                 }
             }
 
+            if matches!(self.active_lister.last(), Some((None, _))) {
+                let _ = self.active_lister.pop();
+                continue;
+            }
+
             let (de, lister) = match self.active_lister.last_mut() {
                 Some((de, lister)) => (de, lister),
                 None => return Ok(None),
             };
 
-            match lister.next().await? {
-                Some(v) if v.mode().is_dir() => {
+            match lister.next().await {
+                Err(e) if e.kind() == ErrorKind::NotFound => {
+                    let path = de.as_ref().map(|entry| 
entry.path()).unwrap_or("<unknown>");
+                    log::warn!(
+                        "FlatLister skipping entry due to not found during 
recursive listing: {}",
+                        path
+                    );
+                    continue;
+                }
+                Err(e) => return Err(e),
+                Ok(Some(v)) if v.mode().is_dir() => {
                     // should not loop itself again
-                    if v.path() != de.as_ref().expect("de should not be none 
here").path() {
+                    if v.path()
+                        != de
+                            .as_ref()
+                            .expect("de must be present before listing")
+                            .path()
+                    {
                         self.next_dir = Some(v);
                         continue;
                     }
                 }
-                Some(v) => return Ok(Some(v)),
-                None => match de.take() {
+                Ok(Some(v)) => return Ok(Some(v)),
+                Ok(None) => match de.take() {
                     Some(de) => {
                         return Ok(Some(de));
                     }
diff --git a/core/services/fs/src/lister.rs b/core/services/fs/src/lister.rs
index 90aaf3075..8b9cee5f2 100644
--- a/core/services/fs/src/lister.rs
+++ b/core/services/fs/src/lister.rs
@@ -48,42 +48,69 @@ unsafe impl<P> Sync for FsLister<P> {}
 
 impl oio::List for FsLister<tokio::fs::ReadDir> {
     async fn next(&mut self) -> Result<Option<oio::Entry>> {
-        // since list should return path itself, we return it first
-        if let Some(path) = self.current_path.take() {
-            let e = oio::Entry::new(path.as_str(), 
Metadata::new(EntryMode::DIR));
-            return Ok(Some(e));
-        }
+        loop {
+            // since list should return path itself, we return it first
+            if let Some(path) = self.current_path.take() {
+                let e = oio::Entry::new(path.as_str(), 
Metadata::new(EntryMode::DIR));
+                return Ok(Some(e));
+            }
 
-        let Some(de) = self.rd.next_entry().await.map_err(new_std_io_error)? 
else {
-            return Ok(None);
-        };
+            let de = match self.rd.next_entry().await {
+                Ok(Some(de)) => de,
+                Ok(None) => return Ok(None),
+                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+                    // Directory can be removed during listing; stop 
gracefully.
+                    return Ok(None);
+                }
+                Err(e) => return Err(new_std_io_error(e)),
+            };
 
-        let entry_path = de.path();
-        let rel_path = normalize_path(
-            &entry_path
-                .strip_prefix(&self.root)
-                .expect("cannot fail because the prefix is iterated")
-                .to_string_lossy()
-                .replace('\\', "/"),
-        );
+            let entry_path = de.path();
+            let rel_path = normalize_path(
+                &entry_path
+                    .strip_prefix(&self.root)
+                    .expect("cannot fail because the prefix is iterated")
+                    .to_string_lossy()
+                    .replace('\\', "/"),
+            );
 
-        let ft = de.file_type().await.map_err(new_std_io_error)?;
-        let (path, mode) = if ft.is_dir() {
-            // Make sure we are returning the correct path.
-            (&format!("{rel_path}/"), EntryMode::DIR)
-        } else if ft.is_file() {
-            (&rel_path, EntryMode::FILE)
-        } else {
-            (&rel_path, EntryMode::Unknown)
-        };
+            let ft = match de.file_type().await {
+                Ok(ft) => ft,
+                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+                    // Entry can be deleted between readdir and stat calls.
+                    continue;
+                }
+                Err(e) => return Err(new_std_io_error(e)),
+            };
+            let (path, mode) = if ft.is_dir() {
+                // Make sure we are returning the correct path.
+                (&format!("{rel_path}/"), EntryMode::DIR)
+            } else if ft.is_file() {
+                (&rel_path, EntryMode::FILE)
+            } else {
+                (&rel_path, EntryMode::Unknown)
+            };
 
-        let de_metadata = de.metadata().await.map_err(new_std_io_error)?;
-        let metadata = Metadata::new(mode)
-            .with_content_length(de_metadata.len())
-            .with_last_modified(Timestamp::try_from(
-                de_metadata.modified().map_err(new_std_io_error)?,
-            )?);
+            let de_metadata = match de.metadata().await {
+                Ok(de_metadata) => de_metadata,
+                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+                    // Entry can be deleted between readdir and metadata calls.
+                    continue;
+                }
+                Err(e) => return Err(new_std_io_error(e)),
+            };
+            let last_modified = match de_metadata.modified() {
+                Ok(v) => v,
+                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+                    continue;
+                }
+                Err(e) => return Err(new_std_io_error(e)),
+            };
+            let metadata = Metadata::new(mode)
+                .with_content_length(de_metadata.len())
+                .with_last_modified(Timestamp::try_from(last_modified)?);
 
-        Ok(Some(oio::Entry::new(path, metadata)))
+            return Ok(Some(oio::Entry::new(path, metadata)));
+        }
     }
 }

Reply via email to