alamb commented on code in PR #9593:
URL: https://github.com/apache/arrow-datafusion/pull/9593#discussion_r1535930133


##########
datafusion/core/src/datasource/listing/mod.rs:
##########
@@ -67,6 +67,8 @@ pub struct PartitionedFile {
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
+    /// Optional statistics that describe the data in this file

Review Comment:
   I think we should also provide some more guidance here for example
   
   ```suggestion
       /// Optional statistics that describe the data in this file if known.
       ///
       /// DataFusion relies on these statistics for planning so if they are 
incorrect
       /// incorrect answers may result.
   ```



##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -169,6 +171,38 @@ SELECT min(date_col) FROM test_table;
 ----
 1970-01-02
 
+# Clean up
+statement ok
+DROP TABLE test_table;
+
+# Do one more test, but order by numeric columns:

Review Comment:
   I would like to request moving the tests for this optimization to its own 
.slt file (e.g. `sorted_parquet.slt` or something). It is important enough and 
tricky enough I think to warrant specialized tests



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
             .with_repartition_file_min_size(repartition_file_min_size)
             .repartition_file_groups(&file_groups)
     }
+
+    /// Attempts to do a bin-packing on files into file groups, such that any 
two files
+    /// in a file group are ordered and non-overlapping with respect to their 
statistics.
+    /// It will produce the smallest number of file groups possible.
+    pub fn sort_file_groups(
+        table_schema: &SchemaRef,
+        projected_schema: &SchemaRef,
+        file_groups: &[Vec<PartitionedFile>],
+        sort_order: &[PhysicalSortExpr],
+    ) -> Result<Vec<Vec<PartitionedFile>>> {
+        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+        // First Fit:
+        // * Choose the first file group that a file can be placed into.
+        // * If it fits into no existing file groups, create a new one.
+        //
+        // By sorting files by min values and then applying first-fit bin 
packing,
+        // we can produce the smallest number of file groups such that
+        // files within a group are in order and non-overlapping.
+        //
+        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
+        // 
https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
+
+        let statistics = MinMaxStatistics::new_from_files(
+            sort_order,
+            table_schema,
+            projected_schema,
+            flattened_files.iter().copied(),
+        )
+        .map_err(|e| e.context("construct min/max statistics"))?;

Review Comment:
   Having a bit more context would be helpful I think
   
   ```suggestion
           .map_err(|e| e.context("construct min/max statistics for 
sort_file_groups"))?;
   ```



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -762,6 +836,171 @@ mod tests {
         assert_eq!(projection.fields(), schema.fields());
     }
 
+    #[test]
+    fn test_sort_file_groups() -> Result<()> {
+        use chrono::TimeZone;
+        use datafusion_common::DFSchema;
+        use datafusion_expr::execution_props::ExecutionProps;
+        use object_store::{path::Path, ObjectMeta};
+
+        struct File {
+            name: &'static str,
+            date: &'static str,
+            statistics: Vec<Option<(f64, f64)>>,
+        }
+        impl File {
+            fn new(
+                name: &'static str,
+                date: &'static str,
+                statistics: Vec<Option<(f64, f64)>>,
+            ) -> Self {
+                Self {
+                    name,
+                    date,
+                    statistics,
+                }
+            }
+        }
+
+        struct TestCase {
+            #[allow(unused)]
+            file_schema: Schema,
+            files: Vec<File>,
+            sort: Vec<datafusion_expr::Expr>,
+            expected_result: Result<Vec<Vec<usize>>, &'static str>,
+        }
+
+        use datafusion_expr::col;
+        let cases = vec![
+            TestCase {
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec![0, 1], vec![2]]),
+            },

Review Comment:
   Other important cases are additional negative cases
   1. The  files have values but no statistics (the groups shouldn't get 
reordered)
   2. One of the files has no statistics



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -138,12 +138,21 @@ impl FileScanConfig {
             column_statistics: table_cols_stats,
         };
 
-        let table_schema = Arc::new(
+        let projected_schema = Arc::new(
             
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
         );
+
+        let full_table_schema = {

Review Comment:
   I wonder if we can avoid creating the entire schema here and instead reuse 
it from elsewhere -- the information should already be present to plan queries 
I think. As we try to make planning more efficient avoiding re-creating the 
same schema over and over again I think will be importatn



##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -169,6 +171,38 @@ SELECT min(date_col) FROM test_table;
 ----
 1970-01-02
 
+# Clean up
+statement ok
+DROP TABLE test_table;
+
+# Do one more test, but order by numeric columns:
+# This is to exercise file group sorting, which uses file-level statistics
+# DataFusion doesn't currently support string column statistics
+statement ok
+CREATE EXTERNAL TABLE test_table (
+  int_col INT NOT NULL,
+  string_col TEXT NOT NULL,
+  bigint_col BIGINT NOT NULL,
+  date_col DATE NOT NULL
+)
+STORED AS PARQUET
+WITH HEADER ROW
+WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet/test_table';
+
+# Check output plan again, expect an "output_ordering" clause in the 
physical_plan -> ParquetExec:
+# After https://github.com/apache/arrow-datafusion/pull/9593 this should not 
require a sort.

Review Comment:
   I don't fully understand this comment -- this is a new test added as part of 
that PR, right?



##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -482,6 +508,171 @@ fn get_projected_output_ordering(
     all_orderings
 }
 
+// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.

Review Comment:
   I think it would help to explain the requirements on this structure for 
correctness:
   1. That all rows in the sort order have statistics for all values



##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -448,14 +448,40 @@ impl From<ObjectMeta> for FileMeta {
 fn get_projected_output_ordering(
     base_config: &FileScanConfig,
     projected_schema: &SchemaRef,
+    table_schema: &SchemaRef,
 ) -> Vec<Vec<PhysicalSortExpr>> {
     let mut all_orderings = vec![];
     for output_ordering in &base_config.output_ordering {
-        if base_config.file_groups.iter().any(|group| group.len() > 1) {
-            debug!("Skipping specified output ordering {:?}. Some file group 
had more than one file: {:?}",
-            base_config.output_ordering[0], base_config.file_groups);
+        // Check if any file groups are not sorted
+        if base_config.file_groups.iter().any(|group| {
+            if group.len() <= 1 {
+                // File groups with <= 1 files are always sorted
+                return false;
+            }
+
+            let statistics = match MinMaxStatistics::new_from_files(

Review Comment:
   Since creating `MinMaxStatistics` is likely relatively expensive I wonder if 
there is some way to avoid it being recreated twice



##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -482,6 +508,171 @@ fn get_projected_output_ordering(
     all_orderings
 }
 
+// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.
+pub(crate) struct MinMaxStatistics {
+    min: arrow::row::Rows,
+    max: arrow::row::Rows,
+}
+
+impl MinMaxStatistics {
+    fn new_from_files<'a>(
+        sort_order: &[PhysicalSortExpr],
+        table_schema: &SchemaRef,
+        projected_schema: &SchemaRef,
+        files: impl IntoIterator<Item = &'a PartitionedFile>,
+    ) -> Result<Self> {
+        use datafusion_common::ScalarValue;
+
+        let statistics_and_partition_values = files
+            .into_iter()
+            .map(|file| {
+                file.statistics
+                    .as_ref()
+                    .zip(Some(file.partition_values.as_slice()))
+            })
+            .collect::<Option<Vec<_>>>()
+            .ok_or_else(|| {
+                DataFusionError::Plan("Parquet file missing 
statistics".to_string())
+            })?;
+
+        let get_min_max = |i: usize| -> (Vec<ScalarValue>, Vec<ScalarValue>) {
+            statistics_and_partition_values
+                .iter()
+                .map(|(s, pv)| {
+                    if i < s.column_statistics.len() {
+                        (
+                            s.column_statistics[i]
+                                .min_value
+                                .get_value()
+                                .cloned()
+                                .unwrap_or(ScalarValue::Null),

Review Comment:
   I don't think we can just use `NULL` when the statistics are not present (as 
NULL will sort somewhere in the sort order, but in reality the data may be 
anywhere / possibly overlap) -- Thus I think this code needs to return an error 
in this case



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
             .with_repartition_file_min_size(repartition_file_min_size)
             .repartition_file_groups(&file_groups)
     }
+
+    /// Attempts to do a bin-packing on files into file groups, such that any 
two files
+    /// in a file group are ordered and non-overlapping with respect to their 
statistics.
+    /// It will produce the smallest number of file groups possible.
+    pub fn sort_file_groups(

Review Comment:
   maybe we could name this function something more related to the non 
overlapping nature
   
   Perhaps like `split_groups_by_statistics` or something? 



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -762,6 +836,171 @@ mod tests {
         assert_eq!(projection.fields(), schema.fields());
     }
 
+    #[test]
+    fn test_sort_file_groups() -> Result<()> {
+        use chrono::TimeZone;
+        use datafusion_common::DFSchema;
+        use datafusion_expr::execution_props::ExecutionProps;
+        use object_store::{path::Path, ObjectMeta};
+
+        struct File {
+            name: &'static str,
+            date: &'static str,
+            statistics: Vec<Option<(f64, f64)>>,
+        }
+        impl File {
+            fn new(
+                name: &'static str,
+                date: &'static str,
+                statistics: Vec<Option<(f64, f64)>>,
+            ) -> Self {
+                Self {
+                    name,
+                    date,
+                    statistics,
+                }
+            }
+        }
+
+        struct TestCase {
+            #[allow(unused)]

Review Comment:
   why is `allow(unused)` needed?



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +203,71 @@ impl FileScanConfig {
             .with_repartition_file_min_size(repartition_file_min_size)
             .repartition_file_groups(&file_groups)
     }
+
+    /// Attempts to do a bin-packing on files into file groups, such that any 
two files
+    /// in a file group are ordered and non-overlapping with respect to their 
statistics.
+    /// It will produce the smallest number of file groups possible.

Review Comment:
   > As for next steps, it actually just occurred to me that if there are 1000 
files that all overlap, after this PR ListingTable will produce 1000 file 
groups which is really bad. So I think I should add a check that only uses the 
sorted file groups if they produce less than target_partitions file groups. I 
will add that in this PR.
   
   I agree
   
   
   > Thanks. I think another possible next step is to move the implementation 
of ProgressiveEvalExec from InfluxDB IOx here and then someone can use it and 
the results of this PR to do #6672.
   
   I think it is a good idea @NGA-TRAN  -- @suremarc and anyone else following 
along `ProgressiveEvalExec` is an operator we implemented in InfluxDB 3.0 to 
replace `SortPreservingMerge` in plans where we know the inputs don't overlap 
in sort-key space (what this PR calculates) and that avoids the merge as well 
as reads the files more sequentially
   
   > Other than that I'm not sure immediately sure what next steps are, without 
gathering more data from real-world usage. Though, I would like to add some 
benchmarks to DataFusion for large plans (with thousands of files), as I think 
the performance of this code might become more apparent
   
   I agree a real world benchmark would be great. As the "running queries on 
sorted parquet files" is also quite important to InfluxDB 3.0 we can probably 
help you with that.
   
   Do you have data in mind? If not, we could potentially make something using 
https://github.com/apache/arrow-datafusion/blob/main/test-utils/src/data_gen.rs#L251
 (that sorts on the first few low cardinality columns, for example)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to