2010YOUY01 commented on code in PR #15409: URL: https://github.com/apache/datafusion/pull/15409#discussion_r2065566166
########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3471,3 +3477,102 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_distribute_sort_parquet() -> Result<()> { + let test_config: TestConfig = + TestConfig::default().with_prefer_repartition_file_scans(1000); + assert!( + test_config.config.optimizer.repartition_datasource_scans, + "should enable scans to be repartitioned" + ); + + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]); + let physical_plan = sort_exec(sort_key, parquet_exec_with_stats(10000 * 8192), false); + + // prior to optimization, this is the starting plan + let starting = &[ + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + plans_matches_expected!(starting, physical_plan.clone()); + + // what the enforce distribution run does. + let expected = &[ + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " DataSourceExec: file_groups={10 groups: [[x:0..8192000], [x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], [x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], [x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected, physical_plan.clone(), &[Run::Distribution])?; + + // what the sort parallelization (in enforce sorting), does after the enforce distribution changes + let expected = &[ + "SortPreservingMergeExec: [c@2 ASC]", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " DataSourceExec: file_groups={10 groups: [[x:0..8192000], [x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], [x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], [x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected, physical_plan, &[Run::Distribution, Run::Sorting])?; + Ok(()) +} + +#[tokio::test] Review Comment: ```suggestion /// Ensures that `DataSourceExec` has been repartitioned into `target_partitions` file groups #[tokio::test] ``` ########## datafusion/common/src/config.rs: ########## @@ -632,14 +632,21 @@ config_namespace! { /// long runner execution, all types of joins may encounter out-of-memory errors. pub allow_symmetric_joins_without_pruning: bool, default = true - /// When set to `true`, file groups will be repartitioned to achieve maximum parallelism. - /// Currently Parquet and CSV formats are supported. + /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. + /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). /// - /// If set to `true`, all files will be repartitioned evenly (i.e., a single large file + /// For FileSources, only Parquet and CSV formats are currently supported. + /// + /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file /// might be partitioned into smaller chunks) for parallel scanning. - /// If set to `false`, different files will be read in parallel, but repartitioning won't + /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't /// happen within a single file. - pub repartition_file_scans: bool, default = true + /// + /// If set to `true` for an in-memory source, all memtable's partitions will have their batches + /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change + /// the total number of partitions and batches per partition, but does not slice the initial + /// record tables provided to the MemTable on creation. + pub repartition_datasource_scans: bool, default = true Review Comment: This name is better, however, I recommend using the old name to avoid changing the config API. ########## datafusion/datasource/src/memory.rs: ########## @@ -723,6 +761,222 @@ impl MemorySourceConfig { pub fn original_schema(&self) -> SchemaRef { Arc::clone(&self.schema) } + + /// Repartition while preserving order. + /// + /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such + /// as having too few batches to fulfill the `target_partitions` or if unable + /// to preserve output ordering. + fn repartition_preserving_order( + &self, + target_partitions: usize, + output_ordering: LexOrdering, + ) -> Result<Option<Vec<Vec<RecordBatch>>>> { + if !self.eq_properties().ordering_satisfy(&output_ordering) { + Ok(None) + } else if self.partitions.len() == 1 { + self.repartition_evenly_by_size(target_partitions) + } else { Review Comment: It would be great to include a simple example to explain this code block, like how is an input table be repartitioned, what is the key in the max heap, and how the state changes during each loop step. ########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3471,3 +3477,102 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +#[tokio::test] Review Comment: ```suggestion /// Ensures that `DataSourceExec` has been repartitioned into `target_partitions` file groups #[tokio::test] ``` ########## datafusion/datasource/src/memory.rs: ########## @@ -723,6 +761,222 @@ impl MemorySourceConfig { pub fn original_schema(&self) -> SchemaRef { Arc::clone(&self.schema) } + + /// Repartition while preserving order. + /// + /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such + /// as having too few batches to fulfill the `target_partitions` or if unable + /// to preserve output ordering. + fn repartition_preserving_order( + &self, + target_partitions: usize, + output_ordering: LexOrdering, + ) -> Result<Option<Vec<Vec<RecordBatch>>>> { + if !self.eq_properties().ordering_satisfy(&output_ordering) { + Ok(None) + } else if self.partitions.len() == 1 { + self.repartition_evenly_by_size(target_partitions) + } else { + let total_num_batches = + self.partitions.iter().map(|b| b.len()).sum::<usize>(); + if total_num_batches < target_partitions { + // no way to create the desired repartitioning + return Ok(None); + } + + let cnt_to_repartition = target_partitions - self.partitions.len(); + + // Label the current partitions and their order. + // Such that when we later split up the partitions into smaller sizes, we are maintaining the order. + let to_repartition = self + .partitions + .iter() + .enumerate() + .map(|(idx, batches)| RePartition { + idx: idx + (cnt_to_repartition * idx), // make space in ordering for split partitions + row_count: batches.iter().map(|batch| batch.num_rows()).sum(), + batches: batches.clone(), + }) + .collect_vec(); + + // Put all of the partitions into a heap ordered by `RePartition::partial_cmp`, which sizes + // by count of rows. + let mut max_heap = BinaryHeap::with_capacity(target_partitions); + for rep in to_repartition { + max_heap.push(rep); + } + + // Split the largest partitions into smaller partitions. Maintaining the output + // order of the partitions & newly created partitions. + let mut cannot_split_further = Vec::with_capacity(target_partitions); + for _ in 0..cnt_to_repartition { + loop { + let Some(to_split) = max_heap.pop() else { + break; + }; + + let mut new_partitions = to_split.split(); + if new_partitions.len() > 1 { + for new_partition in new_partitions { + max_heap.push(new_partition); + } + break; + } else { + cannot_split_further.push(new_partitions.remove(0)); + } + } + } + let mut partitions = max_heap.drain().collect_vec(); + partitions.extend(cannot_split_further); + + // Finally, sort all partitions by the output ordering. + partitions.sort_by_key(|p| p.idx); Review Comment: I think currently datafusion only uses partial ordering (the arg `output_ordering` of this function means the memory table before partitioning has such order within each partition, not necessarily mean it maintains order across partitions), see: https://github.com/apache/datafusion/blob/5e1214c55e37d198d732667b770943cfba4fe5c3/datafusion/physical-plan/src/execution_plan.rs#L533 If it is the case, this final sorting step is not necessary. However, I think it can be kept, since it's efficient. We might need additional API to do such repartitioning that further maintains global order, and this implementation can be reused. ########## datafusion/datasource/src/memory.rs: ########## @@ -723,6 +761,222 @@ impl MemorySourceConfig { pub fn original_schema(&self) -> SchemaRef { Arc::clone(&self.schema) } + + /// Repartition while preserving order. + /// + /// Returns `Ok(None)` if cannot fulfill the requested repartitioning, such + /// as having too few batches to fulfill the `target_partitions` or if unable + /// to preserve output ordering. + fn repartition_preserving_order( + &self, + target_partitions: usize, + output_ordering: LexOrdering, + ) -> Result<Option<Vec<Vec<RecordBatch>>>> { + if !self.eq_properties().ordering_satisfy(&output_ordering) { + Ok(None) + } else if self.partitions.len() == 1 { + self.repartition_evenly_by_size(target_partitions) + } else { Review Comment: It would be great to include a simple example to explain this code block, like how is an input table be repartitioned, what is the key in the max heap, and how the state changes during each loop step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org