BlakeOrth commented on code in PR #19298:
URL: https://github.com/apache/datafusion/pull/19298#discussion_r2615769167
##########
datafusion/datasource/src/url.rs:
##########
@@ -826,6 +803,190 @@ mod tests {
Ok(())
}
+ /// Tests that the cached code path produces identical results to the
non-cached path.
+ ///
+ /// This is critical: the cache is a transparent optimization, so both
paths
+ /// MUST return the same files in the same order.
+ #[tokio::test]
+ async fn test_cache_path_equivalence() -> Result<()> {
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+ use std::time::Duration;
+
+ let store = MockObjectStore {
+ in_mem: object_store::memory::InMemory::new(),
+ forbidden_paths: vec![],
+ };
+
+ // Create test files with partition-style paths
+ create_file(&store, "/table/year=2023/data1.parquet").await;
+ create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
+ create_file(&store, "/table/year=2024/data3.parquet").await;
+ create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
+ create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
+
+ // Session WITHOUT cache
+ let session_no_cache = MockSession::new();
+
+ // Session WITH cache - use RuntimeEnvBuilder with cache limits
+ let runtime_with_cache = RuntimeEnvBuilder::new()
+ .with_object_list_cache_limit(1024 * 1024) // 1MB limit
+ .with_object_list_cache_ttl(Duration::from_secs(300))
Review Comment:
Since this test isn't really relying on entry expiration this can probably
left at its default (`None`) to entirely avoid potential test timing issues.
##########
datafusion/datasource/src/url.rs:
##########
@@ -826,6 +803,190 @@ mod tests {
Ok(())
}
+ /// Tests that the cached code path produces identical results to the
non-cached path.
+ ///
+ /// This is critical: the cache is a transparent optimization, so both
paths
+ /// MUST return the same files in the same order.
+ #[tokio::test]
+ async fn test_cache_path_equivalence() -> Result<()> {
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+ use std::time::Duration;
+
+ let store = MockObjectStore {
+ in_mem: object_store::memory::InMemory::new(),
+ forbidden_paths: vec![],
+ };
+
+ // Create test files with partition-style paths
+ create_file(&store, "/table/year=2023/data1.parquet").await;
+ create_file(&store, "/table/year=2023/month=01/data2.parquet").await;
+ create_file(&store, "/table/year=2024/data3.parquet").await;
+ create_file(&store, "/table/year=2024/month=06/data4.parquet").await;
+ create_file(&store, "/table/year=2024/month=12/data5.parquet").await;
+
+ // Session WITHOUT cache
+ let session_no_cache = MockSession::new();
+
+ // Session WITH cache - use RuntimeEnvBuilder with cache limits
+ let runtime_with_cache = RuntimeEnvBuilder::new()
+ .with_object_list_cache_limit(1024 * 1024) // 1MB limit
+ .with_object_list_cache_ttl(Duration::from_secs(300))
+ .build_arc()?;
+ let session_with_cache =
MockSessionWithRuntime::new(runtime_with_cache);
+
+ // Test cases: (url, prefix, description)
+ let test_cases = vec![
+ ("/table/", None, "full table listing"),
+ (
+ "/table/",
+ Some(Path::from("year=2023")),
+ "single partition filter",
+ ),
+ (
+ "/table/",
+ Some(Path::from("year=2024")),
+ "different partition filter",
+ ),
+ (
+ "/table/",
+ Some(Path::from("year=2024/month=06")),
+ "nested partition filter",
+ ),
+ (
+ "/table/",
+ Some(Path::from("year=2025")),
+ "non-existent partition",
+ ),
+ ];
+
+ for (url_str, prefix, description) in test_cases {
+ let url = ListingTableUrl::parse(url_str)?;
+
+ // Get results WITHOUT cache
+ let results_no_cache: Vec<String> = url
+ .list_prefixed_files(&session_no_cache, &store,
prefix.clone(), "parquet")
+ .await?
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .map(|m| m.location.to_string())
+ .collect();
+
+ // Get results WITH cache (first call - cache miss)
+ let results_with_cache_miss: Vec<String> = url
+ .list_prefixed_files(
+ &session_with_cache,
+ &store,
+ prefix.clone(),
+ "parquet",
+ )
+ .await?
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .map(|m| m.location.to_string())
+ .collect();
+
+ // Get results WITH cache (second call - cache hit)
+ let results_with_cache_hit: Vec<String> = url
+ .list_prefixed_files(&session_with_cache, &store, prefix,
"parquet")
+ .await?
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .map(|m| m.location.to_string())
+ .collect();
+
+ // All three should be identical
+ assert_eq!(
+ results_no_cache, results_with_cache_miss,
+ "Cache miss path should match non-cached path for:
{description}"
+ );
+ assert_eq!(
+ results_no_cache, results_with_cache_hit,
+ "Cache hit path should match non-cached path for:
{description}"
+ );
+ }
+
+ Ok(())
+ }
+
+ /// Tests that partition queries can be served from a cached full-table
listing
+ #[tokio::test]
+ async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+ use std::time::Duration;
+
+ let store = MockObjectStore {
+ in_mem: object_store::memory::InMemory::new(),
+ forbidden_paths: vec![],
+ };
+
+ // Create test files
+ create_file(&store, "/sales/region=US/q1.parquet").await;
+ create_file(&store, "/sales/region=US/q2.parquet").await;
+ create_file(&store, "/sales/region=EU/q1.parquet").await;
+
+ // Create session with cache - use RuntimeEnvBuilder with cache limits
+ let runtime = RuntimeEnvBuilder::new()
+ .with_object_list_cache_limit(1024 * 1024) // 1MB limit
+ .with_object_list_cache_ttl(Duration::from_secs(300))
Review Comment:
The same order criteria and default TTL apply to this test as well
##########
datafusion/datasource/src/url.rs:
##########
@@ -329,36 +331,46 @@ impl ListingTableUrl {
/// # Arguments
/// * `ctx` - The session context
/// * `store` - The object store to list from
-/// * `full_prefix` - The full prefix to list (table_base + partition prefix)
/// * `table_base_path` - The table's base path (the stable cache key)
+/// * `partition_prefix` - Optional partition prefix relative to table base
///
-/// # Cache Behavior :
+/// # Cache Behavior:
/// The cache key is always `table_base_path`. When a partition-specific
listing
-/// is requested (full_prefix includes partition path), the cache:
+/// is requested via `partition_prefix`, the cache:
/// - Looks up `table_base_path` in the cache
-/// - Filters results to match `full_prefix`
+/// - Filters results to match `table_base_path/partition_prefix`
/// - Returns filtered results without a storage call
///
/// On cache miss, the full table is always listed and cached, ensuring
/// subsequent partition queries can be served from cache.
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
- full_prefix: &Path,
table_base_path: &Path,
+ partition_prefix: Option<&Path>,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
+ // Build the full listing path (table_base + partition_prefix)
+ let full_prefix = match partition_prefix {
+ Some(prefix) if !prefix.as_ref().is_empty() => {
Review Comment:
If we look at the documentation for an `object_store::Path` I don't think we
need to handle empty path segments, because they aren't considered a valid
`Path`
https://docs.rs/object_store/0.12.4/object_store/path/struct.Path.html
> - Paths do not contain empty path segments
##########
datafusion/datasource/src/url.rs:
##########
@@ -239,13 +239,14 @@ impl ListingTableUrl {
&'a self,
ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
- prefix: Option<Path>,
+ partition_prefix: Option<Path>,
Review Comment:
A small naming nit here -- In the context of this PR I think this name
change makes sense, however the notion of a "partition" is pretty specific to
the `ListingTable` and this method is a more general helper that can be used
for operations not related to the `ListingTable`. I think keeping the name as
`prefix` is more generally applicable for users of the method outside the
context of the `ListingTable`.
##########
datafusion/datasource/src/url.rs:
##########
@@ -826,6 +803,190 @@ mod tests {
Ok(())
}
+ /// Tests that the cached code path produces identical results to the
non-cached path.
+ ///
+ /// This is critical: the cache is a transparent optimization, so both
paths
+ /// MUST return the same files in the same order.
Review Comment:
This is only partially true. The returned order of the files does not
matter. In fact, the order of items returned by `ObjectStore::list` is not
guaranteed:
https://docs.rs/object_store/0.12.4/object_store/trait.ObjectStore.html#tymethod.list
To prevent this test from potentially being flaky (I _doubt_ it really
matters for a local object store, but better safe than sorry) we should
probably collect and then sort the results rather than make hard assertions on
the return order of the items.
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -64,9 +64,21 @@ pub struct FileStatisticsCacheEntry {
/// command on the local filesystem. This operation can be expensive,
/// especially when done over remote object stores.
///
+/// The cache key is always the table's base path, ensuring a stable cache key.
+/// The `Extra` type is `Option<Path>`, representing an optional prefix filter
+/// (relative to the table base path) for partition-aware lookups.
+///
+/// When `get_with_extra(key, Some(prefix))` is called:
+/// - The cache entry for `key` (table base path) is fetched
+/// - Results are filtered to only include files matching `key/prefix`
+/// - Filtered results are returned without making a storage call
+///
+/// This enables efficient partition pruning: a single cached listing of the
+/// full table can serve queries for any partition subset.
+///
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
pub trait ListFilesCache:
- CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
+ CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
Review Comment:
To be clear, this was more of an open question I had while reading this as
opposed to a request for changes! I'm perfectly fine with `Option<Path>` (maybe
we can do `Option<&Path>` to avoid unnecessary clones?) if we all think it's
the right idea.
1. I don't think we need to worry about empty string handling throughout
this implementation because `object_store::Path` should already protect against
that (although it's probably worth validating this assertion).
https://docs.rs/object_store/0.12.4/object_store/path/struct.Path.html
2. I agree, but the user API of this cache already has `get(key: &Path)`
which represents an unfiltered query. It's also probably beneficial to keep in
mind this cache interface does not just apply to the `ListingTable`, so it may
be best to think of the implementation in more general "object store" terms
instead of "query" terms (e.g. a general path "prefix" instead of a "path
partition")
I think the code internally should stay mostly the same so we can
functionally continue delegating `get(key)` as `get_with_extra(key, None)`. I
guess the question is mostly how we want the code to present to a user at the
callsite and if there's value in forcing user's who do not have a prefix to use
`get(key)` (to explicitly show a lack of prefix filter) over
`get_with_extra(key, None)`.
Again, I don't feel too strongly about this, but at least wanted to bring it
up for some discussion!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]