This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b67776703 Add Avro Reader projection API (#9162)
8b67776703 is described below

commit 8b67776703dca58c0a44fa6cc7c856a5242468a7
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Jan 15 11:20:40 2026 -0600

    Add Avro Reader projection API (#9162)
    
    # Which issue does this PR close?
    
    - Closes #8923.
    
    # Rationale for this change
    
    The `arrow-avro` crate's `ReaderBuilder` previously lacked the ability
    to project (select) specific columns when reading Avro files. This is a
    common feature in other Arrow readers (like `arrow-csv` and `arrow-ipc`)
    that enables users to read only the columns they need, improving
    performance and reducing memory usage.
    
    # What changes are included in this PR?
    
    - Added a `with_projection(projection: Vec<usize>)` method to
    `ReaderBuilder` that accepts zero-based column indices
    - Implemented `AvroSchema::project()` method to create a projected Avro
    schema with only the selected fields
    - The projection supports:
      - Selecting a subset of fields
      - Reordering fields
    - Preserving all record and field metadata (namespace, doc, defaults,
    aliases, etc.)
      - Preserving nested/complex types (records, arrays, maps, unions)
    - Added validation for out-of-bounds indices and duplicate indices
    
    # Are these changes tested?
    
    Yes, comprehensive tests have been added:
    - Unit tests for `AvroSchema::project()` covering:
      - Empty projections
      - Single and multiple field selection
      - Field reordering
      - Metadata preservation (record-level and field-level)
      - Nested records and complex types (arrays, maps, unions)
    - Error cases (invalid JSON, non-record schemas, out-of-bounds indices,
    duplicate indices)
    - Integration tests in the reader module for end-to-end projection with
    OCF files
    
    # Are there any user-facing changes?
    
    Yes, this adds a new public API method:
    
    ```rust
    impl ReaderBuilder {
        /// Set a projection of columns to read (zero-based column indices).
        pub fn with_projection(self, projection: Vec<usize>) -> Self
    }
    ```
    
    This is consistent with the projection API in `arrow-csv::ReaderBuilder`
    and `arrow-ipc::FileReaderBuilder`. There are no breaking changes to
    existing APIs.
---
 arrow-avro/src/reader/mod.rs | 387 +++++++++++++++++++++++++++-
 arrow-avro/src/schema.rs     | 602 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 981 insertions(+), 8 deletions(-)

diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 546650faf5..b5750cc6a3 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -482,8 +482,8 @@
 use crate::codec::AvroFieldBuilder;
 use crate::reader::header::read_header;
 use crate::schema::{
-    AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm, 
SINGLE_OBJECT_MAGIC, Schema,
-    SchemaStore,
+    AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm, 
SCHEMA_METADATA_KEY,
+    SINGLE_OBJECT_MAGIC, Schema, SchemaStore,
 };
 use arrow_array::{RecordBatch, RecordBatchReader};
 use arrow_schema::{ArrowError, SchemaRef};
@@ -890,6 +890,17 @@ impl Decoder {
 ///   See `Self::with_strict_mode`.
 /// * **`reader_schema`**: Optional reader schema (projection / evolution) 
used when decoding
 ///   values (default: `None`). See `Self::with_reader_schema`.
+/// * **`projection`**: Optional projection of **top‑level record fields** by 
index (default: `None`).
+///
+///   If set, the effective reader schema is **pruned** to include only the 
projected fields, in the
+///   specified order:
+///
+///   * If a reader schema is provided, that schema is pruned.
+///   * Otherwise, a reader schema is derived from the writer schema and then 
pruned.
+///   * For streaming `Decoder` with multiple writer schemas and no reader 
schema, a projected reader
+///     schema is derived **per writer schema** in the `SchemaStore`.
+///
+///   See `Self::with_projection`.
 /// * **`writer_schema_store`**: Required for building a `Decoder` for 
single‑object or
 ///   Confluent framing. Maps fingerprints to Avro schemas. See 
`Self::with_writer_schema_store`.
 /// * **`active_fingerprint`**: Optional starting fingerprint for streaming 
decode when the
@@ -931,6 +942,7 @@ pub struct ReaderBuilder {
     strict_mode: bool,
     utf8_view: bool,
     reader_schema: Option<AvroSchema>,
+    projection: Option<Vec<usize>>,
     writer_schema_store: Option<SchemaStore>,
     active_fingerprint: Option<Fingerprint>,
 }
@@ -942,6 +954,7 @@ impl Default for ReaderBuilder {
             strict_mode: false,
             utf8_view: false,
             reader_schema: None,
+            projection: None,
             writer_schema_store: None,
             active_fingerprint: None,
         }
@@ -955,6 +968,7 @@ impl ReaderBuilder {
     /// * `strict_mode = false`
     /// * `utf8_view = false`
     /// * `reader_schema = None`
+    /// * `projection = None`
     /// * `writer_schema_store = None`
     /// * `active_fingerprint = None`
     pub fn new() -> Self {
@@ -1017,8 +1031,33 @@ impl ReaderBuilder {
                 .ok_or_else(|| {
                     ArrowError::ParseError("No Avro schema present in file 
header".into())
                 })?;
+            let projected_reader_schema = self
+                .projection
+                .as_deref()
+                .map(|projection| {
+                    let base_schema = if let Some(reader_schema) = 
reader_schema {
+                        reader_schema.clone()
+                    } else {
+                        let raw = hdr.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+                            ArrowError::ParseError(
+                                "No Avro schema present in file 
header".to_string(),
+                            )
+                        })?;
+                        let json_string = std::str::from_utf8(raw)
+                            .map_err(|e| {
+                                ArrowError::ParseError(format!(
+                                    "Invalid UTF-8 in Avro schema header: {e}"
+                                ))
+                            })?
+                            .to_string();
+                        AvroSchema::new(json_string)
+                    };
+                    base_schema.project(projection)
+                })
+                .transpose()?;
+            let effective_reader_schema = 
projected_reader_schema.as_ref().or(reader_schema);
             let record_decoder =
-                self.make_record_decoder_from_schemas(&writer_schema, 
reader_schema)?;
+                self.make_record_decoder_from_schemas(&writer_schema, 
effective_reader_schema)?;
             return Ok(self.make_decoder_with_parts(
                 record_decoder,
                 None,
@@ -1041,6 +1080,11 @@ impl ReaderBuilder {
             .ok_or_else(|| {
                 ArrowError::ParseError("Could not determine initial schema 
fingerprint".into())
             })?;
+        let projection = self.projection.as_deref();
+        let projected_reader_schema = match (projection, reader_schema) {
+            (Some(projection), Some(reader_schema)) => 
Some(reader_schema.project(projection)?),
+            _ => None,
+        };
         let mut cache = 
IndexMap::with_capacity(fingerprints.len().saturating_sub(1));
         let mut active_decoder: Option<RecordDecoder> = None;
         for fingerprint in store.fingerprints() {
@@ -1053,8 +1097,23 @@ impl ReaderBuilder {
                 }
             };
             let writer_schema = avro_schema.schema()?;
-            let record_decoder =
-                self.make_record_decoder_from_schemas(&writer_schema, 
reader_schema)?;
+            let record_decoder = match projection {
+                None => self.make_record_decoder_from_schemas(&writer_schema, 
reader_schema)?,
+                Some(projection) => {
+                    if let Some(ref pruned_reader_schema) = 
projected_reader_schema {
+                        self.make_record_decoder_from_schemas(
+                            &writer_schema,
+                            Some(pruned_reader_schema),
+                        )?
+                    } else {
+                        let derived_reader_schema = 
avro_schema.project(projection)?;
+                        self.make_record_decoder_from_schemas(
+                            &writer_schema,
+                            Some(&derived_reader_schema),
+                        )?
+                    }
+                }
+            };
             if fingerprint == start_fingerprint {
                 active_decoder = Some(record_decoder);
             } else {
@@ -1119,6 +1178,68 @@ impl ReaderBuilder {
         self
     }
 
+    /// Sets an explicit top-level field projection by index.
+    ///
+    /// The provided `projection` is a list of indices into the **top-level 
record** fields.
+    /// The output schema will contain only these fields, in the specified 
order.
+    ///
+    /// Internally, this is implemented by pruning the effective Avro *reader 
schema*:
+    ///
+    /// * If a reader schema is provided via `Self::with_reader_schema`, that 
schema is pruned.
+    /// * Otherwise, a reader schema is derived from the writer schema and 
then pruned.
+    /// * For streaming `Decoder` with multiple writer schemas and no reader 
schema, a projected
+    ///   reader schema is derived **per writer schema** in the `SchemaStore`.
+    ///
+    /// # Example
+    ///
+    /// Read only specific columns from an Avro OCF file:
+    ///
+    /// ```
+    /// use std::io::Cursor;
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, StringArray, Float64Array, 
RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::AvroWriter;
+    /// use arrow_avro::reader::ReaderBuilder;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// // Original schema has three fields: id, name, value
+    /// let schema = Schema::new(vec![
+    ///     Field::new("id", DataType::Int32, false),
+    ///     Field::new("name", DataType::Utf8, false),
+    ///     Field::new("value", DataType::Float64, false),
+    /// ]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![
+    ///         Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
+    ///         Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
+    ///         Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
+    ///     ],
+    /// )?;
+    ///
+    /// // Write Avro OCF
+    /// let mut writer = AvroWriter::new(Vec::new(), schema)?;
+    /// writer.write(&batch)?;
+    /// writer.finish()?;
+    /// let bytes = writer.into_inner();
+    ///
+    /// // Read only fields at indices 2 and 0 (value, id) — in that order
+    /// let mut reader = ReaderBuilder::new()
+    ///     .with_projection(vec![2, 0])
+    ///     .build(Cursor::new(bytes))?;
+    ///
+    /// let out = reader.next().unwrap()?;
+    /// assert_eq!(out.num_columns(), 2);
+    /// assert_eq!(out.schema().field(0).name(), "value");
+    /// assert_eq!(out.schema().field(1).name(), "id");
+    /// # Ok(()) }
+    /// ```
+    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
     /// Sets the `SchemaStore` used to resolve writer schemas by fingerprint.
     ///
     /// This is required when building a `Decoder` for **single‑object 
encoding** or the
@@ -1651,7 +1772,42 @@ mod test {
     }
 
     #[test]
-    fn writer_string_reader_nullable_with_alias() -> Result<(), Box<dyn 
std::error::Error>> {
+    fn ocf_projection_no_reader_schema_reorder() -> Result<(), Box<dyn 
std::error::Error>> {
+        // Writer: { id: int, name: string, is_active: boolean }
+        let writer_schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+            Field::new("is_active", DataType::Boolean, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(writer_schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef,
+                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+                Arc::new(BooleanArray::from(vec![true, false])) as ArrayRef,
+            ],
+        )?;
+        let bytes = write_ocf(&writer_schema, &[batch]);
+        // Project and reorder: [is_active, id]
+        let mut reader = ReaderBuilder::new()
+            .with_projection(vec![2, 0])
+            .build(Cursor::new(bytes))?;
+        let out = reader.next().unwrap()?;
+        assert_eq!(out.num_columns(), 2);
+        assert_eq!(out.schema().field(0).name(), "is_active");
+        assert_eq!(out.schema().field(1).name(), "id");
+        let is_active = out.column(0).as_boolean();
+        assert!(is_active.value(0));
+        assert!(!is_active.value(1));
+        let id = out.column(1).as_primitive::<Int32Type>();
+        assert_eq!(id.value(0), 1);
+        assert_eq!(id.value(1), 2);
+        Ok(())
+    }
+
+    #[test]
+    fn ocf_projection_with_reader_schema_alias_and_default()
+    -> Result<(), Box<dyn std::error::Error>> {
         // Writer: { id: long, name: string }
         let writer_schema = Schema::new(vec![
             Field::new("id", DataType::Int64, false),
@@ -1665,6 +1821,9 @@ mod test {
             ],
         )?;
         let bytes = write_ocf(&writer_schema, &[batch]);
+        // Reader adds alias + default field:
+        //  - rename `name` -> `full_name` via aliases
+        //  - add `is_active` with default true
         let reader_json = r#"
     {
       "type": "record",
@@ -1675,15 +1834,180 @@ mod test {
         { "name": "is_active", "type": "boolean", "default": true }
       ]
     }"#;
+        // Project only [full_name, is_active] (indices relative to the reader 
schema)
         let mut reader = ReaderBuilder::new()
             .with_reader_schema(AvroSchema::new(reader_json.to_string()))
+            .with_projection(vec![1, 2])
             .build(Cursor::new(bytes))?;
         let out = reader.next().unwrap()?;
-        // Evolved aliased field should be non-null and match original writer 
values
-        let full_name = out.column(1).as_string::<i32>();
+        assert_eq!(out.num_columns(), 2);
+        assert_eq!(out.schema().field(0).name(), "full_name");
+        assert_eq!(out.schema().field(1).name(), "is_active");
+        let full_name = out.column(0).as_string::<i32>();
         assert_eq!(full_name.value(0), "a");
         assert_eq!(full_name.value(1), "b");
+        let is_active = out.column(1).as_boolean();
+        assert!(is_active.value(0));
+        assert!(is_active.value(1));
+        Ok(())
+    }
+
+    #[test]
+    fn projection_errors_out_of_bounds_and_duplicate() -> Result<(), Box<dyn 
std::error::Error>> {
+        let writer_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(writer_schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![1])) as ArrayRef,
+                Arc::new(Int32Array::from(vec![2])) as ArrayRef,
+            ],
+        )?;
+        let bytes = write_ocf(&writer_schema, &[batch]);
+        let err = ReaderBuilder::new()
+            .with_projection(vec![2])
+            .build(Cursor::new(bytes.clone()))
+            .unwrap_err();
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("out of bounds"));
+        let err = ReaderBuilder::new()
+            .with_projection(vec![0, 0])
+            .build(Cursor::new(bytes))
+            .unwrap_err();
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("Duplicate projection index"));
+        Ok(())
+    }
+
+    #[test]
+    #[cfg(feature = "snappy")]
+    fn test_alltypes_plain_with_projection_and_reader_schema() {
+        use std::fs::File;
+        use std::io::BufReader;
+        let path = arrow_test_data("avro/alltypes_plain.avro");
+        // Build a reader schema that selects [double_col, id, tinyint_col] in 
that order
+        let reader_schema = make_reader_schema_with_selected_fields_in_order(
+            &path,
+            &["double_col", "id", "tinyint_col"],
+        );
+        let file = File::open(&path).expect("open avro/alltypes_plain.avro");
+        let reader = ReaderBuilder::new()
+            .with_batch_size(1024)
+            .with_reader_schema(reader_schema)
+            .with_projection(vec![1, 2]) // Select indices 1 and 2 from reader 
schema: [id, tinyint_col]
+            .build(BufReader::new(file))
+            .expect("build reader with projection and reader schema");
+        let schema = reader.schema();
+        // Verify the projected schema has exactly 2 fields in the correct 
order
+        assert_eq!(schema.fields().len(), 2);
+        assert_eq!(schema.field(0).name(), "id");
+        assert_eq!(schema.field(1).name(), "tinyint_col");
+        let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, 
_>>().unwrap();
+        assert_eq!(batches.len(), 1);
+        let batch = &batches[0];
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 2);
+        // Build expected batch with exact values from alltypes_plain.avro:
+        // - id values: [4, 5, 6, 7, 2, 3, 0, 1]
+        // - tinyint_col values: [0, 1, 0, 1, 0, 1, 0, 1] (i.e., row_index % 2)
+        let expected = RecordBatch::try_from_iter_with_nullable([
+            (
+                "id",
+                Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as 
ArrayRef,
+                true,
+            ),
+            (
+                "tinyint_col",
+                Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as 
ArrayRef,
+                true,
+            ),
+        ])
+        .unwrap();
+        assert_eq!(
+            batch, &expected,
+            "Projected batch mismatch for alltypes_plain.avro with reader 
schema and projection [1, 2]"
+        );
+    }
+
+    #[test]
+    #[cfg(feature = "snappy")]
+    fn test_alltypes_plain_with_projection() {
+        use std::fs::File;
+        use std::io::BufReader;
+        let path = arrow_test_data("avro/alltypes_plain.avro");
+        let file = File::open(&path).expect("open avro/alltypes_plain.avro");
+        let reader = ReaderBuilder::new()
+            .with_batch_size(1024)
+            .with_projection(vec![2, 0, 5])
+            .build(BufReader::new(file))
+            .expect("build reader with projection");
+        let schema = reader.schema();
+        assert_eq!(schema.fields().len(), 3);
+        assert_eq!(schema.field(0).name(), "tinyint_col");
+        assert_eq!(schema.field(1).name(), "id");
+        assert_eq!(schema.field(2).name(), "bigint_col");
+        let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, 
_>>().unwrap();
+        assert_eq!(batches.len(), 1);
+        let batch = &batches[0];
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 3);
+        let expected = RecordBatch::try_from_iter_with_nullable([
+            (
+                "tinyint_col",
+                Arc::new(Int32Array::from(vec![0, 1, 0, 1, 0, 1, 0, 1])) as 
ArrayRef,
+                true,
+            ),
+            (
+                "id",
+                Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as 
ArrayRef,
+                true,
+            ),
+            (
+                "bigint_col",
+                Arc::new(Int64Array::from(vec![0, 10, 0, 10, 0, 10, 0, 10])) 
as ArrayRef,
+                true,
+            ),
+        ])
+        .unwrap();
+        assert_eq!(
+            batch, &expected,
+            "Projected batch mismatch for alltypes_plain.avro with projection 
[2, 0, 5]"
+        );
+    }
 
+    #[test]
+    fn writer_string_reader_nullable_with_alias() -> Result<(), Box<dyn 
std::error::Error>> {
+        let writer_schema = Schema::new(vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(writer_schema.clone()),
+            vec![
+                Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+            ],
+        )?;
+        let bytes = write_ocf(&writer_schema, &[batch]);
+        let reader_json = r#"
+    {
+      "type": "record",
+      "name": "topLevelRecord",
+      "fields": [
+        { "name": "id", "type": "long" },
+        { "name": "full_name", "type": ["null","string"], "aliases": ["name"], 
"default": null },
+        { "name": "is_active", "type": "boolean", "default": true }
+      ]
+    }"#;
+        let mut reader = ReaderBuilder::new()
+            .with_reader_schema(AvroSchema::new(reader_json.to_string()))
+            .build(Cursor::new(bytes))?;
+        let out = reader.next().unwrap()?;
+        let full_name = out.column(1).as_string::<i32>();
+        assert_eq!(full_name.value(0), "a");
+        assert_eq!(full_name.value(1), "b");
         Ok(())
     }
 
@@ -2222,6 +2546,53 @@ mod test {
         assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long);
     }
 
+    #[test]
+    fn test_decoder_projection_multiple_writer_schemas_no_reader_schema()
+    -> Result<(), Box<dyn std::error::Error>> {
+        // Two writer schemas with different shapes
+        let writer_v1 = AvroSchema::new(
+            
r#"{"type":"record","name":"E","fields":[{"name":"a","type":"int"},{"name":"b","type":"string"}]}"#
+                .to_string(),
+        );
+        let writer_v2 = AvroSchema::new(
+            
r#"{"type":"record","name":"E","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":"int"}]}"#
+                .to_string(),
+        );
+        let mut store = SchemaStore::new();
+        let fp1 = store.register(writer_v1)?;
+        let fp2 = store.register(writer_v2)?;
+        let mut decoder = ReaderBuilder::new()
+            .with_writer_schema_store(store)
+            .with_active_fingerprint(fp1)
+            .with_batch_size(8)
+            .with_projection(vec![1])
+            .build_decoder()?;
+        // Message for v1: {a:1, b:"x"}
+        let mut msg1 = make_prefix(fp1);
+        msg1.extend_from_slice(&encode_zigzag(1)); // a = 1
+        msg1.push((1u8) << 1);
+        msg1.extend_from_slice(b"x");
+        // Message for v2: {a:2, b:"y", c:7}
+        let mut msg2 = make_prefix(fp2);
+        msg2.extend_from_slice(&encode_zigzag(2)); // a = 2
+        msg2.push((1u8) << 1);
+        msg2.extend_from_slice(b"y");
+        msg2.extend_from_slice(&encode_zigzag(7)); // c = 7
+        decoder.decode(&msg1)?;
+        let batch1 = decoder.flush()?.expect("batch1");
+        assert_eq!(batch1.num_columns(), 1);
+        assert_eq!(batch1.schema().field(0).name(), "b");
+        let b1 = batch1.column(0).as_string::<i32>();
+        assert_eq!(b1.value(0), "x");
+        decoder.decode(&msg2)?;
+        let batch2 = decoder.flush()?.expect("batch2");
+        assert_eq!(batch2.num_columns(), 1);
+        assert_eq!(batch2.schema().field(0).name(), "b");
+        let b2 = batch2.column(0).as_string::<i32>();
+        assert_eq!(b2.value(0), "y");
+        Ok(())
+    }
+
     #[test]
     fn test_two_messages_same_schema() {
         let writer_schema = make_value_schema(PrimitiveType::Int);
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 819ea1f16e..0ecb5c1d19 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -395,6 +395,66 @@ impl AvroSchema {
         Self::generate_fingerprint(&self.schema()?, hash_type)
     }
 
+    pub(crate) fn project(&self, projection: &[usize]) -> Result<Self, 
ArrowError> {
+        let mut value: Value = serde_json::from_str(&self.json_string)
+            .map_err(|e| ArrowError::AvroError(format!("Invalid Avro schema 
JSON: {e}")))?;
+        let obj = value.as_object_mut().ok_or_else(|| {
+            ArrowError::AvroError(
+                "Projected schema must be a JSON object Avro record 
schema".to_string(),
+            )
+        })?;
+        match obj.get("type").and_then(|v| v.as_str()) {
+            Some("record") => {}
+            Some(other) => {
+                return Err(ArrowError::AvroError(format!(
+                    "Projected schema must be an Avro record, found type 
'{other}'"
+                )));
+            }
+            None => {
+                return Err(ArrowError::AvroError(
+                    "Projected schema missing required 'type' 
field".to_string(),
+                ));
+            }
+        }
+        let fields_val = obj.get_mut("fields").ok_or_else(|| {
+            ArrowError::AvroError("Avro record schema missing required 
'fields'".to_string())
+        })?;
+        let projected_fields = {
+            let mut original_fields = match fields_val {
+                Value::Array(arr) => std::mem::take(arr),
+                _ => {
+                    return Err(ArrowError::AvroError(
+                        "Avro record schema 'fields' must be an 
array".to_string(),
+                    ));
+                }
+            };
+            let len = original_fields.len();
+            let mut seen: HashSet<usize> = 
HashSet::with_capacity(projection.len());
+            let mut out: Vec<Value> = Vec::with_capacity(projection.len());
+            for &i in projection {
+                if i >= len {
+                    return Err(ArrowError::AvroError(format!(
+                        "Projection index {i} out of bounds for record with 
{len} fields"
+                    )));
+                }
+                if !seen.insert(i) {
+                    return Err(ArrowError::AvroError(format!(
+                        "Duplicate projection index {i}"
+                    )));
+                }
+                out.push(std::mem::replace(&mut original_fields[i], 
Value::Null));
+            }
+            out
+        };
+        *fields_val = Value::Array(projected_fields);
+        let json_string = serde_json::to_string(&value).map_err(|e| {
+            ArrowError::AvroError(format!(
+                "Failed to serialize projected Avro schema JSON: {e}"
+            ))
+        })?;
+        Ok(Self::new(json_string))
+    }
+
     pub(crate) fn generate_fingerprint(
         schema: &Schema,
         hash_type: FingerprintAlgorithm,
@@ -3137,4 +3197,546 @@ mod tests {
         assert_eq!(union_arr2[1], Value::String("int".into()));
         assert_eq!(union_arr2[2], Value::String("string".into()));
     }
+
+    #[test]
+    fn test_project_empty_projection() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert!(
+            fields.is_empty(),
+            "Empty projection should yield empty fields"
+        );
+    }
+
+    #[test]
+    fn test_project_single_field() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"},
+                {"name": "c", "type": "long"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[1]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 1);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("b"));
+    }
+
+    #[test]
+    fn test_project_multiple_fields() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"},
+                {"name": "c", "type": "long"},
+                {"name": "d", "type": "boolean"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0, 2, 3]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 3);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
+        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("c"));
+        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("d"));
+    }
+
+    #[test]
+    fn test_project_all_fields() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0, 1]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 2);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("a"));
+        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("b"));
+    }
+
+    #[test]
+    fn test_project_reorder_fields() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"},
+                {"name": "c", "type": "long"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        // Project in reverse order
+        let projected = schema.project(&[2, 0, 1]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 3);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("c"));
+        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("a"));
+        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("b"));
+    }
+
+    #[test]
+    fn test_project_preserves_record_metadata() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "MyRecord",
+            "namespace": "com.example",
+            "doc": "A test record",
+            "aliases": ["OldRecord"],
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("MyRecord"));
+        assert_eq!(
+            v.get("namespace").and_then(|n| n.as_str()),
+            Some("com.example")
+        );
+        assert_eq!(v.get("doc").and_then(|n| n.as_str()), Some("A test 
record"));
+        assert!(v.get("aliases").is_some());
+    }
+
+    #[test]
+    fn test_project_preserves_field_metadata() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int", "doc": "Field A", "default": 0},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(
+            fields[0].get("doc").and_then(|d| d.as_str()),
+            Some("Field A")
+        );
+        assert_eq!(fields[0].get("default").and_then(|d| d.as_i64()), Some(0));
+    }
+
+    #[test]
+    fn test_project_with_nested_record() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Outer",
+            "fields": [
+                {"name": "id", "type": "int"},
+                {"name": "inner", "type": {
+                    "type": "record",
+                    "name": "Inner",
+                    "fields": [
+                        {"name": "x", "type": "int"},
+                        {"name": "y", "type": "string"}
+                    ]
+                }},
+                {"name": "value", "type": "double"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[1]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 1);
+        assert_eq!(
+            fields[0].get("name").and_then(|n| n.as_str()),
+            Some("inner")
+        );
+        // Verify nested record structure is preserved
+        let inner_type = fields[0].get("type").unwrap();
+        assert_eq!(
+            inner_type.get("type").and_then(|t| t.as_str()),
+            Some("record")
+        );
+        assert_eq!(
+            inner_type.get("name").and_then(|n| n.as_str()),
+            Some("Inner")
+        );
+    }
+
+    #[test]
+    fn test_project_with_complex_field_types() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "arr", "type": {"type": "array", "items": "int"}},
+                {"name": "map", "type": {"type": "map", "values": "string"}},
+                {"name": "union", "type": ["null", "int"]}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0, 2]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 2);
+        // Verify array type is preserved
+        let arr_type = fields[0].get("type").unwrap();
+        assert_eq!(arr_type.get("type").and_then(|t| t.as_str()), 
Some("array"));
+        // Verify union type is preserved
+        let union_type = fields[1].get("type").unwrap();
+        assert!(union_type.is_array());
+    }
+
+    #[test]
+    fn test_project_error_invalid_json() {
+        let schema = AvroSchema::new("not valid json".to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Invalid Avro schema JSON"),
+            "Expected parse error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_not_object() {
+        // Primitive type schema (not a JSON object)
+        let schema = AvroSchema::new(r#""string""#.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be a JSON object"),
+            "Expected object error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_array_schema() {
+        // Array (list) is a valid JSON but not a record
+        let schema = AvroSchema::new(r#"["null", "int"]"#.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be a JSON object"),
+            "Expected object error for array schema, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_type_not_record() {
+        let schema_json = r#"{
+            "type": "enum",
+            "name": "Color",
+            "symbols": ["RED", "GREEN", "BLUE"]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be an Avro record") && msg.contains("'enum'"),
+            "Expected type mismatch error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_type_array() {
+        let schema_json = r#"{
+            "type": "array",
+            "items": "int"
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be an Avro record") && msg.contains("'array'"),
+            "Expected type mismatch error for array type, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_type_fixed() {
+        let schema_json = r#"{
+            "type": "fixed",
+            "name": "MD5",
+            "size": 16
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be an Avro record") && msg.contains("'fixed'"),
+            "Expected type mismatch error for fixed type, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_type_map() {
+        let schema_json = r#"{
+            "type": "map",
+            "values": "string"
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("must be an Avro record") && msg.contains("'map'"),
+            "Expected type mismatch error for map type, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_missing_type_field() {
+        let schema_json = r#"{
+            "name": "Test",
+            "fields": [{"name": "a", "type": "int"}]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("missing required 'type' field"),
+            "Expected missing type error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_missing_fields() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test"
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("missing required 'fields'"),
+            "Expected missing fields error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_fields_not_array() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": "not an array"
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("'fields' must be an array"),
+            "Expected fields array error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_index_out_of_bounds() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[5]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("out of bounds") && msg.contains("5") && 
msg.contains("2"),
+            "Expected out of bounds error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_index_out_of_bounds_edge() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        // Index 1 is just out of bounds for a 1-element array
+        let err = schema.project(&[1]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("out of bounds") && msg.contains("1"),
+            "Expected out of bounds error for edge case, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_duplicate_index() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"},
+                {"name": "c", "type": "long"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0, 1, 0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Duplicate projection index") && msg.contains("0"),
+            "Expected duplicate index error, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_error_duplicate_index_consecutive() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "string"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[1, 1]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Duplicate projection index") && msg.contains("1"),
+            "Expected duplicate index error for consecutive duplicates, got: 
{msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_with_empty_fields() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "EmptyRecord",
+            "fields": []
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        // Projecting empty from empty should succeed
+        let projected = schema.project(&[]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert!(fields.is_empty());
+    }
+
+    #[test]
+    fn test_project_empty_fields_index_out_of_bounds() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "EmptyRecord",
+            "fields": []
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let err = schema.project(&[0]).unwrap_err();
+        let msg = err.to_string();
+        assert!(
+            msg.contains("out of bounds") && msg.contains("0 fields"),
+            "Expected out of bounds error for empty record, got: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_project_result_is_valid_avro_schema() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "namespace": "com.example",
+            "fields": [
+                {"name": "id", "type": "long"},
+                {"name": "name", "type": "string"},
+                {"name": "active", "type": "boolean"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        let projected = schema.project(&[0, 2]).unwrap();
+        // Verify the projected schema can be parsed as a valid Avro schema
+        let parsed = projected.schema();
+        assert!(parsed.is_ok(), "Projected schema should be valid Avro");
+        match parsed.unwrap() {
+            Schema::Complex(ComplexType::Record(r)) => {
+                assert_eq!(r.name, "Test");
+                assert_eq!(r.namespace, Some("com.example"));
+                assert_eq!(r.fields.len(), 2);
+                assert_eq!(r.fields[0].name, "id");
+                assert_eq!(r.fields[1].name, "active");
+            }
+            _ => panic!("Expected Record schema"),
+        }
+    }
+
+    #[test]
+    fn test_project_non_contiguous_indices() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "Test",
+            "fields": [
+                {"name": "f0", "type": "int"},
+                {"name": "f1", "type": "int"},
+                {"name": "f2", "type": "int"},
+                {"name": "f3", "type": "int"},
+                {"name": "f4", "type": "int"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        // Select every other field
+        let projected = schema.project(&[0, 2, 4]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 3);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f0"));
+        assert_eq!(fields[1].get("name").and_then(|n| n.as_str()), Some("f2"));
+        assert_eq!(fields[2].get("name").and_then(|n| n.as_str()), Some("f4"));
+    }
+
+    #[test]
+    fn test_project_single_field_from_many() {
+        let schema_json = r#"{
+            "type": "record",
+            "name": "BigRecord",
+            "fields": [
+                {"name": "f0", "type": "int"},
+                {"name": "f1", "type": "int"},
+                {"name": "f2", "type": "int"},
+                {"name": "f3", "type": "int"},
+                {"name": "f4", "type": "int"},
+                {"name": "f5", "type": "int"},
+                {"name": "f6", "type": "int"},
+                {"name": "f7", "type": "int"},
+                {"name": "f8", "type": "int"},
+                {"name": "f9", "type": "int"}
+            ]
+        }"#;
+        let schema = AvroSchema::new(schema_json.to_string());
+        // Select only the last field
+        let projected = schema.project(&[9]).unwrap();
+        let v: Value = serde_json::from_str(&projected.json_string).unwrap();
+        let fields = v.get("fields").and_then(|f| f.as_array()).unwrap();
+        assert_eq!(fields.len(), 1);
+        assert_eq!(fields[0].get("name").and_then(|n| n.as_str()), Some("f9"));
+    }
 }


Reply via email to