tustvold commented on code in PR #6183:
URL: https://github.com/apache/arrow-datafusion/pull/6183#discussion_r1181715419
##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -152,225 +146,224 @@ pub fn split_files(
.collect()
}
+struct Partition {
+ path: Path,
+ depth: usize,
+ files: Option<Vec<ObjectMeta>>,
+}
+
+impl Partition {
+ async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self,
Vec<Path>)> {
+ let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
+ let result = store.list_with_delimiter(prefix).await?;
+ self.files = Some(result.objects);
+ Ok((self, result.common_prefixes))
+ }
+}
+
+/// Returns a recursive list of the partitions in `table_path` up to
`max_depth`
+async fn list_partitions(
+ store: &dyn ObjectStore,
+ table_path: &ListingTableUrl,
+ max_depth: usize,
+) -> Result<Vec<Partition>> {
+ let partition = Partition {
+ path: table_path.prefix().clone(),
+ depth: 0,
+ files: None,
+ };
+
+ let mut out = Vec::with_capacity(64);
+ let mut futures = FuturesUnordered::new();
+ futures.push(partition.list(store));
+
+ while let Some((partition, paths)) = futures.next().await.transpose()? {
+ let depth = partition.depth;
+ out.push(partition);
+ for path in paths {
+ let child = Partition {
+ path,
+ depth: depth + 1,
+ files: None,
+ };
+ match depth < max_depth {
+ true => futures.push(child.list(store)),
+ false => out.push(child),
+ }
+ }
+ }
+ Ok(out)
+}
+
+async fn prune_partitions(
+ table_path: &ListingTableUrl,
+ partitions: Vec<Partition>,
+ filters: &[Expr],
+ partition_cols: &[(String, DataType)],
+) -> Result<Vec<Partition>> {
+ if filters.is_empty() {
+ return Ok(partitions);
+ }
+
+ let mut builders: Vec<_> = (0..partition_cols.len())
+ .map(|_| StringBuilder::with_capacity(partitions.len(),
partitions.len() * 10))
+ .collect();
+
+ for partition in &partitions {
+ let cols = partition_cols.iter().map(|x| x.0.as_str());
+ let parsed = parse_partitions_for_path(&table_path, &partition.path,
cols)
+ .unwrap_or_default();
+
+ let mut builders = builders.iter_mut();
+ for (p, b) in parsed.iter().zip(&mut builders) {
+ b.append_value(p);
+ }
+ builders.for_each(|b| b.append_null());
+ }
+
+ let arrays = partition_cols
+ .iter()
+ .zip(builders)
+ .map(|((_, d), mut builder)| {
+ let array = builder.finish();
+ cast(&array, d)
+ })
+ .collect::<Result<_, _>>()?;
+
+ let fields: Fields = partition_cols
+ .into_iter()
+ .map(|(n, d)| Field::new(n, d.clone(), true))
+ .collect();
+ let schema = Arc::new(Schema::new(fields));
+
+ let df_schema = DFSchema::new_with_metadata(
+ partition_cols
+ .into_iter()
+ .map(|(n, d)| DFField::new_unqualified(n, d.clone(), true))
+ .collect(),
+ Default::default(),
+ )?;
+
+ let batch = RecordBatch::try_new(schema.clone(), arrays)?;
+
+ // TODO: Plumb this down
Review Comment:
This was a pre-existing issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]