This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 5bf4a6c315 Add a custom implementation
`LocalFileSystem::list_with_offset` (#7019)
5bf4a6c315 is described below
commit 5bf4a6c3154b5754e0ffdad107b4e8c6a800d78b
Author: Corwin Joy <[email protected]>
AuthorDate: Sat Feb 8 08:27:40 2025 -0800
Add a custom implementation `LocalFileSystem::list_with_offset` (#7019)
* Initial change from Daniel.
* Upgrade unit test to be more generic.
* Add comments on why we have filter
* Cleanup unit tests.
* Update object_store/src/local.rs
Co-authored-by: Adam Reeve <[email protected]>
* Add changes suggested by Adam.
* Cleanup match error.
* Apply formatting changes suggested by cargo +stable fmt --all.
* Apply cosmetic changes suggested by clippy.
* Upgrade test_path_with_offset to create temporary directory + files for
testing rather than pointing to existing dir.
---------
Co-authored-by: Adam Reeve <[email protected]>
---
object_store/src/local.rs | 219 ++++++++++++++++++++++++++++++++--------------
1 file changed, 155 insertions(+), 64 deletions(-)
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 6fef4614f9..ccf6e34df8 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -483,71 +483,15 @@ impl ObjectStore for LocalFileSystem {
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
- let config = Arc::clone(&self.config);
-
- let root_path = match prefix {
- Some(prefix) => match config.prefix_to_filesystem(prefix) {
- Ok(path) => path,
- Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
- },
- None => self.config.root.to_file_path().unwrap(),
- };
-
- let walkdir = WalkDir::new(root_path)
- // Don't include the root directory itself
- .min_depth(1)
- .follow_links(true);
-
- let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
- let entry = match
convert_walkdir_result(result_dir_entry).transpose()? {
- Ok(entry) => entry,
- Err(e) => return Some(Err(e)),
- };
-
- if !entry.path().is_file() {
- return None;
- }
-
- match config.filesystem_to_path(entry.path()) {
- Ok(path) => match is_valid_file_path(&path) {
- true => convert_entry(entry, path).transpose(),
- false => None,
- },
- Err(e) => Some(Err(e)),
- }
- });
-
- // If no tokio context, return iterator directly as no
- // need to perform chunked spawn_blocking reads
- if tokio::runtime::Handle::try_current().is_err() {
- return futures::stream::iter(s).boxed();
- }
-
- // Otherwise list in batches of CHUNK_SIZE
- const CHUNK_SIZE: usize = 1024;
-
- let buffer = VecDeque::with_capacity(CHUNK_SIZE);
- futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async
move {
- if buffer.is_empty() {
- (s, buffer) = tokio::task::spawn_blocking(move || {
- for _ in 0..CHUNK_SIZE {
- match s.next() {
- Some(r) => buffer.push_back(r),
- None => break,
- }
- }
- (s, buffer)
- })
- .await?;
- }
+ self.list_with_maybe_offset(prefix, None)
+ }
- match buffer.pop_front() {
- Some(Err(e)) => Err(e),
- Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
- None => Ok(None),
- }
- })
- .boxed()
+ fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ self.list_with_maybe_offset(prefix, Some(offset))
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
@@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}
+impl LocalFileSystem {
+ fn list_with_maybe_offset(
+ &self,
+ prefix: Option<&Path>,
+ maybe_offset: Option<&Path>,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ let config = Arc::clone(&self.config);
+
+ let root_path = match prefix {
+ Some(prefix) => match config.prefix_to_filesystem(prefix) {
+ Ok(path) => path,
+ Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ },
+ None => config.root.to_file_path().unwrap(),
+ };
+
+ let walkdir = WalkDir::new(root_path)
+ // Don't include the root directory itself
+ .min_depth(1)
+ .follow_links(true);
+
+ let maybe_offset = maybe_offset.cloned();
+
+ let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
+ // Apply offset filter before proceeding, to reduce statx file
system calls
+ // This matters for NFS mounts
+ if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(),
result_dir_entry.as_ref()) {
+ let location = config.filesystem_to_path(entry.path());
+ match location {
+ Ok(path) if path <= *offset => return None,
+ Err(e) => return Some(Err(e)),
+ _ => {}
+ }
+ }
+
+ let entry = match
convert_walkdir_result(result_dir_entry).transpose()? {
+ Ok(entry) => entry,
+ Err(e) => return Some(Err(e)),
+ };
+
+ if !entry.path().is_file() {
+ return None;
+ }
+
+ match config.filesystem_to_path(entry.path()) {
+ Ok(path) => match is_valid_file_path(&path) {
+ true => convert_entry(entry, path).transpose(),
+ false => None,
+ },
+ Err(e) => Some(Err(e)),
+ }
+ });
+
+ // If no tokio context, return iterator directly as no
+ // need to perform chunked spawn_blocking reads
+ if tokio::runtime::Handle::try_current().is_err() {
+ return futures::stream::iter(s).boxed();
+ }
+
+ // Otherwise list in batches of CHUNK_SIZE
+ const CHUNK_SIZE: usize = 1024;
+
+ let buffer = VecDeque::with_capacity(CHUNK_SIZE);
+ futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async
move {
+ if buffer.is_empty() {
+ (s, buffer) = tokio::task::spawn_blocking(move || {
+ for _ in 0..CHUNK_SIZE {
+ match s.next() {
+ Some(r) => buffer.push_back(r),
+ None => break,
+ }
+ }
+ (s, buffer)
+ })
+ .await?;
+ }
+
+ match buffer.pop_front() {
+ Some(Err(e)) => Err(e),
+ Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+ None => Ok(None),
+ }
+ })
+ .boxed()
+ }
+}
+
/// Creates the parent directories of `path` or returns an error based on
`source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()>
{
let parent = path.parent().ok_or_else(|| {
@@ -1459,6 +1490,66 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn test_path_with_offset() {
+ let root = TempDir::new().unwrap();
+ let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let root_path = root.path();
+ for i in 0..5 {
+ let filename = format!("test{}.parquet", i);
+ let file = root_path.join(filename);
+ std::fs::write(file, "test").unwrap();
+ }
+ let filter_str = "test";
+ let filter = String::from(filter_str);
+ let offset_str = filter + "1";
+ let offset = Path::from(offset_str.clone());
+
+ // Use list_with_offset to retrieve files
+ let res = integration.list_with_offset(None, &offset);
+ let offset_paths: Vec<_> = res.map_ok(|x|
x.location).try_collect().await.unwrap();
+ let mut offset_files: Vec<_> = offset_paths
+ .iter()
+ .map(|x| String::from(x.filename().unwrap()))
+ .collect();
+
+ // Check result with direct filesystem read
+ let files = fs::read_dir(root_path).unwrap();
+ let filtered_files = files
+ .filter_map(Result::ok)
+ .filter_map(|d| {
+ d.file_name().to_str().and_then(|f| {
+ if f.contains(filter_str) {
+ Some(String::from(f))
+ } else {
+ None
+ }
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let mut expected_offset_files: Vec<_> = filtered_files
+ .iter()
+ .filter(|s| **s > offset_str)
+ .cloned()
+ .collect();
+
+ fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
+ let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a ==
b).count();
+ matching == a.len() && matching == b.len()
+ }
+
+ offset_files.sort();
+ expected_offset_files.sort();
+
+ // println!("Expected Offset Files: {:?}", expected_offset_files);
+ // println!("Actual Offset Files: {:?}", offset_files);
+
+ assert_eq!(offset_files.len(), expected_offset_files.len());
+ assert!(do_vecs_match(&expected_offset_files, &offset_files));
+ }
+
#[tokio::test]
async fn filesystem_filename_with_percent() {
let temp_dir = TempDir::new().unwrap();