This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new de8bcc536b Refactor: add `FileGroup` structure for
`Vec<PartitionedFile>` (#15379)
de8bcc536b is described below
commit de8bcc536bae5b9448e3e715834d518415152967
Author: xudong.w <[email protected]>
AuthorDate: Wed Mar 26 08:43:56 2025 +0800
Refactor: add `FileGroup` structure for `Vec<PartitionedFile>` (#15379)
* Refactor: add FileGroup structure
* update
* address comments
* address comments from alamb
* fix clippy
---
.../examples/parquet_exec_visitor.rs | 6 +-
datafusion/catalog-listing/src/helpers.rs | 18 +-
.../core/src/datasource/file_format/arrow.rs | 2 +-
datafusion/core/src/datasource/file_format/mod.rs | 3 +-
.../core/src/datasource/file_format/parquet.rs | 10 +-
datafusion/core/src/datasource/listing/table.rs | 11 +-
.../src/datasource/physical_plan/arrow_file.rs | 4 +-
.../core/src/datasource/physical_plan/json.rs | 7 +-
.../core/src/datasource/physical_plan/mod.rs | 1 +
.../core/src/datasource/physical_plan/parquet.rs | 11 +-
datafusion/core/src/datasource/statistics.rs | 10 +-
datafusion/core/src/physical_planner.rs | 3 +-
datafusion/core/src/test/mod.rs | 8 +-
.../physical_optimizer/enforce_distribution.rs | 9 +-
datafusion/datasource-csv/src/file_format.rs | 2 +-
datafusion/datasource-csv/src/mod.rs | 7 +-
datafusion/datasource-csv/src/source.rs | 8 +-
datafusion/datasource-json/src/file_format.rs | 2 +-
datafusion/datasource-json/src/source.rs | 7 +-
datafusion/datasource-parquet/src/file_format.rs | 2 +-
datafusion/datasource-parquet/src/mod.rs | 6 +-
datafusion/datasource/src/display.rs | 66 ++--
datafusion/datasource/src/file_groups.rs | 342 +++++++++++++++------
datafusion/datasource/src/file_scan_config.rs | 35 ++-
datafusion/datasource/src/file_sink_config.rs | 8 +-
datafusion/datasource/src/file_stream.rs | 4 +-
datafusion/proto/src/physical_plan/from_proto.rs | 27 +-
datafusion/proto/src/physical_plan/to_proto.rs | 4 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 22 +-
datafusion/substrait/src/physical_plan/consumer.rs | 4 +-
datafusion/substrait/src/physical_plan/producer.rs | 2 +-
.../tests/cases/roundtrip_physical_plan.rs | 10 +-
32 files changed, 421 insertions(+), 240 deletions(-)
diff --git a/datafusion-examples/examples/parquet_exec_visitor.rs
b/datafusion-examples/examples/parquet_exec_visitor.rs
index 96bd6c369d..8133457dd9 100644
--- a/datafusion-examples/examples/parquet_exec_visitor.rs
+++ b/datafusion-examples/examples/parquet_exec_visitor.rs
@@ -18,8 +18,8 @@
use std::sync::Arc;
use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig,
ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
@@ -85,7 +85,7 @@ async fn main() {
/// and `file_groups` from the FileScanConfig.
#[derive(Debug)]
struct ParquetExecVisitor {
- file_groups: Option<Vec<Vec<PartitionedFile>>>,
+ file_groups: Option<Vec<FileGroup>>,
bytes_scanned: Option<MetricValue>,
}
diff --git a/datafusion/catalog-listing/src/helpers.rs
b/datafusion/catalog-listing/src/helpers.rs
index 7742f5f9a1..8efb74d4ea 100644
--- a/datafusion/catalog-listing/src/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -122,6 +122,7 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr:
&Expr) -> bool {
const CONCURRENCY_LIMIT: usize = 100;
/// Partition the list of files into `n` groups
+#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
pub fn split_files(
mut partitioned_files: Vec<PartitionedFile>,
n: usize,
@@ -535,13 +536,13 @@ pub fn describe_partition(partition: &Partition) ->
(&str, usize, Vec<&str>) {
mod tests {
use async_trait::async_trait;
use datafusion_common::config::TableOptions;
+ use datafusion_datasource::file_groups::FileGroup;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use futures::FutureExt;
use object_store::memory::InMemory;
use std::any::Any;
use std::ops::Not;
- // use futures::StreamExt;
use super::*;
use datafusion_expr::{
@@ -553,24 +554,24 @@ mod tests {
#[test]
fn test_split_files() {
let new_partitioned_file = |path: &str|
PartitionedFile::new(path.to_owned(), 10);
- let files = vec![
+ let files = FileGroup::new(vec![
new_partitioned_file("a"),
new_partitioned_file("b"),
new_partitioned_file("c"),
new_partitioned_file("d"),
new_partitioned_file("e"),
- ];
+ ]);
- let chunks = split_files(files.clone(), 1);
+ let chunks = files.clone().split_files(1);
assert_eq!(1, chunks.len());
assert_eq!(5, chunks[0].len());
- let chunks = split_files(files.clone(), 2);
+ let chunks = files.clone().split_files(2);
assert_eq!(2, chunks.len());
assert_eq!(3, chunks[0].len());
assert_eq!(2, chunks[1].len());
- let chunks = split_files(files.clone(), 5);
+ let chunks = files.clone().split_files(5);
assert_eq!(5, chunks.len());
assert_eq!(1, chunks[0].len());
assert_eq!(1, chunks[1].len());
@@ -578,7 +579,7 @@ mod tests {
assert_eq!(1, chunks[3].len());
assert_eq!(1, chunks[4].len());
- let chunks = split_files(files, 123);
+ let chunks = files.clone().split_files(123);
assert_eq!(5, chunks.len());
assert_eq!(1, chunks[0].len());
assert_eq!(1, chunks[1].len());
@@ -586,7 +587,8 @@ mod tests {
assert_eq!(1, chunks[3].len());
assert_eq!(1, chunks[4].len());
- let chunks = split_files(vec![], 2);
+ let empty_group = FileGroup::default();
+ let chunks = empty_group.split_files(2);
assert_eq!(0, chunks.len());
}
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index 3172e56925..d0b9df403b 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -297,7 +297,7 @@ impl DisplayAs for ArrowFileSink {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ArrowFileSink(file_groups=",)?;
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
write!(f, ")")
}
DisplayFormatType::TreeRender => {
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index df74e5d060..4914a91722 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -73,7 +73,8 @@ pub(crate) mod test_util {
statistics: None,
extensions: None,
metadata_size_hint: None,
- }]];
+ }]
+ .into()];
let exec = format
.create_physical_plan(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 3b71593b33..f314ae54a1 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -147,6 +147,7 @@ mod tests {
};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
+ use datafusion_datasource::file_groups::FileGroup;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use log::error;
@@ -1375,7 +1376,7 @@ mod tests {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group:
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
table_paths: vec![ListingTableUrl::parse(table_path)?],
output_schema: schema.clone(),
table_partition_cols: vec![],
@@ -1461,7 +1462,7 @@ mod tests {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group:
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], //
add partitioning
@@ -1545,7 +1546,10 @@ mod tests {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group: FileGroup::new(vec![PartitionedFile::new(
+ "/tmp".to_string(),
+ 1,
+ )]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 21b35bac21..0d0dea910d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,7 +20,7 @@
use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};
-use super::helpers::{expr_applicable_for_cols, pruned_partition_list,
split_files};
+use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
use super::{ListingTableUrl, PartitionedFile};
use crate::datasource::{
@@ -55,6 +55,7 @@ use datafusion_physical_expr::{
use async_trait::async_trait;
use datafusion_catalog::Session;
+use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
@@ -1031,7 +1032,7 @@ impl TableProvider for ListingTable {
)
.await?;
- let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
+ let file_group =
file_list_stream.try_collect::<Vec<_>>().await?.into();
let keep_partition_by_columns =
state.config_options().execution.keep_partition_by_columns;
@@ -1040,7 +1041,7 @@ impl TableProvider for ListingTable {
original_url: String::default(),
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
- file_groups,
+ file_group,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
insert_op,
@@ -1088,7 +1089,7 @@ impl ListingTable {
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
- ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
+ ) -> Result<(Vec<FileGroup>, Statistics)> {
let store = if let Some(url) = self.table_paths.first() {
ctx.runtime_env().object_store(url)?
} else {
@@ -1136,7 +1137,7 @@ impl ListingTable {
.await?;
Ok((
- split_files(files, self.options.target_partitions),
+ files.split_files(self.options.target_partitions),
statistics,
))
}
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 59860e3594..5dcf4df73f 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -20,7 +20,6 @@
use std::any::Any;
use std::sync::Arc;
-use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use crate::error::Result;
@@ -42,6 +41,7 @@ use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
+use datafusion_datasource::file_groups::FileGroup;
use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
@@ -124,7 +124,7 @@ impl ArrowExec {
)
}
- fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) ->
Self {
+ fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 9bab75fc88..041aacf687 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -40,7 +40,6 @@ mod tests {
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::FileFormat;
- use datafusion_datasource::PartitionedFile;
use datafusion_datasource_json::JsonFormat;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
@@ -49,6 +48,7 @@ mod tests {
use arrow::array::Array;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Field, SchemaBuilder};
+ use datafusion_datasource::file_groups::FileGroup;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
@@ -62,7 +62,7 @@ mod tests {
state: &SessionState,
file_compression_type: FileCompressionType,
work_dir: &Path,
- ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
+ ) -> (ObjectStoreUrl, Vec<FileGroup>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env().object_store(&store_url).unwrap();
@@ -79,6 +79,7 @@ mod tests {
let meta = file_groups
.first()
.unwrap()
+ .files()
.first()
.unwrap()
.clone()
@@ -113,6 +114,7 @@ mod tests {
let path = file_groups
.first()
.unwrap()
+ .files()
.first()
.unwrap()
.object_meta
@@ -560,6 +562,7 @@ mod tests {
let path = file_groups
.first()
.unwrap()
+ .files()
.first()
.unwrap()
.object_meta
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index cae04e5ee6..9d8148ca75 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -53,6 +53,7 @@ pub use csv::{CsvExec, CsvExecBuilder};
pub use csv::{CsvOpener, CsvSource};
pub use datafusion_datasource::file::FileSource;
+pub use datafusion_datasource::file_groups::FileGroup;
pub use datafusion_datasource::file_groups::FileGroupPartitioner;
pub use datafusion_datasource::file_meta::FileMeta;
pub use datafusion_datasource::file_scan_config::{
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index b5534d6b3d..9d21e56847 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -67,6 +67,7 @@ mod tests {
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use chrono::{TimeZone, Utc};
+ use datafusion_datasource::file_groups::FileGroup;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
@@ -1123,7 +1124,7 @@ mod tests {
async fn assert_parquet_read(
state: &SessionState,
- file_groups: Vec<Vec<PartitionedFile>>,
+ file_groups: Vec<FileGroup>,
expected_row_num: Option<usize>,
file_schema: SchemaRef,
) -> Result<()> {
@@ -1166,12 +1167,12 @@ mod tests {
.infer_schema(&state, &store, std::slice::from_ref(&meta))
.await?;
- let group_empty = vec![vec![file_range(&meta, 0, 2)]];
- let group_contain = vec![vec![file_range(&meta, 2, i64::MAX)]];
- let group_all = vec![vec![
+ let group_empty = vec![FileGroup::new(vec![file_range(&meta, 0, 2)])];
+ let group_contain = vec![FileGroup::new(vec![file_range(&meta, 2,
i64::MAX)])];
+ let group_all = vec![FileGroup::new(vec![
file_range(&meta, 0, 2),
file_range(&meta, 2, i64::MAX),
- ]];
+ ])];
assert_parquet_read(&state, group_empty, None,
file_schema.clone()).await?;
assert_parquet_read(&state, group_contain, Some(8),
file_schema.clone()).await?;
diff --git a/datafusion/core/src/datasource/statistics.rs
b/datafusion/core/src/datasource/statistics.rs
index acc16e0687..cf283ecee0 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -20,12 +20,12 @@ use std::sync::Arc;
use futures::{Stream, StreamExt};
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
-
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::physical_plan::{ColumnStatistics, Statistics};
+use datafusion_common::stats::Precision;
+use datafusion_common::ScalarValue;
+use datafusion_datasource::file_groups::FileGroup;
use super::listing::PartitionedFile;
@@ -39,8 +39,8 @@ pub async fn get_statistics_with_limit(
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
-) -> Result<(Vec<PartitionedFile>, Statistics)> {
- let mut result_files = vec![];
+) -> Result<(FileGroup, Statistics)> {
+ let mut result_files = FileGroup::default();
// These statistics can be calculated as long as at least one file provides
// useful information. If none of the files provides any information, then
// they will end up having `Precision::Absent` values. Throughout
calculations,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 600a8de1de..9a089796bc 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -90,6 +90,7 @@ use datafusion_physical_plan::unnest::ListUnnest;
use crate::schema_equivalence::schema_satisfied_by;
use async_trait::async_trait;
+use datafusion_datasource::file_groups::FileGroup;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
@@ -532,7 +533,7 @@ impl DefaultPhysicalPlanner {
original_url,
object_store_url,
table_paths: vec![parsed_url],
- file_groups: vec![],
+ file_group: FileGroup::default(),
output_schema: Arc::new(schema),
table_partition_cols,
insert_op: InsertOp::Append,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index be707f7e19..2912eed6c9 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -28,7 +28,6 @@ use std::sync::Arc;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::FileFormat;
-use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::CsvSource;
use crate::datasource::{MemTable, TableProvider};
@@ -46,6 +45,7 @@ use datafusion_datasource::source::DataSourceExec;
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
+use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource_csv::partitioned_csv_config;
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
@@ -96,7 +96,7 @@ pub fn scan_partitioned_csv(
Ok(config.build())
}
-/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning
`partitions` of `filename`
+/// Returns file groups [`Vec<FileGroup>`] for scanning `partitions` of
`filename`
pub fn partitioned_file_groups(
path: &str,
filename: &str,
@@ -104,7 +104,7 @@ pub fn partitioned_file_groups(
file_format: Arc<dyn FileFormat>,
file_compression_type: FileCompressionType,
work_dir: &Path,
-) -> Result<Vec<Vec<PartitionedFile>>> {
+) -> Result<Vec<FileGroup>> {
let path = format!("{path}/{filename}");
let mut writers = vec![];
@@ -181,7 +181,7 @@ pub fn partitioned_file_groups(
Ok(files
.into_iter()
- .map(|f| vec![local_unpartitioned_file(f).into()])
+ .map(|f| FileGroup::new(vec![local_unpartitioned_file(f).into()]))
.collect::<Vec<_>>())
}
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index b71724b8f7..5e3671ea83 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -35,6 +35,7 @@ use datafusion::datasource::source::DataSourceExec;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
+use datafusion_datasource::file_groups::FileGroup;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::PhysicalExpr;
@@ -189,8 +190,8 @@ fn parquet_exec_multiple_sorted(
Arc::new(ParquetSource::default()),
)
.with_file_groups(vec![
- vec![PartitionedFile::new("x".to_string(), 100)],
- vec![PartitionedFile::new("y".to_string(), 100)],
+ FileGroup::new(vec![PartitionedFile::new("x".to_string(), 100)]),
+ FileGroup::new(vec![PartitionedFile::new("y".to_string(), 100)]),
])
.with_output_ordering(output_ordering)
.build()
@@ -223,8 +224,8 @@ fn csv_exec_multiple_sorted(output_ordering:
Vec<LexOrdering>) -> Arc<DataSource
Arc::new(CsvSource::new(false, b',', b'"')),
)
.with_file_groups(vec![
- vec![PartitionedFile::new("x".to_string(), 100)],
- vec![PartitionedFile::new("y".to_string(), 100)],
+ FileGroup::new(vec![PartitionedFile::new("x".to_string(), 100)]),
+ FileGroup::new(vec![PartitionedFile::new("y".to_string(), 100)]),
])
.with_output_ordering(output_ordering)
.build()
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index 522cb12db0..6cd6a83884 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -662,7 +662,7 @@ impl DisplayAs for CsvSink {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CsvSink(file_groups=",)?;
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
write!(f, ")")
}
DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-csv/src/mod.rs
b/datafusion/datasource-csv/src/mod.rs
index 90c3689cd1..97f4214cf9 100644
--- a/datafusion/datasource-csv/src/mod.rs
+++ b/datafusion/datasource-csv/src/mod.rs
@@ -25,16 +25,15 @@ pub mod source;
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
-use datafusion_datasource::{
- file::FileSource, file_scan_config::FileScanConfig, PartitionedFile,
-};
+use datafusion_datasource::file_groups::FileGroup;
+use datafusion_datasource::{file::FileSource,
file_scan_config::FileScanConfig};
use datafusion_execution::object_store::ObjectStoreUrl;
pub use file_format::*;
/// Returns a [`FileScanConfig`] for given `file_groups`
pub fn partitioned_csv_config(
schema: SchemaRef,
- file_groups: Vec<Vec<PartitionedFile>>,
+ file_groups: Vec<FileGroup>,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema,
file_source)
diff --git a/datafusion/datasource-csv/src/source.rs
b/datafusion/datasource-csv/src/source.rs
index b9d974c884..175ee18197 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -28,7 +28,7 @@ use
datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
- calculate_range, FileRange, ListingTableUrl, PartitionedFile,
RangeCalculation,
+ calculate_range, FileRange, ListingTableUrl, RangeCalculation,
};
use arrow::csv;
@@ -49,13 +49,13 @@ use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
PlanProperties,
};
+use crate::file_format::CsvDecoder;
+use datafusion_datasource::file_groups::FileGroup;
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
-use crate::file_format::CsvDecoder;
-
/// Old Csv source, deprecated with DataSourceExec implementation and CsvSource
///
/// See examples on `CsvSource`
@@ -314,7 +314,7 @@ impl CsvExec {
)
}
- fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) ->
Self {
+ fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/datasource-json/src/file_format.rs
b/datafusion/datasource-json/src/file_format.rs
index 9b6d5925fe..054c05b5b0 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -321,7 +321,7 @@ impl DisplayAs for JsonSink {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "JsonSink(file_groups=",)?;
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
write!(f, ")")
}
DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-json/src/source.rs
b/datafusion/datasource-json/src/source.rs
index 4605ad3d94..f1adccf9de 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -30,9 +30,7 @@ use datafusion_datasource::decoder::{deserialize_stream,
DecoderDeserializer};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_datasource::{
- calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
-};
+use datafusion_datasource::{calculate_range, ListingTableUrl,
RangeCalculation};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow::json::ReaderBuilder;
@@ -48,6 +46,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness,
EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
+use datafusion_datasource::file_groups::FileGroup;
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -146,7 +145,7 @@ impl NdJsonExec {
)
}
- fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) ->
Self {
+ fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index 8a78407c64..b7a8712428 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -983,7 +983,7 @@ impl DisplayAs for ParquetSink {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ParquetSink(file_groups=",)?;
- FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
write!(f, ")")
}
DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-parquet/src/mod.rs
b/datafusion/datasource-parquet/src/mod.rs
index cecee00317..516b137921 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -44,7 +44,6 @@ use datafusion_common::{Constraints, Statistics};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::source::DataSourceExec;
-use datafusion_datasource::PartitionedFile;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr,
@@ -65,6 +64,7 @@ pub use row_group_filter::RowGroupAccessPlanFilter;
use source::ParquetSource;
pub use writer::plan_to_parquet;
+use datafusion_datasource::file_groups::FileGroup;
use log::debug;
#[derive(Debug, Clone)]
@@ -134,7 +134,7 @@ impl ParquetExecBuilder {
}
/// Update the list of files groups to read
- pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>)
-> Self {
+ pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.file_scan_config.file_groups = file_groups;
self
}
@@ -463,7 +463,7 @@ impl ParquetExec {
/// that depend on the file groups.
fn with_file_groups_and_update_partitioning(
mut self,
- file_groups: Vec<Vec<PartitionedFile>>,
+ file_groups: Vec<FileGroup>,
) -> Self {
let mut config = self.file_scan_config();
config.file_groups = file_groups;
diff --git a/datafusion/datasource/src/display.rs
b/datafusion/datasource/src/display.rs
index 7ab8d407be..c9e9795359 100644
--- a/datafusion/datasource/src/display.rs
+++ b/datafusion/datasource/src/display.rs
@@ -17,10 +17,9 @@
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
+use crate::file_groups::FileGroup;
use std::fmt::{Debug, Formatter, Result as FmtResult};
-use crate::PartitionedFile;
-
/// A wrapper to customize partitioned file display
///
/// Prints in the format:
@@ -28,7 +27,7 @@ use crate::PartitionedFile;
/// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]}
/// ```
#[derive(Debug)]
-pub(crate) struct FileGroupsDisplay<'a>(pub(crate) &'a [Vec<PartitionedFile>]);
+pub(crate) struct FileGroupsDisplay<'a>(pub(crate) &'a [FileGroup]);
impl DisplayAs for FileGroupsDisplay<'_> {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
@@ -60,7 +59,7 @@ impl DisplayAs for FileGroupsDisplay<'_> {
/// [file1, file2,...]
/// ```
#[derive(Debug)]
-pub struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]);
+pub struct FileGroupDisplay<'a>(pub &'a FileGroup);
impl DisplayAs for FileGroupDisplay<'_> {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
@@ -69,7 +68,7 @@ impl DisplayAs for FileGroupDisplay<'_> {
DisplayFormatType::Default | DisplayFormatType::TreeRender => {
// To avoid showing too many files
let max_files = 5;
- fmt_up_to_n_elements(self.0, max_files, f, |pf, f| {
+ fmt_up_to_n_elements(self.0.files(), max_files, f, |pf, f| {
write!(f, "{}", pf.object_meta.location.as_ref())?;
if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
@@ -138,6 +137,7 @@ mod tests {
use datafusion_physical_plan::{DefaultDisplay, VerboseDisplay};
use object_store::{path::Path, ObjectMeta};
+ use crate::PartitionedFile;
use chrono::Utc;
#[test]
@@ -148,7 +148,10 @@ mod tests {
#[test]
fn file_groups_display_one() {
- let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
+ let files = [FileGroup::new(vec![
+ partitioned_file("foo"),
+ partitioned_file("bar"),
+ ])];
let expected = "{1 group: [[foo, bar]]}";
assert_eq!(
@@ -160,9 +163,9 @@ mod tests {
#[test]
fn file_groups_display_many_default() {
let files = [
- vec![partitioned_file("foo"), partitioned_file("bar")],
- vec![partitioned_file("baz")],
- vec![],
+ FileGroup::new(vec![partitioned_file("foo"),
partitioned_file("bar")]),
+ FileGroup::new(vec![partitioned_file("baz")]),
+ FileGroup::default(),
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
@@ -175,9 +178,9 @@ mod tests {
#[test]
fn file_groups_display_many_verbose() {
let files = [
- vec![partitioned_file("foo"), partitioned_file("bar")],
- vec![partitioned_file("baz")],
- vec![],
+ FileGroup::new(vec![partitioned_file("foo"),
partitioned_file("bar")]),
+ FileGroup::new(vec![partitioned_file("baz")]),
+ FileGroup::default(),
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
@@ -190,13 +193,13 @@ mod tests {
#[test]
fn file_groups_display_too_many_default() {
let files = [
- vec![partitioned_file("foo"), partitioned_file("bar")],
- vec![partitioned_file("baz")],
- vec![partitioned_file("qux")],
- vec![partitioned_file("quux")],
- vec![partitioned_file("quuux")],
- vec![partitioned_file("quuuux")],
- vec![],
+ FileGroup::new(vec![partitioned_file("foo"),
partitioned_file("bar")]),
+ FileGroup::new(vec![partitioned_file("baz")]),
+ FileGroup::new(vec![partitioned_file("qux")]),
+ FileGroup::new(vec![partitioned_file("quux")]),
+ FileGroup::new(vec![partitioned_file("quuux")]),
+ FileGroup::new(vec![partitioned_file("quuuux")]),
+ FileGroup::default(),
];
let expected = "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux],
...]}";
@@ -209,13 +212,13 @@ mod tests {
#[test]
fn file_groups_display_too_many_verbose() {
let files = [
- vec![partitioned_file("foo"), partitioned_file("bar")],
- vec![partitioned_file("baz")],
- vec![partitioned_file("qux")],
- vec![partitioned_file("quux")],
- vec![partitioned_file("quuux")],
- vec![partitioned_file("quuuux")],
- vec![],
+ FileGroup::new(vec![partitioned_file("foo"),
partitioned_file("bar")]),
+ FileGroup::new(vec![partitioned_file("baz")]),
+ FileGroup::new(vec![partitioned_file("qux")]),
+ FileGroup::new(vec![partitioned_file("quux")]),
+ FileGroup::new(vec![partitioned_file("quuux")]),
+ FileGroup::new(vec![partitioned_file("quuuux")]),
+ FileGroup::default(),
];
let expected =
@@ -228,7 +231,8 @@ mod tests {
#[test]
fn file_group_display_many_default() {
- let files = vec![partitioned_file("foo"), partitioned_file("bar")];
+ let files =
+ FileGroup::new(vec![partitioned_file("foo"),
partitioned_file("bar")]);
let expected = "[foo, bar]";
assert_eq!(
@@ -239,14 +243,14 @@ mod tests {
#[test]
fn file_group_display_too_many_default() {
- let files = vec![
+ let files = FileGroup::new(vec![
partitioned_file("foo"),
partitioned_file("bar"),
partitioned_file("baz"),
partitioned_file("qux"),
partitioned_file("quux"),
partitioned_file("quuux"),
- ];
+ ]);
let expected = "[foo, bar, baz, qux, quux, ...]";
assert_eq!(
@@ -257,14 +261,14 @@ mod tests {
#[test]
fn file_group_display_too_many_verbose() {
- let files = vec![
+ let files = FileGroup::new(vec![
partitioned_file("foo"),
partitioned_file("bar"),
partitioned_file("baz"),
partitioned_file("qux"),
partitioned_file("quux"),
partitioned_file("quuux"),
- ];
+ ]);
let expected = "[foo, bar, baz, qux, quux, quuux]";
assert_eq!(
diff --git a/datafusion/datasource/src/file_groups.rs
b/datafusion/datasource/src/file_groups.rs
index 2c2b791f23..93895790cb 100644
--- a/datafusion/datasource/src/file_groups.rs
+++ b/datafusion/datasource/src/file_groups.rs
@@ -18,10 +18,13 @@
//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
use crate::{FileRange, PartitionedFile};
+use datafusion_common::Statistics;
use itertools::Itertools;
use std::cmp::min;
use std::collections::BinaryHeap;
use std::iter::repeat_with;
+use std::mem;
+use std::ops::{Index, IndexMut};
/// Repartition input files into `target_partitions` partitions, if total file
size exceed
/// `repartition_file_min_size`
@@ -179,14 +182,17 @@ impl FileGroupPartitioner {
/// If no repartitioning is needed or possible, return `None`.
pub fn repartition_file_groups(
&self,
- file_groups: &[Vec<PartitionedFile>],
- ) -> Option<Vec<Vec<PartitionedFile>>> {
+ file_groups: &[FileGroup],
+ ) -> Option<Vec<FileGroup>> {
if file_groups.is_empty() {
return None;
}
// Perform redistribution only in case all files should be read from
beginning to end
- let has_ranges = file_groups.iter().flatten().any(|f|
f.range.is_some());
+ let has_ranges = file_groups
+ .iter()
+ .flat_map(FileGroup::iter)
+ .any(|f| f.range.is_some());
if has_ranges {
return None;
}
@@ -203,11 +209,11 @@ impl FileGroupPartitioner {
/// existing grouping / ordering
fn repartition_evenly_by_size(
&self,
- file_groups: &[Vec<PartitionedFile>],
- ) -> Option<Vec<Vec<PartitionedFile>>> {
+ file_groups: &[FileGroup],
+ ) -> Option<Vec<FileGroup>> {
let target_partitions = self.target_partitions;
let repartition_file_min_size = self.repartition_file_min_size;
- let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+ let flattened_files =
file_groups.iter().flat_map(FileGroup::iter).collect_vec();
let total_size = flattened_files
.iter()
@@ -257,7 +263,7 @@ impl FileGroupPartitioner {
.flatten()
.chunk_by(|(partition_idx, _)| *partition_idx)
.into_iter()
- .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
+ .map(|(_, group)| FileGroup::new(group.map(|(_, vals)|
vals).collect_vec()))
.collect_vec();
Some(repartitioned_files)
@@ -266,8 +272,8 @@ impl FileGroupPartitioner {
/// Redistribute file groups across size preserving order
fn repartition_preserving_order(
&self,
- file_groups: &[Vec<PartitionedFile>],
- ) -> Option<Vec<Vec<PartitionedFile>>> {
+ file_groups: &[FileGroup],
+ ) -> Option<Vec<FileGroup>> {
// Can't repartition and preserve order if there are more groups
// than partitions
if file_groups.len() >= self.target_partitions {
@@ -303,11 +309,12 @@ impl FileGroupPartitioner {
return None;
}
+ // Add new empty groups to which we will redistribute ranges of
existing files
// Add new empty groups to which we will redistribute ranges of
existing files
let mut file_groups: Vec<_> = file_groups
.iter()
.cloned()
- .chain(repeat_with(Vec::new).take(num_new_groups))
+ .chain(repeat_with(||
FileGroup::new(Vec::new())).take(num_new_groups))
.collect();
// Divide up empty groups
@@ -354,6 +361,130 @@ impl FileGroupPartitioner {
}
}
+/// Represents a group of partitioned files that'll be processed by a single
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+ /// The files in this group
+ files: Vec<PartitionedFile>,
+ /// Optional statistics for the data across all files in the group
+ statistics: Option<Statistics>,
+}
+
+impl FileGroup {
+ /// Creates a new FileGroup from a vector of PartitionedFile objects
+ pub fn new(files: Vec<PartitionedFile>) -> Self {
+ Self {
+ files,
+ statistics: None,
+ }
+ }
+
+ /// Returns the number of files in this group
+ pub fn len(&self) -> usize {
+ self.files.len()
+ }
+
+ /// Set the statistics for this group
+ pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+ self.statistics = Some(statistics);
+ self
+ }
+
+ /// Returns a slice of the files in this group
+ pub fn files(&self) -> &[PartitionedFile] {
+ &self.files
+ }
+
+ pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
+ self.files.iter()
+ }
+
+ pub fn into_inner(self) -> Vec<PartitionedFile> {
+ self.files
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.files.is_empty()
+ }
+
+ /// Removes the last element from the files vector and returns it, or None
if empty
+ pub fn pop(&mut self) -> Option<PartitionedFile> {
+ self.files.pop()
+ }
+
+ /// Adds a file to the group
+ pub fn push(&mut self, file: PartitionedFile) {
+ self.files.push(file);
+ }
+
+ /// Partition the list of files into `n` groups
+ pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
+ if self.is_empty() {
+ return vec![];
+ }
+
+ // ObjectStore::list does not guarantee any consistent order and for
some
+ // implementations such as LocalFileSystem, it may be inconsistent.
Thus
+ // Sort files by path to ensure consistent plans when run more than
once.
+ self.files.sort_by(|a, b| a.path().cmp(b.path()));
+
+ // effectively this is div with rounding up instead of truncating
+ let chunk_size = self.len().div_ceil(n);
+ let mut chunks = Vec::with_capacity(n);
+ let mut current_chunk = Vec::with_capacity(chunk_size);
+ for file in self.files.drain(..) {
+ current_chunk.push(file);
+ if current_chunk.len() == chunk_size {
+ let full_chunk = FileGroup::new(mem::replace(
+ &mut current_chunk,
+ Vec::with_capacity(chunk_size),
+ ));
+ chunks.push(full_chunk);
+ }
+ }
+
+ if !current_chunk.is_empty() {
+ chunks.push(FileGroup::new(current_chunk))
+ }
+
+ chunks
+ }
+}
+
+impl Index<usize> for FileGroup {
+ type Output = PartitionedFile;
+
+ fn index(&self, index: usize) -> &Self::Output {
+ &self.files[index]
+ }
+}
+
+impl IndexMut<usize> for FileGroup {
+ fn index_mut(&mut self, index: usize) -> &mut Self::Output {
+ &mut self.files[index]
+ }
+}
+
+impl FromIterator<PartitionedFile> for FileGroup {
+ fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
+ let files = iter.into_iter().collect();
+ FileGroup::new(files)
+ }
+}
+
+impl From<Vec<PartitionedFile>> for FileGroup {
+ fn from(files: Vec<PartitionedFile>) -> Self {
+ FileGroup::new(files)
+ }
+}
+
+impl Default for FileGroup {
+ fn default() -> Self {
+ Self::new(Vec::new())
+ }
+}
+
/// Tracks how a individual file will be repartitioned
#[derive(Debug, Clone, PartialEq, Eq)]
struct ToRepartition {
@@ -393,7 +524,7 @@ mod test {
#[test]
fn repartition_empty_file_only() {
let partitioned_file_empty = pfile("empty", 0);
- let file_group = vec![vec![partitioned_file_empty]];
+ let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
let partitioned_files = FileGroupPartitioner::new()
.with_target_partitions(4)
@@ -411,29 +542,33 @@ mod test {
let pfile_empty = pfile("empty", 0);
let empty_first = vec![
- vec![pfile_empty.clone()],
- vec![pfile_a.clone()],
- vec![pfile_b.clone()],
+ FileGroup::new(vec![pfile_empty.clone()]),
+ FileGroup::new(vec![pfile_a.clone()]),
+ FileGroup::new(vec![pfile_b.clone()]),
];
let empty_middle = vec![
- vec![pfile_a.clone()],
- vec![pfile_empty.clone()],
- vec![pfile_b.clone()],
+ FileGroup::new(vec![pfile_a.clone()]),
+ FileGroup::new(vec![pfile_empty.clone()]),
+ FileGroup::new(vec![pfile_b.clone()]),
+ ];
+ let empty_last = vec![
+ FileGroup::new(vec![pfile_a]),
+ FileGroup::new(vec![pfile_b]),
+ FileGroup::new(vec![pfile_empty]),
];
- let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]];
// Repartition file groups into x partitions
let expected_2 = vec![
- vec![pfile("a", 10).with_range(0, 10)],
- vec![pfile("b", 10).with_range(0, 10)],
+ FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
+ FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
];
let expected_3 = vec![
- vec![pfile("a", 10).with_range(0, 7)],
- vec![
+ FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
+ FileGroup::new(vec![
pfile("a", 10).with_range(7, 10),
pfile("b", 10).with_range(0, 4),
- ],
- vec![pfile("b", 10).with_range(4, 10)],
+ ]),
+ FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
];
let file_groups_tests = [empty_first, empty_middle, empty_last];
@@ -454,7 +589,7 @@ mod test {
#[test]
fn repartition_single_file() {
// Single file, single partition into multiple partitions
- let single_partition = vec![vec![pfile("a", 123)]];
+ let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
@@ -462,10 +597,10 @@ mod test {
.repartition_file_groups(&single_partition);
let expected = Some(vec![
- vec![pfile("a", 123).with_range(0, 31)],
- vec![pfile("a", 123).with_range(31, 62)],
- vec![pfile("a", 123).with_range(62, 93)],
- vec![pfile("a", 123).with_range(93, 123)],
+ FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
+ FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
+ FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
+ FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -474,7 +609,7 @@ mod test {
fn repartition_too_much_partitions() {
// Single file, single partition into 96 partitions
let partitioned_file = pfile("a", 8);
- let single_partition = vec![vec![partitioned_file]];
+ let single_partition = vec![FileGroup::new(vec![partitioned_file])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(96)
@@ -482,14 +617,14 @@ mod test {
.repartition_file_groups(&single_partition);
let expected = Some(vec![
- vec![pfile("a", 8).with_range(0, 1)],
- vec![pfile("a", 8).with_range(1, 2)],
- vec![pfile("a", 8).with_range(2, 3)],
- vec![pfile("a", 8).with_range(3, 4)],
- vec![pfile("a", 8).with_range(4, 5)],
- vec![pfile("a", 8).with_range(5, 6)],
- vec![pfile("a", 8).with_range(6, 7)],
- vec![pfile("a", 8).with_range(7, 8)],
+ FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
+ FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
]);
assert_partitioned_files(expected, actual);
@@ -498,7 +633,10 @@ mod test {
#[test]
fn repartition_multiple_partitions() {
// Multiple files in single partition after redistribution
- let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b",
60)]];
+ let source_partitions = vec![
+ FileGroup::new(vec![pfile("a", 40)]),
+ FileGroup::new(vec![pfile("b", 60)]),
+ ];
let actual = FileGroupPartitioner::new()
.with_target_partitions(3)
@@ -506,12 +644,12 @@ mod test {
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
- vec![pfile("a", 40).with_range(0, 34)],
- vec![
+ FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
+ FileGroup::new(vec![
pfile("a", 40).with_range(34, 40),
pfile("b", 60).with_range(0, 28),
- ],
- vec![pfile("b", 60).with_range(28, 60)],
+ ]),
+ FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -519,7 +657,10 @@ mod test {
#[test]
fn repartition_same_num_partitions() {
// "Rebalance" files across partitions
- let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b",
60)]];
+ let source_partitions = vec![
+ FileGroup::new(vec![pfile("a", 40)]),
+ FileGroup::new(vec![pfile("b", 60)]),
+ ];
let actual = FileGroupPartitioner::new()
.with_target_partitions(2)
@@ -527,11 +668,11 @@ mod test {
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
- vec![
+ FileGroup::new(vec![
pfile("a", 40).with_range(0, 40),
pfile("b", 60).with_range(0, 10),
- ],
- vec![pfile("b", 60).with_range(10, 60)],
+ ]),
+ FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -540,8 +681,8 @@ mod test {
fn repartition_no_action_ranges() {
// No action due to Some(range) in second file
let source_partitions = vec![
- vec![pfile("a", 123)],
- vec![pfile("b", 144).with_range(1, 50)],
+ FileGroup::new(vec![pfile("a", 123)]),
+ FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
];
let actual = FileGroupPartitioner::new()
@@ -555,7 +696,7 @@ mod test {
#[test]
fn repartition_no_action_min_size() {
// No action due to target_partition_size
- let single_partition = vec![vec![pfile("a", 123)]];
+ let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
@@ -580,7 +721,10 @@ mod test {
#[test]
fn repartition_ordered_no_action_too_few_partitions() {
// No action as there are no new groups to redistribute to
- let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b",
200)]];
+ let input_partitions = vec![
+ FileGroup::new(vec![pfile("a", 100)]),
+ FileGroup::new(vec![pfile("b", 200)]),
+ ];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -594,7 +738,7 @@ mod test {
#[test]
fn repartition_ordered_no_action_file_too_small() {
// No action as there are no new groups to redistribute to
- let single_partition = vec![vec![pfile("a", 100)]];
+ let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -609,7 +753,7 @@ mod test {
#[test]
fn repartition_ordered_one_large_file() {
// "Rebalance" the single large file across partitions
- let source_partitions = vec![vec![pfile("a", 100)]];
+ let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -618,9 +762,9 @@ mod test {
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
- vec![pfile("a", 100).with_range(0, 34)],
- vec![pfile("a", 100).with_range(34, 68)],
- vec![pfile("a", 100).with_range(68, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
+ FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
+ FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -629,7 +773,10 @@ mod test {
fn repartition_ordered_one_large_one_small_file() {
// "Rebalance" the single large file across empty partitions, but
can't split
// small file
- let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b",
30)]];
+ let source_partitions = vec![
+ FileGroup::new(vec![pfile("a", 100)]),
+ FileGroup::new(vec![pfile("b", 30)]),
+ ];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -639,13 +786,13 @@ mod test {
let expected = Some(vec![
// scan first third of "a"
- vec![pfile("a", 100).with_range(0, 33)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
// only b in this group (can't do this)
- vec![pfile("b", 30).with_range(0, 30)],
+ FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
// second third of "a"
- vec![pfile("a", 100).with_range(33, 66)],
+ FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
// final third of "a"
- vec![pfile("a", 100).with_range(66, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -653,7 +800,10 @@ mod test {
#[test]
fn repartition_ordered_two_large_files() {
// "Rebalance" two large files across empty partitions, but can't mix
them
- let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b",
100)]];
+ let source_partitions = vec![
+ FileGroup::new(vec![pfile("a", 100)]),
+ FileGroup::new(vec![pfile("b", 100)]),
+ ];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -663,13 +813,13 @@ mod test {
let expected = Some(vec![
// scan first half of "a"
- vec![pfile("a", 100).with_range(0, 50)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
// scan first half of "b"
- vec![pfile("b", 100).with_range(0, 50)],
+ FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
// second half of "a"
- vec![pfile("a", 100).with_range(50, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
// second half of "b"
- vec![pfile("b", 100).with_range(50, 100)],
+ FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -678,9 +828,9 @@ mod test {
fn repartition_ordered_two_large_one_small_files() {
// "Rebalance" two large files and one small file across empty
partitions
let source_partitions = vec![
- vec![pfile("a", 100)],
- vec![pfile("b", 100)],
- vec![pfile("c", 30)],
+ FileGroup::new(vec![pfile("a", 100)]),
+ FileGroup::new(vec![pfile("b", 100)]),
+ FileGroup::new(vec![pfile("c", 30)]),
];
let partitioner = FileGroupPartitioner::new()
@@ -694,13 +844,13 @@ mod test {
let expected = Some(vec![
// scan first half of "a"
- vec![pfile("a", 100).with_range(0, 50)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
// All of "b"
- vec![pfile("b", 100).with_range(0, 100)],
+ FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
// All of "c"
- vec![pfile("c", 30).with_range(0, 30)],
+ FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
// second half of "a"
- vec![pfile("a", 100).with_range(50, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
@@ -711,15 +861,15 @@ mod test {
let expected = Some(vec![
// scan first half of "a"
- vec![pfile("a", 100).with_range(0, 50)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
// scan first half of "b"
- vec![pfile("b", 100).with_range(0, 50)],
+ FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
// All of "c"
- vec![pfile("c", 30).with_range(0, 30)],
+ FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
// second half of "a"
- vec![pfile("a", 100).with_range(50, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
// second half of "b"
- vec![pfile("b", 100).with_range(50, 100)],
+ FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -727,8 +877,12 @@ mod test {
#[test]
fn repartition_ordered_one_large_one_small_existing_empty() {
// "Rebalance" files using existing empty partition
- let source_partitions =
- vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]];
+ let source_partitions = vec![
+ FileGroup::new(vec![pfile("a", 100)]),
+ FileGroup::default(),
+ FileGroup::new(vec![pfile("b", 40)]),
+ FileGroup::default(),
+ ];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
@@ -740,14 +894,14 @@ mod test {
// target partitions), assign two to "a" and one to "b"
let expected = Some(vec![
// Scan of "a" across three groups
- vec![pfile("a", 100).with_range(0, 33)],
- vec![pfile("a", 100).with_range(33, 66)],
+ FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
+ FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
// scan first half of "b"
- vec![pfile("b", 40).with_range(0, 20)],
+ FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
// final third of "a"
- vec![pfile("a", 100).with_range(66, 100)],
+ FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
// second half of "b"
- vec![pfile("b", 40).with_range(20, 40)],
+ FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -756,8 +910,8 @@ mod test {
// groups with multiple files in a group can not be changed, but can
divide others
let source_partitions = vec![
// two files in an existing partition
- vec![pfile("a", 100), pfile("b", 100)],
- vec![pfile("c", 40)],
+ FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
+ FileGroup::new(vec![pfile("c", 40)]),
];
let actual = FileGroupPartitioner::new()
@@ -772,11 +926,11 @@ mod test {
// don't try and rearrange files in the existing partition
// assuming that the caller had a good reason to put them that way.
// (it is technically possible to split off ranges from the files
if desired)
- vec![pfile("a", 100), pfile("b", 100)],
+ FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
// first half of "c"
- vec![pfile("c", 40).with_range(0, 20)],
+ FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
// second half of "c"
- vec![pfile("c", 40).with_range(20, 40)],
+ FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
@@ -784,8 +938,8 @@ mod test {
/// Asserts that the two groups of [`PartitionedFile`] are the same
/// (PartitionedFile doesn't implement PartialEq)
fn assert_partitioned_files(
- expected: Option<Vec<Vec<PartitionedFile>>>,
- actual: Option<Vec<Vec<PartitionedFile>>>,
+ expected: Option<Vec<FileGroup>>,
+ actual: Option<Vec<FileGroup>>,
) {
match (expected, actual) {
(None, None) => {}
@@ -808,8 +962,8 @@ mod test {
/// asserting they return the same value and returns that value
fn repartition_test(
partitioner: FileGroupPartitioner,
- file_groups: Vec<Vec<PartitionedFile>>,
- ) -> Option<Vec<Vec<PartitionedFile>>> {
+ file_groups: Vec<FileGroup>,
+ ) -> Option<Vec<FileGroup>> {
let repartitioned = partitioner.repartition_file_groups(&file_groups);
let repartitioned_preserving_sort = partitioner
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 82308bda70..770e26bbf4 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -48,6 +48,7 @@ use datafusion_physical_plan::{
};
use log::{debug, warn};
+use crate::file_groups::FileGroup;
use crate::{
display::FileGroupsDisplay,
file::FileSource,
@@ -71,6 +72,7 @@ use crate::{
/// # use object_store::ObjectStore;
/// # use datafusion_common::Statistics;
/// # use datafusion_datasource::file::FileSource;
+/// # use datafusion_datasource::file_groups::FileGroup;
/// # use datafusion_datasource::PartitionedFile;
/// # use datafusion_datasource::file_scan_config::FileScanConfig;
/// # use datafusion_datasource::file_stream::FileOpener;
@@ -111,10 +113,10 @@ use crate::{
/// .with_file(PartitionedFile::new("file1.parquet", 1234))
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
/// // in a single row group
-/// .with_file_group(vec![
+/// .with_file_group(FileGroup::new(vec![
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
-/// ]);
+/// ]));
/// // create an execution plan from the config
/// let plan: Arc<dyn ExecutionPlan> = config.build();
/// ```
@@ -145,7 +147,7 @@ pub struct FileScanConfig {
/// DataFusion may attempt to read each partition of files
/// concurrently, however files *within* a partition will be read
/// sequentially, one after the next.
- pub file_groups: Vec<Vec<PartitionedFile>>,
+ pub file_groups: Vec<FileGroup>,
/// Table constraints
pub constraints: Constraints,
/// Estimated overall statistics of the files, taking `filters` into
account.
@@ -226,7 +228,7 @@ impl DataSource for FileScanConfig {
DisplayFormatType::TreeRender => {
writeln!(f, "format={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)?;
- let num_files =
self.file_groups.iter().map(Vec::len).sum::<usize>();
+ let num_files = self.file_groups.iter().map(|fg|
fg.len()).sum::<usize>();
writeln!(f, "files={num_files}")?;
Ok(())
}
@@ -450,16 +452,13 @@ impl FileScanConfig {
///
/// See [Self::file_groups] for more information.
pub fn with_file(self, file: PartitionedFile) -> Self {
- self.with_file_group(vec![file])
+ self.with_file_group(FileGroup::new(vec![file]))
}
/// Add the file groups
///
/// See [Self::file_groups] for more information.
- pub fn with_file_groups(
- mut self,
- mut file_groups: Vec<Vec<PartitionedFile>>,
- ) -> Self {
+ pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self
{
self.file_groups.append(&mut file_groups);
self
}
@@ -467,7 +466,7 @@ impl FileScanConfig {
/// Add a new file group
///
/// See [Self::file_groups] for more information
- pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self
{
+ pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
self.file_groups.push(file_group);
self
}
@@ -581,10 +580,13 @@ impl FileScanConfig {
/// It will produce the smallest number of file groups possible.
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
- file_groups: &[Vec<PartitionedFile>],
+ file_groups: &[FileGroup],
sort_order: &LexOrdering,
- ) -> Result<Vec<Vec<PartitionedFile>>> {
- let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+ ) -> Result<Vec<FileGroup>> {
+ let flattened_files = file_groups
+ .iter()
+ .flat_map(FileGroup::iter)
+ .collect::<Vec<_>>();
// First Fit:
// * Choose the first file group that a file can be placed into.
// * If it fits into no existing file groups, create a new one.
@@ -1044,7 +1046,7 @@ fn get_projected_output_ordering(
&new_ordering,
projected_schema,
base_config.projection.as_deref(),
- group,
+ group.iter(),
) {
Ok(statistics) => statistics,
Err(e) => {
@@ -1653,8 +1655,9 @@ mod tests {
.collect::<Result<Vec<_>>>()?,
);
- let partitioned_files =
- case.files.into_iter().map(From::from).collect::<Vec<_>>();
+ let partitioned_files = FileGroup::new(
+ case.files.into_iter().map(From::from).collect::<Vec<_>>(),
+ );
let result = FileScanConfig::split_groups_by_statistics(
&table_schema,
&[partitioned_files.clone()],
diff --git a/datafusion/datasource/src/file_sink_config.rs
b/datafusion/datasource/src/file_sink_config.rs
index 279c9d2100..dbff28692c 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use crate::file_groups::FileGroup;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
-use crate::{ListingTableUrl, PartitionedFile};
+use crate::ListingTableUrl;
use arrow::datatypes::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::Result;
@@ -90,8 +91,9 @@ pub struct FileSinkConfig {
pub original_url: String,
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
- /// A vector of [`PartitionedFile`] structs, each representing a file
partition
- pub file_groups: Vec<PartitionedFile>,
+ /// A collection of files organized into groups.
+ /// Each FileGroup contains one or more PartitionedFile objects.
+ pub file_group: FileGroup,
/// Vector of partition paths
pub table_paths: Vec<ListingTableUrl>,
/// The schema of the output file
diff --git a/datafusion/datasource/src/file_stream.rs
b/datafusion/datasource/src/file_stream.rs
index 7d17d230fc..904f152287 100644
--- a/datafusion/datasource/src/file_stream.rs
+++ b/datafusion/datasource/src/file_stream.rs
@@ -88,10 +88,10 @@ impl FileStream {
.collect::<Vec<_>>(),
);
- let files = config.file_groups[partition].clone();
+ let file_group = config.file_groups[partition].clone();
Ok(Self {
- file_iter: files.into(),
+ file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index a417eccee1..833c1ad512 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -32,7 +32,9 @@ use datafusion::datasource::file_format::json::JsonSink;
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::listing::{FileRange, ListingTableUrl,
PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig,
FileSource};
+use datafusion::datasource::physical_plan::{
+ FileGroup, FileScanConfig, FileSinkConfig, FileSource,
+};
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::WindowFunctionDefinition;
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr,
ScalarFunctionExpr};
@@ -496,7 +498,7 @@ pub fn parse_protobuf_file_scan_config(
let constraints = convert_required!(proto.constraints)?;
let statistics = convert_required!(proto.statistics)?;
- let file_groups: Vec<Vec<PartitionedFile>> = proto
+ let file_groups = proto
.file_groups
.iter()
.map(|f| f.try_into())
@@ -585,14 +587,16 @@ impl TryFrom<&protobuf::FileRange> for FileRange {
}
}
-impl TryFrom<&protobuf::FileGroup> for Vec<PartitionedFile> {
+impl TryFrom<&protobuf::FileGroup> for FileGroup {
type Error = DataFusionError;
fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
- val.files
+ let files = val
+ .files
.iter()
.map(|f| f.try_into())
- .collect::<Result<Vec<_>, _>>()
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(FileGroup::new(files))
}
}
@@ -634,11 +638,12 @@ impl TryFrom<&protobuf::FileSinkConfig> for
FileSinkConfig {
type Error = DataFusionError;
fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
- let file_groups = conf
- .file_groups
- .iter()
- .map(TryInto::try_into)
- .collect::<Result<Vec<_>>>()?;
+ let file_group = FileGroup::new(
+ conf.file_groups
+ .iter()
+ .map(|f| f.try_into())
+ .collect::<Result<Vec<_>>>()?,
+ );
let table_paths = conf
.table_paths
.iter()
@@ -660,7 +665,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
Ok(Self {
original_url: String::default(),
object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
- file_groups,
+ file_group,
table_paths,
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index c2cf506eb9..2eed60590c 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -485,7 +485,7 @@ pub fn serialize_file_scan_config(
let file_groups = conf
.file_groups
.iter()
- .map(|p| p.as_slice().try_into())
+ .map(|p| p.files().try_into())
.collect::<Result<Vec<_>, _>>()?;
let mut output_orderings = vec![];
@@ -585,7 +585,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
let file_groups = conf
- .file_groups
+ .file_group
.iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index aeae39c4d0..57d10e38ee 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -46,7 +46,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
- wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
+ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup,
FileScanConfig,
FileSinkConfig, FileSource, ParquetSource,
};
use datafusion::execution::FunctionRegistry;
@@ -743,10 +743,10 @@ fn roundtrip_parquet_exec_with_pruning_predicate() ->
Result<()> {
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema,
file_source)
- .with_file_groups(vec![vec![PartitionedFile::new(
+ .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
- )]])
+ )])])
.with_statistics(Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
@@ -770,7 +770,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols()
-> Result<()> {
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema,
file_source)
.with_projection(Some(vec![0, 1]))
- .with_file_group(vec![file_group])
+ .with_file_group(FileGroup::new(vec![file_group]))
.with_table_partition_cols(vec![Field::new(
"part".to_string(),
wrap_partition_type_in_dict(DataType::Int16),
@@ -797,10 +797,10 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() ->
Result<()> {
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema,
file_source)
- .with_file_groups(vec![vec![PartitionedFile::new(
+ .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
- )]])
+ )])])
.with_statistics(Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
@@ -1296,7 +1296,7 @@ fn roundtrip_json_sink() -> Result<()> {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group:
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1333,7 +1333,7 @@ fn roundtrip_csv_sink() -> Result<()> {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group:
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1389,7 +1389,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+ file_group:
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1609,10 +1609,10 @@ async fn roundtrip_projection_source() -> Result<()> {
schema.clone(),
file_source,
)
- .with_file_groups(vec![vec![PartitionedFile::new(
+ .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
- )]])
+ )])])
.with_statistics(statistics)
.with_projection(Some(vec![0, 1, 2]));
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 7bbdfc2a5d..cf1e5c2db9 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -22,7 +22,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{not_impl_err, substrait_err};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig,
ParquetSource};
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
@@ -134,7 +134,7 @@ pub async fn from_substrait_rel(
let part_index = file.partition_index as usize;
while part_index >= file_groups.len() {
- file_groups.push(vec![]);
+ file_groups.push(FileGroup::default());
}
file_groups[part_index].push(partitioned_file)
}
diff --git a/datafusion/substrait/src/physical_plan/producer.rs
b/datafusion/substrait/src/physical_plan/producer.rs
index f84db541b7..65eaf79852 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -63,7 +63,7 @@ pub fn to_substrait_rel(
let mut substrait_files = vec![];
for (partition_index, files) in
file_config.file_groups.iter().enumerate()
{
- for file in files {
+ for file in files.iter() {
substrait_files.push(FileOrFiles {
partition_index:
partition_index.try_into().unwrap(),
start: 0,
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index f1284db2ad..d8b3867732 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -22,7 +22,7 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig,
ParquetSource};
use datafusion::error::Result;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
@@ -40,14 +40,14 @@ async fn parquet_exec() -> Result<()> {
source,
)
.with_file_groups(vec![
- vec![PartitionedFile::new(
+ FileGroup::new(vec![PartitionedFile::new(
"file://foo/part-0.parquet".to_string(),
123,
- )],
- vec![PartitionedFile::new(
+ )]),
+ FileGroup::new(vec![PartitionedFile::new(
"file://foo/part-1.parquet".to_string(),
123,
- )],
+ )]),
]);
let parquet_exec: Arc<dyn ExecutionPlan> = scan_config.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]