BlakeOrth opened a new issue, #17211:
URL: https://github.com/apache/datafusion/issues/17211

   ### Is your feature request related to a problem or challenge?
   
   When using "high latency" storage (e.g. remote object stores, such as AWS 
S3) listing objects and collecting object metadata can end up taking a 
non-trivial amount of time. This has already been noted in #7618 where a simple 
cache was implemented and partially integrated. Unfortunately, partitioned 
tables cannot hit the cache path in the current `ListingTable` implementation. 
A discrepancy between the performance of flat vs partitioned tables has been 
previously noted in #9654. Unfortunately, that issue didn't provide much detail 
on the timing discrepancies, nor did it provide an easy way to reproduce the 
behavior.
   
   In order to better illustrate this issue, I have added some basic timing 
statements in the `ListingTable` which can be reproduced using the diff below:
   
   ```diff
   diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
   index a93654cdf..8d1ddc909 100644
   --- a/datafusion/core/src/datasource/listing/table.rs
   +++ b/datafusion/core/src/datasource/listing/table.rs
   @@ -1342,6 +1342,7 @@ impl ListingTable {
                return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
            };
            // list files (with partitions)
   +        let start = datafusion_common::instant::Instant::now();
            let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
                pruned_partition_list(
                    ctx,
   @@ -1353,6 +1354,10 @@ impl ListingTable {
                )
            }))
            .await?;
   +        println!(
   +            "file_list duration: {}ms",
   +            1000. * start.elapsed().as_secs_f32()
   +        );
            let meta_fetch_concurrency =
                ctx.config_options().execution.meta_fetch_concurrency;
            let file_list = 
stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
   @@ -1370,8 +1375,17 @@ impl ListingTable {
                .boxed()
                
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
   
   +        let group_start = datafusion_common::instant::Instant::now();
            let (file_group, inexact_stats) =
                get_files_with_limit(files, limit, 
self.options.collect_stat).await?;
   +        println!(
   +            "file_group duration: {}ms",
   +            1000. * group_start.elapsed().as_secs_f32()
   +        );
   +        println!(
   +            "Total duration: {}ms",
   +            1000. * start.elapsed().as_secs_f32()
   +        );
   
            let file_groups = 
file_group.split_files(self.options.target_partitions);
            let (mut file_groups, mut stats) = compute_all_files_statistics(
   ```
   
   (note that reproducing these results currently relies on the resolution to 
apache/datafusion/pull/17050)
   The discrepancies between the timing for these operations can be seen below. 
In spite of the fact the queries ultimately end up operating on the same exact 
subset of the files, the queries issued to the partitioned dataset continue to 
take longer to complete. In all cases shown here the cumulative duration 
required to build the `file_groups` dominates the query times. Each query is 
issued twice to allow the default caching mechanisms in DataFusion to build 
cached data, and then operate on that cached data in the 2nd query.
   
   ```sql
   DataFusion CLI v49.0.0
   > set datafusion.execution.parquet.cache_metadata = true;
   0 row(s) fetched.
   Elapsed 0.000 seconds.
   
   > CREATE EXTERNAL TABLE overture_maps
   STORED AS PARQUET LOCATION 
's3://overturemaps-us-west-2/release/2025-07-23.0/';
   0 row(s) fetched.
   Elapsed 7.724 seconds.
   
   > select count(*) from overture_maps where type='address';
   file_list duration: 317.549ms
   file_group duration: 240.83353ms
   Total duration: 558.39996ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.563 seconds.
   
   > select count(*) from overture_maps where type='address';
   file_list duration: 371.57742ms
   file_group duration: 0.034617003ms
   Total duration: 371.62094ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.376 seconds.
   
   > CREATE EXTERNAL TABLE overture_maps_flat
   STORED AS PARQUET LOCATION 
's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/';
   0 row(s) fetched.
   Elapsed 1.478 seconds.
   
   > select count(*) from overture_maps_flat;
   file_list duration: 0.004915ms
   file_group duration: 421.62354ms
   Total duration: 421.64505ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.423 seconds.
   
   > select count(*) from overture_maps_flat;
   file_list duration: 0.004658ms
   file_group duration: 42.788586ms
   Total duration: 42.806194ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.044 seconds.
   ```
   
   The basic layout of the underlying dataset can be seen here. Note that the 
subset of files accessed for the queries against the partitioned table will be 
exactly the same as the flat dataset.
   
   ```console
   $ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/ --recursive
   2025-07-22 14:42:50 1073537247 
release/2025-07-23.0/theme=addresses/type=address/part-00000-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet
   2025-07-22 14:43:06 1013189887 
release/2025-07-23.0/theme=addresses/type=address/part-00001-57f746d8-98a1-4faa-94c2-4f084343ecac-c000.zstd.parquet
   . . .
   
   $ aws s3 ls s3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/
                              PRE type=address/
   ```
   
   The `DefaultListFilesCache` can be forced into the mix using the following 
