zhuqi-lucas commented on code in PR #8146:
URL: https://github.com/apache/arrow-rs/pull/8146#discussion_r2280418650


##########
arrow-select/src/coalesce.rs:
##########
@@ -1314,4 +1366,269 @@ mod tests {
         let options = 
RecordBatchOptions::new().with_row_count(Some(row_count));
         RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
     }
+
+    /// Helper function to create a test batch with specified number of rows
+    fn create_test_batch(num_rows: usize) -> RecordBatch {
+        let schema = Arc::new(Schema::new(vec![Field::new("c0", 
DataType::Int32, false)]));
+        let array = Int32Array::from_iter_values(0..num_rows as i32);
+        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_none_default() {
+        // Test that default behavior (None) coalesces all batches
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+
+        // Push a large batch (1000 rows) - should be coalesced normally
+        let large_batch = create_test_batch(1000);
+        coalescer.push_batch(large_batch).unwrap();
+
+        // Should produce multiple batches of target size (100)
+        let mut output_batches = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            output_batches.push(batch);
+        }
+
+        coalescer.finish_buffered_batch().unwrap();
+        while let Some(batch) = coalescer.next_completed_batch() {
+            output_batches.push(batch);
+        }
+
+        // Should have 10 batches of 100 rows each
+        assert_eq!(output_batches.len(), 10);
+        for batch in output_batches {
+            assert_eq!(batch.num_rows(), 100);
+        }
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
+        // Test that batches larger than biggest_coalesce_batch_size bypass 
coalescing
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(500);
+
+        // Push a large batch (1000 rows) - should bypass coalescing
+        let large_batch = create_test_batch(1000);
+        coalescer.push_batch(large_batch.clone()).unwrap();
+
+        // Should have one completed batch immediately (the original large 
batch)
+        assert!(coalescer.has_completed_batch());
+        let output_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output_batch.num_rows(), 1000);
+
+        // Should be no more completed batches
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
+        // Test that batches smaller than biggest_coalesce_batch_size are 
coalesced normally
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(500);
+
+        // Push small batches that should be coalesced
+        let small_batch = create_test_batch(50);
+        coalescer.push_batch(small_batch.clone()).unwrap();
+
+        // Should not have completed batch yet (only 50 rows, target is 100)
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+
+        // Push another small batch
+        coalescer.push_batch(small_batch).unwrap();
+
+        // Now should have a completed batch (100 rows total)
+        assert!(coalescer.has_completed_batch());
+        let output_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output_batch.num_rows(), 100);
+
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_equal_boundary() {
+        // Test behavior when batch size equals biggest_coalesce_batch_size
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(500);
+
+        // Push a batch exactly equal to the limit
+        let boundary_batch = create_test_batch(500);
+        coalescer.push_batch(boundary_batch).unwrap();
+
+        // Should be coalesced (not bypass) since it's equal, not greater
+        let mut output_count = 0;
+        while coalescer.next_completed_batch().is_some() {
+            output_count += 1;
+        }
+
+        coalescer.finish_buffered_batch().unwrap();
+        while coalescer.next_completed_batch().is_some() {
+            output_count += 1;
+        }
+
+        // Should have 5 batches of 100 rows each
+        assert_eq!(output_count, 5);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_mixed_batches() {
+        // Test mixing large (bypass) and small (coalesce) batches
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(300);
+
+        let small_batch = create_test_batch(50);
+        let large_batch = create_test_batch(400);
+
+        // Push small batch first
+        coalescer.push_batch(small_batch.clone()).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+        assert!(!coalescer.has_completed_batch());
+
+        // Push large batch - should flush buffer and add large batch to 
completed
+        coalescer.push_batch(large_batch).unwrap();
+
+        // Should have two completed batches:
+        // 1. The flushed buffer (50 rows)
+        // 2. The large batch (400 rows)
+        assert!(coalescer.has_completed_batch());
+
+        let first_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(first_batch.num_rows(), 50); // flushed buffer
+
+        let second_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(second_batch.num_rows(), 400); // large batch
+
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+
+        // Push another small batch to verify normal coalescing continues
+        coalescer.push_batch(small_batch).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+        assert!(!coalescer.has_completed_batch());
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_empty_batch() {
+        // Test that empty batches don't trigger the bypass logic
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(50);
+
+        let empty_batch = create_test_batch(0);
+        coalescer.push_batch(empty_batch).unwrap();
+
+        // Empty batch should be handled normally (no effect)
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_with_buffered_data_flush() {
+        // Test that buffered data is properly flushed before adding large 
batch
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(200);
+
+        // Add some buffered data first
+        let small_batch = create_test_batch(30);
+        coalescer.push_batch(small_batch.clone()).unwrap();
+        coalescer.push_batch(small_batch).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 60);
+
+        // Push large batch that should bypass coalescing
+        let large_batch = create_test_batch(250);
+        coalescer.push_batch(large_batch).unwrap();
+
+        // Should flush the buffer first, then add the large batch
+        assert!(coalescer.has_completed_batch());
+
+        let flushed_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(flushed_batch.num_rows(), 60); // buffered data
+
+        let large_output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(large_output.num_rows(), 250); // large batch
+
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_zero_limit() {
+        // Test edge case where limit is 0 (all batches bypass)
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.biggest_coalesce_batch_size = Some(0);
+
+        // Even a 1-row batch should bypass
+        let tiny_batch = create_test_batch(1);
+        coalescer.push_batch(tiny_batch).unwrap();
+
+        assert!(coalescer.has_completed_batch());
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 1);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_getter_setter() {
+        // Test that we can get and set the biggest_coalesce_batch_size
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+
+        // Initially None
+        assert_eq!(coalescer.biggest_coalesce_batch_size, None);
+
+        // Set a value
+        coalescer.biggest_coalesce_batch_size = Some(500);
+        assert_eq!(coalescer.biggest_coalesce_batch_size, Some(500));
+
+        // Set back to None
+        coalescer.biggest_coalesce_batch_size = None;
+        assert_eq!(coalescer.biggest_coalesce_batch_size, None);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_constructor_method() {

Review Comment:
   Removed it in latest PR, i also think it's not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to