wiedld commented on code in PR #15409:
URL: https://github.com/apache/datafusion/pull/15409#discussion_r2011691796


##########
datafusion/datasource/src/memory.rs:
##########
@@ -718,6 +750,181 @@ impl MemorySourceConfig {
     pub fn original_schema(&self) -> SchemaRef {
         Arc::clone(&self.schema)
     }
+
+    /// Repartition while preserving order.
+    ///
+    /// Returns None if cannot fulfill requested repartitioning.
+    fn repartition_preserving_order(
+        &self,
+        target_partitions: usize,
+        output_ordering: LexOrdering,
+    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
+        if !self.eq_properties().ordering_satisfy(&output_ordering) {
+            Ok(None)
+        } else if self.partitions.len() == 1 {
+            self.repartition_evenly_by_size(target_partitions)
+        } else {
+            let total_num_batches =
+                self.partitions.iter().map(|b| b.len()).sum::<usize>();
+            if total_num_batches < target_partitions {
+                // no way to create the desired repartitioning
+                return Ok(None);
+            }
+
+            let cnt_to_repartition = target_partitions - self.partitions.len();
+
+            let to_repartition = self
+                .partitions
+                .iter()
+                .enumerate()
+                .map(|(idx, batches)| RePartition {
+                    idx: idx + (cnt_to_repartition * idx), // make space in 
ordering for split partitions
+                    row_count: batches.iter().map(|batch| 
batch.num_rows()).sum(),
+                    batches: batches.clone(),
+                })
+                .collect_vec();
+
+            // split the largest partitions
+            let mut max_heap = BinaryHeap::with_capacity(target_partitions);
+            for rep in to_repartition {
+                max_heap.push(rep);
+            }
+            for _ in 0..cnt_to_repartition {
+                let Some(to_split) = max_heap.pop() else {
+                    unreachable!()
+                };
+                for new_partition in to_split.split() {
+                    max_heap.push(new_partition);
+                }
+            }
+            let mut partitions = max_heap.drain().collect_vec();
+            partitions.sort_by_key(|p| p.idx);
+            let partitions = partitions.into_iter().map(|rep| 
rep.batches).collect_vec();
+
+            Ok(Some(partitions))
+        }
+    }
+
+    /// Repartition into evenly sized chunks (as much as possible without 
batch splitting),
+    /// disregarding any ordering.
+    ///
+    /// Returns None if cannot fulfill requested repartitioning.
+    fn repartition_evenly_by_size(
+        &self,
+        target_partitions: usize,
+    ) -> Result<Option<Vec<Vec<RecordBatch>>>> {
+        // determine if we have enough total batches to fulfill request
+        let flatten_batches = 
self.partitions.clone().into_iter().flatten().collect_vec();
+        if flatten_batches.len() < target_partitions {
+            return Ok(None);
+        }
+
+        // repartition evenly
+        let total_num_rows = flatten_batches.iter().map(|b| 
b.num_rows()).sum::<usize>();
+        let target_partition_size = total_num_rows.div_ceil(target_partitions);
+        let mut partitions =
+            vec![Vec::with_capacity(flatten_batches.len()); target_partitions];
+        let mut curr_row_count = 0;
+        let mut next_idx = 0;
+        for batch in flatten_batches {
+            let row_cnt = batch.num_rows();
+
+            // handle very lopsided batch sizing
+            if partitions[next_idx].is_empty() {
+                partitions[next_idx].push(batch);
+            } else {
+                // have at least 1 batch per partition
+                let idx =
+                    std::cmp::min(next_idx + 1, curr_row_count / 
target_partition_size);
+                if let Some(partition) = partitions.get_mut(idx) {
+                    partition.push(batch);
+                } else {
+                    partitions[target_partitions - 1].push(batch);
+                }
+                next_idx = idx;
+            }

Review Comment:
   Note to self; this feels ugly and I should fix it 😅 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to