This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b464f04a7 refactor: Implement SimulateLayer to make simulate logic 
more maintai… (#6822)
b464f04a7 is described below

commit b464f04a778d7e11e326a17d7419c5936b90e502
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 26 23:18:01 2025 +0800

    refactor: Implement SimulateLayer to make simulate logic more maintai… 
(#6822)
    
    * refactor: Implement SimulateLayer to make simulate logic more maintainable
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix ci
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * start after is not supported before
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix ci
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs        | 161 ++---------------------
 core/src/layers/mod.rs             |   3 +
 core/src/layers/simulate.rs        | 256 +++++++++++++++++++++++++++++++++++++
 core/src/types/operator/builder.rs |   1 +
 4 files changed, 271 insertions(+), 150 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 830db569d..75fa9ee09 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -20,54 +20,22 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-use crate::raw::oio::FlatLister;
-use crate::raw::oio::PrefixLister;
-use crate::raw::*;
+use crate::raw::oio;
+use crate::raw::{
+    Access, AccessorInfo, Layer, LayeredAccess, OpCreateDir, OpList, 
OpPresign, OpRead, OpStat,
+    OpWrite, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpStat, RpWrite,
+};
 use crate::*;
 
-/// Complete underlying services features so that users can use them in
-/// the same way.
-///
-/// # Notes
-///
-/// CompleteLayer is not a public accessible layer that can be used by
-/// external users. OpenDAL will make sure every accessor will apply this
-/// layer once and only once.
-///
-/// # Internal
-///
-/// So far `CompleteLayer` will do the following things:
-///
-/// ## Stat Completion
-///
-/// Not all services support stat dir natively, but we can simulate it via 
list.
-///
-/// ## List Completion
-///
-/// There are two styles of list, but not all services support both of
-/// them. CompleteLayer will add those capabilities in a zero cost way.
-///
-/// Underlying services will return [`Capability`] to indicate the
-/// features that returning listers support.
-///
-/// - If support `list_with_recursive`, return directly.
-/// - if not, wrap with [`FlatLister`].
+/// CompleteLayer keeps validation wrappers for read/write operations.
 pub struct CompleteLayer;
 
 impl<A: Access> Layer<A> for CompleteLayer {
     type LayeredAccess = CompleteAccessor<A>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
-        let info = inner.info();
-        info.update_full_capability(|mut cap| {
-            if cap.list && cap.write_can_empty {
-                cap.create_dir = true;
-            }
-            cap
-        });
-
         CompleteAccessor {
-            info,
+            info: inner.info(),
             inner: Arc::new(inner),
         }
     }
@@ -85,115 +53,11 @@ impl<A: Access> Debug for CompleteAccessor<A> {
     }
 }
 
-impl<A: Access> CompleteAccessor<A> {
-    async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let capability = self.info.native_capability();
-        if capability.create_dir {
-            return self.inner().create_dir(path, args).await;
-        }
-
-        if capability.write_can_empty && capability.list {
-            let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
-            oio::Write::close(&mut w).await?;
-            return Ok(RpCreateDir::default());
-        }
-
-        self.inner.create_dir(path, args).await
-    }
-
-    async fn complete_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let capability = self.info.native_capability();
-
-        if path == "/" {
-            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
-        }
-
-        // Forward to inner if create_dir is supported.
-        if path.ends_with('/') && capability.create_dir {
-            let meta = self.inner.stat(path, args).await?.into_metadata();
-
-            if meta.is_file() {
-                return Err(Error::new(
-                    ErrorKind::NotFound,
-                    "stat expected a directory, but found a file",
-                ));
-            }
-
-            return Ok(RpStat::new(meta));
-        }
-
-        // Otherwise, we can simulate stat dir via `list`.
-        if path.ends_with('/') && capability.list_with_recursive {
-            let (_, mut l) = self
-                .inner
-                .list(path, 
OpList::default().with_recursive(true).with_limit(1))
-                .await?;
-
-            return if oio::List::next(&mut l).await?.is_some() {
-                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
-            } else {
-                Err(Error::new(
-                    ErrorKind::NotFound,
-                    "the directory is not found",
-                ))
-            };
-        }
-
-        // Forward to underlying storage directly since we don't know how to 
handle stat dir.
-        self.inner.stat(path, args).await
-    }
-
-    async fn complete_list(
-        &self,
-        path: &str,
-        args: OpList,
-    ) -> Result<(RpList, CompleteLister<A, A::Lister>)> {
-        let cap = self.info.native_capability();
-
-        let recursive = args.recursive();
-
-        match (recursive, cap.list_with_recursive) {
-            // - If service can list_with_recursive, we can forward list to it 
directly.
-            (_, true) => {
-                let (rp, p) = self.inner.list(path, args).await?;
-                Ok((rp, CompleteLister::One(p)))
-            }
-            // If recursive is true but service can't list_with_recursive
-            (true, false) => {
-                // Forward path that ends with /
-                if path.ends_with('/') {
-                    let p = FlatLister::new(self.inner.clone(), path);
-                    Ok((RpList::default(), CompleteLister::Two(p)))
-                } else {
-                    let parent = get_parent(path);
-                    let p = FlatLister::new(self.inner.clone(), parent);
-                    let p = PrefixLister::new(p, path);
-                    Ok((RpList::default(), CompleteLister::Four(p)))
-                }
-            }
-            // If recursive and service doesn't support list_with_recursive, 
we need to handle
-            // list prefix by ourselves.
-            (false, false) => {
-                // Forward path that ends with /
-                if path.ends_with('/') {
-                    let (rp, p) = self.inner.list(path, args).await?;
-                    Ok((rp, CompleteLister::One(p)))
-                } else {
-                    let parent = get_parent(path);
-                    let (rp, p) = self.inner.list(parent, args).await?;
-                    let p = PrefixLister::new(p, path);
-                    Ok((rp, CompleteLister::Three(p)))
-                }
-            }
-        }
-    }
-}
-
 impl<A: Access> LayeredAccess for CompleteAccessor<A> {
     type Inner = A;
     type Reader = CompleteReader<A::Reader>;
     type Writer = CompleteWriter<A::Writer>;
-    type Lister = CompleteLister<A, A::Lister>;
+    type Lister = A::Lister;
     type Deleter = A::Deleter;
 
     fn inner(&self) -> &Self::Inner {
@@ -205,7 +69,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        self.complete_create_dir(path, args).await
+        self.inner().create_dir(path, args).await
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
@@ -223,7 +87,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        self.complete_stat(path, args).await
+        self.inner.stat(path, args).await
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
@@ -231,7 +95,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        self.complete_list(path, args).await
+        self.inner.list(path, args).await
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
@@ -239,9 +103,6 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
     }
 }
 
