alamb commented on code in PR #8146: URL: https://github.com/apache/arrow-rs/pull/8146#discussion_r2280415061
########## arrow-select/src/coalesce.rs: ########## @@ -1314,4 +1366,269 @@ mod tests { let options = RecordBatchOptions::new().with_row_count(Some(row_count)); RecordBatch::try_new_with_options(schema, columns, &options).unwrap() } + + /// Helper function to create a test batch with specified number of rows + fn create_test_batch(num_rows: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])); + let array = Int32Array::from_iter_values(0..num_rows as i32); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + + #[test] + fn test_biggest_coalesce_batch_size_none_default() { + // Test that default behavior (None) coalesces all batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + + // Push a large batch (1000 rows) - should be coalesced normally + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch).unwrap(); + + // Should produce multiple batches of target size (100) + let mut output_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + coalescer.finish_buffered_batch().unwrap(); + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + // Should have 10 batches of 100 rows each + assert_eq!(output_batches.len(), 10); + for batch in output_batches { + assert_eq!(batch.num_rows(), 100); + } + } + + #[test] + fn test_biggest_coalesce_batch_size_bypass_large_batch() { + // Test that batches larger than biggest_coalesce_batch_size bypass coalescing + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push a large batch (1000 rows) - should bypass coalescing + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch.clone()).unwrap(); + + // Should have one completed batch immediately (the original large batch) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 1000); + + // Should be no more completed batches + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_coalesce_small_batch() { + // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push small batches that should be coalesced + let small_batch = create_test_batch(50); + coalescer.push_batch(small_batch.clone()).unwrap(); + + // Should not have completed batch yet (only 50 rows, target is 100) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 50); + + // Push another small batch + coalescer.push_batch(small_batch).unwrap(); + + // Now should have a completed batch (100 rows total) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 100); + + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_equal_boundary() { + // Test behavior when batch size equals biggest_coalesce_batch_size + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push a batch exactly equal to the limit + let boundary_batch = create_test_batch(500); + coalescer.push_batch(boundary_batch).unwrap(); + + // Should be coalesced (not bypass) since it's equal, not greater + let mut output_count = 0; + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + coalescer.finish_buffered_batch().unwrap(); + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + // Should have 5 batches of 100 rows each + assert_eq!(output_count, 5); + } + + #[test] + fn test_biggest_coalesce_batch_size_mixed_batches() { + // Test mixing large (bypass) and small (coalesce) batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(300); + + let small_batch = create_test_batch(50); + let large_batch = create_test_batch(400); + + // Push small batch first + coalescer.push_batch(small_batch.clone()).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + assert!(!coalescer.has_completed_batch()); + + // Push large batch - should flush buffer and add large batch to completed + coalescer.push_batch(large_batch).unwrap(); + + // Should have two completed batches: + // 1. The flushed buffer (50 rows) + // 2. The large batch (400 rows) + assert!(coalescer.has_completed_batch()); + + let first_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(first_batch.num_rows(), 50); // flushed buffer + + let second_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(second_batch.num_rows(), 400); // large batch + + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Push another small batch to verify normal coalescing continues + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + assert!(!coalescer.has_completed_batch()); + } + + #[test] + fn test_biggest_coalesce_batch_size_empty_batch() { + // Test that empty batches don't trigger the bypass logic + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(50); + + let empty_batch = create_test_batch(0); + coalescer.push_batch(empty_batch).unwrap(); + + // Empty batch should be handled normally (no effect) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_with_buffered_data_flush() { + // Test that buffered data is properly flushed before adding large batch + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(200); + + // Add some buffered data first + let small_batch = create_test_batch(30); + coalescer.push_batch(small_batch.clone()).unwrap(); + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 60); + + // Push large batch that should bypass coalescing + let large_batch = create_test_batch(250); + coalescer.push_batch(large_batch).unwrap(); + + // Should flush the buffer first, then add the large batch + assert!(coalescer.has_completed_batch()); + + let flushed_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(flushed_batch.num_rows(), 60); // buffered data + + let large_output = coalescer.next_completed_batch().unwrap(); + assert_eq!(large_output.num_rows(), 250); // large batch + + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_zero_limit() { + // Test edge case where limit is 0 (all batches bypass) + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(0); + + // Even a 1-row batch should bypass + let tiny_batch = create_test_batch(1); + coalescer.push_batch(tiny_batch).unwrap(); + + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 1); + } + + #[test] + fn test_biggest_coalesce_batch_size_getter_setter() { + // Test that we can get and set the biggest_coalesce_batch_size + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + + // Initially None + assert_eq!(coalescer.biggest_coalesce_batch_size, None); + + // Set a value + coalescer.biggest_coalesce_batch_size = Some(500); + assert_eq!(coalescer.biggest_coalesce_batch_size, Some(500)); + + // Set back to None + coalescer.biggest_coalesce_batch_size = None; + assert_eq!(coalescer.biggest_coalesce_batch_size, None); + } + + #[test] + fn test_biggest_coalesce_batch_size_constructor_method() { Review Comment: I don't really understand this test ########## arrow-select/src/coalesce.rs: ########## @@ -236,6 +262,13 @@ impl BatchCoalescer { /// assert_eq!(completed_batch, expected_batch); /// ``` pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { + if let Some(limit) = self.biggest_coalesce_batch_size { Review Comment: I don't fully understand this comment -- I think the code in this PR is a noop when there are no in progress batches (because finish_buffered_batch will not do anything) ########## arrow-select/src/coalesce.rs: ########## @@ -1314,4 +1366,269 @@ mod tests { let options = RecordBatchOptions::new().with_row_count(Some(row_count)); RecordBatch::try_new_with_options(schema, columns, &options).unwrap() } + + /// Helper function to create a test batch with specified number of rows + fn create_test_batch(num_rows: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])); + let array = Int32Array::from_iter_values(0..num_rows as i32); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + + #[test] + fn test_biggest_coalesce_batch_size_none_default() { + // Test that default behavior (None) coalesces all batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + + // Push a large batch (1000 rows) - should be coalesced normally + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch).unwrap(); + + // Should produce multiple batches of target size (100) + let mut output_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + coalescer.finish_buffered_batch().unwrap(); + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + // Should have 10 batches of 100 rows each + assert_eq!(output_batches.len(), 10); + for batch in output_batches { + assert_eq!(batch.num_rows(), 100); + } + } + + #[test] + fn test_biggest_coalesce_batch_size_bypass_large_batch() { + // Test that batches larger than biggest_coalesce_batch_size bypass coalescing + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push a large batch (1000 rows) - should bypass coalescing + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch.clone()).unwrap(); + + // Should have one completed batch immediately (the original large batch) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 1000); + + // Should be no more completed batches + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_coalesce_small_batch() { + // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push small batches that should be coalesced + let small_batch = create_test_batch(50); + coalescer.push_batch(small_batch.clone()).unwrap(); + + // Should not have completed batch yet (only 50 rows, target is 100) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 50); + + // Push another small batch + coalescer.push_batch(small_batch).unwrap(); + + // Now should have a completed batch (100 rows total) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 100); + + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_equal_boundary() { + // Test behavior when batch size equals biggest_coalesce_batch_size + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(500); + + // Push a batch exactly equal to the limit + let boundary_batch = create_test_batch(500); + coalescer.push_batch(boundary_batch).unwrap(); + + // Should be coalesced (not bypass) since it's equal, not greater + let mut output_count = 0; + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + coalescer.finish_buffered_batch().unwrap(); + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + // Should have 5 batches of 100 rows each + assert_eq!(output_count, 5); + } + + #[test] + fn test_biggest_coalesce_batch_size_mixed_batches() { + // Test mixing large (bypass) and small (coalesce) batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(300); + + let small_batch = create_test_batch(50); + let large_batch = create_test_batch(400); + + // Push small batch first + coalescer.push_batch(small_batch.clone()).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + assert!(!coalescer.has_completed_batch()); + + // Push large batch - should flush buffer and add large batch to completed + coalescer.push_batch(large_batch).unwrap(); + + // Should have two completed batches: + // 1. The flushed buffer (50 rows) + // 2. The large batch (400 rows) + assert!(coalescer.has_completed_batch()); + + let first_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(first_batch.num_rows(), 50); // flushed buffer + + let second_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(second_batch.num_rows(), 400); // large batch + + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Push another small batch to verify normal coalescing continues + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + assert!(!coalescer.has_completed_batch()); + } + + #[test] + fn test_biggest_coalesce_batch_size_empty_batch() { + // Test that empty batches don't trigger the bypass logic + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(50); + + let empty_batch = create_test_batch(0); + coalescer.push_batch(empty_batch).unwrap(); + + // Empty batch should be handled normally (no effect) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_with_buffered_data_flush() { + // Test that buffered data is properly flushed before adding large batch + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(200); + + // Add some buffered data first + let small_batch = create_test_batch(30); + coalescer.push_batch(small_batch.clone()).unwrap(); + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 60); + + // Push large batch that should bypass coalescing + let large_batch = create_test_batch(250); + coalescer.push_batch(large_batch).unwrap(); + + // Should flush the buffer first, then add the large batch + assert!(coalescer.has_completed_batch()); + + let flushed_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(flushed_batch.num_rows(), 60); // buffered data + + let large_output = coalescer.next_completed_batch().unwrap(); + assert_eq!(large_output.num_rows(), 250); // large batch + + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_zero_limit() { + // Test edge case where limit is 0 (all batches bypass) + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.biggest_coalesce_batch_size = Some(0); + + // Even a 1-row batch should bypass + let tiny_batch = create_test_batch(1); + coalescer.push_batch(tiny_batch).unwrap(); + + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 1); + } + + #[test] + fn test_biggest_coalesce_batch_size_getter_setter() { Review Comment: I am not sure it is strictly necessary to test the getter/setter ########## arrow-select/src/coalesce.rs: ########## @@ -166,9 +171,30 @@ impl BatchCoalescer { // We will for sure store at least one completed batch completed: VecDeque::with_capacity(1), buffered_rows: 0, + biggest_coalesce_batch_size: None, } } + /// Set the biggest coalesce batch size limit + /// + /// If set to Some(limit), batches larger than this limit will bypass + /// coalescing and be passed through directly. If None, all batches + /// will be coalesced according to the target_batch_size. + pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self { + self.biggest_coalesce_batch_size = limit; + self + } + + /// Get the current biggest coalesce batch size limit + pub fn get_biggest_coalesce_batch_size(&self) -> Option<usize> { Review Comment: I think most of the rest of this crate doesn't use the `get_` prefix for getters ```suggestion pub fn biggest_coalesce_batch_size(&self) -> Option<usize> { ``` ########## arrow-select/src/coalesce.rs: ########## @@ -290,6 +323,25 @@ impl BatchCoalescer { Ok(()) } + /// Returns the number of buffered rows + pub fn get_buffered_rows(&self) -> usize { + self.buffered_rows + } + + /// Push a batch directly to the completed batches + pub fn flush_buffer_and_push_batch_to_completed( Review Comment: I recommend making this non-pub to keep the API smaller and the code easier to use ```suggestion fn flush_buffer_and_push_batch_to_completed( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org