alamb commented on code in PR #9593:
URL: https://github.com/apache/arrow-datafusion/pull/9593#discussion_r1535930133
##########
datafusion/core/src/datasource/listing/mod.rs:
##########
@@ -67,6 +67,8 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
+ /// Optional statistics that describe the data in this file
Review Comment:
I think we should also provide some more guidance here for example
```suggestion
/// Optional statistics that describe the data in this file if known.
///
/// DataFusion relies on these statistics for planning so if they are
incorrect
/// incorrect answers may result.
```
##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -169,6 +171,38 @@ SELECT min(date_col) FROM test_table;
----
1970-01-02
+# Clean up
+statement ok
+DROP TABLE test_table;
+
+# Do one more test, but order by numeric columns:
Review Comment:
I would like to request moving the tests for this optimization to its own
.slt file (e.g. `sorted_parquet.slt` or something). It is important enough and
tricky enough I think to warrant specialized tests
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
.with_repartition_file_min_size(repartition_file_min_size)
.repartition_file_groups(&file_groups)
}
+
+ /// Attempts to do a bin-packing on files into file groups, such that any
two files
+ /// in a file group are ordered and non-overlapping with respect to their
statistics.
+ /// It will produce the smallest number of file groups possible.
+ pub fn sort_file_groups(
+ table_schema: &SchemaRef,
+ projected_schema: &SchemaRef,
+ file_groups: &[Vec<PartitionedFile>],
+ sort_order: &[PhysicalSortExpr],
+ ) -> Result<Vec<Vec<PartitionedFile>>> {
+ let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+ // First Fit:
+ // * Choose the first file group that a file can be placed into.
+ // * If it fits into no existing file groups, create a new one.
+ //
+ // By sorting files by min values and then applying first-fit bin
packing,
+ // we can produce the smallest number of file groups such that
+ // files within a group are in order and non-overlapping.
+ //
+ // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
+ //
https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
+
+ let statistics = MinMaxStatistics::new_from_files(
+ sort_order,
+ table_schema,
+ projected_schema,
+ flattened_files.iter().copied(),
+ )
+ .map_err(|e| e.context("construct min/max statistics"))?;
Review Comment:
Having a bit more context would be helpful I think
```suggestion
.map_err(|e| e.context("construct min/max statistics for
sort_file_groups"))?;
```
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -762,6 +836,171 @@ mod tests {
assert_eq!(projection.fields(), schema.fields());
}
+ #[test]
+ fn test_sort_file_groups() -> Result<()> {
+ use chrono::TimeZone;
+ use datafusion_common::DFSchema;
+ use datafusion_expr::execution_props::ExecutionProps;
+ use object_store::{path::Path, ObjectMeta};
+
+ struct File {
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ }
+ impl File {
+ fn new(
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ ) -> Self {
+ Self {
+ name,
+ date,
+ statistics,
+ }
+ }
+ }
+
+ struct TestCase {
+ #[allow(unused)]
+ file_schema: Schema,
+ files: Vec<File>,
+ sort: Vec<datafusion_expr::Expr>,
+ expected_result: Result<Vec<Vec<usize>>, &'static str>,
+ }
+
+ use datafusion_expr::col;
+ let cases = vec![
+ TestCase {
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![vec![0, 1], vec![2]]),
+ },
Review Comment:
Other important cases are additional negative cases
1. The files have values but no statistics (the groups shouldn't get
reordered)
2. One of the files has no statistics
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -138,12 +138,21 @@ impl FileScanConfig {
column_statistics: table_cols_stats,
};
- let table_schema = Arc::new(
+ let projected_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);
+
+ let full_table_schema = {
Review Comment:
I wonder if we can avoid creating the entire schema here and instead reuse
it from elsewhere -- the information should already be present to plan queries
I think. As we try to make planning more efficient avoiding re-creating the
same schema over and over again I think will be importatn
##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -169,6 +171,38 @@ SELECT min(date_col) FROM test_table;
----
1970-01-02
+# Clean up
+statement ok
+DROP TABLE test_table;
+
+# Do one more test, but order by numeric columns:
+# This is to exercise file group sorting, which uses file-level statistics
+# DataFusion doesn't currently support string column statistics
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ int_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL
+)
+STORED AS PARQUET
+WITH HEADER ROW
+WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet/test_table';
+
+# Check output plan again, expect an "output_ordering" clause in the
physical_plan -> ParquetExec:
+# After https://github.com/apache/arrow-datafusion/pull/9593 this should not
require a sort.
Review Comment:
I don't fully understand this comment -- this is a new test added as part of
that PR, right?
##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -482,6 +508,171 @@ fn get_projected_output_ordering(
all_orderings
}
+// A normalized representation of file min/max statistics that allows for
efficient sorting & comparison.
Review Comment:
I think it would help to explain the requirements on this structure for
correctness:
1. That all rows in the sort order have statistics for all values
##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -448,14 +448,40 @@ impl From<ObjectMeta> for FileMeta {
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
+ table_schema: &SchemaRef,
) -> Vec<Vec<PhysicalSortExpr>> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
- if base_config.file_groups.iter().any(|group| group.len() > 1) {
- debug!("Skipping specified output ordering {:?}. Some file group
had more than one file: {:?}",
- base_config.output_ordering[0], base_config.file_groups);
+ // Check if any file groups are not sorted
+ if base_config.file_groups.iter().any(|group| {
+ if group.len() <= 1 {
+ // File groups with <= 1 files are always sorted
+ return false;
+ }
+
+ let statistics = match MinMaxStatistics::new_from_files(
Review Comment:
Since creating `MinMaxStatistics` is likely relatively expensive I wonder if
there is some way to avoid it being recreated twice
##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -482,6 +508,171 @@ fn get_projected_output_ordering(
all_orderings
}
+// A normalized representation of file min/max statistics that allows for
efficient sorting & comparison.
+pub(crate) struct MinMaxStatistics {
+ min: arrow::row::Rows,
+ max: arrow::row::Rows,
+}
+
+impl MinMaxStatistics {
+ fn new_from_files<'a>(
+ sort_order: &[PhysicalSortExpr],
+ table_schema: &SchemaRef,
+ projected_schema: &SchemaRef,
+ files: impl IntoIterator<Item = &'a PartitionedFile>,
+ ) -> Result<Self> {
+ use datafusion_common::ScalarValue;
+
+ let statistics_and_partition_values = files
+ .into_iter()
+ .map(|file| {
+ file.statistics
+ .as_ref()
+ .zip(Some(file.partition_values.as_slice()))
+ })
+ .collect::<Option<Vec<_>>>()
+ .ok_or_else(|| {
+ DataFusionError::Plan("Parquet file missing
statistics".to_string())
+ })?;
+
+ let get_min_max = |i: usize| -> (Vec<ScalarValue>, Vec<ScalarValue>) {
+ statistics_and_partition_values
+ .iter()
+ .map(|(s, pv)| {
+ if i < s.column_statistics.len() {
+ (
+ s.column_statistics[i]
+ .min_value
+ .get_value()
+ .cloned()
+ .unwrap_or(ScalarValue::Null),
Review Comment:
I don't think we can just use `NULL` when the statistics are not present (as
NULL will sort somewhere in the sort order, but in reality the data may be
anywhere / possibly overlap) -- Thus I think this code needs to return an error
in this case
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
.with_repartition_file_min_size(repartition_file_min_size)
.repartition_file_groups(&file_groups)
}
+
+ /// Attempts to do a bin-packing on files into file groups, such that any
two files
+ /// in a file group are ordered and non-overlapping with respect to their
statistics.
+ /// It will produce the smallest number of file groups possible.
+ pub fn sort_file_groups(
Review Comment:
maybe we could name this function something more related to the non
overlapping nature
Perhaps like `split_groups_by_statistics` or something?
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -762,6 +836,171 @@ mod tests {
assert_eq!(projection.fields(), schema.fields());
}
+ #[test]
+ fn test_sort_file_groups() -> Result<()> {
+ use chrono::TimeZone;
+ use datafusion_common::DFSchema;
+ use datafusion_expr::execution_props::ExecutionProps;
+ use object_store::{path::Path, ObjectMeta};
+
+ struct File {
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ }
+ impl File {
+ fn new(
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ ) -> Self {
+ Self {
+ name,
+ date,
+ statistics,
+ }
+ }
+ }
+
+ struct TestCase {
+ #[allow(unused)]
Review Comment:
why is `allow(unused)` needed?
##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
.with_repartition_file_min_size(repartition_file_min_size)
.repartition_file_groups(&file_groups)
}
+
+ /// Attempts to do a bin-packing on files into file groups, such that any
two files
+ /// in a file group are ordered and non-overlapping with respect to their
statistics.
+ /// It will produce the smallest number of file groups possible.
Review Comment:
> As for next steps, it actually just occurred to me that if there are 1000
files that all overlap, after this PR ListingTable will produce 1000 file
groups which is really bad. So I think I should add a check that only uses the
sorted file groups if they produce less than target_partitions file groups. I
will add that in this PR.
I agree
> Thanks. I think another possible next step is to move the implementation
of ProgressiveEvalExec from InfluxDB IOx here and then someone can use it and
the results of this PR to do #6672.
I think it is a good idea @NGA-TRAN -- @suremarc and anyone else following
along `ProgressiveEvalExec` is an operator we implemented in InfluxDB 3.0 to
replace `SortPreservingMerge` in plans where we know the inputs don't overlap
in sort-key space (what this PR calculates) and that avoids the merge as well
as reads the files more sequentially
> Other than that I'm not sure immediately sure what next steps are, without
gathering more data from real-world usage. Though, I would like to add some
benchmarks to DataFusion for large plans (with thousands of files), as I think
the performance of this code might become more apparent
I agree a real world benchmark would be great. As the "running queries on
sorted parquet files" is also quite important to InfluxDB 3.0 we can probably
help you with that.
Do you have data in mind? If not, we could potentially make something using
https://github.com/apache/arrow-datafusion/blob/main/test-utils/src/data_gen.rs#L251
(that sorts on the first few low cardinality columns, for example)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]