This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 8b67776703 Add Avro Reader projection API (#9162)
8b67776703 is described below
commit 8b67776703dca58c0a44fa6cc7c856a5242468a7
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Jan 15 11:20:40 2026 -0600
Add Avro Reader projection API (#9162)
# Which issue does this PR close?
- Closes #8923.
# Rationale for this change
The `arrow-avro` crate's `ReaderBuilder` previously lacked the ability
to project (select) specific columns when reading Avro files. This is a
common feature in other Arrow readers (like `arrow-csv` and `arrow-ipc`)
that enables users to read only the columns they need, improving
performance and reducing memory usage.
# What changes are included in this PR?
- Added a `with_projection(projection: Vec<usize>)` method to
`ReaderBuilder` that accepts zero-based column indices
- Implemented `AvroSchema::project()` method to create a projected Avro
schema with only the selected fields
- The projection supports:
- Selecting a subset of fields
- Reordering fields
- Preserving all record and field metadata (namespace, doc, defaults,
aliases, etc.)
- Preserving nested/complex types (records, arrays, maps, unions)
- Added validation for out-of-bounds indices and duplicate indices
# Are these changes tested?
Yes, comprehensive tests have been added:
- Unit tests for `AvroSchema::project()` covering:
- Empty projections
- Single and multiple field selection
- Field reordering
- Metadata preservation (record-level and field-level)
- Nested records and complex types (arrays, maps, unions)
- Error cases (invalid JSON, non-record schemas, out-of-bounds indices,
duplicate indices)
- Integration tests in the reader module for end-to-end projection with
OCF files
# Are there any user-facing changes?
Yes, this adds a new public API method:
```rust
impl ReaderBuilder {
/// Set a projection of columns to read (zero-based column indices).
pub fn with_projection(self, projection: Vec<usize>) -> Self
}
```
This is consistent with the projection API in `arrow-csv::ReaderBuilder`
and `arrow-ipc::FileReaderBuilder`. There are no breaking changes to
existing APIs.
---
arrow-avro/src/reader/mod.rs | 387 +++++++++++++++++++++++++++-
arrow-avro/src/schema.rs | 602 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 981 insertions(+), 8 deletions(-)
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 546650faf5..b5750cc6a3 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -482,8 +482,8 @@
use crate::codec::AvroFieldBuilder;
use crate::reader::header::read_header;
use crate::schema::{
- AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm,
SINGLE_OBJECT_MAGIC, Schema,
- SchemaStore,
+ AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm,
SCHEMA_METADATA_KEY,
+ SINGLE_OBJECT_MAGIC, Schema, SchemaStore,
};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
@@ -890,6 +890,17 @@ impl Decoder {
/// See `Self::with_strict_mode`.
/// * **`reader_schema`**: Optional reader schema (projection / evolution)
used when decoding
/// values (default: `None`). See `Self::with_reader_schema`.
+/// * **`projection`**: Optional projection of **top‑level record fields** by
index (default: `None`).
+///
+/// If set, the effective reader schema is **pruned** to include only the
projected fields, in the
+/// specified order:
+///
+/// * If a reader schema is provided, that schema is pruned.
+/// * Otherwise, a reader schema is derived from the writer schema and then
pruned.
+/// * For streaming `Decoder` with multiple writer schemas and no reader
schema, a projected reader
+/// schema is derived **per writer schema** in the `SchemaStore`.
+///
+/// See `Self::with_projection`.
/// * **`writer_schema_store`**: Required for building a `Decoder` for
single‑object or
/// Confluent framing. Maps fingerprints to Avro schemas. See
`Self::with_writer_schema_store`.
/// * **`active_fingerprint`**: Optional starting fingerprint for streaming
decode when the
@@ -931,6 +942,7 @@ pub struct ReaderBuilder {
strict_mode: bool,
utf8_view: bool,
reader_schema: Option<AvroSchema>,
+ projection: Option<Vec<usize>>,
writer_schema_store: Option<SchemaStore>,
active_fingerprint: Option<Fingerprint>,
}
@@ -942,6 +954,7 @@ impl Default for ReaderBuilder {
strict_mode: false,
utf8_view: false,
reader_schema: None,
+ projection: None,
writer_schema_store: None,
active_fingerprint: None,
}
@@ -955,6 +968,7 @@ impl ReaderBuilder {
/// * `strict_mode = false`
/// * `utf8_view = false`
/// * `reader_schema = None`
+ /// * `projection = None`
/// * `writer_schema_store = None`
/// * `active_fingerprint = None`
pub fn new() -> Self {
@@ -1017,8 +1031,33 @@ impl ReaderBuilder {
.ok_or_else(|| {
ArrowError::ParseError("No Avro schema present in file
header".into())
})?;
+ let projected_reader_schema = self
+ .projection
+ .as_deref()
+ .map(|projection| {
+ let base_schema = if let Some(reader_schema) =
reader_schema {
+ reader_schema.clone()
+ } else {
+ let raw = hdr.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+ ArrowError::ParseError(
+ "No Avro schema present in file
header".to_string(),
+ )
+ })?;
+ let json_string = std::str::from_utf8(raw)
+ .map_err(|e| {
+ ArrowError::ParseError(format!(
+ "Invalid UTF-8 in Avro schema header: {e}"
+ ))
+ })?
+ .to_string();
+ AvroSchema::new(json_string)
+ };
+ base_schema.project(projection)
+ })
+ .transpose()?;
+ let effective_reader_schema =
projected_reader_schema.as_ref().or(reader_schema);
let record_decoder =
- self.make_record_decoder_from_schemas(&writer_schema,
reader_schema)?;
+ self.make_record_decoder_from_schemas(&writer_schema,
effective_reader_schema)?;
return Ok(self.make_decoder_with_parts(
record_decoder,
None,
@@ -1041,6 +1080,11 @@ impl ReaderBuilder {
.ok_or_else(|| {
ArrowError::ParseError("Could not determine initial schema
fingerprint".into())
})?;
+ let projection = self.projection.as_deref();
+ let projected_reader_schema = match (projection, reader_schema) {
+ (Some(projection), Some(reader_schema)) =>
Some(reader_schema.project(projection)?),
+ _ => None,
+ };
let mut cache =
IndexMap::with_capacity(fingerprints.len().saturating_sub(1));
let mut active_decoder: Option<RecordDecoder> = None;
for fingerprint in store.fingerprints() {
@@ -1053,8 +1097,23 @@ impl ReaderBuilder {
}
};
let writer_schema = avro_schema.schema()?;
- let record_decoder =
- self.make_record_decoder_from_schemas(&writer_schema,
reader_schema)?;
+ let record_decoder = match projection {
+ None => self.make_record_decoder_from_schemas(&writer_schema,
reader_schema)?,
+ Some(projection) => {
+ if let Some(ref pruned_reader_schema) =
projected_reader_schema {
+ self.make_record_decoder_from_schemas(
+ &writer_schema,
+ Some(pruned_reader_schema),
+ )?
+ } else {
+ let derived_reader_schema =
avro_schema.project(projection)?;
+ self.make_record_decoder_from_schemas(
+ &writer_schema,
+ Some(&derived_reader_schema),
+ )?
+ }
+ }
+ };
if fingerprint == start_fingerprint {
active_decoder = Some(record_decoder);
} else {
@@ -1119,6 +1178,68 @@ impl ReaderBuilder {
self
}
+ /// Sets an explicit top-level field projection by index.
+ ///
+ /// The provided `projection` is a list of indices into the **top-level
record** fields.
+ /// The output schema will contain only these fields, in the specified
order.
+ ///
+ /// Internally, this is implemented by pruning the effective Avro *reader
schema*:
+ ///
+ /// * If a reader schema is provided via `Self::with_reader_schema`, that
schema is pruned.
+ /// * Otherwise, a reader schema is derived from the writer schema and
then pruned.
+ /// * For streaming `Decoder` with multiple writer schemas and no reader
schema, a projected
+ /// reader schema is derived **per writer schema** in the `SchemaStore`.
+ ///
+ /// # Example
+ ///
+ /// Read only specific columns from an Avro OCF file:
+ ///
+ /// ```
+ /// use std::io::Cursor;
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, StringArray, Float64Array,
RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::AvroWriter;
+ /// use arrow_avro::reader::ReaderBuilder;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// // Original schema has three fields: id, name, value
+ /// let schema = Schema::new(vec![
+ /// Field::new("id", DataType::Int32, false),
+ /// Field::new("name", DataType::Utf8, false),
+ /// Field::new("value", DataType::Float64, false),
+ /// ]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![
+ /// Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
+ /// Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
+ /// Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
+ /// ],
+ /// )?;
+ ///
+ /// // Write Avro OCF
+ /// let mut writer = AvroWriter::new(Vec::new(), schema)?;
+ /// writer.write(&batch)?;
+ /// writer.finish()?;
+ /// let bytes = writer.into_inner();
+ ///
+ /// // Read only fields at indices 2 and 0 (value, id) — in that order
+ /// let mut reader = ReaderBuilder::new()
+ /// .with_projection(vec![2, 0])
+ /// .build(Cursor::new(bytes))?;
+ ///
+ /// let out = reader.next().unwrap()?;
+ /// assert_eq!(out.num_columns(), 2);
+ /// assert_eq!(out.schema().field(0).name(), "value");
+ /// assert_eq!(out.schema().field(1).name(), "id");
+ /// # Ok(()) }
+ /// ```
+ pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
+
/// Sets the `SchemaStore` used to resolve writer schemas by fingerprint.
///
/// This is required when building a `Decoder` for **single‑object
encoding** or the
@@ -1651,7 +1772,42 @@ mod test {
}
#[test]
- fn writer_string_reader_nullable_with_alias() -> Result<(), Box<dyn
std::error::Error>> {
+ fn ocf_projection_no_reader_schema_reorder() -> Result<(), Box<dyn
std::error::Error>> {
+ // Writer: { id: int, name: string, is_active: boolean }
+ let writer_schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("is_active", DataType::Boolean, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(writer_schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
+ Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+ Arc::new(BooleanArray::from(vec![true, false])) as ArrayRef,
+ ],
+ )?;
+ let bytes = write_ocf(&writer_schema, &[batch]);
+ // Project and reorder: [is_active, id]
+ let mut reader = ReaderBuilder::new()
+ .with_projection(vec![2, 0])
+ .build(Cursor::new(bytes))?;
+ let out = reader.next().unwrap()?;
+ assert_eq!(out.num_columns(), 2);
+ assert_eq!(out.schema().field(0).name(), "is_active");
+ assert_eq!(out.schema().field(1).name(), "id");
+ let is_active = out.column(0).as_boolean();
+ assert!(is_active.value(0));
+ assert!(!is_active.value(1));
+ let id = out.column(1).as_primitive::<Int32Type>();
+ assert_eq!(id.value(0), 1);
+ assert_eq!(id.value(1), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn ocf_projection_with_reader_schema_alias_and_default()
+ -> Result<(), Box<dyn std::error::Error>> {
// Writer: { id: long, name: string }
let writer_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
@@ -1665,6 +1821,9 @@ mod test {
],
)?;
let bytes = write_ocf(&writer_schema, &[batch]);
+ // Reader adds alias + default field:
+ // - rename `name` -> `full_name` via aliases
+ // - add `is_active` with default true
let reader_json = r#"
{
"type": "record",
@@ -1675,15 +1834,180 @@ mod test {
{ "name": "is_active", "type": "boolean", "default": true }
]
}"#;
+ // Project only [full_name, is_active] (indices relative to the reader
schema)
let mut reader = ReaderBuilder::new()
.with_reader_schema(AvroSchema::new(reader_json.to_string()))
+ .with_projection(vec![1, 2])
.build(Cursor::new(bytes))?;
let out = reader.next().unwrap()?;
- // Evolved aliased field should be non-null and match original writer
values
- let full_name = out.column(1).as_string::<i32>();
+ assert_eq!(out.num_columns(), 2);
+ assert_eq!(out.schema().field(0).name(), "full_name");
+ assert_eq!(out.schema().field(1).name(), "is_active");
+ let full_name = out.column(0).as_string::<i32>();
assert_eq!(full_name.value(0), "a");
assert_eq!(full_name.value(1), "b");
+ let is_active = out.column(1).as_boolean();
+ assert!(is_active.value(0));
+ assert!(is_active.value(1));
+ Ok(())
+ }
+
+ #[test]
+ fn projection_errors_out_of_bounds_and_duplicate() -> Result<(), Box<dyn
std::error::Error>> {
+ let writer_schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(writer_schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![2])) as ArrayRef,
+ ],
+ )?;
+ let bytes = write_ocf(&writer_schema, &[batch]);
+ let err = ReaderBuilder::new()
+ .with_projection(vec![2])
+ .build(Cursor::new(bytes.clone()))
+ .unwrap_err();
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("out of bounds"));
+ let err = ReaderBuilder::new()
+ .with_projection(vec![0, 0])
+ .build(Cursor::new(bytes))
+ .unwrap_err();
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("Duplicate projection index"));
+ Ok(())
+ }
+
+ #[test]
+ #[cfg(feature = "snappy")]
+ fn test_alltypes_plain_with_projection_and_reader_schema() {
+ use std::fs::File;
+ use std::io::BufReader;
+ let path = arrow_test_data("avro/alltypes_plain.avro");
+ // Build a reader schema that selects [double_col, id, tinyint_col] in
that order
+ let reader_schema = make_reader_schema_with_selected_fields_in_order(
+ &path,
+ &["double_col", "id", "tinyint_col"],
+ );
+ let file = File::open(&path).expect("open avro/alltypes_plain.avro");
+ let reader = ReaderBuilder::new()
+ .with_batch_size(1024)
+ .with_reader_schema(reader_schema)
+ .with_projection(vec![1, 2]) // Select indices 1 and 2 from reader
schema: [id, tinyint_col]
+ .build(BufReader::new(file))
+ .expect("build reader with projection and reader schema");
+ let schema = reader.schema();
+ // Verify the projected schema has exactly 2 fields in the correct
order
+ assert_eq!(schema.fields().len(), 2);
+ assert_eq!(schema.field(0).name(), "id");
+ assert_eq!(schema.field(1).name(), "tinyint_col");
+ let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>,
_>>().unwrap();
+ assert_eq!(batches.len(), 1);
+ let batch = &batches[0];
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 2);
+ // Build expected batch with exact values from alltypes_plain.avro:
+ // - id values: [4, 5, 6, 7, 2, 3, 0, 1]
+ // - tinyint_col values: [0, 1, 0, 1, 0, 1, 0, 1] (i.e., row_index % 2)
+ let expected = RecordBatch::try_from_iter_with_nullable([
+ (
+ "id",
+ Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as
ArrayRef,
+ true,
+ ),
+ (
+ "tinyint_col",
+ Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as
ArrayRef,
+ true,
+ ),
+ ])
+ .unwrap();
+ assert_eq!(
+ batch, &expected,
+ "Projected batch mismatch for alltypes_plain.avro with reader
schema and projection [1, 2]"
+ );
+ }
+
+ #[test]
+ #[cfg(feature = "snappy")]
+ fn test_alltypes_plain_with_projection() {
+ use std::fs::File;
+ use std::io::BufReader;
+ let path = arrow_test_data("avro/alltypes_plain.avro");
+ let file = File::open(&path).expect("open avro/alltypes_plain.avro");
+ let reader = ReaderBuilder::new()
+ .with_batch_size(1024)
+ .with_projection(vec![2, 0, 5])
+ .build(BufReader::new(file))
+ .expect("build reader with projection");
+ let schema = reader.schema();
+ assert_eq!(schema.fields().len(), 3);
+ assert_eq!(schema.field(0).name(), "tinyint_col");
+ assert_eq!(schema.field(1).name(), "id");
+ assert_eq!(schema.field(2).name(), "bigint_col");
+ let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>,
_>>().unwrap();
+ assert_eq!(batches.len(), 1);
+ let batch = &batches[0];
+ assert_eq!(batch.num_rows(), 8);
+ assert_eq!(batch.num_columns(), 3);
+ let expected = RecordBatch::try_from_iter_with_nullable([
+ (
+ "tinyint_col",
+ Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as
ArrayRef,
+ true,
+ ),
+ (
+ "id",
+ Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as
ArrayRef,
+ true,
+ ),
+ (
+ "bigint_col",
+ Arc::new(Int64Array::from(vec![0, 10, 0, 10, 0, 10, 0, 10]))
as ArrayRef,
+ true,
+ ),
+ ])
+ .unwrap();
+ assert_eq!(
+ batch, &expected,
+ "Projected batch mismatch for alltypes_plain.avro with projection
[2, 0, 5]"
+ );
+ }
+ #[test]
+ fn writer_string_reader_nullable_with_alias() -> Result<(), Box<dyn
std::error::Error>> {
+ let writer_schema = Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(writer_schema.clone()),
+ vec![
+ Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+ Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+ ],
+ )?;
+ let bytes = write_ocf(&writer_schema, &[batch]);
+ let reader_json = r#"
+ {
+ "type": "record",
+ "name": "topLevelRecord",
+ "fields": [
+ { "name": "id", "type": "long" },
+ { "name": "full_name", "type": ["null","string"], "aliases": ["name"],
"default": null },
+ { "name": "is_active", "type": "boolean", "default": true }
+ ]
+ }"#;
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(AvroSchema::new(reader_json.to_string()))
+ .build(Cursor::new(bytes))?;
+ let out = reader.next().unwrap()?;
+ let full_name = out.column(1).as_string::<i32>();
+ assert_eq!(full_name.value(0), "a");
+ assert_eq!(full_name.value(1), "b");
Ok(())
}
@@ -2222,6 +2546,53 @@ mod test {
assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long);
}
+ #[test]
+ fn test_decoder_projection_multiple_writer_schemas_no_reader_schema()
+ -> Result<(), Box<dyn std::error::Error>> {
+ // Two writer schemas with different shapes
+ let writer_v1 = AvroSchema::new(
+
r#"{"type":"record","name":"E","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]}"#
+ .to_string(),
+ );
+ let writer_v2 = AvroSchema::new(
+
r#"{"type":"record","name":"E","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":"int"}]}"#
+ .to_string(),
+ );
+ let mut store = SchemaStore::new();
+ let fp1 = store.register(writer_v1)?;
+ let fp2 = store.register(writer_v2)?;
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_active_fingerprint(fp1)
+ .with_batch_size(8)
+ .with_projection(vec![1])
+ .build_decoder()?;
+ // Message for v1: {a:1, b:"x"}
+ let mut msg1 = make_prefix(fp1);
+ msg1.extend_from_slice(&encode_zigzag(1)); // a = 1
+ msg1.push((1u8) << 1);
+ msg1.extend_from_slice(b"x");
+ // Message for v2: {a:2, b:"y", c:7}
+ let mut msg2 = make_prefix(fp2);
+ msg2.extend_from_slice(&encode_zigzag(2)); // a = 2
+ msg2.push((1u8) << 1);
+ msg2.extend_from_slice(b"y");
+ msg2.extend_from_slice(&encode_zigzag(7)); // c = 7
+ decoder.decode(&msg1)?;
+ let batch1 = decoder.flush()?.expect("batch1");
+ assert_eq!(batch1.num_columns(), 1);
+ assert_eq!(batch1.schema().field(0).name(), "b");
+ let b1 = batch1.column(0).as_string::<i32>();
+ assert_eq!(b1.value(0), "x");
+ decoder.decode(&msg2)?;
+ let batch2 = decoder.flush()?.expect("batch2");
+ assert_eq!(batch2.num_columns(), 1);
+ assert_eq!(batch2.schema().field(0).name(), "b");
+ let b2 = batch2.column(0).as_string::<i32>();
+ assert_eq!(b2.value(0), "y");
+ Ok(())
+ }
+
#[test]
fn test_two_messages_same_schema() {
let writer_schema = make_value_schema(PrimitiveType::Int);
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 819ea1f16e..0ecb5c1d19 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -395,6 +395,66 @@ impl AvroSchema {
Self::generate_fingerprint(&self.schema()?, hash_type)
}
+ pub(crate) fn project(&self, projection: &[usize]) -> Result<Self,
ArrowError> {
+ let mut value: Value = serde_json::from_str(&self.json_string)
+ .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema
JSON: {e}")))?;
+ let obj = value.as_object_mut().ok_or_else(|| {
+ ArrowError::AvroError(
+ "Projected schema must be a JSON object Avro record
schema".to_string(),
+ )
+ })?;
+ match obj.get("type").and_then(|v| v.as_str()) {
+ Some("record") => {}
+ Some(other) => {
+ return Err(ArrowError::AvroError(format!(
+ "Projected schema must be an Avro record, found type
'{other}'"
+ )));
+ }
+ None => {
+ return Err(ArrowError::AvroError(
+ "Projected schema missing required 'type'
field".to_string(),
+ ));
+ }
+ }
+ let fields_val = obj.get_mut("fields").ok_or_else(|| {
+ ArrowError::AvroError("Avro record schema missing required
'fields'".to_string())
+ })?;
+ let projected_fields = {
+ let mut original_fields = match fields_val {
+ Value::Array(arr) => std::mem::take(arr),
+ _ => {
+ return Err(ArrowError::AvroError(
+ "Avro record schema 'fields' must be an
array".to_string(),
+ ));
+ }
+ };
+ let len = original_fields.len();
+ let mut seen: HashSet<usize> =
HashSet::with_capacity(projection.len());
+ let mut out: Vec<Value> = Vec::with_capacity(projection.len());
+ for &i in projection {
+ if i >= len {
+ return Err(ArrowError::AvroError(format!(
+ "Projection index {i} out of bounds for record with
{len} fields"
+ )));
+ }
+ if !seen.insert(i) {
+ return Err(ArrowError::AvroError(format!(
+ "Duplicate projection index {i}"
+ )));
+ }
+ out.push(std::mem::replace(&mut original_fields[i],
Value::Null));
+ }
+ out
+ };
+ *fields_val = Value::Array(projected_fields);
+ let json_string = serde_json::to_string(&value).map_err(|e| {
+ ArrowError::AvroError(format!(
+ "Failed to serialize projected Avro schema JSON: {e}"
+ ))
+ })?;
+ Ok(Self::new(json_string))
+ }
+
pub(crate) fn generate_fingerprint(
schema: &Schema,
hash_type: FingerprintAlgorithm,
@@ -3137,4 +3197,546 @@ mod tests {
assert_eq!(union_arr2[1], Value::String("int".into()));
assert_eq!(union_arr2[2], Value::String("string".into()));
}
+
+ #[test]
+ fn test_project_empty_projection() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert!(
+ fields.is_empty(),
+ "Empty projection should yield empty fields"
+ );
+ }
+
+ #[test]
+ fn test_project_single_field() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"},
+ {"name": "c", "type": "long"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[1]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 1);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
+ }
+
+ #[test]
+ fn test_project_multiple_fields() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"},
+ {"name": "c", "type": "long"},
+ {"name": "d", "type": "boolean"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0, 2, 3]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 3);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
+ assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
+ assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
+ }
+
+ #[test]
+ fn test_project_all_fields() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0, 1]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 2);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
+ assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
+ }
+
+ #[test]
+ fn test_project_reorder_fields() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"},
+ {"name": "c", "type": "long"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ // Project in reverse order
+ let projected = schema.project(&[2, 0, 1]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 3);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
+ assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
+ assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
+ }
+
+ #[test]
+ fn test_project_preserves_record_metadata() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "MyRecord",
+ "namespace": "com.example",
+ "doc": "A test record",
+ "aliases": ["OldRecord"],
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
+ assert_eq!(
+ v.get("namespace").and_then(|n| n.as_str()),
+ Some("com.example")
+ );
+ assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test
record"));
+ assert!(v.get("aliases").is_some());
+ }
+
+ #[test]
+ fn test_project_preserves_field_metadata() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int", "doc": "Field A", "default": 0},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(
+ fields[0].get("doc").and_then(|d| d.as_str()),
+ Some("Field A")
+ );
+ assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
+ }
+
+ #[test]
+ fn test_project_with_nested_record() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Outer",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "inner", "type": {
+ "type": "record",
+ "name": "Inner",
+ "fields": [
+ {"name": "x", "type": "int"},
+ {"name": "y", "type": "string"}
+ ]
+ }},
+ {"name": "value", "type": "double"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[1]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 1);
+ assert_eq!(
+ fields[0].get("name").and_then(|n| n.as_str()),
+ Some("inner")
+ );
+ // Verify nested record structure is preserved
+ let inner_type = fields[0].get("type").unwrap();
+ assert_eq!(
+ inner_type.get("type").and_then(|t| t.as_str()),
+ Some("record")
+ );
+ assert_eq!(
+ inner_type.get("name").and_then(|n| n.as_str()),
+ Some("Inner")
+ );
+ }
+
+ #[test]
+ fn test_project_with_complex_field_types() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "arr", "type": {"type": "array", "items": "int"}},
+ {"name": "map", "type": {"type": "map", "values": "string"}},
+ {"name": "union", "type": ["null", "int"]}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0, 2]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 2);
+ // Verify array type is preserved
+ let arr_type = fields[0].get("type").unwrap();
+ assert_eq!(arr_type.get("type").and_then(|t| t.as_str()),
Some("array"));
+ // Verify union type is preserved
+ let union_type = fields[1].get("type").unwrap();
+ assert!(union_type.is_array());
+ }
+
+ #[test]
+ fn test_project_error_invalid_json() {
+ let schema = AvroSchema::new("not valid json".to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Invalid Avro schema JSON"),
+ "Expected parse error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_not_object() {
+ // Primitive type schema (not a JSON object)
+ let schema = AvroSchema::new(r#""string""#.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be a JSON object"),
+ "Expected object error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_array_schema() {
+ // Array (list) is a valid JSON but not a record
+ let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be a JSON object"),
+ "Expected object error for array schema, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_type_not_record() {
+ let schema_json = r#"{
+ "type": "enum",
+ "name": "Color",
+ "symbols": ["RED", "GREEN", "BLUE"]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be an Avro record") && msg.contains("'enum'"),
+ "Expected type mismatch error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_type_array() {
+ let schema_json = r#"{
+ "type": "array",
+ "items": "int"
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be an Avro record") && msg.contains("'array'"),
+ "Expected type mismatch error for array type, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_type_fixed() {
+ let schema_json = r#"{
+ "type": "fixed",
+ "name": "MD5",
+ "size": 16
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be an Avro record") && msg.contains("'fixed'"),
+ "Expected type mismatch error for fixed type, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_type_map() {
+ let schema_json = r#"{
+ "type": "map",
+ "values": "string"
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("must be an Avro record") && msg.contains("'map'"),
+ "Expected type mismatch error for map type, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_missing_type_field() {
+ let schema_json = r#"{
+ "name": "Test",
+ "fields": [{"name": "a", "type": "int"}]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("missing required 'type' field"),
+ "Expected missing type error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_missing_fields() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test"
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("missing required 'fields'"),
+ "Expected missing fields error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_fields_not_array() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": "not an array"
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("'fields' must be an array"),
+ "Expected fields array error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_index_out_of_bounds() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[5]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("out of bounds") && msg.contains("5") &&
msg.contains("2"),
+ "Expected out of bounds error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_index_out_of_bounds_edge() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ // Index 1 is just out of bounds for a 1-element array
+ let err = schema.project(&[1]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("out of bounds") && msg.contains("1"),
+ "Expected out of bounds error for edge case, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_duplicate_index() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"},
+ {"name": "c", "type": "long"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0, 1, 0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Duplicate projection index") && msg.contains("0"),
+ "Expected duplicate index error, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_error_duplicate_index_consecutive() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "string"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[1, 1]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Duplicate projection index") && msg.contains("1"),
+ "Expected duplicate index error for consecutive duplicates, got:
{msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_with_empty_fields() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "EmptyRecord",
+ "fields": []
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ // Projecting empty from empty should succeed
+ let projected = schema.project(&[]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert!(fields.is_empty());
+ }
+
+ #[test]
+ fn test_project_empty_fields_index_out_of_bounds() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "EmptyRecord",
+ "fields": []
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let err = schema.project(&[0]).unwrap_err();
+ let msg = err.to_string();
+ assert!(
+ msg.contains("out of bounds") && msg.contains("0 fields"),
+ "Expected out of bounds error for empty record, got: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_project_result_is_valid_avro_schema() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "namespace": "com.example",
+ "fields": [
+ {"name": "id", "type": "long"},
+ {"name": "name", "type": "string"},
+ {"name": "active", "type": "boolean"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ let projected = schema.project(&[0, 2]).unwrap();
+ // Verify the projected schema can be parsed as a valid Avro schema
+ let parsed = projected.schema();
+ assert!(parsed.is_ok(), "Projected schema should be valid Avro");
+ match parsed.unwrap() {
+ Schema::Complex(ComplexType::Record(r)) => {
+ assert_eq!(r.name, "Test");
+ assert_eq!(r.namespace, Some("com.example"));
+ assert_eq!(r.fields.len(), 2);
+ assert_eq!(r.fields[0].name, "id");
+ assert_eq!(r.fields[1].name, "active");
+ }
+ _ => panic!("Expected Record schema"),
+ }
+ }
+
+ #[test]
+ fn test_project_non_contiguous_indices() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "f0", "type": "int"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "int"},
+ {"name": "f3", "type": "int"},
+ {"name": "f4", "type": "int"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ // Select every other field
+ let projected = schema.project(&[0, 2, 4]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 3);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
+ assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
+ assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
+ }
+
+ #[test]
+ fn test_project_single_field_from_many() {
+ let schema_json = r#"{
+ "type": "record",
+ "name": "BigRecord",
+ "fields": [
+ {"name": "f0", "type": "int"},
+ {"name": "f1", "type": "int"},
+ {"name": "f2", "type": "int"},
+ {"name": "f3", "type": "int"},
+ {"name": "f4", "type": "int"},
+ {"name": "f5", "type": "int"},
+ {"name": "f6", "type": "int"},
+ {"name": "f7", "type": "int"},
+ {"name": "f8", "type": "int"},
+ {"name": "f9", "type": "int"}
+ ]
+ }"#;
+ let schema = AvroSchema::new(schema_json.to_string());
+ // Select only the last field
+ let projected = schema.project(&[9]).unwrap();
+ let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+ let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+ assert_eq!(fields.len(), 1);
+ assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
+ }
}