This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f0455d12dd Support Parsing Avro File Headers (#4888)
f0455d12dd is described below

commit f0455d12ddcb174f1f8d2bbfd5874f7b708c9a74
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Oct 4 12:42:49 2023 +0100

    Support Parsing Avro File Headers (#4888)
    
    * Add arrow-avro
    
    * Add HeaderDecoder
    
    * Add schema parsing
    
    * Add BlockDecoder
    
    * Further docs
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Review feedback
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .github/workflows/arrow.yml          |   5 +
 .github/workflows/dev_pr/labeler.yml |   1 +
 .github/workflows/integration.yml    |   1 +
 .github/workflows/miri.yaml          |   1 +
 .github/workflows/parquet.yml        |   1 +
 Cargo.toml                           |   1 +
 arrow-avro/Cargo.toml                |  46 ++++
 arrow-avro/src/compression.rs        |  32 +++
 arrow-avro/src/lib.rs                |  28 ++
 arrow-avro/src/reader/block.rs       | 141 ++++++++++
 arrow-avro/src/reader/header.rs      | 289 +++++++++++++++++++++
 arrow-avro/src/reader/mod.rs         |  92 +++++++
 arrow-avro/src/reader/vlq.rs         |  46 ++++
 arrow-avro/src/schema.rs             | 484 +++++++++++++++++++++++++++++++++++
 dev/release/README.md                |   1 +
 15 files changed, 1169 insertions(+)

diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index cde931c3c6..da56c23b5c 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -39,6 +39,7 @@ on:
       - arrow-integration-test/**
       - arrow-ipc/**
       - arrow-json/**
+      - arrow-avro/**
       - arrow-ord/**
       - arrow-row/**
       - arrow-schema/**
@@ -78,6 +79,8 @@ jobs:
         run: cargo test -p arrow-csv --all-features
       - name: Test arrow-json with all features
         run: cargo test -p arrow-json --all-features
+      - name: Test arrow-avro with all features
+        run: cargo test -p arrow-avro --all-features
       - name: Test arrow-string with all features
         run: cargo test -p arrow-string --all-features
       - name: Test arrow-ord with all features
@@ -202,6 +205,8 @@ jobs:
         run: cargo clippy -p arrow-csv --all-targets --all-features -- -D 
warnings
       - name: Clippy arrow-json with all features
         run: cargo clippy -p arrow-json --all-targets --all-features -- -D 
warnings
+      - name: Clippy arrow-avro with all features
+        run: cargo clippy -p arrow-avro --all-targets --all-features -- -D 
warnings
       - name: Clippy arrow-string with all features
         run: cargo clippy -p arrow-string --all-targets --all-features -- -D 
warnings
       - name: Clippy arrow-ord with all features
diff --git a/.github/workflows/dev_pr/labeler.yml 
b/.github/workflows/dev_pr/labeler.yml
index e5b86e8bcd..ea5873081f 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -27,6 +27,7 @@ arrow:
   - arrow-integration-testing/**/*
   - arrow-ipc/**/*
   - arrow-json/**/*
+  - arrow-avro/**/*
   - arrow-ord/**/*
   - arrow-row/**/*
   - arrow-schema/**/*
diff --git a/.github/workflows/integration.yml 
b/.github/workflows/integration.yml
index aaf39d22bb..eca51a80c1 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -38,6 +38,7 @@ on:
       - arrow-integration-testing/**
       - arrow-ipc/**
       - arrow-json/**
+      - arrow-avro/**
       - arrow-ord/**
       - arrow-pyarrow-integration-testing/**
       - arrow-schema/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index e3704d036a..19b432121b 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -36,6 +36,7 @@ on:
       - arrow-data/**
       - arrow-ipc/**
       - arrow-json/**
+      - arrow-avro/**
       - arrow-schema/**
       - arrow-select/**
       - arrow-string/**
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 7a649e16b1..d664a0dc07 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -40,6 +40,7 @@ on:
       - arrow-ipc/**
       - arrow-csv/**
       - arrow-json/**
+      - arrow-avro/**
       - parquet/**
       - .github/**
 
diff --git a/Cargo.toml b/Cargo.toml
index 936935ec7e..d874e335ee 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
     "arrow",
     "arrow-arith",
     "arrow-array",
+    "arrow-avro",
     "arrow-buffer",
     "arrow-cast",
     "arrow-csv",
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
new file mode 100644
index 0000000000..9575874c41
--- /dev/null
+++ b/arrow-avro/Cargo.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-avro"
+version = { workspace = true }
+description = "Support for parsing Avro format into the Arrow format"
+homepage = { workspace = true }
+repository = { workspace = true }
+authors = { workspace = true }
+license = { workspace = true }
+keywords = { workspace = true }
+include = { workspace = true }
+edition = { workspace = true }
+rust-version = { workspace = true }
+
+[lib]
+name = "arrow_avro"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { workspace = true  }
+arrow-buffer = { workspace = true  }
+arrow-cast = { workspace = true  }
+arrow-data = { workspace = true  }
+arrow-schema = { workspace = true  }
+serde_json = { version = "1.0", default-features = false, features = ["std"] }
+serde = { version = "1.0.188", features = ["derive"] }
+
+[dev-dependencies]
+
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
new file mode 100644
index 0000000000..a1a44fc22b
--- /dev/null
+++ b/arrow-avro/src/compression.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
+pub const CODEC_METADATA_KEY: &str = "avro.codec";
+
+#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum CompressionCodec {
+    Null,
+    Deflate,
+    BZip2,
+    Snappy,
+    XZ,
+    ZStandard,
+}
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
new file mode 100644
index 0000000000..e134d9d798
--- /dev/null
+++ b/arrow-avro/src/lib.rs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
+//!
+//! [Apache Arrow]: https://arrow.apache.org
+//! [Apache Avro]: https://avro.apache.org/
+
+#![allow(unused)] // Temporary
+
+pub mod reader;
+mod schema;
+
+mod compression;
diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs
new file mode 100644
index 0000000000..479f0ef909
--- /dev/null
+++ b/arrow-avro/src/reader/block.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decoder for [`Block`]
+
+use crate::reader::vlq::VLQDecoder;
+use arrow_schema::ArrowError;
+
+/// A file data block
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+#[derive(Debug, Default)]
+pub struct Block {
+    /// The number of objects in this block
+    pub count: usize,
+    /// The serialized objects within this block
+    pub data: Vec<u8>,
+    /// The sync marker
+    pub sync: [u8; 16],
+}
+
+/// A decoder for [`Block`]
+#[derive(Debug)]
+pub struct BlockDecoder {
+    state: BlockDecoderState,
+    in_progress: Block,
+    vlq_decoder: VLQDecoder,
+    bytes_remaining: usize,
+}
+
+#[derive(Debug)]
+enum BlockDecoderState {
+    Count,
+    Size,
+    Data,
+    Sync,
+    Finished,
+}
+
+impl Default for BlockDecoder {
+    fn default() -> Self {
+        Self {
+            state: BlockDecoderState::Count,
+            in_progress: Default::default(),
+            vlq_decoder: Default::default(),
+            bytes_remaining: 0,
+        }
+    }
+}
+
+impl BlockDecoder {
+    /// Parse [`Block`] from `buf`, returning the number of bytes read
+    ///
+    /// This method can be called multiple times with consecutive chunks of 
data, allowing
+    /// integration with chunked IO systems like [`BufRead::fill_buf`]
+    ///
+    /// All errors should be considered fatal, and decoding aborted
+    ///
+    /// Once an entire [`Block`] has been decoded this method will not read 
any further
+    /// input bytes, until [`Self::flush`] is called. Afterwards 
[`Self::decode`]
+    /// can then be used again to read the next block, if any
+    ///
+    /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
+    pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+        let max_read = buf.len();
+        while !buf.is_empty() {
+            match self.state {
+                BlockDecoderState::Count => {
+                    if let Some(c) = self.vlq_decoder.long(&mut buf) {
+                        self.in_progress.count = c.try_into().map_err(|_| {
+                            ArrowError::ParseError(format!(
+                                "Block count cannot be negative, got {c}"
+                            ))
+                        })?;
+
+                        self.state = BlockDecoderState::Size;
+                    }
+                }
+                BlockDecoderState::Size => {
+                    if let Some(c) = self.vlq_decoder.long(&mut buf) {
+                        self.bytes_remaining = c.try_into().map_err(|_| {
+                            ArrowError::ParseError(format!(
+                                "Block size cannot be negative, got {c}"
+                            ))
+                        })?;
+
+                        self.in_progress.data.reserve(self.bytes_remaining);
+                        self.state = BlockDecoderState::Data;
+                    }
+                }
+                BlockDecoderState::Data => {
+                    let to_read = self.bytes_remaining.min(buf.len());
+                    self.in_progress.data.extend_from_slice(&buf[..to_read]);
+                    buf = &buf[to_read..];
+                    self.bytes_remaining -= to_read;
+                    if self.bytes_remaining == 0 {
+                        self.bytes_remaining = 16;
+                        self.state = BlockDecoderState::Sync;
+                    }
+                }
+                BlockDecoderState::Sync => {
+                    let to_decode = buf.len().min(self.bytes_remaining);
+                    let write = &mut self.in_progress.sync[16 - to_decode..];
+                    write[..to_decode].copy_from_slice(&buf[..to_decode]);
+                    self.bytes_remaining -= to_decode;
+                    buf = &buf[to_decode..];
+                    if self.bytes_remaining == 0 {
+                        self.state = BlockDecoderState::Finished;
+                    }
+                }
+                BlockDecoderState::Finished => return Ok(max_read - buf.len()),
+            }
+        }
+        Ok(max_read)
+    }
+
+    /// Flush this decoder returning the parsed [`Block`] if any
+    pub fn flush(&mut self) -> Option<Block> {
+        match self.state {
+            BlockDecoderState::Finished => {
+                self.state = BlockDecoderState::Count;
+                Some(std::mem::take(&mut self.in_progress))
+            }
+            _ => None,
+        }
+    }
+}
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
new file mode 100644
index 0000000000..92db8b1dc7
--- /dev/null
+++ b/arrow-avro/src/reader/header.rs
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decoder for [`Header`]
+
+use crate::reader::vlq::VLQDecoder;
+use crate::schema::Schema;
+use arrow_schema::ArrowError;
+
+#[derive(Debug)]
+enum HeaderDecoderState {
+    /// Decoding the [`MAGIC`] prefix
+    Magic,
+    /// Decoding a block count
+    BlockCount,
+    /// Decoding a block byte length
+    BlockLen,
+    /// Decoding a key length
+    KeyLen,
+    /// Decoding a key string
+    Key,
+    /// Decoding a value length
+    ValueLen,
+    /// Decoding a value payload
+    Value,
+    /// Decoding sync marker
+    Sync,
+    /// Finished decoding
+    Finished,
+}
+
+/// A decoded header for an [Object Container 
File](https://avro.apache.org/docs/1.11.1/specification/#object-container-files)
+#[derive(Debug, Clone)]
+pub struct Header {
+    meta_offsets: Vec<usize>,
+    meta_buf: Vec<u8>,
+    sync: [u8; 16],
+}
+
+impl Header {
+    /// Returns an iterator over the meta keys in this header
+    pub fn metadata(&self) -> impl Iterator<Item = (&[u8], &[u8])> {
+        let mut last = 0;
+        self.meta_offsets.windows(2).map(move |w| {
+            let start = last;
+            last = w[1];
+            (&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]])
+        })
+    }
+
+    /// Returns the value for a given metadata key if present
+    pub fn get(&self, key: impl AsRef<[u8]>) -> Option<&[u8]> {
+        self.metadata()
+            .find_map(|(k, v)| (k == key.as_ref()).then_some(v))
+    }
+
+    /// Returns the sync token for this file
+    pub fn sync(&self) -> [u8; 16] {
+        self.sync
+    }
+}
+
+/// A decoder for [`Header`]
+///
+/// The avro file format does not encode the length of the header, and so it
+/// is necessary to provide a push-based decoder that can be used with streams
+#[derive(Debug)]
+pub struct HeaderDecoder {
+    state: HeaderDecoderState,
+    vlq_decoder: VLQDecoder,
+
+    /// The end offsets of strings in `meta_buf`
+    meta_offsets: Vec<usize>,
+    /// The raw binary data of the metadata map
+    meta_buf: Vec<u8>,
+
+    /// The decoded sync marker
+    sync_marker: [u8; 16],
+
+    /// The number of remaining tuples in the current block
+    tuples_remaining: usize,
+    /// The number of bytes remaining in the current string/bytes payload
+    bytes_remaining: usize,
+}
+
+impl Default for HeaderDecoder {
+    fn default() -> Self {
+        Self {
+            state: HeaderDecoderState::Magic,
+            meta_offsets: vec![],
+            meta_buf: vec![],
+            sync_marker: [0; 16],
+            vlq_decoder: Default::default(),
+            tuples_remaining: 0,
+            bytes_remaining: MAGIC.len(),
+        }
+    }
+}
+
+const MAGIC: &[u8; 4] = b"Obj\x01";
+
+impl HeaderDecoder {
+    /// Parse [`Header`] from `buf`, returning the number of bytes read
+    ///
+    /// This method can be called multiple times with consecutive chunks of 
data, allowing
+    /// integration with chunked IO systems like [`BufRead::fill_buf`]
+    ///
+    /// All errors should be considered fatal, and decoding aborted
+    ///
+    /// Once the entire [`Header`] has been decoded this method will not read 
any further
+    /// input bytes, and the header can be obtained with [`Self::flush`]
+    ///
+    /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
+    pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+        let max_read = buf.len();
+        while !buf.is_empty() {
+            match self.state {
+                HeaderDecoderState::Magic => {
+                    let remaining = &MAGIC[MAGIC.len() - 
self.bytes_remaining..];
+                    let to_decode = buf.len().min(remaining.len());
+                    if !buf.starts_with(&remaining[..to_decode]) {
+                        return Err(ArrowError::ParseError(
+                            "Incorrect avro magic".to_string(),
+                        ));
+                    }
+                    self.bytes_remaining -= to_decode;
+                    buf = &buf[to_decode..];
+                    if self.bytes_remaining == 0 {
+                        self.state = HeaderDecoderState::BlockCount;
+                    }
+                }
+                HeaderDecoderState::BlockCount => {
+                    if let Some(block_count) = self.vlq_decoder.long(&mut buf) 
{
+                        match block_count.try_into() {
+                            Ok(0) => {
+                                self.state = HeaderDecoderState::Sync;
+                                self.bytes_remaining = 16;
+                            }
+                            Ok(remaining) => {
+                                self.tuples_remaining = remaining;
+                                self.state = HeaderDecoderState::KeyLen;
+                            }
+                            Err(_) => {
+                                self.tuples_remaining = 
block_count.unsigned_abs() as _;
+                                self.state = HeaderDecoderState::BlockLen;
+                            }
+                        }
+                    }
+                }
+                HeaderDecoderState::BlockLen => {
+                    if self.vlq_decoder.long(&mut buf).is_some() {
+                        self.state = HeaderDecoderState::KeyLen
+                    }
+                }
+                HeaderDecoderState::Key => {
+                    let to_read = self.bytes_remaining.min(buf.len());
+                    self.meta_buf.extend_from_slice(&buf[..to_read]);
+                    self.bytes_remaining -= to_read;
+                    buf = &buf[to_read..];
+                    if self.bytes_remaining == 0 {
+                        self.meta_offsets.push(self.meta_buf.len());
+                        self.state = HeaderDecoderState::ValueLen;
+                    }
+                }
+                HeaderDecoderState::Value => {
+                    let to_read = self.bytes_remaining.min(buf.len());
+                    self.meta_buf.extend_from_slice(&buf[..to_read]);
+                    self.bytes_remaining -= to_read;
+                    buf = &buf[to_read..];
+                    if self.bytes_remaining == 0 {
+                        self.meta_offsets.push(self.meta_buf.len());
+
+                        self.tuples_remaining -= 1;
+                        match self.tuples_remaining {
+                            0 => self.state = HeaderDecoderState::BlockCount,
+                            _ => self.state = HeaderDecoderState::KeyLen,
+                        }
+                    }
+                }
+                HeaderDecoderState::KeyLen => {
+                    if let Some(len) = self.vlq_decoder.long(&mut buf) {
+                        self.bytes_remaining = len as _;
+                        self.state = HeaderDecoderState::Key;
+                    }
+                }
+                HeaderDecoderState::ValueLen => {
+                    if let Some(len) = self.vlq_decoder.long(&mut buf) {
+                        self.bytes_remaining = len as _;
+                        self.state = HeaderDecoderState::Value;
+                    }
+                }
+                HeaderDecoderState::Sync => {
+                    let to_decode = buf.len().min(self.bytes_remaining);
+                    let write = &mut self.sync_marker[16 - to_decode..];
+                    write[..to_decode].copy_from_slice(&buf[..to_decode]);
+                    self.bytes_remaining -= to_decode;
+                    buf = &buf[to_decode..];
+                    if self.bytes_remaining == 0 {
+                        self.state = HeaderDecoderState::Finished;
+                    }
+                }
+                HeaderDecoderState::Finished => return Ok(max_read - 
buf.len()),
+            }
+        }
+        Ok(max_read)
+    }
+
+    /// Flush this decoder returning the parsed [`Header`] if any
+    pub fn flush(&mut self) -> Option<Header> {
+        match self.state {
+            HeaderDecoderState::Finished => {
+                self.state = HeaderDecoderState::Magic;
+                Some(Header {
+                    meta_offsets: std::mem::take(&mut self.meta_offsets),
+                    meta_buf: std::mem::take(&mut self.meta_buf),
+                    sync: self.sync_marker,
+                })
+            }
+            _ => None,
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::reader::read_header;
+    use crate::schema::SCHEMA_METADATA_KEY;
+    use std::fs::File;
+    use std::io::{BufRead, BufReader};
+
+    #[test]
+    fn test_header_decode() {
+        let mut decoder = HeaderDecoder::default();
+        for m in MAGIC {
+            decoder.decode(std::slice::from_ref(m)).unwrap();
+        }
+
+        let mut decoder = HeaderDecoder::default();
+        assert_eq!(decoder.decode(MAGIC).unwrap(), 4);
+
+        let mut decoder = HeaderDecoder::default();
+        decoder.decode(b"Ob").unwrap();
+        let err = decoder.decode(b"s").unwrap_err().to_string();
+        assert_eq!(err, "Parser error: Incorrect avro magic");
+    }
+
+    fn decode_file(file: &str) -> Header {
+        let file = File::open(file).unwrap();
+        read_header(BufReader::with_capacity(100, file)).unwrap()
+    }
+
+    #[test]
+    fn test_header() {
+        let header = decode_file("../testing/data/avro/alltypes_plain.avro");
+        let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
+        let expected = 
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"id","type":["int","null"]},{"name":"bool_col","type":["boolean","null"]},{"name":"tinyint_col","type":["int","null"]},{"name":"smallint_col","type":["int","null"]},{"name":"int_col","type":["int","null"]},{"name":"bigint_col","type":["long","null"]},{"name":"float_col","type":["float","null"]},{"name":"double_col","type":["double","null"]},{"name":"date_string_col","type":["bytes","null"]},{"name":"str
 [...]
+        assert_eq!(schema_json, expected);
+        let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
+        assert_eq!(
+            u128::from_le_bytes(header.sync()),
+            226966037233754408753420635932530907102
+        );
+
+        let header = 
decode_file("../testing/data/avro/fixed_length_decimal.avro");
+        let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
+        let expected = 
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#;
+        assert_eq!(schema_json, expected);
+        let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
+        assert_eq!(
+            u128::from_le_bytes(header.sync()),
+            325166208089902833952788552656412487328
+        );
+    }
+}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
new file mode 100644
index 0000000000..a42011e3b2
--- /dev/null
+++ b/arrow-avro/src/reader/mod.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read Avro data to Arrow
+
+use crate::reader::block::{Block, BlockDecoder};
+use crate::reader::header::{Header, HeaderDecoder};
+use arrow_schema::ArrowError;
+use std::io::BufRead;
+
+mod header;
+
+mod block;
+
+mod vlq;
+
+/// Read a [`Header`] from the provided [`BufRead`]
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+
+    decoder
+        .flush()
+        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+}
+
+/// Return an iterator of [`Block`] from the provided [`BufRead`]
+fn read_blocks<R: BufRead>(
+    mut reader: R,
+) -> impl Iterator<Item = Result<Block, ArrowError>> {
+    let mut decoder = BlockDecoder::default();
+
+    let mut try_next = move || {
+        loop {
+            let buf = reader.fill_buf()?;
+            if buf.is_empty() {
+                break;
+            }
+            let read = buf.len();
+            let decoded = decoder.decode(buf)?;
+            reader.consume(decoded);
+            if decoded != read {
+                break;
+            }
+        }
+        Ok(decoder.flush())
+    };
+    std::iter::from_fn(move || try_next().transpose())
+}
+
+#[cfg(test)]
+mod test {
+    use crate::reader::{read_blocks, read_header};
+    use std::fs::File;
+    use std::io::BufReader;
+
+    #[test]
+    fn test_mux() {
+        let file = 
File::open("../testing/data/avro/alltypes_plain.avro").unwrap();
+        let mut reader = BufReader::new(file);
+        let header = read_header(&mut reader).unwrap();
+        for result in read_blocks(reader) {
+            let block = result.unwrap();
+            assert_eq!(block.sync, header.sync());
+        }
+    }
+}
diff --git a/arrow-avro/src/reader/vlq.rs b/arrow-avro/src/reader/vlq.rs
new file mode 100644
index 0000000000..80f1c60eec
--- /dev/null
+++ b/arrow-avro/src/reader/vlq.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Decoder for zig-zag encoded variable length (VLW) integers
+///
+/// See also:
+/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types-1>
+/// <https://protobuf.dev/programming-guides/encoding/#varints>
+#[derive(Debug, Default)]
+pub struct VLQDecoder {
+    /// Scratch space for decoding VLQ integers
+    in_progress: u64,
+    shift: u32,
+}
+
+impl VLQDecoder {
+    /// Decode a signed long from `buf`
+    pub fn long(&mut self, buf: &mut &[u8]) -> Option<i64> {
+        while let Some(byte) = buf.first().copied() {
+            *buf = &buf[1..];
+            self.in_progress |= ((byte & 0x7F) as u64) << self.shift;
+            self.shift += 7;
+            if byte & 0x80 == 0 {
+                let val = self.in_progress;
+                self.in_progress = 0;
+                self.shift = 0;
+                return Some((val >> 1) as i64 ^ -((val & 1) as i64));
+            }
+        }
+        None
+    }
+}
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
new file mode 100644
index 0000000000..839ba65bd5
--- /dev/null
+++ b/arrow-avro/src/schema.rs
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// The metadata key used for storing the JSON encoded [`Schema`]
+pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
+
+/// Either a [`PrimitiveType`] or a reference to a previously defined named 
type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#names>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum TypeName<'a> {
+    Primitive(PrimitiveType),
+    Ref(&'a str),
+}
+
+/// A primitive type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum PrimitiveType {
+    Null,
+    Boolean,
+    Int,
+    Long,
+    Float,
+    Double,
+    Bytes,
+    String,
+}
+
+/// Additional attributes within a [`Schema`]
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
+#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Attributes<'a> {
+    /// A logical type name
+    ///
+    /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
+    #[serde(default)]
+    pub logical_type: Option<&'a str>,
+
+    /// Additional JSON attributes
+    #[serde(flatten)]
+    pub additional: HashMap<&'a str, serde_json::Value>,
+}
+
+/// A type definition that is not a variant of [`ComplexType`]
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Type<'a> {
+    #[serde(borrow)]
+    pub r#type: TypeName<'a>,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+/// An Avro schema
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum Schema<'a> {
+    #[serde(borrow)]
+    TypeName(TypeName<'a>),
+    #[serde(borrow)]
+    Union(Vec<Schema<'a>>),
+    #[serde(borrow)]
+    Complex(ComplexType<'a>),
+    #[serde(borrow)]
+    Type(Type<'a>),
+}
+
+/// A complex type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum ComplexType<'a> {
+    #[serde(borrow)]
+    Union(Vec<Schema<'a>>),
+    #[serde(borrow)]
+    Record(Record<'a>),
+    #[serde(borrow)]
+    Enum(Enum<'a>),
+    #[serde(borrow)]
+    Array(Array<'a>),
+    #[serde(borrow)]
+    Map(Map<'a>),
+    #[serde(borrow)]
+    Fixed(Fixed<'a>),
+}
+
+/// A record
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Record<'a> {
+    #[serde(borrow)]
+    pub name: &'a str,
+    #[serde(borrow, default)]
+    pub namespace: Option<&'a str>,
+    #[serde(borrow, default)]
+    pub doc: Option<&'a str>,
+    #[serde(borrow, default)]
+    pub aliases: Vec<&'a str>,
+    #[serde(borrow)]
+    pub fields: Vec<Field<'a>>,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+/// A field within a [`Record`]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Field<'a> {
+    #[serde(borrow)]
+    pub name: &'a str,
+    #[serde(borrow, default)]
+    pub doc: Option<&'a str>,
+    #[serde(borrow)]
+    pub r#type: Schema<'a>,
+    #[serde(borrow, default)]
+    pub default: Option<&'a str>,
+}
+
+/// An enumeration
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Enum<'a> {
+    #[serde(borrow)]
+    pub name: &'a str,
+    #[serde(borrow, default)]
+    pub namespace: Option<&'a str>,
+    #[serde(borrow, default)]
+    pub doc: Option<&'a str>,
+    #[serde(borrow, default)]
+    pub aliases: Vec<&'a str>,
+    #[serde(borrow)]
+    pub symbols: Vec<&'a str>,
+    #[serde(borrow, default)]
+    pub default: Option<&'a str>,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+/// An array
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Array<'a> {
+    #[serde(borrow)]
+    pub items: Box<Schema<'a>>,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+/// A map
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Map<'a> {
+    #[serde(borrow)]
+    pub values: Box<Schema<'a>>,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+/// A fixed length binary array
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Fixed<'a> {
+    #[serde(borrow)]
+    pub name: &'a str,
+    #[serde(borrow, default)]
+    pub namespace: Option<&'a str>,
+    #[serde(borrow, default)]
+    pub aliases: Vec<&'a str>,
+    pub size: usize,
+    #[serde(flatten)]
+    pub attributes: Attributes<'a>,
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde_json::json;
+    #[test]
+    fn test_deserialize() {
+        let t: Schema = serde_json::from_str("\"string\"").unwrap();
+        assert_eq!(
+            t,
+            Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
+        );
+
+        let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
+        assert_eq!(
+            t,
+            Schema::Union(vec![
+                Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+                Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+            ])
+        );
+
+        let t: Type = serde_json::from_str(
+            r#"{
+                   "type":"long",
+                   "logicalType":"timestamp-micros"
+                }"#,
+        )
+        .unwrap();
+
+        let timestamp = Type {
+            r#type: TypeName::Primitive(PrimitiveType::Long),
+            attributes: Attributes {
+                logical_type: Some("timestamp-micros"),
+                additional: Default::default(),
+            },
+        };
+
+        assert_eq!(t, timestamp);
+
+        let t: ComplexType = serde_json::from_str(
+            r#"{
+                   "type":"fixed",
+                   "name":"fixed",
+                   "namespace":"topLevelRecord.value",
+                   "size":11,
+                   "logicalType":"decimal",
+                   "precision":25,
+                   "scale":2
+                }"#,
+        )
+        .unwrap();
+
+        let decimal = ComplexType::Fixed(Fixed {
+            name: "fixed",
+            namespace: Some("topLevelRecord.value"),
+            aliases: vec![],
+            size: 11,
+            attributes: Attributes {
+                logical_type: Some("decimal"),
+                additional: vec![("precision", json!(25)), ("scale", json!(2))]
+                    .into_iter()
+                    .collect(),
+            },
+        });
+
+        assert_eq!(t, decimal);
+
+        let schema: Schema = serde_json::from_str(
+            r#"{
+               "type":"record",
+               "name":"topLevelRecord",
+               "fields":[
+                  {
+                     "name":"value",
+                     "type":[
+                        {
+                           "type":"fixed",
+                           "name":"fixed",
+                           "namespace":"topLevelRecord.value",
+                           "size":11,
+                           "logicalType":"decimal",
+                           "precision":25,
+                           "scale":2
+                        },
+                        "null"
+                     ]
+                  }
+               ]
+            }"#,
+        )
+        .unwrap();
+
+        assert_eq!(
+            schema,
+            Schema::Complex(ComplexType::Record(Record {
+                name: "topLevelRecord",
+                namespace: None,
+                doc: None,
+                aliases: vec![],
+                fields: vec![Field {
+                    name: "value",
+                    doc: None,
+                    r#type: Schema::Union(vec![
+                        Schema::Complex(decimal),
+                        
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                    ]),
+                    default: None,
+                },],
+                attributes: Default::default(),
+            }))
+        );
+
+        let schema: Schema = serde_json::from_str(
+            r#"{
+                  "type": "record",
+                  "name": "LongList",
+                  "aliases": ["LinkedLongs"],
+                  "fields" : [
+                    {"name": "value", "type": "long"},
+                    {"name": "next", "type": ["null", "LongList"]}
+                  ]
+                }"#,
+        )
+        .unwrap();
+
+        assert_eq!(
+            schema,
+            Schema::Complex(ComplexType::Record(Record {
+                name: "LongList",
+                namespace: None,
+                doc: None,
+                aliases: vec!["LinkedLongs"],
+                fields: vec![
+                    Field {
+                        name: "value",
+                        doc: None,
+                        r#type: Schema::TypeName(TypeName::Primitive(
+                            PrimitiveType::Long
+                        )),
+                        default: None,
+                    },
+                    Field {
+                        name: "next",
+                        doc: None,
+                        r#type: Schema::Union(vec![
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                            Schema::TypeName(TypeName::Ref("LongList")),
+                        ]),
+                        default: None,
+                    }
+                ],
+                attributes: Attributes::default(),
+            }))
+        );
+
+        let schema: Schema = serde_json::from_str(
+            r#"{
+               "type":"record",
+               "name":"topLevelRecord",
+               "fields":[
+                  {
+                     "name":"id",
+                     "type":[
+                        "int",
+                        "null"
+                     ]
+                  },
+                  {
+                     "name":"timestamp_col",
+                     "type":[
+                        {
+                           "type":"long",
+                           "logicalType":"timestamp-micros"
+                        },
+                        "null"
+                     ]
+                  }
+               ]
+            }"#,
+        )
+        .unwrap();
+
+        assert_eq!(
+            schema,
+            Schema::Complex(ComplexType::Record(Record {
+                name: "topLevelRecord",
+                namespace: None,
+                doc: None,
+                aliases: vec![],
+                fields: vec![
+                    Field {
+                        name: "id",
+                        doc: None,
+                        r#type: Schema::Union(vec![
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                        ]),
+                        default: None,
+                    },
+                    Field {
+                        name: "timestamp_col",
+                        doc: None,
+                        r#type: Schema::Union(vec![
+                            Schema::Type(timestamp),
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                        ]),
+                        default: None,
+                    }
+                ],
+                attributes: Default::default(),
+            }))
+        );
+
+        let schema: Schema = serde_json::from_str(
+            r#"{
+                  "type": "record",
+                  "name": "HandshakeRequest", 
"namespace":"org.apache.avro.ipc",
+                  "fields": [
+                    {"name": "clientHash",
+                     "type": {"type": "fixed", "name": "MD5", "size": 16}},
+                    {"name": "clientProtocol", "type": ["null", "string"]},
+                    {"name": "serverHash", "type": "MD5"},
+                    {"name": "meta", "type": ["null", {"type": "map", 
"values": "bytes"}]}
+                  ]
+            }"#,
+        )
+        .unwrap();
+
+        assert_eq!(
+            schema,
+            Schema::Complex(ComplexType::Record(Record {
+                name: "HandshakeRequest",
+                namespace: Some("org.apache.avro.ipc"),
+                doc: None,
+                aliases: vec![],
+                fields: vec![
+                    Field {
+                        name: "clientHash",
+                        doc: None,
+                        r#type: Schema::Complex(ComplexType::Fixed(Fixed {
+                            name: "MD5",
+                            namespace: None,
+                            aliases: vec![],
+                            size: 16,
+                            attributes: Default::default(),
+                        })),
+                        default: None,
+                    },
+                    Field {
+                        name: "clientProtocol",
+                        doc: None,
+                        r#type: Schema::Union(vec![
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+                        ]),
+                        default: None,
+                    },
+                    Field {
+                        name: "serverHash",
+                        doc: None,
+                        r#type: Schema::TypeName(TypeName::Ref("MD5")),
+                        default: None,
+                    },
+                    Field {
+                        name: "meta",
+                        doc: None,
+                        r#type: Schema::Union(vec![
+                            
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                            Schema::Complex(ComplexType::Map(Map {
+                                values: 
Box::new(Schema::TypeName(TypeName::Primitive(
+                                    PrimitiveType::Bytes
+                                ))),
+                                attributes: Default::default(),
+                            })),
+                        ]),
+                        default: None,
+                    }
+                ],
+                attributes: Default::default(),
+            }))
+        );
+    }
+}
diff --git a/dev/release/README.md b/dev/release/README.md
index 30b3a4a8a5..177f33bcbb 100644
--- a/dev/release/README.md
+++ b/dev/release/README.md
@@ -258,6 +258,7 @@ Rust Arrow Crates:
 (cd arrow-ipc && cargo publish)
 (cd arrow-csv && cargo publish)
 (cd arrow-json && cargo publish)
+(cd arrow-avro && cargo publish)
 (cd arrow-ord && cargo publish)
 (cd arrow-arith && cargo publish)
 (cd arrow-string && cargo publish)


Reply via email to