This is an automated email from the ASF dual-hosted git repository.
Jefffrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new f03e1bc028 fix(ipc): handle duplicate projection indices in IPC reader
(#9952)
f03e1bc028 is described below
commit f03e1bc028630561af7d4adfc000916337ec6ce5
Author: pchintar <[email protected]>
AuthorDate: Wed Jun 3 06:06:16 2026 +0530
fix(ipc): handle duplicate projection indices in IPC reader (#9952)
# Which issue does this PR close?
- Closes #9950 .
# Rationale for this change
The current IPC reader does not correctly handle duplicate projection
indices.
`Schema::project`(in `arrow-schema/src/schema.rs`) and
`RecordBatch::project`(in `arrow-array/src/record_batch.rs`) both map
each requested index directly, preserve the projection order and allow
duplicate indices such as:
```rust id="n4pq0f"
vec![1, 1]
```
However, the IPC reader currently uses:
```rust id="gjklyo"
projection.iter().position(|p| p == &idx)
```
which only returns the first matching entry. As a result, only one
column is decoded even though the projected schema contains multiple
fields, leading to schema/column count mismatches when constructing the
`RecordBatch`.
This also affects reordered duplicate projections such as:
```rust id="jlwmku"
vec![2, 0, 2]
```
# What changes are included in this PR?
* Updated IPC projection handling in `arrow-ipc/src/reader.rs` to
preserve all matching projection entries
* Reused the decoded array for duplicate projection indices instead of
decoding the same field multiple times
* Preserved projection order for reordered duplicate projections
# Are these changes tested?
Yes.
Added `test_projection_duplicate_indices`, which verifies:
* duplicate projections (`vec![1, 1]`)
* reordered duplicate projections (`vec![2, 0, 2]`)
The test compares IPC projection results against `RecordBatch::project`.
The test fails before the fix and passes after it.
All existing `arrow-ipc` tests also pass `cargo test -p arrow-ipc --lib`
# Are there any user-facing changes?
No.
---
arrow-ipc/src/reader.rs | 43 ++++++++++++++++++++++++++++++++++++++-----
1 file changed, 38 insertions(+), 5 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 6d1466febf..6d1e799d43 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -560,11 +560,20 @@ impl<'a> RecordBatchDecoder<'a> {
let mut arrays = vec![];
// project fields
for (idx, field) in schema.fields().iter().enumerate() {
- // Create array for projected field
- if let Some(proj_idx) = projection.iter().position(|p| p ==
&idx) {
- let child = self.create_array(field, &mut
variadic_counts)?;
- arrays.push((proj_idx, child));
- } else {
+ // A projected field can appear more than once, so collect all
matching positions.
+ let mut child = None;
+ for (proj_idx, projected_idx) in projection.iter().enumerate()
{
+ if *projected_idx == idx {
+ if child.is_none() {
+ child = Some(self.create_array(field, &mut
variadic_counts)?);
+ }
+
+ // Reuse the decoded array for duplicate projection
entries.
+ arrays.push((proj_idx,
child.as_ref().unwrap().clone()));
+ }
+ }
+
+ if child.is_none() {
self.skip_field(field, &mut variadic_counts)?;
}
}
@@ -2297,6 +2306,30 @@ mod tests {
}
}
+ #[test]
+ fn test_projection_duplicate_indices() {
+ let schema = create_test_projection_schema();
+ let batch = create_test_projection_batch_data(&schema);
+
+ // Write the batch to IPC
+ let mut buf = Vec::new();
+ {
+ let mut writer = crate::writer::FileWriter::try_new(&mut buf,
&schema).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+ }
+
+ // Verify duplicate([1, 1]) and reordered([2, 0, 2]) projection indices
+ for projection in [vec![1, 1], vec![2, 0, 2]] {
+ let reader =
+ FileReader::try_new(std::io::Cursor::new(buf.clone()),
Some(projection.clone()));
+ let read_batch = reader.unwrap().next().unwrap().unwrap();
+
+ let expected_batch = batch.project(&projection).unwrap();
+ assert_eq!(read_batch, expected_batch);
+ }
+ }
+
#[test]
fn test_arrow_single_float_row() {
let schema = Schema::new(vec![