alamb commented on code in PR #9593:
URL: https://github.com/apache/datafusion/pull/9593#discussion_r1585506338


##########
datafusion/core/src/datasource/listing/mod.rs:
##########
@@ -67,6 +67,11 @@ pub struct PartitionedFile {
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
+    /// Optional statistics that describe the data in this file if known.
+    ///
+    /// DataFusion relies on these statistics for planning (in particular to 
sort file groups),
+    /// so if they are incorrect, incorrect answers may result.
+    pub statistics: Option<Statistics>,

Review Comment:
   this is actually a nice API to potentially provide pre-known statistics 👍 



##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -473,15 +468,281 @@ fn get_projected_output_ordering(
             // since rest of the orderings are violated
             break;
         }
+
         // do not push empty entries
         // otherwise we may have `Some(vec![])` at the output ordering.
-        if !new_ordering.is_empty() {
-            all_orderings.push(new_ordering);
+        if new_ordering.is_empty() {
+            continue;
+        }
+
+        // Check if any file groups are not sorted
+        if base_config.file_groups.iter().any(|group| {
+            if group.len() <= 1 {
+                // File groups with <= 1 files are always sorted
+                return false;
+            }
+
+            let statistics = match MinMaxStatistics::new_from_files(
+                &new_ordering,
+                projected_schema,
+                base_config.projection.as_deref(),
+                group,
+            ) {
+                Ok(statistics) => statistics,
+                Err(e) => {
+                    log::trace!("Error fetching statistics for file group: 
{e}");
+                    // we can't prove that it's ordered, so we have to reject 
it
+                    return true;
+                }
+            };
+
+            !statistics.is_sorted()
+        }) {
+            debug!(
+                "Skipping specified output ordering {:?}. \
+                Some file groups couldn't be determined to be sorted: {:?}",
+                base_config.output_ordering[0], base_config.file_groups
+            );
+            continue;
         }
+
+        all_orderings.push(new_ordering);
     }
     all_orderings
 }
 
+/// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.

Review Comment:
   Could you please put this structure into its own module (e.g. 
datafusion/core/src/datasource/physical_plan/statistics.rs) so that it is 
easier to find



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -770,6 +843,277 @@ mod tests {
         assert_eq!(projection.fields(), schema.fields());
     }
 