-pub type CompleteLister<A, P> =
-    FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, 
PrefixLister<FlatLister<Arc<A>, P>>>;
-
 pub struct CompleteReader<R> {
     inner: R,
     size: Option<u64>,
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 6c789be48..cffc12ead 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -26,6 +26,9 @@ pub(crate) use error_context::ErrorContextLayer;
 mod complete;
 pub(crate) use complete::CompleteLayer;
 
+mod simulate;
+pub use simulate::SimulateLayer;
+
 mod concurrent_limit;
 pub use concurrent_limit::ConcurrentLimitLayer;
 
diff --git a/core/src/layers/simulate.rs b/core/src/layers/simulate.rs
new file mode 100644
index 000000000..ee340fc5c
--- /dev/null
+++ b/core/src/layers/simulate.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use crate::raw::oio::FlatLister;
+use crate::raw::oio::PrefixLister;
+use crate::raw::*;
+use crate::*;
+
+/// Simulate missing capabilities for backends in a configurable way.
+#[derive(Debug, Clone)]
+pub struct SimulateLayer {
+    list_recursive: bool,
+    stat_dir: bool,
+    create_dir: bool,
+}
+
+impl Default for SimulateLayer {
+    fn default() -> Self {
+        Self {
+            list_recursive: true,
+            stat_dir: true,
+            create_dir: true,
+        }
+    }
+}
+
+impl SimulateLayer {
+    /// Enable or disable recursive list simulation. Default: true.
+    pub fn with_list_recursive(mut self, enabled: bool) -> Self {
+        self.list_recursive = enabled;
+        self
+    }
+
+    /// Enable or disable stat dir simulation. Default: true.
+    pub fn with_stat_dir(mut self, enabled: bool) -> Self {
+        self.stat_dir = enabled;
+        self
+    }
+
+    /// Enable or disable create_dir simulation. Default: true.
+    pub fn with_create_dir(mut self, enabled: bool) -> Self {
+        self.create_dir = enabled;
+        self
+    }
+}
+
+impl<A: Access> Layer<A> for SimulateLayer {
+    type LayeredAccess = SimulateAccessor<A>;
+
+    fn layer(&self, inner: A) -> Self::LayeredAccess {
+        let info = inner.info();
+        info.update_full_capability(|mut cap| {
+            if self.create_dir && cap.list && cap.write_can_empty {
+                cap.create_dir = true;
+            }
+            cap
+        });
+
+        SimulateAccessor {
+            config: self.clone(),
+            info,
+            inner: Arc::new(inner),
+        }
+    }
+}
+
+/// Accessor that applies capability simulation.
+pub struct SimulateAccessor<A: Access> {
+    config: SimulateLayer,
+    info: Arc<AccessorInfo>,
+    inner: Arc<A>,
+}
+
+impl<A: Access> Debug for SimulateAccessor<A> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        self.inner.fmt(f)
+    }
+}
+
+impl<A: Access> SimulateAccessor<A> {
+    async fn simulate_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+        let capability = self.info.native_capability();
+
+        if capability.create_dir || !self.config.create_dir {
+            return self.inner().create_dir(path, args).await;
+        }
+
+        if capability.write_can_empty && capability.list {
+            let (_, mut w) = self.inner.write(path, OpWrite::default()).await?;
+            oio::Write::close(&mut w).await?;
+            return Ok(RpCreateDir::default());
+        }
+
+        self.inner.create_dir(path, args).await
+    }
+
+    async fn simulate_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        let capability = self.info.native_capability();
+
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        if path.ends_with('/') {
+            if capability.create_dir {
+                let meta = self.inner.stat(path, 
args.clone()).await?.into_metadata();
+
+                if meta.is_file() {
+                    return Err(Error::new(
+                        ErrorKind::NotFound,
+                        "stat expected a directory, but found a file",
+                    ));
+                }
+
+                return Ok(RpStat::new(meta));
+            }
+
+            if self.config.stat_dir && capability.list_with_recursive {
+                let (_, mut l) = self
+                    .inner
+                    .list(path, 
OpList::default().with_recursive(true).with_limit(1))
+                    .await?;
+
+                return if oio::List::next(&mut l).await?.is_some() {
+                    Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+                } else {
+                    Err(Error::new(
+                        ErrorKind::NotFound,
+                        "the directory is not found",
+                    ))
+                };
+            }
+        }
+
+        self.inner.stat(path, args).await
+    }
+
+    async fn simulate_list(
+        &self,
+        path: &str,
+        args: OpList,
+    ) -> Result<(RpList, SimulateLister<A, A::Lister>)> {
+        let cap = self.info.native_capability();
+
+        let recursive = args.recursive();
+        let forward = args;
+
+        let (rp, lister) = match (
+            recursive,
+            cap.list_with_recursive,
+            self.config.list_recursive,
+        ) {
+            // Backend supports recursive list, forward directly.
+            (_, true, _) => {
+                let (rp, p) = self.inner.list(path, forward).await?;
+                (rp, SimulateLister::One(p))
+            }
+            // Simulate recursive via flat list when enabled.
+            (true, false, true) => {
+                if path.ends_with('/') {
+                    let p = FlatLister::new(self.inner.clone(), path);
+                    (RpList::default(), SimulateLister::Two(p))
+                } else {
+                    let parent = get_parent(path);
+                    let p = FlatLister::new(self.inner.clone(), parent);
+                    let p = PrefixLister::new(p, path);
+                    (RpList::default(), SimulateLister::Four(p))
+                }
+            }
+            // Recursive requested but simulation disabled; rely on backend 
and propagate errors.
+            (true, false, false) => {
+                let (rp, p) = self.inner.list(path, forward).await?;
+                (rp, SimulateLister::One(p))
+            }
+            // Non-recursive list: keep existing prefix handling semantics.
+            (false, false, _) => {
+                if path.ends_with('/') {
+                    let (rp, p) = self.inner.list(path, forward).await?;
+                    (rp, SimulateLister::One(p))
+                } else {
+                    let parent = get_parent(path);
+                    let (rp, p) = self.inner.list(parent, forward).await?;
+                    let p = PrefixLister::new(p, path);
+                    (rp, SimulateLister::Three(p))
+                }
+            }
+        };
+
+        Ok((rp, lister))
+    }
+}
+
+impl<A: Access> LayeredAccess for SimulateAccessor<A> {
+    type Inner = A;
+    type Reader = A::Reader;
+    type Writer = A::Writer;
+    type Lister = SimulateLister<A, A::Lister>;
+    type Deleter = A::Deleter;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
+    }
+
+    async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.simulate_create_dir(path, args).await
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        self.inner.read(path, args).await
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        self.inner.write(path, args).await
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.simulate_stat(path, args).await
+    }
+
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        self.inner().delete().await
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        self.simulate_list(path, args).await
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        self.inner.presign(path, args).await
+    }
+}
+
+pub type SimulateLister<A, P> =
+    FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, 
PrefixLister<FlatLister<Arc<A>, P>>>;
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 1954ea85d..294ec162c 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -423,6 +423,7 @@ impl<A: Access> OperatorBuilder<A> {
         // Make sure error context layer has been attached.
         OperatorBuilder { accessor }
             .layer(ErrorContextLayer)
+            .layer(SimulateLayer::default())
             .layer(CompleteLayer)
             .layer(CorrectnessCheckLayer)
     }

Reply via email to