alamb commented on code in PR #7772:
URL: https://github.com/apache/arrow-datafusion/pull/7772#discussion_r1352722190


##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -138,6 +162,44 @@ impl SortTest {
         self
     }
 
+    async fn run_with_params(

Review Comment:
   Perhaps naming this `run_with_limit` would make it easier to understand what 
is happening



##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -22,89 +22,101 @@ use arrow::{
     compute::SortOptions,
     record_batch::RecordBatch,
 };
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use arrow_array::{Float64Array, StringArray};
+use core::iter;
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::{
+    datasource::MemTable,
+    execution::runtime_env::{RuntimeConfig, RuntimeEnv},
+};
+use datafusion_common::{
+    cast::{as_float64_array, as_string_array},
+    TableReference,
+};
 use datafusion_execution::memory_pool::GreedyMemoryPool;
-use rand::Rng;
+use datafusion_physical_expr::expressions::col;
+use rand::{rngs::StdRng, Rng, SeedableRng};
 use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_1k_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(10240)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(10240)
-        .with_should_spill(true)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(10240)
-        .with_should_spill(true)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, 
true)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(10240)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
 }
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_100k_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(102400)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(102400)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(102400)
-        .with_should_spill(true)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, 
true)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(102400)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
 }
 
 #[tokio::test]
 async fn test_sort_unlimited_mem() {
-    SortTest::new()
-        .with_int32_batches(5)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(20000)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
-
-    SortTest::new()
-        .with_int32_batches(1000000)
-        .with_pool_size(usize::MAX)
-        .with_should_spill(false)
-        .run()
-        .await;
+    for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, 
false)] {
+        SortTest::new()
+            .with_int32_batches(batch_size)
+            .with_pool_size(usize::MAX)
+            .with_should_spill(should_spill)
+            .run()
+            .await;
+    }
+}
+
+#[tokio::test]
+async fn test_sort_topk_i32() {

Review Comment:
   What would you think about encapsulating the limit data and expected value 
calculation in a structure? 
   
   So this test might look like
   
   ```rust
   let size: usize = ...; // pick a random size
   let scenario = TopKScenario::new()
      // tell the scenario to sort by one column
      .with_sort_column(["i32_column"])
      // specify a limit of 10 rows
       .with_limit(10);
   
   // stagger the batches in the scenario
   scenario.stagger()
   
   let collected = SortTest::new()
     // call Scenario::batches to get the input batches
     .with_input(scenario.batches());
   // run the test
     .run_with_limit("t", scenario.sort_cols(), scenario.limit()).await;
   
   // The scenario handles calculting expected output (as it knows the sort 
column and limit)
   let expected = scenario.expected()
   let actual =  batches_to_vec(&collected);
   assert_eq!(actual, &expected); 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to