This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new bf3520d9ee fix: return error instead of panic on schema mismatch in
BatchCoalescer::push_batch (#9390)
bf3520d9ee is described below
commit bf3520d9ee629afe209e55ff303902ebaf82dbde
Author: Bruno Volpato <[email protected]>
AuthorDate: Wed Feb 11 14:15:45 2026 -0500
fix: return error instead of panic on schema mismatch in
BatchCoalescer::push_batch (#9390)
# Which issue does this PR close?
- Closes #9389.
# Rationale for this change
`BatchCoalescer::push_batch` currently uses a hard `assert_eq!` to check
that the incoming batch has the same number of columns as the
coalescer's schema. If there's a mismatch, the whole process panics.
The function already returns `Result<(), ArrowError>`, so there's no
reason this can't be an error return instead. This is also how the rest
of the arrow API handles the same situation — `RecordBatch::try_new`
returns `Err(ArrowError::InvalidArgumentError)` for column count
mismatches, and other checks in the same struct use `debug_assert!`.
We ran into this in production where a connector returned batches with a
different schema than the plan expected. Instead of a query-level error,
the whole process went down.
# What changes are included in this PR?
- Replace `assert_eq!(arrays.len(), self.in_progress_arrays.len())` with
an `if` check that returns `Err(ArrowError::InvalidArgumentError(...))`.
- Add three tests covering both directions of mismatch (fewer columns,
more columns, zero-vs-two).
# Are these changes tested?
Yes — three new tests:
- `test_push_batch_schema_mismatch_fewer_columns` — coalescer expects 0
columns, batch has 1
- `test_push_batch_schema_mismatch_more_columns` — coalescer expects 2
columns, batch has 1
- `test_push_batch_schema_mismatch_two_vs_zero` — coalescer expects 0
columns, batch has 2 (matches the exact error we saw in production)
# Are there any user-facing changes?
`BatchCoalescer::push_batch` now returns
`Err(ArrowError::InvalidArgumentError)` on column count mismatch instead
of panicking. Any caller that was relying on the panic (unlikely) would
need to handle the error instead.
---
arrow-select/src/coalesce.rs | 69 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 67 insertions(+), 2 deletions(-)
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 5ea2d97e78..8fe88fb8c3 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -458,8 +458,14 @@ impl BatchCoalescer {
let (_schema, arrays, mut num_rows) = batch.into_parts();
- // setup input rows
- assert_eq!(arrays.len(), self.in_progress_arrays.len());
+ // Validate column count matches the expected schema
+ if arrays.len() != self.in_progress_arrays.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Batch has {} columns but BatchCoalescer expects {}",
+ arrays.len(),
+ self.in_progress_arrays.len()
+ )));
+ }
self.in_progress_arrays
.iter_mut()
.zip(arrays)
@@ -2178,4 +2184,63 @@ mod tests {
assert_eq!(expected, actual);
}
+
+ #[test]
+ fn test_push_batch_schema_mismatch_fewer_columns() {
+ // Coalescer expects 0 columns, batch has 1
+ let empty_schema = Arc::new(Schema::empty());
+ let mut coalescer = BatchCoalescer::new(empty_schema, 100);
+ let batch = uint32_batch(0..5);
+ let result = coalescer.push_batch(batch);
+ assert!(result.is_err());
+ let err = result.unwrap_err().to_string();
+ assert!(
+ err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_push_batch_schema_mismatch_more_columns() {
+ // Coalescer expects 2 columns, batch has 1
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c0", DataType::UInt32, false),
+ Field::new("c1", DataType::UInt32, false),
+ ]));
+ let mut coalescer = BatchCoalescer::new(schema, 100);
+ let batch = uint32_batch(0..5);
+ let result = coalescer.push_batch(batch);
+ assert!(result.is_err());
+ let err = result.unwrap_err().to_string();
+ assert!(
+ err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_push_batch_schema_mismatch_two_vs_zero() {
+ // Coalescer expects 0 columns, batch has 2
+ let empty_schema = Arc::new(Schema::empty());
+ let mut coalescer = BatchCoalescer::new(empty_schema, 100);
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c0", DataType::UInt32, false),
+ Field::new("c1", DataType::UInt32, false),
+ ]));
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(UInt32Array::from(vec![1, 2, 3])),
+ Arc::new(UInt32Array::from(vec![4, 5, 6])),
+ ],
+ )
+ .unwrap();
+ let result = coalescer.push_batch(batch);
+ assert!(result.is_err());
+ let err = result.unwrap_err().to_string();
+ assert!(
+ err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
+ "unexpected error: {err}"
+ );
+ }
}