+    #[test]
+    fn test_split_groups_by_statistics() -> Result<()> {
+        use chrono::TimeZone;
+        use datafusion_common::DFSchema;
+        use datafusion_expr::execution_props::ExecutionProps;
+        use object_store::{path::Path, ObjectMeta};
+
+        struct File {
+            name: &'static str,
+            date: &'static str,
+            statistics: Vec<Option<(f64, f64)>>,
+        }
+        impl File {
+            fn new(
+                name: &'static str,
+                date: &'static str,
+                statistics: Vec<Option<(f64, f64)>>,
+            ) -> Self {
+                Self {
+                    name,
+                    date,
+                    statistics,
+                }
+            }
+        }
+
+        struct TestCase {
+            name: &'static str,
+            file_schema: Schema,
+            files: Vec<File>,
+            sort: Vec<datafusion_expr::Expr>,
+            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
+        }
+
+        use datafusion_expr::col;
+        let cases = vec![
+            TestCase {
+                name: "test sort",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+            },
+            // same input but file '2' is in the middle
+            // test that we still order correctly
+            TestCase {
+                name: "test sort with files ordered differently",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+            },
+            TestCase {
+                name: "reverse sort",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(false, true)],
+                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
+            },
+            // reject nullable sort columns
+            TestCase {
+                name: "no nullable sort columns",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    true, // should fail because nullable
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Err("construct min/max statistics for 
split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate 
sorting columns\ncaused by\nError during planning: cannot sort by nullable 
column")
+            },
+            TestCase {
+                name: "all three non-overlapping",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),

Review Comment:
   I think all these tests also always have the first file with the minimum 
stastistics value -- can you possibly also test what happens when it is not 
(aka add a test that runs this test with file ids 2, 1, 0)?



##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -473,15 +468,281 @@ fn get_projected_output_ordering(
             // since rest of the orderings are violated
             break;
         }
+
         // do not push empty entries
         // otherwise we may have `Some(vec![])` at the output ordering.
-        if !new_ordering.is_empty() {
-            all_orderings.push(new_ordering);
+        if new_ordering.is_empty() {
+            continue;
+        }
+
+        // Check if any file groups are not sorted
+        if base_config.file_groups.iter().any(|group| {
+            if group.len() <= 1 {
+                // File groups with <= 1 files are always sorted
+                return false;
+            }
+
+            let statistics = match MinMaxStatistics::new_from_files(
+                &new_ordering,
+                projected_schema,
+                base_config.projection.as_deref(),
+                group,
+            ) {
+                Ok(statistics) => statistics,
+                Err(e) => {
+                    log::trace!("Error fetching statistics for file group: 
{e}");
+                    // we can't prove that it's ordered, so we have to reject 
it
+                    return true;
+                }
+            };
+
+            !statistics.is_sorted()
+        }) {
+            debug!(
+                "Skipping specified output ordering {:?}. \
+                Some file groups couldn't be determined to be sorted: {:?}",
+                base_config.output_ordering[0], base_config.file_groups
+            );
+            continue;
         }
+
+        all_orderings.push(new_ordering);
     }
     all_orderings
 }
 
+/// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.
+/// The min/max values are ordered by [`Self::sort_order`].
+/// Furthermore, any columns that are reversed in the sort order have their 
min/max values swapped.
+pub(crate) struct MinMaxStatistics {
+    min_by_sort_order: arrow::row::Rows,
+    max_by_sort_order: arrow::row::Rows,
+    sort_order: Vec<PhysicalSortExpr>,
+}
+
+impl MinMaxStatistics {
+    #[allow(unused)]
+    fn sort_order(&self) -> &[PhysicalSortExpr] {
+        &self.sort_order
+    }
+
+    fn new_from_files<'a>(
+        projected_sort_order: &[PhysicalSortExpr], // Sort order with respect 
to projected schema
+        projected_schema: &SchemaRef,              // Projected schema
+        projection: Option<&[usize]>, // Indices of projection in full table 
schema (None = all columns)

Review Comment:
   I didn't see any tests that covered a non `None` projection and I am a 
little confused about how it could be correct if the projection was in terms of 
another schema 🤔 



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -770,6 +843,277 @@ mod tests {
         assert_eq!(projection.fields(), schema.fields());
     }
 
+    #[test]
+    fn test_split_groups_by_statistics() -> Result<()> {
+        use chrono::TimeZone;
+        use datafusion_common::DFSchema;
+        use datafusion_expr::execution_props::ExecutionProps;
+        use object_store::{path::Path, ObjectMeta};
+
+        struct File {
+            name: &'static str,
+            date: &'static str,
+            statistics: Vec<Option<(f64, f64)>>,
+        }
+        impl File {
+            fn new(
+                name: &'static str,
+                date: &'static str,
+                statistics: Vec<Option<(f64, f64)>>,
+            ) -> Self {
+                Self {
+                    name,
+                    date,
+                    statistics,
+                }
+            }
+        }
+
+        struct TestCase {
+            name: &'static str,
+            file_schema: Schema,
+            files: Vec<File>,
+            sort: Vec<datafusion_expr::Expr>,
+            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
+        }
+
+        use datafusion_expr::col;
+        let cases = vec![
+            TestCase {
+                name: "test sort",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+            },
+            // same input but file '2' is in the middle
+            // test that we still order correctly
+            TestCase {
+                name: "test sort with files ordered differently",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+            },
+            TestCase {
+                name: "reverse sort",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(false, true)],
+                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
+            },
+            // reject nullable sort columns
+            TestCase {
+                name: "no nullable sort columns",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    true, // should fail because nullable
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Err("construct min/max statistics for 
split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate 
sorting columns\ncaused by\nError during planning: cannot sort by nullable 
column")
+            },
+            TestCase {
+                name: "all three non-overlapping",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
+                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0", "1", "2"]]),
+            },
+            TestCase {
+                name: "all three overlapping",
+                file_schema: Schema::new(vec![Field::new(
+                    "value".to_string(),
+                    DataType::Float64,
+                    false,
+                )]),
+                files: vec![
+                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
+                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
+                ],
+                sort: vec![col("value").sort(true, false)],
+                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
+            },
+            TestCase {

Review Comment:
   can we please add a test for a single input file too?



##########
datafusion/core/src/datasource/physical_plan/file_scan_config.rs:
##########
@@ -194,6 +196,77 @@ impl FileScanConfig {
             .with_repartition_file_min_size(repartition_file_min_size)
             .repartition_file_groups(&file_groups)
     }
+
+    /// Attempts to do a bin-packing on files into file groups, such that any 
two files
+    /// in a file group are ordered and non-overlapping with respect to their 
statistics.
+    /// It will produce the smallest number of file groups possible.
+    pub fn split_groups_by_statistics(
+        table_schema: &SchemaRef,
+        file_groups: &[Vec<PartitionedFile>],
+        sort_order: &[PhysicalSortExpr],
+    ) -> Result<Vec<Vec<PartitionedFile>>> {
+        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+        // First Fit:
+        // * Choose the first file group that a file can be placed into.
+        // * If it fits into no existing file groups, create a new one.
+        //
+        // By sorting files by min values and then applying first-fit bin 
packing,
+        // we can produce the smallest number of file groups such that
+        // files within a group are in order and non-overlapping.
+        //
+        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
+        // 
https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
+
+        if flattened_files.is_empty() {
+            return Ok(vec![]);
+        }
+
+        let statistics = MinMaxStatistics::new_from_files(
+            sort_order,
+            table_schema,
+            None,
+            flattened_files.iter().copied(),
+        )
+        .map_err(|e| {
+            e.context("construct min/max statistics for 
split_groups_by_statistics")
+        })?;
+
+        let indices_sorted_by_min = {

Review Comment:
   I wonder if we could move this into `statistics itself`  somehing like
   
   ```rust
           let indices_sorted_by_min = statistics.indices_sorted_by_min()
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to