This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d8946ca077 Fix `ArrowArrayStreamReader` for 0-columns record batch 
streams (#9405)
d8946ca077 is described below

commit d8946ca0775ab7fe0eef2fdea4b8bb3d55ec6664
Author: Jonas Dedden <[email protected]>
AuthorDate: Fri Feb 13 15:49:10 2026 +0100

    Fix `ArrowArrayStreamReader` for 0-columns record batch streams (#9405)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/9394
    
    # Rationale for this change
    
    PR https://github.com/apache/arrow-rs/pull/8944 introduced a regression
    that 0-column record batch streams could not longer be decoded.
    
    # What changes are included in this PR?
    
    - Construct `RecordBatch` with `try_new_with_options` using the `len` of
    the `ArrayData`, instead of letting it try to implicitly determine `len`
    by looking at the first column (this is what `try_new` does).
    - Slight refactor and reduction of code duplication of the existing
    `test_stream_round_trip_[import/export]` tests
    - Introduction of a new `test_stream_round_trip_no_columns` test
    
    # Are these changes tested?
    
    Yes, both export and import are tested in
    `test_stream_round_trip_no_columns`.
    
    # Are there any user-facing changes?
    
    0-column record batch streams should be decodable now.
---
 arrow-array/src/ffi_stream.rs | 75 ++++++++++++++++++++++---------------------
 1 file changed, 39 insertions(+), 36 deletions(-)

diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs
index c469436829..815d7c5760 100644
--- a/arrow-array/src/ffi_stream.rs
+++ b/arrow-array/src/ffi_stream.rs
@@ -66,6 +66,7 @@ use std::{
 use arrow_data::ffi::FFI_ArrowArray;
 use arrow_schema::{ArrowError, Schema, SchemaRef, ffi::FFI_ArrowSchema};
 
+use crate::RecordBatchOptions;
 use crate::array::Array;
 use crate::array::StructArray;
 use crate::ffi::from_ffi_and_data_type;
@@ -365,7 +366,12 @@ impl Iterator for ArrowArrayStreamReader {
                 from_ffi_and_data_type(array, 
DataType::Struct(self.schema().fields().clone()))
             };
             Some(result.and_then(|data| {
-                RecordBatch::try_new(self.schema.clone(), 
StructArray::from(data).into_parts().1)
+                let len = data.len();
+                RecordBatch::try_new_with_options(
+                    self.schema.clone(),
+                    StructArray::from(data).into_parts().1,
+                    &RecordBatchOptions::new().with_row_count(Some(len)),
+                )
             }))
         } else {
             let last_error = self.get_stream_last_error();
@@ -419,20 +425,7 @@ mod tests {
         }
     }
 
-    fn _test_round_trip_export(arrays: Vec<Arc<dyn Array>>) -> Result<()> {
-        let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
-        let schema = Arc::new(Schema::new_with_metadata(
-            vec![
-                Field::new("a", arrays[0].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-                Field::new("b", arrays[1].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-                Field::new("c", arrays[2].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-            ],
-            metadata,
-        ));
-        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+    fn _test_round_trip_export(batch: RecordBatch, schema: Arc<Schema>) -> 
Result<()> {
         let iter = Box::new(vec![batch.clone(), 
batch.clone()].into_iter().map(Ok)) as _;
 
         let reader = TestRecordBatchReader::new(schema.clone(), iter);
@@ -461,10 +454,12 @@ mod tests {
             }
 
             let array = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap();
+            let len = array.len();
 
-            let record_batch = RecordBatch::try_new(
+            let record_batch = RecordBatch::try_new_with_options(
                 SchemaRef::from(exported_schema.clone()),
                 StructArray::from(array).into_parts().1,
+                &RecordBatchOptions::new().with_row_count(Some(len)),
             )
             .unwrap();
             produced_batches.push(record_batch);
@@ -475,20 +470,7 @@ mod tests {
         Ok(())
     }
 
-    fn _test_round_trip_import(arrays: Vec<Arc<dyn Array>>) -> Result<()> {
-        let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
-        let schema = Arc::new(Schema::new_with_metadata(
-            vec![
-                Field::new("a", arrays[0].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-                Field::new("b", arrays[1].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-                Field::new("c", arrays[2].data_type().clone(), true)
-                    .with_metadata(metadata.clone()),
-            ],
-            metadata,
-        ));
-        let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+    fn _test_round_trip_import(batch: RecordBatch, schema: Arc<Schema>) -> 
Result<()> {
         let iter = Box::new(vec![batch.clone(), 
batch.clone()].into_iter().map(Ok)) as _;
 
         let reader = TestRecordBatchReader::new(schema.clone(), iter);
@@ -511,19 +493,40 @@ mod tests {
     }
 
     #[test]
-    fn test_stream_round_trip_export() -> Result<()> {
+    fn test_stream_round_trip() {
         let array = Int32Array::from(vec![Some(2), None, Some(1), None]);
         let array: Arc<dyn Array> = Arc::new(array);
+        let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
+
+        let schema = Arc::new(Schema::new_with_metadata(
+            vec![
+                Field::new("a", array.data_type().clone(), 
true).with_metadata(metadata.clone()),
+                Field::new("b", array.data_type().clone(), 
true).with_metadata(metadata.clone()),
+                Field::new("c", array.data_type().clone(), 
true).with_metadata(metadata.clone()),
+            ],
+            metadata,
+        ));
+        let batch = RecordBatch::try_new(schema.clone(), vec![array.clone(), 
array.clone(), array])
+            .unwrap();
 
-        _test_round_trip_export(vec![array.clone(), array.clone(), array])
+        _test_round_trip_export(batch.clone(), schema.clone()).unwrap();
+        _test_round_trip_import(batch, schema).unwrap();
     }
 
     #[test]
-    fn test_stream_round_trip_import() -> Result<()> {
-        let array = Int32Array::from(vec![Some(2), None, Some(1), None]);
-        let array: Arc<dyn Array> = Arc::new(array);
+    fn test_stream_round_trip_no_columns() {
+        let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
+
+        let schema = Arc::new(Schema::new_with_metadata(Vec::<Field>::new(), 
metadata));
+        let batch = RecordBatch::try_new_with_options(
+            schema.clone(),
+            Vec::<Arc<dyn Array>>::new(),
+            &RecordBatchOptions::new().with_row_count(Some(10)),
+        )
+        .unwrap();
 
-        _test_round_trip_import(vec![array.clone(), array.clone(), array])
+        _test_round_trip_export(batch.clone(), schema.clone()).unwrap();
+        _test_round_trip_import(batch, schema).unwrap();
     }
 
     #[test]

Reply via email to