BlakeOrth commented on code in PR #18146:
URL: https://github.com/apache/datafusion/pull/18146#discussion_r2453457296
##########
datafusion/catalog-listing/src/helpers.rs:
##########
@@ -424,80 +380,41 @@ pub async fn pruned_partition_list<'a>(
file_extension: &'a str,
partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
- // if no partition col => simply list all the files
+ let objects = table_path
+ .list_all_files(ctx, store, file_extension)
+ .await?
+ .try_filter(|object_meta| futures::future::ready(object_meta.size >
0));
+
if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
- return Ok(Box::pin(
- table_path
- .list_all_files(ctx, store, file_extension)
- .await?
- .try_filter(|object_meta|
futures::future::ready(object_meta.size > 0))
- .map_ok(|object_meta| object_meta.into()),
- ));
- }
-
- let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
-
- let partitions =
- list_partitions(store, table_path, partition_cols.len(),
partition_prefix)
- .await?;
- debug!("Listed {} partitions", partitions.len());
- let pruned =
- prune_partitions(table_path, partitions, filters,
partition_cols).await?;
-
- debug!("Pruning yielded {} partitions", pruned.len());
-
- let stream = futures::stream::iter(pruned)
- .map(move |partition: Partition| async move {
- let cols = partition_cols.iter().map(|x| x.0.as_str());
- let parsed = parse_partitions_for_path(table_path,
&partition.path, cols);
+ // if no partition col => simply list all the files
+ Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
+ } else {
+ let df_schema = DFSchema::from_unqualified_fields(
+ partition_cols
+ .iter()
+ .map(|(n, d)| Field::new(n, d.clone(), true))
+ .collect(),
+ Default::default(),
+ )?;
- let partition_values = parsed
- .into_iter()
- .flatten()
- .zip(partition_cols)
- .map(|(parsed, (_, datatype))| {
- ScalarValue::try_from_string(parsed.to_string(), datatype)
- })
- .collect::<Result<Vec<_>>>()?;
-
- let files = match partition.files {
- Some(files) => files,
- None => {
- trace!("Recursively listing partition {}", partition.path);
- store.list(Some(&partition.path)).try_collect().await?
- }
- };
- let files = files.into_iter().filter(move |o| {
- let extension_match =
o.location.as_ref().ends_with(file_extension);
- // here need to scan
subdirectories(`listing_table_ignore_subdirectory` = false)
- let glob_match = table_path.contains(&o.location, false);
- extension_match && glob_match
- });
-
- let stream = futures::stream::iter(files.map(move |object_meta| {
- Ok(PartitionedFile {
- object_meta,
- partition_values: partition_values.clone(),
- range: None,
- statistics: None,
- extensions: None,
- metadata_size_hint: None,
- })
- }));
-
- Ok::<_, DataFusionError>(stream)
- })
- .buffer_unordered(CONCURRENCY_LIMIT)
Review Comment:
Yes, although it's subtly more complex than that. This existing
implementation doesn't blindly list all known partitions in parallel. It's
probably more fair to say that it potentially filters known partitions and then
rediscovers all un-pruned partitions in parallel. I think an example probably
shows this off pretty well.
Given a table with the following structure:
```console
test_table/
├── a=1
│ └── b=10
│ └── c=100
│ └── file1.parquet
├── a=2
│ └── b=20
│ └── c=200
│ └── file2.parquet
└── a=3
└── b=30
└── c=300
└── file2.parquet
```
Here are some annotated query examples showing the list operations:
```sql
> create external table test_table
stored as parquet location '/tmp/test_table/';
> select count(*) from test_table;
+----------+
| count(*) |
+----------+
| 6 |
+----------+
1 row(s) fetched.
Elapsed 0.007 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner:
LocalFileSystem(file:///)
# This list call is executed, rediscovering partition 'a'
2025-10-22T21:40:58.237088151+00:00 operation=List duration=0.000294s
path=tmp/test_table
# ----
# These 3 list calls are executed in parallel, rediscovering all of the
partitions at the 2nd level, in this case 'b'
2025-10-22T21:40:58.237397741+00:00 operation=List duration=0.000081s
path=tmp/test_table/a=1
2025-10-22T21:40:58.237414558+00:00 operation=List duration=0.000069s
path=tmp/test_table/a=2
2025-10-22T21:40:58.237436985+00:00 operation=List duration=0.000101s
path=tmp/test_table/a=3
# ---
# Then the 'b' partitions are listed in parallel, rediscovering the 'c'
partitions
2025-10-22T21:40:58.237487175+00:00 operation=List duration=0.000056s
path=tmp/test_table/a=1/b=10
2025-10-22T21:40:58.237513848+00:00 operation=List duration=0.000058s
path=tmp/test_table/a=2/b=20
# Then the 'c' partitions are listed in parallel, finally discovering
readable files for this table.
# Note that, while the 'c' partition directly following this comment is
returned prior to a 'b' partition, the timestamps
# indicate the list call for the 'b' partition was submitted first.
2025-10-22T21:40:58.237576223+00:00 operation=List duration=0.000047s
path=tmp/test_table/a=2/b=20/c=200
2025-10-22T21:40:58.237548133+00:00 operation=List duration=0.000080s
path=tmp/test_table/a=3/b=30
2025-10-22T21:40:58.237560094+00:00 operation=List duration=0.000088s
path=tmp/test_table/a=1/b=10/c=100
2025-10-22T21:40:58.237631945+00:00 operation=List duration=0.000095s
path=tmp/test_table/a=3/b=30/c=300
# Only after the partitions are listed and files discovered does any reading
of the actual data start
2025-10-22T21:40:58.238183601+00:00 operation=Get duration=0.000085s size=8
range: bytes=477-484 path=tmp/test_table/a=2/b=20/c=200/file2.parquet
2025-10-22T21:40:58.238237666+00:00 operation=Get duration=0.000041s size=8
range: bytes=477-484 path=tmp/test_table/a=3/b=30/c=300/file2.parquet
. . .
```
Next, using a simple filter on a single partition column value. Note the
list calls are identical even though there's a filter in the query:
```sql
> select count(*) from test_table where b=30;
+----------+
| count(*) |
+----------+
| 2 |
+----------+
1 row(s) fetched.
Elapsed 0.018 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner:
LocalFileSystem(file:///)
2025-10-22T21:53:51.490170215+00:00 operation=List duration=0.000815s
path=tmp/test_table
2025-10-22T21:53:51.491027338+00:00 operation=List duration=0.000355s
path=tmp/test_table/a=1
2025-10-22T21:53:51.491104157+00:00 operation=List duration=0.000355s
path=tmp/test_table/a=2
2025-10-22T21:53:51.491261191+00:00 operation=List duration=0.000211s
path=tmp/test_table/a=3
2025-10-22T21:53:51.491484012+00:00 operation=List duration=0.000184s
path=tmp/test_table/a=2/b=20
2025-10-22T21:53:51.491405857+00:00 operation=List duration=0.000360s
path=tmp/test_table/a=1/b=10
2025-10-22T21:53:51.491523030+00:00 operation=List duration=0.000259s
path=tmp/test_table/a=3/b=30
2025-10-22T21:53:51.491698312+00:00 operation=List duration=0.000181s
path=tmp/test_table/a=2/b=20/c=200
2025-10-22T21:53:51.491793944+00:00 operation=List duration=0.000363s
path=tmp/test_table/a=1/b=10/c=100
2025-10-22T21:53:51.491833864+00:00 operation=List duration=0.000350s
path=tmp/test_table/a=3/b=30/c=300
```
This query shows where I believe there is a potential performance
_regression_ with this PR exactly as written. This shows the existing code
pruning list operations when the filter can be evaluated against the known
partition columns.
```sql
> select count(*) from test_table where a=3 and b=30;
+----------+
| count(*) |
+----------+
| 2 |
+----------+
1 row(s) fetched.
Elapsed 0.012 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner:
LocalFileSystem(file:///)
2025-10-22T21:57:58.229995504+00:00 operation=List duration=0.000373s
path=tmp/test_table/a=3/b=30
2025-10-22T21:57:58.230384839+00:00 operation=List duration=0.000146s
path=tmp/test_table/a=3/b=30/c=300
```
However, the above optimization _only_ applies when the full column path
from the beginning of the table structure is present, as the following query
goes back to listing every directory in the table.
```sql
> select count(*) from test_table where b=20 and c=200;
+----------+
| count(*) |
+----------+
| 2 |
+----------+
1 row(s) fetched.
Elapsed 0.013 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner:
LocalFileSystem(file:///)
2025-10-22T22:01:02.257613181+00:00 operation=List duration=0.000386s
path=tmp/test_table
2025-10-22T22:01:02.258016229+00:00 operation=List duration=0.000196s
path=tmp/test_table/a=1
2025-10-22T22:01:02.258038911+00:00 operation=List duration=0.000181s
path=tmp/test_table/a=2
2025-10-22T22:01:02.258176544+00:00 operation=List duration=0.000046s
path=tmp/test_table/a=3
2025-10-22T22:01:02.258240850+00:00 operation=List duration=0.000043s
path=tmp/test_table/a=2/b=20
2025-10-22T22:01:02.258250880+00:00 operation=List duration=0.000052s
path=tmp/test_table/a=3/b=30
2025-10-22T22:01:02.258226660+00:00 operation=List duration=0.000080s
path=tmp/test_table/a=1/b=10
2025-10-22T22:01:02.258288622+00:00 operation=List duration=0.000045s
path=tmp/test_table/a=2/b=20/c=200
2025-10-22T22:01:02.258310145+00:00 operation=List duration=0.000035s
path=tmp/test_table/a=3/b=30/c=300
2025-10-22T22:01:02.258321461+00:00 operation=List duration=0.000039s
path=tmp/test_table/a=1/b=10/c=100
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]