This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 62d02f79a fix(core): enforce list file content length in complete
layer (#7201)
62d02f79a is described below
commit 62d02f79a71ecee3ef666022ca1f01ef55f6d2c2
Author: Xuanwo <[email protected]>
AuthorDate: Mon Feb 23 20:20:01 2026 +0800
fix(core): enforce list file content length in complete layer (#7201)
* fix(core): enforce list file content length in complete layer
* fix(services/etcd): fill content length from list response
* fix(etcd): keep list keys-only and skip stale entries
* refactor(core): short-circuit list content length completion
* refactor(core): inline list completion fast-path check
---
core/core/src/layers/complete.rs | 66 +++++++++++++++++++++++++++++++++++++--
core/core/src/raw/oio/entry.rs | 10 ++++++
core/core/src/types/metadata.rs | 5 +++
core/tests/behavior/async_list.rs | 3 +-
4 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/core/core/src/layers/complete.rs b/core/core/src/layers/complete.rs
index d9cf1a403..cc2808393 100644
--- a/core/core/src/layers/complete.rs
+++ b/core/core/src/layers/complete.rs
@@ -57,7 +57,7 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
type Inner = A;
type Reader = CompleteReader<A::Reader>;
type Writer = CompleteWriter<A::Writer>;
- type Lister = A::Lister;
+ type Lister = CompleteLister<A>;
type Deleter = A::Deleter;
fn inner(&self) -> &Self::Inner {
@@ -95,7 +95,9 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- self.inner.list(path, args).await
+ let (rp, lister) = self.inner.list(path, args).await?;
+ let lister = CompleteLister::new(self.inner.clone(),
self.info.clone(), lister);
+ Ok((rp, lister))
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
@@ -103,6 +105,66 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}
}
+pub struct CompleteLister<A: Access> {
+ inner: A::Lister,
+ acc: Arc<A>,
+ info: Arc<AccessorInfo>,
+}
+
+impl<A: Access> CompleteLister<A> {
+ fn new(acc: Arc<A>, info: Arc<AccessorInfo>, inner: A::Lister) -> Self {
+ Self { inner, acc, info }
+ }
+
+ async fn ensure_file_content_length(&self, entry: &mut oio::Entry) ->
Result<()> {
+ let path = entry.path().to_string();
+ let version = entry.metadata().version().map(str::to_owned);
+ let mut op = OpStat::new();
+ if let Some(version) = version.as_deref() {
+ op = op.with_version(version);
+ }
+
+ let stat_metadata = self.acc.stat(&path, op).await?.into_metadata();
+ if !stat_metadata.has_content_length() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "content length is required for list file entries",
+ )
+ .with_operation("CompleteLister::ensure_file_content_length")
+ .with_context("service", self.info.scheme().to_string())
+ .with_context("path", path));
+ }
+
+ entry
+ .metadata_mut()
+ .set_content_length(stat_metadata.content_length());
+ Ok(())
+ }
+}
+
+impl<A: Access> oio::List for CompleteLister<A> {
+ async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ loop {
+ let Some(mut entry) = self.inner.next().await? else {
+ return Ok(None);
+ };
+
+ if !entry.mode().is_file()
+ || entry.metadata().is_deleted()
+ || entry.metadata().has_content_length()
+ {
+ return Ok(Some(entry));
+ }
+
+ match self.ensure_file_content_length(&mut entry).await {
+ Ok(()) => return Ok(Some(entry)),
+ Err(err) if err.kind() == ErrorKind::NotFound => continue,
+ Err(err) => return Err(err),
+ }
+ }
+ }
+}
+
pub struct CompleteReader<R> {
inner: R,
size: Option<u64>,
diff --git a/core/core/src/raw/oio/entry.rs b/core/core/src/raw/oio/entry.rs
index 70320300b..34d64bc7e 100644
--- a/core/core/src/raw/oio/entry.rs
+++ b/core/core/src/raw/oio/entry.rs
@@ -86,4 +86,14 @@ impl Entry {
pub(crate) fn into_entry(self) -> crate::Entry {
crate::Entry::new(self.path, self.meta)
}
+
+ /// Get metadata of entry.
+ pub(crate) fn metadata(&self) -> &Metadata {
+ &self.meta
+ }
+
+ /// Get mutable metadata of entry.
+ pub(crate) fn metadata_mut(&mut self) -> &mut Metadata {
+ &mut self.meta
+ }
}
diff --git a/core/core/src/types/metadata.rs b/core/core/src/types/metadata.rs
index 901979dc4..fe18a5d30 100644
--- a/core/core/src/types/metadata.rs
+++ b/core/core/src/types/metadata.rs
@@ -222,6 +222,11 @@ impl Metadata {
self.content_length.unwrap_or_default()
}
+ /// Returns `true` if this metadata contains an explicit content length.
+ pub(crate) fn has_content_length(&self) -> bool {
+ self.content_length.is_some()
+ }
+
/// Set content length of this entry.
pub fn set_content_length(&mut self, v: u64) -> &mut Self {
self.content_length = Some(v);
diff --git a/core/tests/behavior/async_list.rs
b/core/tests/behavior/async_list.rs
index 2be5f0d0d..fab890280 100644
--- a/core/tests/behavior/async_list.rs
+++ b/core/tests/behavior/async_list.rs
@@ -79,10 +79,9 @@ pub async fn test_list_dir(op: Operator) -> Result<()> {
let mut obs = op.lister(&format!("{parent}/")).await?;
let mut found = false;
while let Some(de) = obs.try_next().await? {
- let meta = op.stat(de.path()).await?;
if de.path() == path {
+ let meta = de.metadata();
assert_eq!(meta.mode(), EntryMode::FILE);
-
assert_eq!(meta.content_length(), size as u64);
found = true