This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 50e1229 Azure ADLS list_with_offset support (#623)
50e1229 is described below
commit 50e1229a97225fa1d330cd925187e2b302d25676
Author: Omar Khudeira <[email protected]>
AuthorDate: Tue Feb 10 06:14:33 2026 -0600
Azure ADLS list_with_offset support (#623)
* Add list_with_offset for Azure
* Add list_from_exclusivity test
* Update documentation on function
* Remove unnecessary docs
* Remove redundant tests
* Run cargo fmt
* Skip Azurite version check
* Skip list_with_offset_exclusivity against Azure emulator
* Fix failing test by changing behavior of list_with_offset if emulator is
running instead of skipping a test
* Update src/azure/mod.rs
Co-authored-by: Marco Neumann <[email protected]>
---------
Co-authored-by: Marco Neumann <[email protected]>
---
src/aws/mod.rs | 1 +
src/azure/client.rs | 25 ++++++++---
src/azure/mod.rs | 21 +++++++++
src/gcp/mod.rs | 1 +
src/integration.rs | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++++
src/lib.rs | 4 +-
src/list.rs | 3 ++
src/local.rs | 1 +
src/memory.rs | 3 ++
9 files changed, 172 insertions(+), 8 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 030590a..7c8dcf3 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -625,6 +625,7 @@ mod tests {
let test_conditional_put = config.conditional_put !=
S3ConditionalPut::Disabled;
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
diff --git a/src/azure/client.rs b/src/azure/client.rs
index 7a7d6dc..e068c4b 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -939,16 +939,10 @@ impl ListClient for Arc<AzureClient> {
prefix: Option<&str>,
opts: PaginatedListOptions,
) -> Result<PaginatedListResult> {
- if opts.offset.is_some() {
- return Err(crate::Error::NotSupported {
- source: "Azure does not support listing with offsets".into(),
- });
- }
-
let credential = self.get_credential().await?;
let url = self.config.path_url(&Path::default());
- let mut query = Vec::with_capacity(5);
+ let mut query = Vec::with_capacity(6);
query.push(("restype", "container"));
query.push(("comp", "list"));
@@ -962,6 +956,10 @@ impl ListClient for Arc<AzureClient> {
if let Some(token) = &opts.page_token {
query.push(("marker", token.as_ref()))
+ } else if let Some(offset) = &opts.offset {
+ // startFrom is only used on the first request, subsequent
requests use marker
+ // Note: startFrom is inclusive (unlike S3/GCP's start-after which
is exclusive)
+ query.push(("startFrom", offset.as_ref()))
}
let max_keys_str;
@@ -996,6 +994,19 @@ impl ListClient for Arc<AzureClient> {
let token = response.next_marker.take().filter(|x| !x.is_empty());
+ // Azure's startFrom is inclusive, so when an offset is provided, we
need to filter out
+ // the offset item itself to match the exclusive semantics expected by
ObjectStore::list_with_offset.
+ // Since Azure returns items in lexicographic order and startFrom is
inclusive, the first item
+ // (if any) will be exactly == offset (if it exists), or > offset (if
it doesn't exist).
+ // So we can efficiently remove just the first item if it equals the
offset.
+ if let Some(offset) = &opts.offset {
+ if let Some(first) = response.blobs.blobs.first() {
+ if first.name == *offset {
+ response.blobs.blobs.remove(0);
+ }
+ }
+ }
+
Ok(PaginatedListResult {
result: to_list_result(response, prefix)?,
page_token: token,
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 04c8f31..8a2e5d1 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -22,6 +22,7 @@
//! [`ObjectStore::put_multipart_opts`] will upload data in blocks and write a
blob from those blocks.
//!
//! Unused blocks will automatically be dropped after 7 days.
+//!
use crate::{
CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId,
MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result,
@@ -121,6 +122,25 @@ impl ObjectStore for MicrosoftAzure {
self.client.list(prefix)
}
+ fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ if self.client.config().is_emulator {
+ // Azurite doesn't support the startFrom query parameter,
+ // fall back to client-side filtering
+ //
+ // See
https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055
+ let offset = offset.clone();
+ self.list(prefix)
+ .try_filter(move |f| futures::future::ready(f.location >
offset))
+ .boxed()
+ } else {
+ self.client.list_with_offset(prefix, offset)
+ }
+ }
+
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
@@ -324,6 +344,7 @@ mod tests {
let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 2fb74b4..4e1679e 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -317,6 +317,7 @@ mod test {
let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index 5c601b5..b0672db 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -1264,3 +1264,124 @@ pub async fn list_paginated(storage: &dyn ObjectStore,
list: &dyn PaginatedListS
);
assert!(ret.page_token.is_none());
}
+
+/// Tests that [`ObjectStore::list_with_offset`] returns an exclusive list
+/// that does not include the offset value itself.
+/// This is needed because some object stores (i.e. Azure) return inclusive
results,
+/// while AWS S3 and GCP return exclusive results.
+pub async fn list_with_offset_exclusivity(storage: &DynObjectStore) {
+ delete_fixtures(storage).await;
+
+ // Create a set of test files with predictable ordering
+ let files = vec![
+ Path::from("test_offset/file_a.txt"),
+ Path::from("test_offset/file_b.txt"),
+ Path::from("test_offset/file_c.txt"),
+ Path::from("test_offset/file_d.txt"),
+ Path::from("test_offset/file_e.txt"),
+ ];
+
+ let data = Bytes::from("test data");
+ for file in &files {
+ storage.put(file, data.clone().into()).await.unwrap();
+ }
+
+ // Test 1: Offset at file_b, should return c, d, e but NOT b
+ let offset = Path::from("test_offset/file_b.txt");
+ let result: Vec<Path> = storage
+ .list_with_offset(Some(&Path::from("test_offset")), &offset)
+ .map_ok(|meta| meta.location)
+ .try_collect()
+ .await
+ .unwrap();
+
+ // Verify that the offset file (file_b) is NOT in the results
+ assert!(
+ !result.contains(&offset),
+ "list_with_offset should not include the offset value itself. Found
{:?} in results: {:?}",
+ offset,
+ result
+ );
+
+ // Verify that only files after the offset are returned
+ let expected = vec![
+ Path::from("test_offset/file_c.txt"),
+ Path::from("test_offset/file_d.txt"),
+ Path::from("test_offset/file_e.txt"),
+ ];
+ assert_eq!(
+ result.len(),
+ expected.len(),
+ "Expected {} files after offset, got {}",
+ expected.len(),
+ result.len()
+ );
+ for file in &expected {
+ assert!(
+ result.contains(file),
+ "Expected file {:?} to be in results",
+ file
+ );
+ }
+
+ // Test 2: Offset at file_a, should return b, c, d, e but NOT a
+ let offset = Path::from("test_offset/file_a.txt");
+ let result: Vec<Path> = storage
+ .list_with_offset(Some(&Path::from("test_offset")), &offset)
+ .map_ok(|meta| meta.location)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert!(
+ !result.contains(&offset),
+ "list_with_offset should not include the offset value itself. Found
{:?} in results: {:?}",
+ offset,
+ result
+ );
+ assert_eq!(result.len(), 4, "Expected 4 files after first file");
+
+ // Test 3: Offset at file_e (last file), should return empty list
+ let offset = Path::from("test_offset/file_e.txt");
+ let result: Vec<Path> = storage
+ .list_with_offset(Some(&Path::from("test_offset")), &offset)
+ .map_ok(|meta| meta.location)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert!(
+ result.is_empty(),
+ "list_with_offset with offset at last file should return empty list,
got: {:?}",
+ result
+ );
+
+ // Test 4: Test without prefix - offset should still be exclusive
+ let offset = Path::from("test_offset/file_c.txt");
+ let result: Vec<Path> = storage
+ .list_with_offset(None, &offset)
+ .map_ok(|meta| meta.location)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert!(
+ !result.contains(&offset),
+ "list_with_offset without prefix should not include the offset value
itself. Found {:?} in results: {:?}",
+ offset,
+ result
+ );
+
+ // Verify only files after file_c are included
+ let expected = vec![
+ Path::from("test_offset/file_d.txt"),
+ Path::from("test_offset/file_e.txt"),
+ ];
+ assert_eq!(result.len(), expected.len());
+ for file in &expected {
+ assert!(result.contains(file), "Expected file {:?} in results", file);
+ }
+
+ // Clean up
+ delete_fixtures(storage).await;
+}
diff --git a/src/lib.rs b/src/lib.rs
index 54d51dd..3903a62 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1074,7 +1074,9 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// List all the objects with the given prefix and a location greater than
`offset`
///
/// Some stores, such as S3 and GCS, may be able to push `offset` down to
reduce
- /// the number of network requests required
+ /// the number of network requests required.
+ ///
+ /// This returns an exclusive offset, i.e. objects at exactly `offset`
will not be included.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
///
diff --git a/src/list.rs b/src/list.rs
index e73fe52..f9a47cf 100644
--- a/src/list.rs
+++ b/src/list.rs
@@ -28,6 +28,9 @@ pub struct PaginatedListOptions {
/// Path to start listing from
///
/// Note: Not all stores support this
+ ///
+ /// For stores that do support this, the returned
+ /// result should not include the object with this key.
pub offset: Option<String>,
/// A delimiter use to group keys with a common prefix
diff --git a/src/local.rs b/src/local.rs
index 3a6a7a4..907b589 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1271,6 +1271,7 @@ mod tests {
let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
diff --git a/src/memory.rs b/src/memory.rs
index f026907..d6c7c0c 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -548,6 +548,7 @@ mod tests {
let integration = InMemory::new();
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -564,6 +565,7 @@ mod tests {
let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -577,6 +579,7 @@ mod tests {
let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;