This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4e186db chore: Extract Block from reader/mod.rs into reader/block.rs
(#475)
4e186db is described below
commit 4e186dbd710a31197f1faa2a7ec063c0d7e6c47c
Author: Martin Grigorov <[email protected]>
AuthorDate: Fri Feb 20 11:51:45 2026 +0200
chore: Extract Block from reader/mod.rs into reader/block.rs (#475)
---
avro/src/reader/block.rs | 307 +++++++++++++++++++++++++++++++++++++++++++++++
avro/src/reader/mod.rs | 300 ++-------------------------------------------
2 files changed, 317 insertions(+), 290 deletions(-)
diff --git a/avro/src/reader/block.rs b/avro/src/reader/block.rs
new file mode 100644
index 0000000..1571c06
--- /dev/null
+++ b/avro/src/reader/block.rs
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+ AvroResult, Codec, Error,
+ decode::{decode, decode_internal},
+ error::Details,
+ schema::{Names, Schema, resolve_names, resolve_names_with_schemata},
+ types::Value,
+ util,
+};
+use log::warn;
+use serde_json::from_slice;
+use std::{
+ collections::HashMap,
+ io::{ErrorKind, Read},
+ str::FromStr,
+};
+
+/// Internal Block reader.
+#[derive(Debug, Clone)]
+pub(super) struct Block<'r, R> {
+ reader: R,
+ /// Internal buffering to reduce allocation.
+ buf: Vec<u8>,
+ buf_idx: usize,
+ /// Number of elements expected to exist within this block.
+ message_count: usize,
+ marker: [u8; 16],
+ codec: Codec,
+ pub(super) writer_schema: Schema,
+ schemata: Vec<&'r Schema>,
+ pub(super) user_metadata: HashMap<String, Vec<u8>>,
+ names_refs: Names,
+}
+
+impl<'r, R: Read> Block<'r, R> {
+ pub(super) fn new(reader: R, schemata: Vec<&'r Schema>) ->
AvroResult<Block<'r, R>> {
+ let mut block = Block {
+ reader,
+ codec: Codec::Null,
+ writer_schema: Schema::Null,
+ schemata,
+ buf: vec![],
+ buf_idx: 0,
+ message_count: 0,
+ marker: [0; 16],
+ user_metadata: Default::default(),
+ names_refs: Default::default(),
+ };
+
+ block.read_header()?;
+ Ok(block)
+ }
+
+ /// Try to read the header and to set the writer `Schema`, the `Codec` and
the marker based on
+ /// its content.
+ fn read_header(&mut self) -> AvroResult<()> {
+ let mut buf = [0u8; 4];
+ self.reader
+ .read_exact(&mut buf)
+ .map_err(Details::ReadHeader)?;
+
+ if buf != [b'O', b'b', b'j', 1u8] {
+ return Err(Details::HeaderMagic.into());
+ }
+
+ let meta_schema = Schema::map(Schema::Bytes).build();
+ match decode(&meta_schema, &mut self.reader)? {
+ Value::Map(metadata) => {
+ self.read_writer_schema(&metadata)?;
+ self.codec = read_codec(&metadata)?;
+
+ for (key, value) in metadata {
+ if key == "avro.schema"
+ || key == "avro.codec"
+ || key == "avro.codec.compression_level"
+ {
+ // already processed
+ } else if key.starts_with("avro.") {
+ warn!("Ignoring unknown metadata key: {key}");
+ } else {
+ self.read_user_metadata(key, value);
+ }
+ }
+ }
+ _ => {
+ return Err(Details::GetHeaderMetadata.into());
+ }
+ }
+
+ self.reader
+ .read_exact(&mut self.marker)
+ .map_err(|e| Details::ReadMarker(e).into())
+ }
+
+ fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
+ // The buffer needs to contain exactly `n` elements, otherwise codecs
will potentially read
+ // invalid bytes.
+ //
+ // The are two cases to handle here:
+ //
+ // 1. `n > self.buf.len()`:
+ // In this case we call `Vec::resize`, which guarantees that
`self.buf.len() == n`.
+ // 2. `n < self.buf.len()`:
+ // We need to resize to ensure that the buffer len is safe to read
`n` elements.
+ //
+ // TODO: Figure out a way to avoid having to truncate for the second
case.
+ self.buf.resize(util::safe_len(n)?, 0);
+ self.reader
+ .read_exact(&mut self.buf)
+ .map_err(Details::ReadIntoBuf)?;
+ self.buf_idx = 0;
+ Ok(())
+ }
+
+ /// Try to read a data block, also performing schema resolution for the
objects contained in
+ /// the block. The objects are stored in an internal buffer to the
`Reader`.
+ fn read_block_next(&mut self) -> AvroResult<()> {
+ assert!(self.is_empty(), "Expected self to be empty!");
+ match util::read_long(&mut self.reader).map_err(Error::into_details) {
+ Ok(block_len) => {
+ self.message_count = block_len as usize;
+ let block_bytes = util::read_long(&mut self.reader)?;
+ self.fill_buf(block_bytes as usize)?;
+ let mut marker = [0u8; 16];
+ self.reader
+ .read_exact(&mut marker)
+ .map_err(Details::ReadBlockMarker)?;
+
+ if marker != self.marker {
+ return Err(Details::GetBlockMarker.into());
+ }
+
+ // NOTE (JAB): This doesn't fit this Reader pattern very well.
+ // `self.buf` is a growable buffer that is reused as the
reader is iterated.
+ // For non `Codec::Null` variants, `decompress` will allocate
a new `Vec`
+ // and replace `buf` with the new one, instead of reusing the
same buffer.
+ // We can address this by using some "limited read" type to
decode directly
+ // into the buffer. But this is fine, for now.
+ self.codec.decompress(&mut self.buf)
+ }
+ Err(Details::ReadVariableIntegerBytes(io_err)) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ // to not return any error in case we only finished to
read cleanly from the stream
+ Ok(())
+ } else {
+ Err(Details::ReadVariableIntegerBytes(io_err).into())
+ }
+ }
+ Err(e) => Err(Error::new(e)),
+ }
+ }
+
+ fn len(&self) -> usize {
+ self.message_count
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ pub(super) fn read_next(&mut self, read_schema: Option<&Schema>) ->
AvroResult<Option<Value>> {
+ if self.is_empty() {
+ self.read_block_next()?;
+ if self.is_empty() {
+ return Ok(None);
+ }
+ }
+
+ let mut block_bytes = &self.buf[self.buf_idx..];
+ let b_original = block_bytes.len();
+
+ let item = decode_internal(
+ &self.writer_schema,
+ &self.names_refs,
+ &None,
+ &mut block_bytes,
+ )?;
+ let item = match read_schema {
+ Some(schema) => item.resolve(schema)?,
+ None => item,
+ };
+
+ if b_original != 0 && b_original == block_bytes.len() {
+ // from_avro_datum did not consume any bytes, so return an error
to avoid an infinite loop
+ return Err(Details::ReadBlock.into());
+ }
+ self.buf_idx += b_original - block_bytes.len();
+ self.message_count -= 1;
+ Ok(Some(item))
+ }
+
+ fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) ->
AvroResult<()> {
+ let json: serde_json::Value = metadata
+ .get("avro.schema")
+ .and_then(|bytes| {
+ if let Value::Bytes(ref bytes) = *bytes {
+ from_slice(bytes.as_ref()).ok()
+ } else {
+ None
+ }
+ })
+ .ok_or(Details::GetAvroSchemaFromMap)?;
+ if !self.schemata.is_empty() {
+ let mut names = HashMap::new();
+ resolve_names_with_schemata(
+ self.schemata.iter().copied(),
+ &mut names,
+ &None,
+ &HashMap::new(),
+ )?;
+ self.names_refs = names.into_iter().map(|(n, s)| (n,
s.clone())).collect();
+ self.writer_schema = Schema::parse_with_names(&json,
self.names_refs.clone())?;
+ } else {
+ self.writer_schema = Schema::parse(&json)?;
+ let mut names = HashMap::new();
+ resolve_names(&self.writer_schema, &mut names, &None,
&HashMap::new())?;
+ self.names_refs = names.into_iter().map(|(n, s)| (n,
s.clone())).collect();
+ }
+ Ok(())
+ }
+
+ fn read_user_metadata(&mut self, key: String, value: Value) {
+ match value {
+ Value::Bytes(ref vec) => {
+ self.user_metadata.insert(key, vec.clone());
+ }
+ wrong => {
+ warn!("User metadata values must be Value::Bytes, found
{wrong:?}");
+ }
+ }
+ }
+}
+
+fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
+ let result = metadata
+ .get("avro.codec")
+ .map(|codec| {
+ if let Value::Bytes(ref bytes) = *codec {
+ match std::str::from_utf8(bytes.as_ref()) {
+ Ok(utf8) => Ok(utf8),
+ Err(utf8_error) =>
Err(Details::ConvertToUtf8Error(utf8_error).into()),
+ }
+ } else {
+ Err(Details::BadCodecMetadata.into())
+ }
+ })
+ .map(|codec_res| match codec_res {
+ Ok(codec) => match Codec::from_str(codec) {
+ Ok(codec) => match codec {
+ #[cfg(feature = "bzip")]
+ Codec::Bzip2(_) => {
+ use crate::Bzip2Settings;
+ if let Some(Value::Bytes(bytes)) =
+ metadata.get("avro.codec.compression_level")
+ {
+ Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
+ } else {
+ Ok(codec)
+ }
+ }
+ #[cfg(feature = "xz")]
+ Codec::Xz(_) => {
+ use crate::XzSettings;
+ if let Some(Value::Bytes(bytes)) =
+ metadata.get("avro.codec.compression_level")
+ {
+ Ok(Codec::Xz(XzSettings::new(bytes[0])))
+ } else {
+ Ok(codec)
+ }
+ }
+ #[cfg(feature = "zstandard")]
+ Codec::Zstandard(_) => {
+ use crate::ZstandardSettings;
+ if let Some(Value::Bytes(bytes)) =
+ metadata.get("avro.codec.compression_level")
+ {
+
Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
+ } else {
+ Ok(codec)
+ }
+ }
+ _ => Ok(codec),
+ },
+ Err(_) =>
Err(Details::CodecNotSupported(codec.to_owned()).into()),
+ },
+ Err(err) => Err(err),
+ });
+
+ result.unwrap_or(Ok(Codec::Null))
+}
diff --git a/avro/src/reader/mod.rs b/avro/src/reader/mod.rs
index c9e355f..21924b0 100644
--- a/avro/src/reader/mod.rs
+++ b/avro/src/reader/mod.rs
@@ -16,305 +16,23 @@
// under the License.
//! Logic handling reading from Avro format at user level.
+
+mod block;
+
use crate::{
- AvroResult, Codec, Error,
+ AvroResult,
decode::{decode, decode_internal},
error::Details,
from_value,
headers::{HeaderBuilder, RabinFingerprintHeader},
- schema::{
- Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
- resolve_names_with_schemata,
- },
+ schema::{ResolvedOwnedSchema, ResolvedSchema, Schema},
serde::AvroSchema,
types::Value,
- util,
};
+use block::Block;
use bon::bon;
-use log::warn;
use serde::de::DeserializeOwned;
-use serde_json::from_slice;
-use std::{
- collections::HashMap,
- io::{ErrorKind, Read},
- marker::PhantomData,
- str::FromStr,
-};
-
-/// Internal Block reader.
-#[derive(Debug, Clone)]
-struct Block<'r, R> {
- reader: R,
- /// Internal buffering to reduce allocation.
- buf: Vec<u8>,
- buf_idx: usize,
- /// Number of elements expected to exist within this block.
- message_count: usize,
- marker: [u8; 16],
- codec: Codec,
- writer_schema: Schema,
- schemata: Vec<&'r Schema>,
- user_metadata: HashMap<String, Vec<u8>>,
- names_refs: Names,
-}
-
-impl<'r, R: Read> Block<'r, R> {
- fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
- let mut block = Block {
- reader,
- codec: Codec::Null,
- writer_schema: Schema::Null,
- schemata,
- buf: vec![],
- buf_idx: 0,
- message_count: 0,
- marker: [0; 16],
- user_metadata: Default::default(),
- names_refs: Default::default(),
- };
-
- block.read_header()?;
- Ok(block)
- }
-
- /// Try to read the header and to set the writer `Schema`, the `Codec` and
the marker based on
- /// its content.
- fn read_header(&mut self) -> AvroResult<()> {
- let mut buf = [0u8; 4];
- self.reader
- .read_exact(&mut buf)
- .map_err(Details::ReadHeader)?;
-
- if buf != [b'O', b'b', b'j', 1u8] {
- return Err(Details::HeaderMagic.into());
- }
-
- let meta_schema = Schema::map(Schema::Bytes).build();
- match decode(&meta_schema, &mut self.reader)? {
- Value::Map(metadata) => {
- self.read_writer_schema(&metadata)?;
- self.codec = read_codec(&metadata)?;
-
- for (key, value) in metadata {
- if key == "avro.schema"
- || key == "avro.codec"
- || key == "avro.codec.compression_level"
- {
- // already processed
- } else if key.starts_with("avro.") {
- warn!("Ignoring unknown metadata key: {key}");
- } else {
- self.read_user_metadata(key, value);
- }
- }
- }
- _ => {
- return Err(Details::GetHeaderMetadata.into());
- }
- }
-
- self.reader
- .read_exact(&mut self.marker)
- .map_err(|e| Details::ReadMarker(e).into())
- }
-
- fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
- // The buffer needs to contain exactly `n` elements, otherwise codecs
will potentially read
- // invalid bytes.
- //
- // The are two cases to handle here:
- //
- // 1. `n > self.buf.len()`:
- // In this case we call `Vec::resize`, which guarantees that
`self.buf.len() == n`.
- // 2. `n < self.buf.len()`:
- // We need to resize to ensure that the buffer len is safe to read
`n` elements.
- //
- // TODO: Figure out a way to avoid having to truncate for the second
case.
- self.buf.resize(util::safe_len(n)?, 0);
- self.reader
- .read_exact(&mut self.buf)
- .map_err(Details::ReadIntoBuf)?;
- self.buf_idx = 0;
- Ok(())
- }
-
- /// Try to read a data block, also performing schema resolution for the
objects contained in
- /// the block. The objects are stored in an internal buffer to the
`Reader`.
- fn read_block_next(&mut self) -> AvroResult<()> {
- assert!(self.is_empty(), "Expected self to be empty!");
- match util::read_long(&mut self.reader).map_err(Error::into_details) {
- Ok(block_len) => {
- self.message_count = block_len as usize;
- let block_bytes = util::read_long(&mut self.reader)?;
- self.fill_buf(block_bytes as usize)?;
- let mut marker = [0u8; 16];
- self.reader
- .read_exact(&mut marker)
- .map_err(Details::ReadBlockMarker)?;
-
- if marker != self.marker {
- return Err(Details::GetBlockMarker.into());
- }
-
- // NOTE (JAB): This doesn't fit this Reader pattern very well.
- // `self.buf` is a growable buffer that is reused as the
reader is iterated.
- // For non `Codec::Null` variants, `decompress` will allocate
a new `Vec`
- // and replace `buf` with the new one, instead of reusing the
same buffer.
- // We can address this by using some "limited read" type to
decode directly
- // into the buffer. But this is fine, for now.
- self.codec.decompress(&mut self.buf)
- }
- Err(Details::ReadVariableIntegerBytes(io_err)) => {
- if let ErrorKind::UnexpectedEof = io_err.kind() {
- // to not return any error in case we only finished to
read cleanly from the stream
- Ok(())
- } else {
- Err(Details::ReadVariableIntegerBytes(io_err).into())
- }
- }
- Err(e) => Err(Error::new(e)),
- }
- }
-
- fn len(&self) -> usize {
- self.message_count
- }
-
- fn is_empty(&self) -> bool {
- self.len() == 0
- }
-
- fn read_next(&mut self, read_schema: Option<&Schema>) ->
AvroResult<Option<Value>> {
- if self.is_empty() {
- self.read_block_next()?;
- if self.is_empty() {
- return Ok(None);
- }
- }
-
- let mut block_bytes = &self.buf[self.buf_idx..];
- let b_original = block_bytes.len();
-
- let item = decode_internal(
- &self.writer_schema,
- &self.names_refs,
- &None,
- &mut block_bytes,
- )?;
- let item = match read_schema {
- Some(schema) => item.resolve(schema)?,
- None => item,
- };
-
- if b_original != 0 && b_original == block_bytes.len() {
- // from_avro_datum did not consume any bytes, so return an error
to avoid an infinite loop
- return Err(Details::ReadBlock.into());
- }
- self.buf_idx += b_original - block_bytes.len();
- self.message_count -= 1;
- Ok(Some(item))
- }
-
- fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) ->
AvroResult<()> {
- let json: serde_json::Value = metadata
- .get("avro.schema")
- .and_then(|bytes| {
- if let Value::Bytes(ref bytes) = *bytes {
- from_slice(bytes.as_ref()).ok()
- } else {
- None
- }
- })
- .ok_or(Details::GetAvroSchemaFromMap)?;
- if !self.schemata.is_empty() {
- let mut names = HashMap::new();
- resolve_names_with_schemata(
- self.schemata.iter().copied(),
- &mut names,
- &None,
- &HashMap::new(),
- )?;
- self.names_refs = names.into_iter().map(|(n, s)| (n,
s.clone())).collect();
- self.writer_schema = Schema::parse_with_names(&json,
self.names_refs.clone())?;
- } else {
- self.writer_schema = Schema::parse(&json)?;
- let mut names = HashMap::new();
- resolve_names(&self.writer_schema, &mut names, &None,
&HashMap::new())?;
- self.names_refs = names.into_iter().map(|(n, s)| (n,
s.clone())).collect();
- }
- Ok(())
- }
-
- fn read_user_metadata(&mut self, key: String, value: Value) {
- match value {
- Value::Bytes(ref vec) => {
- self.user_metadata.insert(key, vec.clone());
- }
- wrong => {
- warn!("User metadata values must be Value::Bytes, found
{wrong:?}");
- }
- }
- }
-}
-
-fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
- let result = metadata
- .get("avro.codec")
- .map(|codec| {
- if let Value::Bytes(ref bytes) = *codec {
- match std::str::from_utf8(bytes.as_ref()) {
- Ok(utf8) => Ok(utf8),
- Err(utf8_error) =>
Err(Details::ConvertToUtf8Error(utf8_error).into()),
- }
- } else {
- Err(Details::BadCodecMetadata.into())
- }
- })
- .map(|codec_res| match codec_res {
- Ok(codec) => match Codec::from_str(codec) {
- Ok(codec) => match codec {
- #[cfg(feature = "bzip")]
- Codec::Bzip2(_) => {
- use crate::Bzip2Settings;
- if let Some(Value::Bytes(bytes)) =
- metadata.get("avro.codec.compression_level")
- {
- Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
- } else {
- Ok(codec)
- }
- }
- #[cfg(feature = "xz")]
- Codec::Xz(_) => {
- use crate::XzSettings;
- if let Some(Value::Bytes(bytes)) =
- metadata.get("avro.codec.compression_level")
- {
- Ok(Codec::Xz(XzSettings::new(bytes[0])))
- } else {
- Ok(codec)
- }
- }
- #[cfg(feature = "zstandard")]
- Codec::Zstandard(_) => {
- use crate::ZstandardSettings;
- if let Some(Value::Bytes(bytes)) =
- metadata.get("avro.codec.compression_level")
- {
-
Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
- } else {
- Ok(codec)
- }
- }
- _ => Ok(codec),
- },
- Err(_) =>
Err(Details::CodecNotSupported(codec.to_owned()).into()),
- },
- Err(err) => Err(err),
- });
-
- result.unwrap_or(Ok(Codec::Null))
-}
+use std::{collections::HashMap, io::Read, marker::PhantomData};
/// Main interface for reading Avro formatted values.
///
@@ -588,7 +306,9 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin,
types::Record};
+ use crate::{
+ Error, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin,
types::Record,
+ };
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use serde::Deserialize;