This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 50e1229  Azure ADLS list_with_offset support (#623)
50e1229 is described below

commit 50e1229a97225fa1d330cd925187e2b302d25676
Author: Omar Khudeira <[email protected]>
AuthorDate: Tue Feb 10 06:14:33 2026 -0600

    Azure ADLS list_with_offset support (#623)
    
    * Add list_with_offset for Azure
    
    * Add list_from_exclusivity test
    
    * Update documentation on function
    
    * Remove unnecessary docs
    
    * Remove redundant tests
    
    * Run cargo fmt
    
    * Skip Azurite version check
    
    * Skip list_with_offset_exclusivity against Azure emulator
    
    * Fix failing test by changing behavior of list_with_offset if emulator is 
running instead of skipping a test
    
    * Update src/azure/mod.rs
    
    Co-authored-by: Marco Neumann <[email protected]>
    
    ---------
    
    Co-authored-by: Marco Neumann <[email protected]>
---
 src/aws/mod.rs      |   1 +
 src/azure/client.rs |  25 ++++++++---
 src/azure/mod.rs    |  21 +++++++++
 src/gcp/mod.rs      |   1 +
 src/integration.rs  | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/lib.rs          |   4 +-
 src/list.rs         |   3 ++
 src/local.rs        |   1 +
 src/memory.rs       |   3 ++
 9 files changed, 172 insertions(+), 8 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 030590a..7c8dcf3 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -625,6 +625,7 @@ mod tests {
         let test_conditional_put = config.conditional_put != 
S3ConditionalPut::Disabled;
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
diff --git a/src/azure/client.rs b/src/azure/client.rs
index 7a7d6dc..e068c4b 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -939,16 +939,10 @@ impl ListClient for Arc<AzureClient> {
         prefix: Option<&str>,
         opts: PaginatedListOptions,
     ) -> Result<PaginatedListResult> {
-        if opts.offset.is_some() {
-            return Err(crate::Error::NotSupported {
-                source: "Azure does not support listing with offsets".into(),
-            });
-        }
-
         let credential = self.get_credential().await?;
         let url = self.config.path_url(&Path::default());
 
-        let mut query = Vec::with_capacity(5);
+        let mut query = Vec::with_capacity(6);
         query.push(("restype", "container"));
         query.push(("comp", "list"));
 
@@ -962,6 +956,10 @@ impl ListClient for Arc<AzureClient> {
 
         if let Some(token) = &opts.page_token {
             query.push(("marker", token.as_ref()))
+        } else if let Some(offset) = &opts.offset {
+            // startFrom is only used on the first request, subsequent 
requests use marker
+            // Note: startFrom is inclusive (unlike S3/GCP's start-after which 
is exclusive)
+            query.push(("startFrom", offset.as_ref()))
         }
 
         let max_keys_str;
@@ -996,6 +994,19 @@ impl ListClient for Arc<AzureClient> {
 
         let token = response.next_marker.take().filter(|x| !x.is_empty());
 
+        // Azure's startFrom is inclusive, so when an offset is provided, we 
need to filter out
+        // the offset item itself to match the exclusive semantics expected by 
ObjectStore::list_with_offset.
+        // Since Azure returns items in lexicographic order and startFrom is 
inclusive, the first item
+        // (if any) will be exactly == offset (if it exists), or > offset (if 
it doesn't exist).
+        // So we can efficiently remove just the first item if it equals the 
offset.
+        if let Some(offset) = &opts.offset {
+            if let Some(first) = response.blobs.blobs.first() {
+                if first.name == *offset {
+                    response.blobs.blobs.remove(0);
+                }
+            }
+        }
+
         Ok(PaginatedListResult {
             result: to_list_result(response, prefix)?,
             page_token: token,
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 04c8f31..8a2e5d1 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -22,6 +22,7 @@
 //! [`ObjectStore::put_multipart_opts`] will upload data in blocks and write a 
blob from those blocks.
 //!
 //! Unused blocks will automatically be dropped after 7 days.
+//!
 use crate::{
     CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, 
MultipartUpload,
     ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, 
PutResult, Result,
@@ -121,6 +122,25 @@ impl ObjectStore for MicrosoftAzure {
         self.client.list(prefix)
     }
 
+    fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> BoxStream<'static, Result<ObjectMeta>> {
+        if self.client.config().is_emulator {
+            // Azurite doesn't support the startFrom query parameter,
+            // fall back to client-side filtering
+            //
+            // See 
https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055
+            let offset = offset.clone();
+            self.list(prefix)
+                .try_filter(move |f| futures::future::ready(f.location > 
offset))
+                .boxed()
+        } else {
+            self.client.list_with_offset(prefix, offset)
+        }
+    }
+
     fn delete_stream(
         &self,
         locations: BoxStream<'static, Result<Path>>,
@@ -324,6 +344,7 @@ mod tests {
         let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 2fb74b4..4e1679e 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -317,6 +317,7 @@ mod test {
         let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/src/integration.rs b/src/integration.rs
index 5c601b5..b0672db 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -1264,3 +1264,124 @@ pub async fn list_paginated(storage: &dyn ObjectStore, 
list: &dyn PaginatedListS
     );
     assert!(ret.page_token.is_none());
 }
+
+/// Tests that [`ObjectStore::list_with_offset`] returns an exclusive list
+/// that does not include the offset value itself.
+/// This is needed because some object stores (i.e. Azure) return inclusive 
results,
+/// while AWS S3 and GCP return exclusive results.
+pub async fn list_with_offset_exclusivity(storage: &DynObjectStore) {
+    delete_fixtures(storage).await;
+
+    // Create a set of test files with predictable ordering
+    let files = vec![
+        Path::from("test_offset/file_a.txt"),
+        Path::from("test_offset/file_b.txt"),
+        Path::from("test_offset/file_c.txt"),
+        Path::from("test_offset/file_d.txt"),
+        Path::from("test_offset/file_e.txt"),
+    ];
+
+    let data = Bytes::from("test data");
+    for file in &files {
+        storage.put(file, data.clone().into()).await.unwrap();
+    }
+
+    // Test 1: Offset at file_b, should return c, d, e but NOT b
+    let offset = Path::from("test_offset/file_b.txt");
+    let result: Vec<Path> = storage
+        .list_with_offset(Some(&Path::from("test_offset")), &offset)
+        .map_ok(|meta| meta.location)
+        .try_collect()
+        .await
+        .unwrap();
+
+    // Verify that the offset file (file_b) is NOT in the results
+    assert!(
+        !result.contains(&offset),
+        "list_with_offset should not include the offset value itself. Found 
{:?} in results: {:?}",
+        offset,
+        result
+    );
+
+    // Verify that only files after the offset are returned
+    let expected = vec![
+        Path::from("test_offset/file_c.txt"),
+        Path::from("test_offset/file_d.txt"),
+        Path::from("test_offset/file_e.txt"),
+    ];
+    assert_eq!(
+        result.len(),
+        expected.len(),
+        "Expected {} files after offset, got {}",
+        expected.len(),
+        result.len()
+    );
+    for file in &expected {
+        assert!(
+            result.contains(file),
+            "Expected file {:?} to be in results",
+            file
+        );
+    }
+
+    // Test 2: Offset at file_a, should return b, c, d, e but NOT a
+    let offset = Path::from("test_offset/file_a.txt");
+    let result: Vec<Path> = storage
+        .list_with_offset(Some(&Path::from("test_offset")), &offset)
+        .map_ok(|meta| meta.location)
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert!(
+        !result.contains(&offset),
+        "list_with_offset should not include the offset value itself. Found 
{:?} in results: {:?}",
+        offset,
+        result
+    );
+    assert_eq!(result.len(), 4, "Expected 4 files after first file");
+
+    // Test 3: Offset at file_e (last file), should return empty list
+    let offset = Path::from("test_offset/file_e.txt");
+    let result: Vec<Path> = storage
+        .list_with_offset(Some(&Path::from("test_offset")), &offset)
+        .map_ok(|meta| meta.location)
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert!(
+        result.is_empty(),
+        "list_with_offset with offset at last file should return empty list, 
got: {:?}",
+        result
+    );
+
+    // Test 4: Test without prefix - offset should still be exclusive
+    let offset = Path::from("test_offset/file_c.txt");
+    let result: Vec<Path> = storage
+        .list_with_offset(None, &offset)
+        .map_ok(|meta| meta.location)
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert!(
+        !result.contains(&offset),
+        "list_with_offset without prefix should not include the offset value 
itself. Found {:?} in results: {:?}",
+        offset,
+        result
+    );
+
+    // Verify only files after file_c are included
+    let expected = vec![
+        Path::from("test_offset/file_d.txt"),
+        Path::from("test_offset/file_e.txt"),
+    ];
+    assert_eq!(result.len(), expected.len());
+    for file in &expected {
+        assert!(result.contains(file), "Expected file {:?} in results", file);
+    }
+
+    // Clean up
+    delete_fixtures(storage).await;
+}
diff --git a/src/lib.rs b/src/lib.rs
index 54d51dd..3903a62 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1074,7 +1074,9 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// List all the objects with the given prefix and a location greater than 
`offset`
     ///
     /// Some stores, such as S3 and GCS, may be able to push `offset` down to 
reduce
-    /// the number of network requests required
+    /// the number of network requests required.
+    ///
+    /// This returns an exclusive offset, i.e. objects at exactly `offset` 
will not be included.
     ///
     /// Note: the order of returned [`ObjectMeta`] is not guaranteed
     ///
diff --git a/src/list.rs b/src/list.rs
index e73fe52..f9a47cf 100644
--- a/src/list.rs
+++ b/src/list.rs
@@ -28,6 +28,9 @@ pub struct PaginatedListOptions {
     /// Path to start listing from
     ///
     /// Note: Not all stores support this
+    ///
+    /// For stores that do support this, the returned
+    /// result should not include the object with this key.
     pub offset: Option<String>,
 
     /// A delimiter use to group keys with a common prefix
diff --git a/src/local.rs b/src/local.rs
index 3a6a7a4..907b589 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -1271,6 +1271,7 @@ mod tests {
         let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
diff --git a/src/memory.rs b/src/memory.rs
index f026907..d6c7c0c 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -548,6 +548,7 @@ mod tests {
         let integration = InMemory::new();
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
@@ -564,6 +565,7 @@ mod tests {
         let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
@@ -577,6 +579,7 @@ mod tests {
         let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
 
         put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;

Reply via email to