wiedld commented on code in PR #15409:
URL: https://github.com/apache/datafusion/pull/15409#discussion_r2011496396


##########
datafusion/datasource/src/memory.rs:
##########
@@ -902,4 +1108,319 @@ mod tests {
 
         Ok(())
     }
+
+    fn batch(row_size: usize) -> RecordBatch {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); 
row_size]));
+        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
+        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+    }
+
+    fn schema() -> SchemaRef {
+        batch(1).schema()
+    }
+
+    fn memorysrcconfig_no_partitions(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_1_batch(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_1_batch_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)], vec![batch(100)], 
vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_with_2_batches_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+        ];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), 
batch(1)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_2_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100_000), batch(10_000), batch(1_000)],
+            vec![batch(2_000), batch(20)],
+        ];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    /// Assert that we get the expected count of partitions after 
repartitioning.
+    ///
+    /// If None, then we expected the [`DataSource::repartitioned`] to return 
None.
+    fn assert_partitioning(
+        partitioned_datasrc: Option<Arc<dyn DataSource>>,
+        partition_cnt: Option<usize>,
+    ) {
+        let should_exist = if let Some(partition_cnt) = partition_cnt {
+            format!(
+                "new datasource should exist and have {:?} partitions",
+                partition_cnt
+            )
+        } else {
+            "new datasource should not exist".into()
+        };
+
+        let actual = partitioned_datasrc
+            .map(|datasrc| datasrc.output_partitioning().partition_count());
+        assert_eq!(
+            actual,
+            partition_cnt,
+            "partitioned datasrc does not match expected, we expected {}, 
instead found {:?}",
+            should_exist,
+            actual
+        );
+    }
+
+    fn run_all_test_scenarios(
+        output_ordering: Option<LexOrdering>,
+        sort_information_on_config: Vec<LexOrdering>,
+    ) -> Result<()> {
+        let not_used = usize::MAX;
+
+        // src has no partitions
+        let mem_src_config =
+            memorysrcconfig_no_partitions(sort_information_on_config.clone())?;
+        let partitioned_datasrc =
+            mem_src_config.repartitioned(1, not_used, 
output_ordering.clone())?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has partitions == target partitions (=1)
+        let target_partitions = 1;
+        let mem_src_config =
+            
memorysrcconfig_1_partition_1_batch(sort_information_on_config.clone())?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has partitions == target partitions (=3)
+        let target_partitions = 3;
+        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has partitions > target partitions, but we don't merge them
+        let target_partitions = 2;
+        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has partitions < target partitions, but not enough batches (per 
partition) to split into more partitions
+        let target_partitions = 4;
+        let mem_src_config = memorysrcconfig_3_partitions_1_batch_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has partitions < target partitions, and can split to sufficient 
amount
+        // has 6 batches across 3 partitions. Will need to split 2 of it's 
partitions.
+        let target_partitions = 5;
+        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, Some(5));
+
+        // src has partitions < target partitions, and can split to sufficient 
amount
+        // has 6 batches across 3 partitions. Will need to split all of it's 
partitions.
+        let target_partitions = 6;
+        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, Some(6));
+
+        // src has partitions < target partitions, but not enough total 
batches to fulfill the split (desired target_partitions)
+        let target_partitions = 3 * 2 + 1;
+        let mem_src_config = memorysrcconfig_3_partitions_with_2_batches_each(
+            sort_information_on_config.clone(),
+        )?;
+        let partitioned_datasrc = mem_src_config.repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering.clone(),
+        )?;
+        assert_partitioning(partitioned_datasrc, None);
+
+        // src has 1 partition with many batches of lopsided sizes
+        // make sure it handles the split properly
+        let target_partitions = 2;
+        let mem_src_config = 
memorysrcconfig_1_partition_with_different_sized_batches(
+            sort_information_on_config,
+        )?;
+        let partitioned_datasrc = mem_src_config.clone().repartitioned(
+            target_partitions,
+            not_used,
+            output_ordering,
+        )?;
+        assert_partitioning(partitioned_datasrc.clone(), Some(2));
+        // Starting = batch(100_000), batch(10_000), batch(100), batch(1).
+        // It should have split as p1=batch(100_000), p2=[batch(10_000), 
batch(100), batch(1)]
+        let repartitioned_raw_batches = mem_src_config
+            .repartition_evenly_by_size(target_partitions)?
+            .unwrap();
+        assert_eq!(repartitioned_raw_batches.len(), 2);
+        let [ref p1, ref p2] = repartitioned_raw_batches[..] else {
+            unreachable!()
+        };
+        // p1=batch(100_000)
+        assert_eq!(p1.len(), 1);
+        assert_eq!(p1[0].num_rows(), 100_000);
+        // p2=[batch(10_000), batch(100), batch(1)]
+        assert_eq!(p2.len(), 3);
+        assert_eq!(p2[0].num_rows(), 10_000);
+        assert_eq!(p2[1].num_rows(), 100);
+        assert_eq!(p2[2].num_rows(), 1);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_repartition_no_sort_information_no_output_ordering() -> Result<()> 
{
+        let no_sort = vec![];
+        let no_output_ordering = None;
+
+        // Test: common set of functionality
+        run_all_test_scenarios(no_output_ordering.clone(), no_sort.clone())?;
+
+        // Test: does not preserve separate partitions (with own internal 
ordering) on even split
+        let target_partitions = 3;
+        let mem_src_config =
+            memorysrcconfig_2_partition_with_different_sized_batches(no_sort)?;
+        let partitioned_datasrc = mem_src_config.clone().repartitioned(
+            target_partitions,
+            usize::MAX,
+            no_output_ordering,
+        )?;
+        assert_partitioning(partitioned_datasrc.clone(), Some(3));
+        // Starting = batch(100_000), batch(10_000), batch(1_000), 
batch(2_000), batch(20)
+        // It should have split as p1=batch(100_000), p2=batch(10_000),  
p3=rest(mixed across original partitions)
+        let repartitioned_raw_batches = mem_src_config
+            .repartition_evenly_by_size(target_partitions)?
+            .unwrap();
+        assert_eq!(repartitioned_raw_batches.len(), 3);
+        let [ref p1, ref p2, ref p3] = repartitioned_raw_batches[..] else {
+            unreachable!()
+        };
+        // p1=batch(100_000)
+        assert_eq!(p1.len(), 1);
+        assert_eq!(p1[0].num_rows(), 100_000);
+        // p2=batch(10_000)
+        assert_eq!(p2.len(), 1);
+        assert_eq!(p2[0].num_rows(), 10_000);
+        // p3= batch(1_000), batch(2_000), batch(20)
+        assert_eq!(p3.len(), 3);
+        assert_eq!(p3[0].num_rows(), 1_000);
+        assert_eq!(p3[1].num_rows(), 2_000);
+        assert_eq!(p3[2].num_rows(), 20);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_repartition_with_sort_information() -> Result<()> {
+        let schema = schema();
+        let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
+            expr: col("c", &schema).unwrap(),
+            options: SortOptions::default(),
+        }]);
+        let has_sort = vec![sort_key.clone()];
+        let output_ordering = Some(sort_key);
+
+        // Test: common set of functionality
+        run_all_test_scenarios(output_ordering.clone(), has_sort.clone())?;
+
+        // Test: DOES preserve separate partitions (with own internal ordering)

Review Comment:
   Here is the preservation of sort ordering. Again, using the 
[FileSourceConfig 
implementation](https://github.com/apache/datafusion/pull/15409/files#r2011495377)
 as a guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to