alamb commented on code in PR #18146:
URL: https://github.com/apache/datafusion/pull/18146#discussion_r2461832812
##########
datafusion/catalog-listing/src/helpers.rs:
##########
@@ -424,80 +380,41 @@ pub async fn pruned_partition_list<'a>(
file_extension: &'a str,
partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
- // if no partition col => simply list all the files
+ let objects = table_path
+ .list_all_files(ctx, store, file_extension)
+ .await?
+ .try_filter(|object_meta| futures::future::ready(object_meta.size >
0));
+
if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
- return Ok(Box::pin(
- table_path
- .list_all_files(ctx, store, file_extension)
- .await?
- .try_filter(|object_meta|
futures::future::ready(object_meta.size > 0))
- .map_ok(|object_meta| object_meta.into()),
- ));
- }
-
- let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
-
- let partitions =
- list_partitions(store, table_path, partition_cols.len(),
partition_prefix)
- .await?;
- debug!("Listed {} partitions", partitions.len());
- let pruned =
- prune_partitions(table_path, partitions, filters,
partition_cols).await?;
-
- debug!("Pruning yielded {} partitions", pruned.len());
-
- let stream = futures::stream::iter(pruned)
- .map(move |partition: Partition| async move {
- let cols = partition_cols.iter().map(|x| x.0.as_str());
- let parsed = parse_partitions_for_path(table_path,
&partition.path, cols);
+ // if no partition col => simply list all the files
+ Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
+ } else {
+ let df_schema = DFSchema::from_unqualified_fields(
+ partition_cols
+ .iter()
+ .map(|(n, d)| Field::new(n, d.clone(), true))
+ .collect(),
+ Default::default(),
+ )?;
- let partition_values = parsed
- .into_iter()
- .flatten()
- .zip(partition_cols)
- .map(|(parsed, (_, datatype))| {
- ScalarValue::try_from_string(parsed.to_string(), datatype)
- })
- .collect::<Result<Vec<_>>>()?;
-
- let files = match partition.files {
- Some(files) => files,
- None => {
- trace!("Recursively listing partition {}", partition.path);
- store.list(Some(&partition.path)).try_collect().await?
- }
- };
- let files = files.into_iter().filter(move |o| {
- let extension_match =
o.location.as_ref().ends_with(file_extension);
- // here need to scan
subdirectories(`listing_table_ignore_subdirectory` = false)
- let glob_match = table_path.contains(&o.location, false);
- extension_match && glob_match
- });
-
- let stream = futures::stream::iter(files.map(move |object_meta| {
- Ok(PartitionedFile {
- object_meta,
- partition_values: partition_values.clone(),
- range: None,
- statistics: None,
- extensions: None,
- metadata_size_hint: None,
- })
- }));
-
- Ok::<_, DataFusionError>(stream)
- })
- .buffer_unordered(CONCURRENCY_LIMIT)
Review Comment:
Thank you -- I have prepared a test harness so that hopefully we can setup
these scenarios programatically and then test / confirm the difference in
behavior after the change is made
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]