This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f0455d12dd Support Parsing Avro File Headers (#4888)
f0455d12dd is described below
commit f0455d12ddcb174f1f8d2bbfd5874f7b708c9a74
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Oct 4 12:42:49 2023 +0100
Support Parsing Avro File Headers (#4888)
* Add arrow-avro
* Add HeaderDecoder
* Add schema parsing
* Add BlockDecoder
* Further docs
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* Review feedback
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.github/workflows/arrow.yml | 5 +
.github/workflows/dev_pr/labeler.yml | 1 +
.github/workflows/integration.yml | 1 +
.github/workflows/miri.yaml | 1 +
.github/workflows/parquet.yml | 1 +
Cargo.toml | 1 +
arrow-avro/Cargo.toml | 46 ++++
arrow-avro/src/compression.rs | 32 +++
arrow-avro/src/lib.rs | 28 ++
arrow-avro/src/reader/block.rs | 141 ++++++++++
arrow-avro/src/reader/header.rs | 289 +++++++++++++++++++++
arrow-avro/src/reader/mod.rs | 92 +++++++
arrow-avro/src/reader/vlq.rs | 46 ++++
arrow-avro/src/schema.rs | 484 +++++++++++++++++++++++++++++++++++
dev/release/README.md | 1 +
15 files changed, 1169 insertions(+)
diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index cde931c3c6..da56c23b5c 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -39,6 +39,7 @@ on:
- arrow-integration-test/**
- arrow-ipc/**
- arrow-json/**
+ - arrow-avro/**
- arrow-ord/**
- arrow-row/**
- arrow-schema/**
@@ -78,6 +79,8 @@ jobs:
run: cargo test -p arrow-csv --all-features
- name: Test arrow-json with all features
run: cargo test -p arrow-json --all-features
+ - name: Test arrow-avro with all features
+ run: cargo test -p arrow-avro --all-features
- name: Test arrow-string with all features
run: cargo test -p arrow-string --all-features
- name: Test arrow-ord with all features
@@ -202,6 +205,8 @@ jobs:
run: cargo clippy -p arrow-csv --all-targets --all-features -- -D
warnings
- name: Clippy arrow-json with all features
run: cargo clippy -p arrow-json --all-targets --all-features -- -D
warnings
+ - name: Clippy arrow-avro with all features
+ run: cargo clippy -p arrow-avro --all-targets --all-features -- -D
warnings
- name: Clippy arrow-string with all features
run: cargo clippy -p arrow-string --all-targets --all-features -- -D
warnings
- name: Clippy arrow-ord with all features
diff --git a/.github/workflows/dev_pr/labeler.yml
b/.github/workflows/dev_pr/labeler.yml
index e5b86e8bcd..ea5873081f 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -27,6 +27,7 @@ arrow:
- arrow-integration-testing/**/*
- arrow-ipc/**/*
- arrow-json/**/*
+ - arrow-avro/**/*
- arrow-ord/**/*
- arrow-row/**/*
- arrow-schema/**/*
diff --git a/.github/workflows/integration.yml
b/.github/workflows/integration.yml
index aaf39d22bb..eca51a80c1 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -38,6 +38,7 @@ on:
- arrow-integration-testing/**
- arrow-ipc/**
- arrow-json/**
+ - arrow-avro/**
- arrow-ord/**
- arrow-pyarrow-integration-testing/**
- arrow-schema/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index e3704d036a..19b432121b 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -36,6 +36,7 @@ on:
- arrow-data/**
- arrow-ipc/**
- arrow-json/**
+ - arrow-avro/**
- arrow-schema/**
- arrow-select/**
- arrow-string/**
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 7a649e16b1..d664a0dc07 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -40,6 +40,7 @@ on:
- arrow-ipc/**
- arrow-csv/**
- arrow-json/**
+ - arrow-avro/**
- parquet/**
- .github/**
diff --git a/Cargo.toml b/Cargo.toml
index 936935ec7e..d874e335ee 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
"arrow",
"arrow-arith",
"arrow-array",
+ "arrow-avro",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
new file mode 100644
index 0000000000..9575874c41
--- /dev/null
+++ b/arrow-avro/Cargo.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-avro"
+version = { workspace = true }
+description = "Support for parsing Avro format into the Arrow format"
+homepage = { workspace = true }
+repository = { workspace = true }
+authors = { workspace = true }
+license = { workspace = true }
+keywords = { workspace = true }
+include = { workspace = true }
+edition = { workspace = true }
+rust-version = { workspace = true }
+
+[lib]
+name = "arrow_avro"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-data = { workspace = true }
+arrow-schema = { workspace = true }
+serde_json = { version = "1.0", default-features = false, features = ["std"] }
+serde = { version = "1.0.188", features = ["derive"] }
+
+[dev-dependencies]
+
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
new file mode 100644
index 0000000000..a1a44fc22b
--- /dev/null
+++ b/arrow-avro/src/compression.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
+pub const CODEC_METADATA_KEY: &str = "avro.codec";
+
+#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum CompressionCodec {
+ Null,
+ Deflate,
+ BZip2,
+ Snappy,
+ XZ,
+ ZStandard,
+}
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
new file mode 100644
index 0000000000..e134d9d798
--- /dev/null
+++ b/arrow-avro/src/lib.rs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
+//!
+//! [Apache Arrow]: https://arrow.apache.org
+//! [Apache Avro]: https://avro.apache.org/
+
+#![allow(unused)] // Temporary
+
+pub mod reader;
+mod schema;
+
+mod compression;
diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs
new file mode 100644
index 0000000000..479f0ef909
--- /dev/null
+++ b/arrow-avro/src/reader/block.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decoder for [`Block`]
+
+use crate::reader::vlq::VLQDecoder;
+use arrow_schema::ArrowError;
+
+/// A file data block
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+#[derive(Debug, Default)]
+pub struct Block {
+ /// The number of objects in this block
+ pub count: usize,
+ /// The serialized objects within this block
+ pub data: Vec<u8>,
+ /// The sync marker
+ pub sync: [u8; 16],
+}
+
+/// A decoder for [`Block`]
+#[derive(Debug)]
+pub struct BlockDecoder {
+ state: BlockDecoderState,
+ in_progress: Block,
+ vlq_decoder: VLQDecoder,
+ bytes_remaining: usize,
+}
+
+#[derive(Debug)]
+enum BlockDecoderState {
+ Count,
+ Size,
+ Data,
+ Sync,
+ Finished,
+}
+
+impl Default for BlockDecoder {
+ fn default() -> Self {
+ Self {
+ state: BlockDecoderState::Count,
+ in_progress: Default::default(),
+ vlq_decoder: Default::default(),
+ bytes_remaining: 0,
+ }
+ }
+}
+
+impl BlockDecoder {
+ /// Parse [`Block`] from `buf`, returning the number of bytes read
+ ///
+ /// This method can be called multiple times with consecutive chunks of
data, allowing
+ /// integration with chunked IO systems like [`BufRead::fill_buf`]
+ ///
+ /// All errors should be considered fatal, and decoding aborted
+ ///
+ /// Once an entire [`Block`] has been decoded this method will not read
any further
+ /// input bytes, until [`Self::flush`] is called. Afterwards
[`Self::decode`]
+ /// can then be used again to read the next block, if any
+ ///
+ /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
+ pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+ let max_read = buf.len();
+ while !buf.is_empty() {
+ match self.state {
+ BlockDecoderState::Count => {
+ if let Some(c) = self.vlq_decoder.long(&mut buf) {
+ self.in_progress.count = c.try_into().map_err(|_| {
+ ArrowError::ParseError(format!(
+ "Block count cannot be negative, got {c}"
+ ))
+ })?;
+
+ self.state = BlockDecoderState::Size;
+ }
+ }
+ BlockDecoderState::Size => {
+ if let Some(c) = self.vlq_decoder.long(&mut buf) {
+ self.bytes_remaining = c.try_into().map_err(|_| {
+ ArrowError::ParseError(format!(
+ "Block size cannot be negative, got {c}"
+ ))
+ })?;
+
+ self.in_progress.data.reserve(self.bytes_remaining);
+ self.state = BlockDecoderState::Data;
+ }
+ }
+ BlockDecoderState::Data => {
+ let to_read = self.bytes_remaining.min(buf.len());
+ self.in_progress.data.extend_from_slice(&buf[..to_read]);
+ buf = &buf[to_read..];
+ self.bytes_remaining -= to_read;
+ if self.bytes_remaining == 0 {
+ self.bytes_remaining = 16;
+ self.state = BlockDecoderState::Sync;
+ }
+ }
+ BlockDecoderState::Sync => {
+ let to_decode = buf.len().min(self.bytes_remaining);
+ let write = &mut self.in_progress.sync[16 - to_decode..];
+ write[..to_decode].copy_from_slice(&buf[..to_decode]);
+ self.bytes_remaining -= to_decode;
+ buf = &buf[to_decode..];
+ if self.bytes_remaining == 0 {
+ self.state = BlockDecoderState::Finished;
+ }
+ }
+ BlockDecoderState::Finished => return Ok(max_read - buf.len()),
+ }
+ }
+ Ok(max_read)
+ }
+
+ /// Flush this decoder returning the parsed [`Block`] if any
+ pub fn flush(&mut self) -> Option<Block> {
+ match self.state {
+ BlockDecoderState::Finished => {
+ self.state = BlockDecoderState::Count;
+ Some(std::mem::take(&mut self.in_progress))
+ }
+ _ => None,
+ }
+ }
+}
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
new file mode 100644
index 0000000000..92db8b1dc7
--- /dev/null
+++ b/arrow-avro/src/reader/header.rs
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decoder for [`Header`]
+
+use crate::reader::vlq::VLQDecoder;
+use crate::schema::Schema;
+use arrow_schema::ArrowError;
+
+#[derive(Debug)]
+enum HeaderDecoderState {
+ /// Decoding the [`MAGIC`] prefix
+ Magic,
+ /// Decoding a block count
+ BlockCount,
+ /// Decoding a block byte length
+ BlockLen,
+ /// Decoding a key length
+ KeyLen,
+ /// Decoding a key string
+ Key,
+ /// Decoding a value length
+ ValueLen,
+ /// Decoding a value payload
+ Value,
+ /// Decoding sync marker
+ Sync,
+ /// Finished decoding
+ Finished,
+}
+
+/// A decoded header for an [Object Container
File](https://avro.apache.org/docs/1.11.1/specification/#object-container-files)
+#[derive(Debug, Clone)]
+pub struct Header {
+ meta_offsets: Vec<usize>,
+ meta_buf: Vec<u8>,
+ sync: [u8; 16],
+}
+
+impl Header {
+ /// Returns an iterator over the meta keys in this header
+ pub fn metadata(&self) -> impl Iterator<Item = (&[u8], &[u8])> {
+ let mut last = 0;
+ self.meta_offsets.windows(2).map(move |w| {
+ let start = last;
+ last = w[1];
+ (&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]])
+ })
+ }
+
+ /// Returns the value for a given metadata key if present
+ pub fn get(&self, key: impl AsRef<[u8]>) -> Option<&[u8]> {
+ self.metadata()
+ .find_map(|(k, v)| (k == key.as_ref()).then_some(v))
+ }
+
+ /// Returns the sync token for this file
+ pub fn sync(&self) -> [u8; 16] {
+ self.sync
+ }
+}
+
+/// A decoder for [`Header`]
+///
+/// The avro file format does not encode the length of the header, and so it
+/// is necessary to provide a push-based decoder that can be used with streams
+#[derive(Debug)]
+pub struct HeaderDecoder {
+ state: HeaderDecoderState,
+ vlq_decoder: VLQDecoder,
+
+ /// The end offsets of strings in `meta_buf`
+ meta_offsets: Vec<usize>,
+ /// The raw binary data of the metadata map
+ meta_buf: Vec<u8>,
+
+ /// The decoded sync marker
+ sync_marker: [u8; 16],
+
+ /// The number of remaining tuples in the current block
+ tuples_remaining: usize,
+ /// The number of bytes remaining in the current string/bytes payload
+ bytes_remaining: usize,
+}
+
+impl Default for HeaderDecoder {
+ fn default() -> Self {
+ Self {
+ state: HeaderDecoderState::Magic,
+ meta_offsets: vec![],
+ meta_buf: vec![],
+ sync_marker: [0; 16],
+ vlq_decoder: Default::default(),
+ tuples_remaining: 0,
+ bytes_remaining: MAGIC.len(),
+ }
+ }
+}
+
+const MAGIC: &[u8; 4] = b"Obj\x01";
+
+impl HeaderDecoder {
+ /// Parse [`Header`] from `buf`, returning the number of bytes read
+ ///
+ /// This method can be called multiple times with consecutive chunks of
data, allowing
+ /// integration with chunked IO systems like [`BufRead::fill_buf`]
+ ///
+ /// All errors should be considered fatal, and decoding aborted
+ ///
+ /// Once the entire [`Header`] has been decoded this method will not read
any further
+ /// input bytes, and the header can be obtained with [`Self::flush`]
+ ///
+ /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
+ pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+ let max_read = buf.len();
+ while !buf.is_empty() {
+ match self.state {
+ HeaderDecoderState::Magic => {
+ let remaining = &MAGIC[MAGIC.len() -
self.bytes_remaining..];
+ let to_decode = buf.len().min(remaining.len());
+ if !buf.starts_with(&remaining[..to_decode]) {
+ return Err(ArrowError::ParseError(
+ "Incorrect avro magic".to_string(),
+ ));
+ }
+ self.bytes_remaining -= to_decode;
+ buf = &buf[to_decode..];
+ if self.bytes_remaining == 0 {
+ self.state = HeaderDecoderState::BlockCount;
+ }
+ }
+ HeaderDecoderState::BlockCount => {
+ if let Some(block_count) = self.vlq_decoder.long(&mut buf)
{
+ match block_count.try_into() {
+ Ok(0) => {
+ self.state = HeaderDecoderState::Sync;
+ self.bytes_remaining = 16;
+ }
+ Ok(remaining) => {
+ self.tuples_remaining = remaining;
+ self.state = HeaderDecoderState::KeyLen;
+ }
+ Err(_) => {
+ self.tuples_remaining =
block_count.unsigned_abs() as _;
+ self.state = HeaderDecoderState::BlockLen;
+ }
+ }
+ }
+ }
+ HeaderDecoderState::BlockLen => {
+ if self.vlq_decoder.long(&mut buf).is_some() {
+ self.state = HeaderDecoderState::KeyLen
+ }
+ }
+ HeaderDecoderState::Key => {
+ let to_read = self.bytes_remaining.min(buf.len());
+ self.meta_buf.extend_from_slice(&buf[..to_read]);
+ self.bytes_remaining -= to_read;
+ buf = &buf[to_read..];
+ if self.bytes_remaining == 0 {
+ self.meta_offsets.push(self.meta_buf.len());
+ self.state = HeaderDecoderState::ValueLen;
+ }
+ }
+ HeaderDecoderState::Value => {
+ let to_read = self.bytes_remaining.min(buf.len());
+ self.meta_buf.extend_from_slice(&buf[..to_read]);
+ self.bytes_remaining -= to_read;
+ buf = &buf[to_read..];
+ if self.bytes_remaining == 0 {
+ self.meta_offsets.push(self.meta_buf.len());
+
+ self.tuples_remaining -= 1;
+ match self.tuples_remaining {
+ 0 => self.state = HeaderDecoderState::BlockCount,
+ _ => self.state = HeaderDecoderState::KeyLen,
+ }
+ }
+ }
+ HeaderDecoderState::KeyLen => {
+ if let Some(len) = self.vlq_decoder.long(&mut buf) {
+ self.bytes_remaining = len as _;
+ self.state = HeaderDecoderState::Key;
+ }
+ }
+ HeaderDecoderState::ValueLen => {
+ if let Some(len) = self.vlq_decoder.long(&mut buf) {
+ self.bytes_remaining = len as _;
+ self.state = HeaderDecoderState::Value;
+ }
+ }
+ HeaderDecoderState::Sync => {
+ let to_decode = buf.len().min(self.bytes_remaining);
+ let write = &mut self.sync_marker[16 - to_decode..];
+ write[..to_decode].copy_from_slice(&buf[..to_decode]);
+ self.bytes_remaining -= to_decode;
+ buf = &buf[to_decode..];
+ if self.bytes_remaining == 0 {
+ self.state = HeaderDecoderState::Finished;
+ }
+ }
+ HeaderDecoderState::Finished => return Ok(max_read -
buf.len()),
+ }
+ }
+ Ok(max_read)
+ }
+
+ /// Flush this decoder returning the parsed [`Header`] if any
+ pub fn flush(&mut self) -> Option<Header> {
+ match self.state {
+ HeaderDecoderState::Finished => {
+ self.state = HeaderDecoderState::Magic;
+ Some(Header {
+ meta_offsets: std::mem::take(&mut self.meta_offsets),
+ meta_buf: std::mem::take(&mut self.meta_buf),
+ sync: self.sync_marker,
+ })
+ }
+ _ => None,
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::reader::read_header;
+ use crate::schema::SCHEMA_METADATA_KEY;
+ use std::fs::File;
+ use std::io::{BufRead, BufReader};
+
+ #[test]
+ fn test_header_decode() {
+ let mut decoder = HeaderDecoder::default();
+ for m in MAGIC {
+ decoder.decode(std::slice::from_ref(m)).unwrap();
+ }
+
+ let mut decoder = HeaderDecoder::default();
+ assert_eq!(decoder.decode(MAGIC).unwrap(), 4);
+
+ let mut decoder = HeaderDecoder::default();
+ decoder.decode(b"Ob").unwrap();
+ let err = decoder.decode(b"s").unwrap_err().to_string();
+ assert_eq!(err, "Parser error: Incorrect avro magic");
+ }
+
+ fn decode_file(file: &str) -> Header {
+ let file = File::open(file).unwrap();
+ read_header(BufReader::with_capacity(100, file)).unwrap()
+ }
+
+ #[test]
+ fn test_header() {
+ let header = decode_file("../testing/data/avro/alltypes_plain.avro");
+ let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
+ let expected =
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"id","type":["int","null"]},{"name":"bool_col","type":["boolean","null"]},{"name":"tinyint_col","type":["int","null"]},{"name":"smallint_col","type":["int","null"]},{"name":"int_col","type":["int","null"]},{"name":"bigint_col","type":["long","null"]},{"name":"float_col","type":["float","null"]},{"name":"double_col","type":["double","null"]},{"name":"date_string_col","type":["bytes","null"]},{"name":"str
[...]
+ assert_eq!(schema_json, expected);
+ let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
+ assert_eq!(
+ u128::from_le_bytes(header.sync()),
+ 226966037233754408753420635932530907102
+ );
+
+ let header =
decode_file("../testing/data/avro/fixed_length_decimal.avro");
+ let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
+ let expected =
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#;
+ assert_eq!(schema_json, expected);
+ let _schema: Schema<'_> = serde_json::from_slice(schema_json).unwrap();
+ assert_eq!(
+ u128::from_le_bytes(header.sync()),
+ 325166208089902833952788552656412487328
+ );
+ }
+}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
new file mode 100644
index 0000000000..a42011e3b2
--- /dev/null
+++ b/arrow-avro/src/reader/mod.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read Avro data to Arrow
+
+use crate::reader::block::{Block, BlockDecoder};
+use crate::reader::header::{Header, HeaderDecoder};
+use arrow_schema::ArrowError;
+use std::io::BufRead;
+
+mod header;
+
+mod block;
+
+mod vlq;
+
+/// Read a [`Header`] from the provided [`BufRead`]
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+ let mut decoder = HeaderDecoder::default();
+ loop {
+ let buf = reader.fill_buf()?;
+ if buf.is_empty() {
+ break;
+ }
+ let read = buf.len();
+ let decoded = decoder.decode(buf)?;
+ reader.consume(decoded);
+ if decoded != read {
+ break;
+ }
+ }
+
+ decoder
+ .flush()
+ .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+}
+
+/// Return an iterator of [`Block`] from the provided [`BufRead`]
+fn read_blocks<R: BufRead>(
+ mut reader: R,
+) -> impl Iterator<Item = Result<Block, ArrowError>> {
+ let mut decoder = BlockDecoder::default();
+
+ let mut try_next = move || {
+ loop {
+ let buf = reader.fill_buf()?;
+ if buf.is_empty() {
+ break;
+ }
+ let read = buf.len();
+ let decoded = decoder.decode(buf)?;
+ reader.consume(decoded);
+ if decoded != read {
+ break;
+ }
+ }
+ Ok(decoder.flush())
+ };
+ std::iter::from_fn(move || try_next().transpose())
+}
+
+#[cfg(test)]
+mod test {
+ use crate::reader::{read_blocks, read_header};
+ use std::fs::File;
+ use std::io::BufReader;
+
+ #[test]
+ fn test_mux() {
+ let file =
File::open("../testing/data/avro/alltypes_plain.avro").unwrap();
+ let mut reader = BufReader::new(file);
+ let header = read_header(&mut reader).unwrap();
+ for result in read_blocks(reader) {
+ let block = result.unwrap();
+ assert_eq!(block.sync, header.sync());
+ }
+ }
+}
diff --git a/arrow-avro/src/reader/vlq.rs b/arrow-avro/src/reader/vlq.rs
new file mode 100644
index 0000000000..80f1c60eec
--- /dev/null
+++ b/arrow-avro/src/reader/vlq.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Decoder for zig-zag encoded variable length (VLW) integers
+///
+/// See also:
+/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types-1>
+/// <https://protobuf.dev/programming-guides/encoding/#varints>
+#[derive(Debug, Default)]
+pub struct VLQDecoder {
+ /// Scratch space for decoding VLQ integers
+ in_progress: u64,
+ shift: u32,
+}
+
+impl VLQDecoder {
+ /// Decode a signed long from `buf`
+ pub fn long(&mut self, buf: &mut &[u8]) -> Option<i64> {
+ while let Some(byte) = buf.first().copied() {
+ *buf = &buf[1..];
+ self.in_progress |= ((byte & 0x7F) as u64) << self.shift;
+ self.shift += 7;
+ if byte & 0x80 == 0 {
+ let val = self.in_progress;
+ self.in_progress = 0;
+ self.shift = 0;
+ return Some((val >> 1) as i64 ^ -((val & 1) as i64));
+ }
+ }
+ None
+ }
+}
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
new file mode 100644
index 0000000000..839ba65bd5
--- /dev/null
+++ b/arrow-avro/src/schema.rs
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// The metadata key used for storing the JSON encoded [`Schema`]
+pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
+
+/// Either a [`PrimitiveType`] or a reference to a previously defined named
type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#names>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum TypeName<'a> {
+ Primitive(PrimitiveType),
+ Ref(&'a str),
+}
+
+/// A primitive type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum PrimitiveType {
+ Null,
+ Boolean,
+ Int,
+ Long,
+ Float,
+ Double,
+ Bytes,
+ String,
+}
+
+/// Additional attributes within a [`Schema`]
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration>
+#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Attributes<'a> {
+ /// A logical type name
+ ///
+ /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types>
+ #[serde(default)]
+ pub logical_type: Option<&'a str>,
+
+ /// Additional JSON attributes
+ #[serde(flatten)]
+ pub additional: HashMap<&'a str, serde_json::Value>,
+}
+
+/// A type definition that is not a variant of [`ComplexType`]
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Type<'a> {
+ #[serde(borrow)]
+ pub r#type: TypeName<'a>,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+/// An Avro schema
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum Schema<'a> {
+ #[serde(borrow)]
+ TypeName(TypeName<'a>),
+ #[serde(borrow)]
+ Union(Vec<Schema<'a>>),
+ #[serde(borrow)]
+ Complex(ComplexType<'a>),
+ #[serde(borrow)]
+ Type(Type<'a>),
+}
+
+/// A complex type
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#complex-types>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum ComplexType<'a> {
+ #[serde(borrow)]
+ Union(Vec<Schema<'a>>),
+ #[serde(borrow)]
+ Record(Record<'a>),
+ #[serde(borrow)]
+ Enum(Enum<'a>),
+ #[serde(borrow)]
+ Array(Array<'a>),
+ #[serde(borrow)]
+ Map(Map<'a>),
+ #[serde(borrow)]
+ Fixed(Fixed<'a>),
+}
+
+/// A record
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-record>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Record<'a> {
+ #[serde(borrow)]
+ pub name: &'a str,
+ #[serde(borrow, default)]
+ pub namespace: Option<&'a str>,
+ #[serde(borrow, default)]
+ pub doc: Option<&'a str>,
+ #[serde(borrow, default)]
+ pub aliases: Vec<&'a str>,
+ #[serde(borrow)]
+ pub fields: Vec<Field<'a>>,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+/// A field within a [`Record`]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Field<'a> {
+ #[serde(borrow)]
+ pub name: &'a str,
+ #[serde(borrow, default)]
+ pub doc: Option<&'a str>,
+ #[serde(borrow)]
+ pub r#type: Schema<'a>,
+ #[serde(borrow, default)]
+ pub default: Option<&'a str>,
+}
+
+/// An enumeration
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#enums>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Enum<'a> {
+ #[serde(borrow)]
+ pub name: &'a str,
+ #[serde(borrow, default)]
+ pub namespace: Option<&'a str>,
+ #[serde(borrow, default)]
+ pub doc: Option<&'a str>,
+ #[serde(borrow, default)]
+ pub aliases: Vec<&'a str>,
+ #[serde(borrow)]
+ pub symbols: Vec<&'a str>,
+ #[serde(borrow, default)]
+ pub default: Option<&'a str>,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+/// An array
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#arrays>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Array<'a> {
+ #[serde(borrow)]
+ pub items: Box<Schema<'a>>,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+/// A map
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#maps>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Map<'a> {
+ #[serde(borrow)]
+ pub values: Box<Schema<'a>>,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+/// A fixed length binary array
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#fixed>
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Fixed<'a> {
+ #[serde(borrow)]
+ pub name: &'a str,
+ #[serde(borrow, default)]
+ pub namespace: Option<&'a str>,
+ #[serde(borrow, default)]
+ pub aliases: Vec<&'a str>,
+ pub size: usize,
+ #[serde(flatten)]
+ pub attributes: Attributes<'a>,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde_json::json;
+ #[test]
+ fn test_deserialize() {
+ let t: Schema = serde_json::from_str("\"string\"").unwrap();
+ assert_eq!(
+ t,
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::String))
+ );
+
+ let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap();
+ assert_eq!(
+ t,
+ Schema::Union(vec![
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ ])
+ );
+
+ let t: Type = serde_json::from_str(
+ r#"{
+ "type":"long",
+ "logicalType":"timestamp-micros"
+ }"#,
+ )
+ .unwrap();
+
+ let timestamp = Type {
+ r#type: TypeName::Primitive(PrimitiveType::Long),
+ attributes: Attributes {
+ logical_type: Some("timestamp-micros"),
+ additional: Default::default(),
+ },
+ };
+
+ assert_eq!(t, timestamp);
+
+ let t: ComplexType = serde_json::from_str(
+ r#"{
+ "type":"fixed",
+ "name":"fixed",
+ "namespace":"topLevelRecord.value",
+ "size":11,
+ "logicalType":"decimal",
+ "precision":25,
+ "scale":2
+ }"#,
+ )
+ .unwrap();
+
+ let decimal = ComplexType::Fixed(Fixed {
+ name: "fixed",
+ namespace: Some("topLevelRecord.value"),
+ aliases: vec![],
+ size: 11,
+ attributes: Attributes {
+ logical_type: Some("decimal"),
+ additional: vec![("precision", json!(25)), ("scale", json!(2))]
+ .into_iter()
+ .collect(),
+ },
+ });
+
+ assert_eq!(t, decimal);
+
+ let schema: Schema = serde_json::from_str(
+ r#"{
+ "type":"record",
+ "name":"topLevelRecord",
+ "fields":[
+ {
+ "name":"value",
+ "type":[
+ {
+ "type":"fixed",
+ "name":"fixed",
+ "namespace":"topLevelRecord.value",
+ "size":11,
+ "logicalType":"decimal",
+ "precision":25,
+ "scale":2
+ },
+ "null"
+ ]
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ assert_eq!(
+ schema,
+ Schema::Complex(ComplexType::Record(Record {
+ name: "topLevelRecord",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "value",
+ doc: None,
+ r#type: Schema::Union(vec![
+ Schema::Complex(decimal),
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ ]),
+ default: None,
+ },],
+ attributes: Default::default(),
+ }))
+ );
+
+ let schema: Schema = serde_json::from_str(
+ r#"{
+ "type": "record",
+ "name": "LongList",
+ "aliases": ["LinkedLongs"],
+ "fields" : [
+ {"name": "value", "type": "long"},
+ {"name": "next", "type": ["null", "LongList"]}
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ assert_eq!(
+ schema,
+ Schema::Complex(ComplexType::Record(Record {
+ name: "LongList",
+ namespace: None,
+ doc: None,
+ aliases: vec!["LinkedLongs"],
+ fields: vec![
+ Field {
+ name: "value",
+ doc: None,
+ r#type: Schema::TypeName(TypeName::Primitive(
+ PrimitiveType::Long
+ )),
+ default: None,
+ },
+ Field {
+ name: "next",
+ doc: None,
+ r#type: Schema::Union(vec![
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ Schema::TypeName(TypeName::Ref("LongList")),
+ ]),
+ default: None,
+ }
+ ],
+ attributes: Attributes::default(),
+ }))
+ );
+
+ let schema: Schema = serde_json::from_str(
+ r#"{
+ "type":"record",
+ "name":"topLevelRecord",
+ "fields":[
+ {
+ "name":"id",
+ "type":[
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name":"timestamp_col",
+ "type":[
+ {
+ "type":"long",
+ "logicalType":"timestamp-micros"
+ },
+ "null"
+ ]
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ assert_eq!(
+ schema,
+ Schema::Complex(ComplexType::Record(Record {
+ name: "topLevelRecord",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ Field {
+ name: "id",
+ doc: None,
+ r#type: Schema::Union(vec![
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ ]),
+ default: None,
+ },
+ Field {
+ name: "timestamp_col",
+ doc: None,
+ r#type: Schema::Union(vec![
+ Schema::Type(timestamp),
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ ]),
+ default: None,
+ }
+ ],
+ attributes: Default::default(),
+ }))
+ );
+
+ let schema: Schema = serde_json::from_str(
+ r#"{
+ "type": "record",
+ "name": "HandshakeRequest",
"namespace":"org.apache.avro.ipc",
+ "fields": [
+ {"name": "clientHash",
+ "type": {"type": "fixed", "name": "MD5", "size": 16}},
+ {"name": "clientProtocol", "type": ["null", "string"]},
+ {"name": "serverHash", "type": "MD5"},
+ {"name": "meta", "type": ["null", {"type": "map",
"values": "bytes"}]}
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ assert_eq!(
+ schema,
+ Schema::Complex(ComplexType::Record(Record {
+ name: "HandshakeRequest",
+ namespace: Some("org.apache.avro.ipc"),
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ Field {
+ name: "clientHash",
+ doc: None,
+ r#type: Schema::Complex(ComplexType::Fixed(Fixed {
+ name: "MD5",
+ namespace: None,
+ aliases: vec![],
+ size: 16,
+ attributes: Default::default(),
+ })),
+ default: None,
+ },
+ Field {
+ name: "clientProtocol",
+ doc: None,
+ r#type: Schema::Union(vec![
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+ ]),
+ default: None,
+ },
+ Field {
+ name: "serverHash",
+ doc: None,
+ r#type: Schema::TypeName(TypeName::Ref("MD5")),
+ default: None,
+ },
+ Field {
+ name: "meta",
+ doc: None,
+ r#type: Schema::Union(vec![
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ Schema::Complex(ComplexType::Map(Map {
+ values:
Box::new(Schema::TypeName(TypeName::Primitive(
+ PrimitiveType::Bytes
+ ))),
+ attributes: Default::default(),
+ })),
+ ]),
+ default: None,
+ }
+ ],
+ attributes: Default::default(),
+ }))
+ );
+ }
+}
diff --git a/dev/release/README.md b/dev/release/README.md
index 30b3a4a8a5..177f33bcbb 100644
--- a/dev/release/README.md
+++ b/dev/release/README.md
@@ -258,6 +258,7 @@ Rust Arrow Crates:
(cd arrow-ipc && cargo publish)
(cd arrow-csv && cargo publish)
(cd arrow-json && cargo publish)
+(cd arrow-avro && cargo publish)
(cd arrow-ord && cargo publish)
(cd arrow-arith && cargo publish)
(cd arrow-string && cargo publish)