BlakeOrth opened a new issue, #17211: URL: https://github.com/apache/datafusion/issues/17211
### Is your feature request related to a problem or challenge? When using "high latency" storage (e.g. remote object stores, such as AWS S3) listing objects and collecting object metadata can end up taking a non-trivial amount of time. This has already been noted in #7618 where a simple cache was implemented and partially integrated. Unfortunately, partitioned tables cannot hit the cache path in the current `ListingTable` implementation. A discrepancy between the performance of flat vs partitioned tables has been previously noted in #9654. Unfortunately, that issue didn't provide much detail on the timing discrepancies, nor did it provide an easy way to reproduce the behavior. In order to better illustrate this issue, I have added some basic timing statements in the `ListingTable` which can be reproduced using the diff below: ```diff diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a93654cdf..8d1ddc909 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1342,6 +1342,7 @@ impl ListingTable { return Ok((vec![], Statistics::new_unknown(&self.file_schema))); }; // list files (with partitions) + let start = datafusion_common::instant::Instant::now(); let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { pruned_partition_list( ctx, @@ -1353,6 +1354,10 @@ impl ListingTable { ) })) .await?; + println!( + "file_list duration: {}ms", + 1000. * start.elapsed().as_secs_f32() + ); let meta_fetch_concurrency = ctx.config_options().execution.meta_fetch_concurrency; let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); @@ -1370,8 +1375,17 @@ impl ListingTable { .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); + let group_start = datafusion_common::instant::Instant::now(); let (file_group, inexact_stats) = get_files_with_limit(files, limit, self.options.collect_stat).await?; + println!( + "file_group duration: {}ms", + 1000. * group_start.elapsed().as_secs_f32() + ); + println!( + "Total duration: {}ms", + 1000. * start.elapsed().as_secs_f32() + ); let file_groups = file_group.split_files(self.options.target_partitions); let (mut file_groups, mut stats) = compute_all_files_statistics( ``` (note that reproducing these results currently relies on the resolution to apache/datafusion/pull/17050) The discrepancies between the timing for these operations can be seen below. In spite of the fact the queries ultimately end up operating on the same exact subset of the files, the queries issued to the partitioned dataset continue to take longer to complete. In all cases shown here the cumulative duration required to build the `file_groups` dominates the query times. Each query is issued twice to allow the default caching mechanisms in DataFusion to build cached data, and then operate on that cached data in the 2nd query. ```sql DataFusion CLI v49.0.0 > set datafusion.execution.parquet.cache_metadata = true; 0 row(s) fetched. Elapsed 0.000 seconds. > CREATE EXTERNAL TABLE overture_maps STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/'; 0 row(s) fetched. Elapsed 7.724 seconds. > select count(*) from overture_maps where type='address'; file_list duration: 317.549ms file_group duration: 240.83353ms Total duration: 558.39996ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.563 seconds. > select count(*) from overture_maps where type='address'; file_list duration: 371.57742ms file_group duration: 0.034617003ms Total duration: 371.62094ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.376 seconds. > CREATE EXTERNAL TABLE overture_maps_flat STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/'; 0 row(s) fetched. Elapsed 1.478 seconds. > select count(*) from overture_maps_flat; file_list duration: 0.004915ms file_group duration: 421.62354ms Total duration: 421.64505ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.423 seconds. > select count(*) from overture_maps_flat; file_list duration: 0.004658ms file_group duration: 42.788586ms Total duration: 42.806194ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.044 seconds. ``` The basic layout of the underlying dataset can be seen here. Note that the subset of files accessed for the queries against the partitioned table will be exactly the same as the flat dataset. ```console $ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/ --recursive 2025-07-22 14:42:50 1073537247 release/2025-07-23.0/theme=addresses/type=address/part-00000-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet 2025-07-22 14:43:06 1013189887 release/2025-07-23.0/theme=addresses/type=address/part-00001-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet . . . $ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/ PRE type=address/ ``` The `DefaultListFilesCache` can be forced into the mix using the following diff: ```diff diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 37f1baa17..1e3faad15 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -80,6 +80,9 @@ impl CacheManager { } if let Some(lc) = &config.list_files_cache { manager.list_files_cache = Some(Arc::clone(lc)) + } else { + manager.list_files_cache = + Some(Arc::new(super::cache_unit::DefaultListFilesCache::default())); } if let Some(mc) = &config.file_metadata_cache { manager.file_metadata_cache = Some(Arc::clone(mc)); ``` When the cache is inserted, the performance for the flat table shows significant improvements on repeat queries, however the partitioned table continues to suffer from latencies dominated by listing files. ```sql DataFusion CLI v49.0.0 > set datafusion.execution.parquet.cache_metadata = true; 0 row(s) fetched. Elapsed 0.000 seconds. > CREATE EXTERNAL TABLE overture_maps STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/'; 0 row(s) fetched. Elapsed 9.056 seconds. > select count(*) from overture_maps where type='address'; file_list duration: 412.9362ms file_group duration: 272.7048ms Total duration: 685.6636ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.690 seconds. > select count(*) from overture_maps where type='address'; file_list duration: 136.47644ms file_group duration: 0.034272ms Total duration: 136.51955ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.141 seconds. > CREATE EXTERNAL TABLE overture_maps_flat STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/'; 0 row(s) fetched. Elapsed 0.727 seconds. > select count(*) from overture_maps_flat; file_list duration: 0.0063920002ms file_group duration: 357.8303ms Total duration: 357.86032ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.359 seconds. > select count(*) from overture_maps_flat; file_list duration: 0.0066520004ms file_group duration: 0.030025ms Total duration: 0.047092ms +-----------+ | count(*) | +-----------+ | 446544475 | +-----------+ 1 row(s) fetched. Elapsed 0.001 seconds. ``` ### Describe the solution you'd like Ultimately I'd like partitioned datasets to operate with similar performance to flat datasets, and have caching mechanisms available to both. Based on the structure of the existing code, I believe that the code path the `ListingTable` executes when discovering files and collecting initial metadata should be normalized between partitioned datasets and flat datasets. During a table scan, both table types first call `list_files_for_scan` followed by `pruned_partition_list` https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/core/src/datasource/listing/table.rs#L1356 https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L409 In `pruned_partition_list` flat tables end up executing `list_all_files` which is where the `ListFilesCache` is currently leveraged: https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L417-L432 Partitioned datasets, however, end up making calls that at least partially, and may fully rediscover, the partition and file structure by calling `list_partitions` https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L435-L436 Note that `list_partitions` executes a breadth first search of the (potentially prefixed, depending on query filter evaluation) partition structure, and as such the per-query latency penalty it applies scales linearly with the depth of the partition tree. https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L188 *My Opinion* Executing a breadth first search on the full, or a prefixed subset of, the dataset is likely less performant than simply recursively listing the full, or a prefixed subset of, dataset. The partitions of any given object can be directly derived from the object's path, allowing pruning of objects to continue working as it works right now. Additionally, any caching for listing all of the objects can then apply to both partitioned and flat tables. I suspect there are table structures where the current approach is better, such as a table with many many objects and a shallow partition structure. However, most remote object storage solutions will return many hundreds to thousands of object paths in a single list request, so the tables would need to be quite large in terms of number of objects. ### Describe alternatives you've considered It would probably be possible to apply a `ListFilesCache` prior to the divergence between the flat and partitioned tables. In this scenario I think care would need to be taken when loading objects into the cache, because partitioned datasets don't necessarily return all the potential objects due to pruning. This would also mean that repeat queries to partitioned datasets could continue to have a high percentage of cache misses depending on user supplied filters. ### Additional context Existing issues mentioned: https://github.com/apache/datafusion/issues/7618 https://github.com/apache/datafusion/issues/9654 Since this relates to the `ListingTable` and remote dataset: https://github.com/apache/datafusion/issues/16365 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org