diff:
   
   ```diff
   diff --git a/datafusion/execution/src/cache/cache_manager.rs 
b/datafusion/execution/src/cache/cache_manager.rs
   index 37f1baa17..1e3faad15 100644
   --- a/datafusion/execution/src/cache/cache_manager.rs
   +++ b/datafusion/execution/src/cache/cache_manager.rs
   @@ -80,6 +80,9 @@ impl CacheManager {
            }
            if let Some(lc) = &config.list_files_cache {
                manager.list_files_cache = Some(Arc::clone(lc))
   +        } else {
   +            manager.list_files_cache =
   +                
Some(Arc::new(super::cache_unit::DefaultListFilesCache::default()));
            }
            if let Some(mc) = &config.file_metadata_cache {
                manager.file_metadata_cache = Some(Arc::clone(mc));
   ```
   
   When the cache is inserted, the performance for the flat table shows 
significant improvements on repeat queries, however the partitioned table 
continues to suffer from latencies dominated by listing files.
   
   ```sql
   DataFusion CLI v49.0.0
   > set datafusion.execution.parquet.cache_metadata = true;
   0 row(s) fetched.
   Elapsed 0.000 seconds.
   
   > CREATE EXTERNAL TABLE overture_maps
   STORED AS PARQUET LOCATION 
's3://overturemaps-us-west-2/release/2025-07-23.0/';
   0 row(s) fetched.
   Elapsed 9.056 seconds.
   
   > select count(*) from overture_maps where type='address';
   file_list duration: 412.9362ms
   file_group duration: 272.7048ms
   Total duration: 685.6636ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.690 seconds.
   
   > select count(*) from overture_maps where type='address';
   file_list duration: 136.47644ms
   file_group duration: 0.034272ms
   Total duration: 136.51955ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.141 seconds.
   
   > CREATE EXTERNAL TABLE overture_maps_flat
   STORED AS PARQUET LOCATION 
's3://overturemaps-us-west-2/release/2025-07-23.0/theme=addresses/type=address/';
   0 row(s) fetched.
   Elapsed 0.727 seconds.
   
   > select count(*) from overture_maps_flat;
   file_list duration: 0.0063920002ms
   file_group duration: 357.8303ms
   Total duration: 357.86032ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.359 seconds.
   
   > select count(*) from overture_maps_flat;
   file_list duration: 0.0066520004ms
   file_group duration: 0.030025ms
   Total duration: 0.047092ms
   +-----------+
   | count(*)  |
   +-----------+
   | 446544475 |
   +-----------+
   1 row(s) fetched.
   Elapsed 0.001 seconds.
   ```
   
   ### Describe the solution you'd like
   
    Ultimately I'd like partitioned datasets to operate with similar 
performance to flat datasets, and have caching mechanisms available to both. 
Based on the structure of the existing code, I believe that the code path the 
`ListingTable` executes when discovering files and collecting initial metadata 
should be normalized between partitioned datasets and flat datasets.
   
   During a table scan, both table types first call `list_files_for_scan` 
followed by `pruned_partition_list`
   
https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/core/src/datasource/listing/table.rs#L1356
   
https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L409
   
   In `pruned_partition_list` flat tables end up executing `list_all_files` 
which is where the `ListFilesCache` is currently leveraged:
   
https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L417-L432
   
   Partitioned datasets, however, end up making  calls that at least partially, 
and may fully rediscover, the partition and file structure by calling 
`list_partitions`
   
https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L435-L436
   
   Note that `list_partitions` executes a breadth first search of the 
(potentially prefixed, depending on query filter evaluation) partition 
structure, and as such the per-query latency penalty it applies scales linearly 
with the depth of the partition tree.
   
https://github.com/apache/datafusion/blob/f3941b207eeaa7768d840e17c32fa61f3b6fca71/datafusion/catalog-listing/src/helpers.rs#L188
   
   *My Opinion*
   Executing a breadth first search on the full, or a prefixed subset of, the 
dataset is likely less performant than simply recursively listing the full, or 
a prefixed subset of, dataset. The partitions of any given object can be 
directly derived from the object's path, allowing pruning of objects to 
continue working as it works right now. Additionally, any caching for listing 
all of the objects can then apply to both partitioned and flat tables. I 
suspect there are table structures where the current approach is better, such 
as a table with many many objects and a shallow partition structure. However, 
most remote object storage solutions will return many hundreds to thousands of 
object paths in a single list request, so the tables would need to be quite 
large in terms of number of objects.
   
   ### Describe alternatives you've considered
   
   It would probably be possible to apply a `ListFilesCache` prior to the 
divergence between the flat and partitioned tables. In this scenario I think 
care would need to be taken when loading objects into the cache, because 
partitioned datasets don't necessarily return all the potential objects due to 
pruning. This would also mean that repeat queries to partitioned datasets could 
continue to have a high percentage of cache misses depending on user supplied 
filters.
   
   ### Additional context
   
   Existing issues mentioned:
   https://github.com/apache/datafusion/issues/7618
   https://github.com/apache/datafusion/issues/9654
   
   Since this relates to the `ListingTable` and remote dataset:
   https://github.com/apache/datafusion/issues/16365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to