suremarc commented on code in PR #15473: URL: https://github.com/apache/datafusion/pull/15473#discussion_r2020364348
########## datafusion/datasource/src/file_scan_config.rs: ########## @@ -575,6 +575,95 @@ impl FileScanConfig { }) } + /// Splits file groups into new groups based on statistics to enable efficient parallel processing. + /// + /// The method distributes files across a target number of partitions while ensuring + /// files within each partition maintain sort order based on their min/max statistics. + /// + /// The algorithm works by: + /// 1. Sorting all files by their minimum values + /// 2. Trying to place each file into an existing group where it can maintain sort order + /// 3. Creating new groups when necessary if a file cannot fit into existing groups + /// 4. Prioritizing smaller groups when multiple suitable groups exist (for load balancing) + /// + /// # Parameters + /// * `table_schema`: Schema containing information about the columns + /// * `file_groups`: The original file groups to split + /// * `sort_order`: The lexicographical ordering to maintain within each group + /// * `target_partitions`: The desired number of output partitions + /// + /// # Returns + /// A new set of file groups, where files within each group are non-overlapping with respect to + /// their min/max statistics and maintain the specified sort order. + pub fn split_groups_by_statistics_v2( Review Comment: Perhaps we could call it `split_groups_by_statistics_with_target_partitions`? TBH I am not sure if anyone is using the old method, so I would wager it is safe to remove, and the new method could be called `split_groups_by_statistics`. But I agree the old one is probably more useful in certain scenarios, e.g. if you are doing a sort merge above it. If we were to keep it, I would rather unify the implementations, the only thing that differs is the policy for selecting the group to insert. I think we could probably abstract that out into an enum or generic parameter. (Not really sure how common generics are in datafusion though) ########## datafusion/datasource/benches/split_groups_by_statistics.rs: ########## @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use object_store::{path::Path, ObjectMeta}; +use std::sync::Arc; +use std::time::Duration; + +/// Generates test files with min-max statistics in different overlap patterns +fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> { + let mut files = Vec::with_capacity(num_files); + let range_size = if overlap_factor == 0.0 { + 100 / num_files as i64 + } else { + (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64 + }; + + for i in 0..num_files { + let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64; + let min = base as f64; + let max = (base + range_size) as f64; Review Comment: Can we get some comments explaining how `overlap_factor` works and what the expected distribution of files looks like? It's not super obvious to me how it works or if it resembles real life. ########## datafusion/datasource/src/file_scan_config.rs: ########## @@ -575,6 +575,95 @@ impl FileScanConfig { }) } + /// Splits file groups into new groups based on statistics to enable efficient parallel processing. + /// + /// The method distributes files across a target number of partitions while ensuring + /// files within each partition maintain sort order based on their min/max statistics. + /// + /// The algorithm works by: + /// 1. Sorting all files by their minimum values + /// 2. Trying to place each file into an existing group where it can maintain sort order + /// 3. Creating new groups when necessary if a file cannot fit into existing groups + /// 4. Prioritizing smaller groups when multiple suitable groups exist (for load balancing) + /// + /// # Parameters + /// * `table_schema`: Schema containing information about the columns + /// * `file_groups`: The original file groups to split + /// * `sort_order`: The lexicographical ordering to maintain within each group + /// * `target_partitions`: The desired number of output partitions + /// + /// # Returns + /// A new set of file groups, where files within each group are non-overlapping with respect to + /// their min/max statistics and maintain the specified sort order. + pub fn split_groups_by_statistics_v2( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result<Vec<FileGroup>> { + let flattened_files = file_groups + .iter() + .flat_map(FileGroup::iter) + .collect::<Vec<_>>(); + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + )?; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec<Vec<usize>> = + vec![vec![]; target_partitions.max(1)]; + + for (idx, min) in indices_sorted_by_min { + // Find all groups where the file can fit + let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .collect(); + + // Sort by group size to prioritize smaller groups + suitable_groups.sort_by_key(|(_, group)| group.len()); + + if let Some((_, group)) = suitable_groups.first_mut() { + group.push(idx); + } else { + // Create a new group if no existing group fits + file_groups_indices.push(vec![idx]); Review Comment: Hey @xudong963 I know you marked this as resolved, but I think copilot has a good point, perhaps we should fail fast if `target_partitions` is exceeded. Otherwise I think we may have quadratic growth in the worst case when all files overlap, as the number of file groups increases. Edit: I added a benchmark for `overlap=1.0` and `files=10000` and the scaling looks quadratic: ```sh ❯ cargo bench --bench split_groups_by_statistics "split_groups/v2_partitions=32/files=.*,overlap=1.0" split_groups/v2_partitions=32/files=10,overlap=1.0 time: [3.4016 µs 3.4221 µs 3.4484 µs] change: [-0.7305% +0.0518% +0.8399%] (p = 0.91 > 0.05) No change in performance detected. split_groups/v2_partitions=32/files=100,overlap=1.0 time: [28.248 µs 28.366 µs 28.499 µs] change: [-0.5186% +0.2265% +0.9370%] (p = 0.55 > 0.05) No change in performance detected. split_groups/v2_partitions=32/files=1000,overlap=1.0 time: [1.0781 ms 1.0819 ms 1.0859 ms] change: [-0.1957% +0.3401% +0.9614%] (p = 0.26 > 0.05) No change in performance detected. split_groups/v2_partitions=32/files=10000,overlap=1.0 time: [89.536 ms 89.808 ms 90.100 ms] ``` Seems quadratic to me. Though, perhaps this case is not common IRL. FWIW I didn't see quadratic scaling at `overlap=0.8` but I am not sure exactly how the `overlap` works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org