tustvold commented on code in PR #6183:
URL: https://github.com/apache/arrow-datafusion/pull/6183#discussion_r1196378930
##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -153,225 +151,239 @@ pub fn split_files(
.collect()
}
+struct Partition {
+ path: Path,
+ depth: usize,
+ files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+ async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self,
Vec<Path>)> {
+ trace!("Listing partition {}", self.path);
+ let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+ let result = store.list_with_delimiter(prefix).await?;
+ self.files = Some(result.objects);
+ Ok((self, result.common_prefixes))
+ }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to
`max_depth`
+async fn list_partitions(
+ store: &dyn ObjectStore,
+ table_path: &ListingTableUrl,
+ max_depth: usize,
+) -> Result<Vec<Partition>> {
+ let partition = Partition {
+ path: table_path.prefix().clone(),
+ depth: 0,
+ files: None,
+ };
+
+ let mut out = Vec::with_capacity(64);
+
+ let mut pending = vec![];
+ let mut futures = FuturesUnordered::new();
+ futures.push(partition.list(store));
+
+ while let Some((partition, paths)) = futures.next().await.transpose()? {
+ if let Some(next) = pending.pop() {
Review Comment:
Each iteration of the loop can at most complete one future, therefore
freeing up at most one "slot" in futures. If `pending` contains anything it
implies that we were at CONCURRENCY_LIMIT before we polled futures
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]