This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6601959fe6 Adds prefix filtering for table URLs (#18780)
6601959fe6 is described below
commit 6601959fe6b32196e2905d361f0d029426969a7e
Author: Blake Orth <[email protected]>
AuthorDate: Tue Nov 18 15:25:32 2025 -0700
Adds prefix filtering for table URLs (#18780)
## Which issue does this PR close?
This is a follow-on PR spurred by this comment chain:
- https://github.com/apache/datafusion/pull/18146#discussion_r2491693118
This work is associated with:
- https://github.com/apache/datafusion/issues/17211
## Rationale for this change
The implementation prior to merging
https://github.com/apache/datafusion/pull/18146 was capable of only
listing files under a specific prefix when the known prefixes could be
matched to filters. This PR re-introduces that capability, alleviating
the need to list and filter every file for a table when the filters
match.
## What changes are included in this PR?
- Adds the ability to list files backing a table URL optionally filtered
by a path prefix
- Reintroduces the ability for partitioned listing tables to only list
prefixes that match an input filter
- Adds tests for new functionality
## Are these changes tested?
Yes. There is existing coverage on many of the changes, new tests have
been added, and existing integration tests have been updated to show the
change in behavior.
## Are there any user-facing changes?
no
##
cc @alamb
---
datafusion/catalog-listing/src/helpers.rs | 8 +-
.../core/tests/datasource/object_store_access.rs | 4 +-
datafusion/datasource/src/url.rs | 85 +++++++++++++++++++---
3 files changed, 83 insertions(+), 14 deletions(-)
diff --git a/datafusion/catalog-listing/src/helpers.rs
b/datafusion/catalog-listing/src/helpers.rs
index 22e7002f90..5e69cf1a14 100644
--- a/datafusion/catalog-listing/src/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -381,8 +381,14 @@ pub async fn pruned_partition_list<'a>(
file_extension: &'a str,
partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
+ let prefix = if !partition_cols.is_empty() {
+ evaluate_partition_prefix(partition_cols, filters)
+ } else {
+ None
+ };
+
let objects = table_path
- .list_all_files(ctx, store, file_extension)
+ .list_prefixed_files(ctx, store, prefix, file_extension)
.await?
.try_filter(|object_meta| futures::future::ready(object_meta.size >
0));
diff --git a/datafusion/core/tests/datasource/object_store_access.rs
b/datafusion/core/tests/datasource/object_store_access.rs
index 33129150db..4a2b68fb11 100644
--- a/datafusion/core/tests/datasource/object_store_access.rs
+++ b/datafusion/core/tests/datasource/object_store_access.rs
@@ -166,7 +166,7 @@ async fn query_partitioned_csv_file() {
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- - LIST prefix=data
+ - LIST prefix=data/a=2
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
@@ -220,7 +220,7 @@ async fn query_partitioned_csv_file() {
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- - LIST prefix=data
+ - LIST prefix=data/a=2/b=20
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs
index 08e5b6a5df..1307a4c8b1 100644
--- a/datafusion/datasource/src/url.rs
+++ b/datafusion/datasource/src/url.rs
@@ -233,27 +233,37 @@ impl ListingTableUrl {
Some(stripped.split_terminator(DELIMITER))
}
- /// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
- pub async fn list_all_files<'a>(
+ /// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`,
+ /// optionally filtering by a path prefix
+ pub async fn list_prefixed_files<'a>(
&'a self,
ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
+ prefix: Option<Path>,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.config_options().execution;
let ignore_subdirectory =
exec_options.listing_table_ignore_subdirectory;
+ let prefix = if let Some(prefix) = prefix {
+ let mut p = self.prefix.parts().collect::<Vec<_>>();
+ p.extend(prefix.parts());
+ Path::from_iter(p.into_iter())
+ } else {
+ self.prefix.clone()
+ };
+
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
- list_with_cache(ctx, store, &self.prefix).await?
+ list_with_cache(ctx, store, &prefix).await?
} else {
- match store.head(&self.prefix).await {
+ match store.head(&prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
.boxed(),
// If the head command fails, it is likely that object doesn't
exist.
// Retry as though it were a prefix (aka a collection)
Err(object_store::Error::NotFound { .. }) => {
- list_with_cache(ctx, store, &self.prefix).await?
+ list_with_cache(ctx, store, &prefix).await?
}
Err(e) => return Err(e.into()),
}
@@ -269,6 +279,17 @@ impl ListingTableUrl {
.boxed())
}
+ /// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
+ pub async fn list_all_files<'a>(
+ &'a self,
+ ctx: &'a dyn Session,
+ store: &'a dyn ObjectStore,
+ file_extension: &'a str,
+ ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
+ self.list_prefixed_files(ctx, store, None, file_extension)
+ .await
+ }
+
/// Returns this [`ListingTableUrl`] as a string
pub fn as_str(&self) -> &str {
self.as_ref()
@@ -306,7 +327,7 @@ impl ListingTableUrl {
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
- prefix: &'b Path,
+ prefix: &Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
@@ -701,6 +722,35 @@ mod tests {
panic!("Expected PermissionDenied error");
};
+ // Test prefix filtering with partition-style paths
+ create_file(&store, "/data/a=1/file1.parquet").await;
+ create_file(&store, "/data/a=1/b=100/file2.parquet").await;
+ create_file(&store, "/data/a=2/b=200/file3.parquet").await;
+ create_file(&store, "/data/a=2/b=200/file4.csv").await;
+
+ assert_eq!(
+ list_prefixed_files("/data/", &store, Some(Path::from("a=1")),
"parquet")
+ .await?,
+ vec!["data/a=1/b=100/file2.parquet", "data/a=1/file1.parquet"],
+ );
+
+ assert_eq!(
+ list_prefixed_files(
+ "/data/",
+ &store,
+ Some(Path::from("a=1/b=100")),
+ "parquet"
+ )
+ .await?,
+ vec!["data/a=1/b=100/file2.parquet"],
+ );
+
+ assert_eq!(
+ list_prefixed_files("/data/", &store, Some(Path::from("a=2")),
"parquet")
+ .await?,
+ vec!["data/a=2/b=200/file3.parquet"],
+ );
+
Ok(())
}
@@ -712,7 +762,7 @@ mod tests {
.expect("failed to create test file");
}
- /// Runs "list_all_files" and returns their paths
+ /// Runs "list_prefixed_files" with no prefix to list all files and
returns their paths
///
/// Panic's on error
async fn list_all_files(
@@ -720,19 +770,32 @@ mod tests {
store: &dyn ObjectStore,
file_extension: &str,
) -> Result<Vec<String>> {
- try_list_all_files(url, store, file_extension).await
+ try_list_prefixed_files(url, store, None, file_extension).await
+ }
+
+ /// Runs "list_prefixed_files" and returns their paths
+ ///
+ /// Panic's on error
+ async fn list_prefixed_files(
+ url: &str,
+ store: &dyn ObjectStore,
+ prefix: Option<Path>,
+ file_extension: &str,
+ ) -> Result<Vec<String>> {
+ try_list_prefixed_files(url, store, prefix, file_extension).await
}
- /// Runs "list_all_files" and returns their paths
- async fn try_list_all_files(
+ /// Runs "list_prefixed_files" and returns their paths
+ async fn try_list_prefixed_files(
url: &str,
store: &dyn ObjectStore,
+ prefix: Option<Path>,
file_extension: &str,
) -> Result<Vec<String>> {
let session = MockSession::new();
let url = ListingTableUrl::parse(url)?;
let files = url
- .list_all_files(&session, store, file_extension)
+ .list_prefixed_files(&session, store, prefix, file_extension)
.await?
.try_collect::<Vec<_>>()
.await?
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]