This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4cd2d1487c AvroError enum for arrow-avro crate (#8759)
4cd2d1487c is described below
commit 4cd2d1487c924ccb53f6a811817ae02780fae7c4
Author: nathaniel-d-ef <[email protected]>
AuthorDate: Mon Jan 26 23:12:57 2026 +0100
AvroError enum for arrow-avro crate (#8759)
# Which issue does this PR close?
- Closes #8746
# Rationale for this change
The arrow-avro crate currently uses `ArrowError` throughout. This lacks
the level of precision other crates in the project, such as Parquet,
have.
# What changes are included in this PR?
- A new AvroError enum
- Application of AvroError on all internal methods where ArrowError was
previously used.
# Are these changes tested?
No new functionality has been introduced, all existing tests are
passing.
# Are there any user-facing changes?
Errors from the crate are now AvroError variants rather than ArrowError
---------
Co-authored-by: Connor Sanders
<[email protected]>
---
arrow-avro/src/compression.rs | 29 ++--
arrow-avro/src/errors.rs | 148 ++++++++++++++++
arrow-avro/src/lib.rs | 3 +
arrow-avro/src/reader/block.rs | 10 +-
arrow-avro/src/reader/cursor.rs | 51 +++---
arrow-avro/src/reader/header.rs | 22 +--
arrow-avro/src/reader/mod.rs | 72 ++++----
arrow-avro/src/reader/record.rs | 228 ++++++++++++------------
arrow-avro/src/writer/encoder.rs | 367 +++++++++++++++++++--------------------
arrow-avro/src/writer/format.rs | 31 ++--
10 files changed, 543 insertions(+), 418 deletions(-)
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
index 0cb2878a13..7c6a62564a 100644
--- a/arrow-avro/src/compression.rs
+++ b/arrow-avro/src/compression.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::errors::AvroError;
use arrow_schema::ArrowError;
#[cfg(any(
feature = "deflate",
@@ -47,7 +48,7 @@ pub enum CompressionCodec {
impl CompressionCodec {
#[allow(unused_variables)]
- pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>,
ArrowError> {
+ pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>,
AvroError> {
match self {
#[cfg(feature = "deflate")]
CompressionCodec::Deflate => {
@@ -57,7 +58,7 @@ impl CompressionCodec {
Ok(out)
}
#[cfg(not(feature = "deflate"))]
- CompressionCodec::Deflate => Err(ArrowError::ParseError(
+ CompressionCodec::Deflate => Err(AvroError::ParseError(
"Deflate codec requires deflate feature".to_string(),
)),
#[cfg(feature = "snappy")]
@@ -70,16 +71,16 @@ impl CompressionCodec {
let mut decoder = snap::raw::Decoder::new();
let decoded = decoder
.decompress_vec(block)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+ .map_err(|e| AvroError::External(Box::new(e)))?;
let checksum =
crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
- return Err(ArrowError::ParseError("Snappy CRC
mismatch".to_string()));
+ return Err(AvroError::ParseError("Snappy CRC
mismatch".to_string()));
}
Ok(decoded)
}
#[cfg(not(feature = "snappy"))]
- CompressionCodec::Snappy => Err(ArrowError::ParseError(
+ CompressionCodec::Snappy => Err(AvroError::ParseError(
"Snappy codec requires snappy feature".to_string(),
)),
@@ -87,33 +88,39 @@ impl CompressionCodec {
CompressionCodec::ZStandard => {
let mut decoder = zstd::Decoder::new(block)?;
let mut out = Vec::new();
- decoder.read_to_end(&mut out)?;
+ decoder
+ .read_to_end(&mut out)
+ .map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "zstd"))]
- CompressionCodec::ZStandard => Err(ArrowError::ParseError(
+ CompressionCodec::ZStandard => Err(AvroError::ParseError(
"ZStandard codec requires zstd feature".to_string(),
)),
#[cfg(feature = "bzip2")]
CompressionCodec::Bzip2 => {
let mut decoder = bzip2::read::BzDecoder::new(block);
let mut out = Vec::new();
- decoder.read_to_end(&mut out)?;
+ decoder
+ .read_to_end(&mut out)
+ .map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "bzip2"))]
- CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
+ CompressionCodec::Bzip2 => Err(AvroError::ParseError(
"Bzip2 codec requires bzip2 feature".to_string(),
)),
#[cfg(feature = "xz")]
CompressionCodec::Xz => {
let mut decoder = xz::read::XzDecoder::new(block);
let mut out = Vec::new();
- decoder.read_to_end(&mut out)?;
+ decoder
+ .read_to_end(&mut out)
+ .map_err(|e| AvroError::External(Box::new(e)))?;
Ok(out)
}
#[cfg(not(feature = "xz"))]
- CompressionCodec::Xz => Err(ArrowError::ParseError(
+ CompressionCodec::Xz => Err(AvroError::ParseError(
"XZ codec requires xz feature".to_string(),
)),
}
diff --git a/arrow-avro/src/errors.rs b/arrow-avro/src/errors.rs
new file mode 100644
index 0000000000..7e4d1c585e
--- /dev/null
+++ b/arrow-avro/src/errors.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common Avro errors and macros.
+
+use arrow_schema::ArrowError;
+use core::num::TryFromIntError;
+use std::error::Error;
+use std::string::FromUtf8Error;
+use std::{io, str};
+
+/// Avro error enumeration
+
+#[derive(Debug)]
+#[non_exhaustive]
+pub enum AvroError {
+ /// General Avro error.
+ /// Returned when code violates normal workflow of working with Avro data.
+ General(String),
+ /// "Not yet implemented" Avro error.
+ /// Returned when functionality is not yet available.
+ NYI(String),
+ /// "End of file" Avro error.
+ /// Returned when IO related failures occur, e.g. when there are not
enough bytes to
+ /// decode.
+ EOF(String),
+ /// Arrow error.
+ /// Returned when reading into arrow or writing from arrow.
+ ArrowError(Box<ArrowError>),
+ /// Error when the requested index is more than the
+ /// number of items expected
+ IndexOutOfBound(usize, usize),
+ /// Error indicating that an unexpected or bad argument was passed to a
function.
+ InvalidArgument(String),
+ /// Error indicating that a value could not be parsed.
+ ParseError(String),
+ /// Error indicating that a schema is invalid.
+ SchemaError(String),
+ /// An external error variant
+ External(Box<dyn Error + Send + Sync>),
+ /// Error during IO operations
+ IoError(String, io::Error),
+ /// Returned when a function needs more data to complete properly. The
`usize` field indicates
+ /// the total number of bytes required, not the number of additional bytes.
+ NeedMoreData(usize),
+ /// Returned when a function needs more data to complete properly.
+ /// The `Range<u64>` indicates the range of bytes that are needed.
+ NeedMoreDataRange(std::ops::Range<u64>),
+}
+
+impl std::fmt::Display for AvroError {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match &self {
+ AvroError::General(message) => {
+ write!(fmt, "Avro error: {message}")
+ }
+ AvroError::NYI(message) => write!(fmt, "NYI: {message}"),
+ AvroError::EOF(message) => write!(fmt, "EOF: {message}"),
+ AvroError::ArrowError(message) => write!(fmt, "Arrow: {message}"),
+ AvroError::IndexOutOfBound(index, bound) => {
+ write!(fmt, "Index {index} out of bound: {bound}")
+ }
+ AvroError::InvalidArgument(message) => {
+ write!(fmt, "Invalid argument: {message}")
+ }
+ AvroError::ParseError(message) => write!(fmt, "Parser error:
{message}"),
+ AvroError::SchemaError(message) => write!(fmt, "Schema error:
{message}"),
+ AvroError::External(e) => write!(fmt, "External: {e}"),
+ AvroError::IoError(message, e) => write!(fmt, "I/O Error:
{message}: {e}"),
+ AvroError::NeedMoreData(needed) => write!(fmt, "NeedMoreData:
{needed}"),
+ AvroError::NeedMoreDataRange(range) => {
+ write!(fmt, "NeedMoreDataRange: {}..{}", range.start,
range.end)
+ }
+ }
+ }
+}
+
+impl Error for AvroError {
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ match self {
+ AvroError::External(e) => Some(e.as_ref()),
+ AvroError::ArrowError(e) => Some(e.as_ref()),
+ AvroError::IoError(_, e) => Some(e),
+ _ => None,
+ }
+ }
+}
+
+impl From<TryFromIntError> for AvroError {
+ fn from(e: TryFromIntError) -> AvroError {
+ AvroError::General(format!("Integer overflow: {e}"))
+ }
+}
+
+impl From<io::Error> for AvroError {
+ fn from(e: io::Error) -> AvroError {
+ AvroError::External(Box::new(e))
+ }
+}
+
+impl From<str::Utf8Error> for AvroError {
+ fn from(e: str::Utf8Error) -> AvroError {
+ AvroError::External(Box::new(e))
+ }
+}
+
+impl From<FromUtf8Error> for AvroError {
+ fn from(e: FromUtf8Error) -> AvroError {
+ AvroError::External(Box::new(e))
+ }
+}
+
+impl From<ArrowError> for AvroError {
+ fn from(e: ArrowError) -> Self {
+ AvroError::ArrowError(Box::new(e))
+ }
+}
+
+impl From<AvroError> for io::Error {
+ fn from(e: AvroError) -> Self {
+ io::Error::other(e)
+ }
+}
+
+impl From<AvroError> for ArrowError {
+ fn from(e: AvroError) -> Self {
+ match e {
+ AvroError::External(inner) =>
ArrowError::from_external_error(inner),
+ AvroError::IoError(msg, err) => ArrowError::IoError(msg, err),
+ AvroError::ArrowError(inner) => *inner,
+ other => ArrowError::AvroError(other.to_string()),
+ }
+ }
+}
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index 032ad683ff..eb04ee8fa6 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -195,6 +195,9 @@ pub mod compression;
/// Avro data types and Arrow data types.
pub mod codec;
+/// AvroError variants
+pub mod errors;
+
/// Extension trait for AvroField to add Utf8View support
///
/// This trait adds methods for working with Utf8View support to the AvroField
struct.
diff --git a/arrow-avro/src/reader/block.rs b/arrow-avro/src/reader/block.rs
index 479f0ef909..540aedee52 100644
--- a/arrow-avro/src/reader/block.rs
+++ b/arrow-avro/src/reader/block.rs
@@ -17,8 +17,8 @@
//! Decoder for [`Block`]
+use crate::errors::AvroError;
use crate::reader::vlq::VLQDecoder;
-use arrow_schema::ArrowError;
/// A file data block
///
@@ -75,14 +75,14 @@ impl BlockDecoder {
/// can then be used again to read the next block, if any
///
/// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
- pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+ pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, AvroError> {
let max_read = buf.len();
while !buf.is_empty() {
match self.state {
BlockDecoderState::Count => {
if let Some(c) = self.vlq_decoder.long(&mut buf) {
self.in_progress.count = c.try_into().map_err(|_| {
- ArrowError::ParseError(format!(
+ AvroError::ParseError(format!(
"Block count cannot be negative, got {c}"
))
})?;
@@ -93,9 +93,7 @@ impl BlockDecoder {
BlockDecoderState::Size => {
if let Some(c) = self.vlq_decoder.long(&mut buf) {
self.bytes_remaining = c.try_into().map_err(|_| {
- ArrowError::ParseError(format!(
- "Block size cannot be negative, got {c}"
- ))
+ AvroError::ParseError(format!("Block size cannot
be negative, got {c}"))
})?;
self.in_progress.data.reserve(self.bytes_remaining);
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index 23d9e50333..fb5eb66306 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use crate::errors::AvroError;
use crate::reader::vlq::read_varint;
-use arrow_schema::ArrowError;
/// A wrapper around a byte slice, providing low-level decoding for Avro
///
@@ -43,52 +43,51 @@ impl<'a> AvroCursor<'a> {
/// Read a single `u8`
#[inline]
- pub(crate) fn get_u8(&mut self) -> Result<u8, ArrowError> {
+ pub(crate) fn get_u8(&mut self) -> Result<u8, AvroError> {
match self.buf.first().copied() {
Some(x) => {
self.buf = &self.buf[1..];
Ok(x)
}
- None => Err(ArrowError::ParseError("Unexpected EOF".to_string())),
+ None => Err(AvroError::EOF("Unexpected EOF".to_string())),
}
}
#[inline]
- pub(crate) fn get_bool(&mut self) -> Result<bool, ArrowError> {
+ pub(crate) fn get_bool(&mut self) -> Result<bool, AvroError> {
Ok(self.get_u8()? != 0)
}
- pub(crate) fn read_vlq(&mut self) -> Result<u64, ArrowError> {
- let (val, offset) = read_varint(self.buf)
- .ok_or_else(|| ArrowError::ParseError("bad varint".to_string()))?;
+ pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> {
+ let (val, offset) =
+ read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad
varint".to_string()))?;
self.buf = &self.buf[offset..];
Ok(val)
}
#[inline]
- pub(crate) fn get_int(&mut self) -> Result<i32, ArrowError> {
+ pub(crate) fn get_int(&mut self) -> Result<i32, AvroError> {
let varint = self.read_vlq()?;
let val: u32 = varint
.try_into()
- .map_err(|_| ArrowError::ParseError("varint
overflow".to_string()))?;
+ .map_err(|_| AvroError::ParseError("varint
overflow".to_string()))?;
Ok((val >> 1) as i32 ^ -((val & 1) as i32))
}
#[inline]
- pub(crate) fn get_long(&mut self) -> Result<i64, ArrowError> {
+ pub(crate) fn get_long(&mut self) -> Result<i64, AvroError> {
let val = self.read_vlq()?;
Ok((val >> 1) as i64 ^ -((val & 1) as i64))
}
- pub(crate) fn get_bytes(&mut self) -> Result<&'a [u8], ArrowError> {
- let len: usize = self.get_long()?.try_into().map_err(|_| {
- ArrowError::ParseError("offset overflow reading avro
bytes".to_string())
- })?;
+ pub(crate) fn get_bytes(&mut self) -> Result<&'a [u8], AvroError> {
+ let len: usize = self
+ .get_long()?
+ .try_into()
+ .map_err(|_| AvroError::ParseError("offset overflow reading avro
bytes".to_string()))?;
if self.buf.len() < len {
- return Err(ArrowError::ParseError(
- "Unexpected EOF reading bytes".to_string(),
- ));
+ return Err(AvroError::EOF("Unexpected EOF reading
bytes".to_string()));
}
let ret = &self.buf[..len];
self.buf = &self.buf[len..];
@@ -96,11 +95,9 @@ impl<'a> AvroCursor<'a> {
}
#[inline]
- pub(crate) fn get_float(&mut self) -> Result<f32, ArrowError> {
+ pub(crate) fn get_float(&mut self) -> Result<f32, AvroError> {
if self.buf.len() < 4 {
- return Err(ArrowError::ParseError(
- "Unexpected EOF reading float".to_string(),
- ));
+ return Err(AvroError::EOF("Unexpected EOF reading
float".to_string()));
}
let ret = f32::from_le_bytes(self.buf[..4].try_into().unwrap());
self.buf = &self.buf[4..];
@@ -108,11 +105,9 @@ impl<'a> AvroCursor<'a> {
}
#[inline]
- pub(crate) fn get_double(&mut self) -> Result<f64, ArrowError> {
+ pub(crate) fn get_double(&mut self) -> Result<f64, AvroError> {
if self.buf.len() < 8 {
- return Err(ArrowError::ParseError(
- "Unexpected EOF reading float".to_string(),
- ));
+ return Err(AvroError::EOF("Unexpected EOF reading
double".to_string()));
}
let ret = f64::from_le_bytes(self.buf[..8].try_into().unwrap());
self.buf = &self.buf[8..];
@@ -120,11 +115,9 @@ impl<'a> AvroCursor<'a> {
}
/// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`).
- pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8],
ArrowError> {
+ pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8],
AvroError> {
if self.buf.len() < n {
- return Err(ArrowError::ParseError(
- "Unexpected EOF reading fixed".to_string(),
- ));
+ return Err(AvroError::EOF("Unexpected EOF reading
fixed".to_string()));
}
let ret = &self.buf[..n];
self.buf = &self.buf[n..];
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index aac267f50e..b5efd8bcdb 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -18,13 +18,13 @@
//! Decoder for [`Header`]
use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
+use crate::errors::AvroError;
use crate::reader::vlq::VLQDecoder;
use crate::schema::{SCHEMA_METADATA_KEY, Schema};
-use arrow_schema::ArrowError;
use std::io::BufRead;
/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
-pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header,
ArrowError> {
+pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header,
AvroError> {
let mut decoder = HeaderDecoder::default();
loop {
let buf = reader.fill_buf()?;
@@ -38,9 +38,9 @@ pub(crate) fn read_header<R: BufRead>(mut reader: R) ->
Result<Header, ArrowErro
break;
}
}
- decoder.flush().ok_or_else(|| {
- ArrowError::ParseError("Unexpected EOF while reading Avro
header".to_string())
- })
+ decoder
+ .flush()
+ .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro
header".to_string()))
}
#[derive(Debug)]
@@ -96,7 +96,7 @@ impl Header {
}
/// Returns the [`CompressionCodec`] if any
- pub fn compression(&self) -> Result<Option<CompressionCodec>, ArrowError> {
+ pub fn compression(&self) -> Result<Option<CompressionCodec>, AvroError> {
let v = self.get(CODEC_METADATA_KEY);
match v {
None | Some(b"null") => Ok(None),
@@ -105,7 +105,7 @@ impl Header {
Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)),
Some(b"bzip2") => Ok(Some(CompressionCodec::Bzip2)),
Some(b"xz") => Ok(Some(CompressionCodec::Xz)),
- Some(v) => Err(ArrowError::ParseError(format!(
+ Some(v) => Err(AvroError::ParseError(format!(
"Unrecognized compression codec \'{}\'",
String::from_utf8_lossy(v)
))),
@@ -113,11 +113,11 @@ impl Header {
}
/// Returns the `Schema` if any
- pub(crate) fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
+ pub(crate) fn schema(&self) -> Result<Option<Schema<'_>>, AvroError> {
self.get(SCHEMA_METADATA_KEY)
.map(|x| {
serde_json::from_slice(x).map_err(|e| {
- ArrowError::ParseError(format!("Failed to parse Avro
schema JSON: {e}"))
+ AvroError::ParseError(format!("Failed to parse Avro schema
JSON: {e}"))
})
})
.transpose()
@@ -175,7 +175,7 @@ impl HeaderDecoder {
/// input bytes, and the header can be obtained with [`Self::flush`]
///
/// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
- pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
+ pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, AvroError> {
let max_read = buf.len();
while !buf.is_empty() {
match self.state {
@@ -183,7 +183,7 @@ impl HeaderDecoder {
let remaining = &MAGIC[MAGIC.len() -
self.bytes_remaining..];
let to_decode = buf.len().min(remaining.len());
if !buf.starts_with(&remaining[..to_decode]) {
- return Err(ArrowError::ParseError("Incorrect avro
magic".to_string()));
+ return Err(AvroError::ParseError("Incorrect avro
magic".to_string()));
}
self.bytes_remaining -= to_decode;
buf = &buf[to_decode..];
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index b040aa9d42..55763aeb62 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -119,14 +119,13 @@
//! use futures::{Stream, StreamExt};
//! use std::task::{Poll, ready};
//! use arrow_array::RecordBatch;
-//! use arrow_schema::ArrowError;
-//! use arrow_avro::reader::Decoder;
+//! use arrow_avro::{reader::Decoder, errors::AvroError};
//!
//! /// Decode a stream of Avro-framed bytes into RecordBatch values.
//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
//! mut decoder: Decoder,
//! mut input: S,
-//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
+//! ) -> impl Stream<Item = Result<RecordBatch, AvroError>> {
//! let mut buffered = Bytes::new();
//! futures::stream::poll_fn(move |cx| {
//! loop {
@@ -480,6 +479,7 @@
//!
//! ---
use crate::codec::AvroFieldBuilder;
+use crate::errors::AvroError;
use crate::reader::header::read_header;
use crate::schema::{
AvroSchema, CONFLUENT_MAGIC, Fingerprint, FingerprintAlgorithm,
SCHEMA_METADATA_KEY,
@@ -499,11 +499,10 @@ mod header;
mod record;
mod vlq;
-fn is_incomplete_data(err: &ArrowError) -> bool {
+fn is_incomplete_data(err: &AvroError) -> bool {
matches!(
err,
- ArrowError::ParseError(msg)
- if msg.contains("Unexpected EOF")
+ AvroError::EOF(_) | AvroError::NeedMoreData(_) |
AvroError::NeedMoreDataRange(_)
)
}
@@ -675,7 +674,7 @@ impl Decoder {
/// `SchemaStore`;
/// * The Avro body is malformed;
/// * A strict‑mode union rule is violated (see
`ReaderBuilder::with_strict_mode`).
- pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+ pub fn decode(&mut self, data: &[u8]) -> Result<usize, AvroError> {
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.remaining_capacity > 0 {
if self.awaiting_body {
@@ -687,7 +686,7 @@ impl Decoder {
continue;
}
Err(ref e) if is_incomplete_data(e) => break,
- err => return err,
+ Err(e) => return Err(e),
};
}
match self.handle_prefix(&data[total_consumed..])? {
@@ -698,7 +697,7 @@ impl Decoder {
self.awaiting_body = true;
}
None => {
- return Err(ArrowError::ParseError(
+ return Err(AvroError::ParseError(
"Missing magic bytes and fingerprint".to_string(),
));
}
@@ -711,7 +710,7 @@ impl Decoder {
// * Ok(None) – buffer does not start with the prefix.
// * Ok(Some(0)) – prefix detected, but the buffer is too short; caller
should await more bytes.
// * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and
fingerprint).
- fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>,
ArrowError> {
+ fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>,
AvroError> {
match self.fingerprint_algorithm {
FingerprintAlgorithm::Rabin => {
self.handle_prefix_common(buf, &SINGLE_OBJECT_MAGIC, |bytes| {
@@ -749,7 +748,7 @@ impl Decoder {
buf: &[u8],
magic: &[u8; MAGIC_LEN],
fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
- ) -> Result<Option<usize>, ArrowError> {
+ ) -> Result<Option<usize>, AvroError> {
// Need at least the magic bytes to decide
// 2 bytes for Avro Spec and 1 byte for Confluent Wire Protocol.
if buf.len() < MAGIC_LEN {
@@ -774,7 +773,7 @@ impl Decoder {
&mut self,
buf: &[u8],
fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
- ) -> Result<Option<usize>, ArrowError> {
+ ) -> Result<Option<usize>, AvroError> {
// Need enough bytes to get fingerprint (next N bytes)
let Some(fingerprint_bytes) = buf.get(..N) else {
return Ok(None); // insufficient bytes
@@ -784,7 +783,7 @@ impl Decoder {
// If the fingerprint indicates a schema change, prepare to switch
decoders.
if self.active_fingerprint != Some(new_fingerprint) {
let Some(new_decoder) = self.cache.shift_remove(&new_fingerprint)
else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Unknown fingerprint: {new_fingerprint:?}"
)));
};
@@ -816,7 +815,7 @@ impl Decoder {
}
}
- fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, AvroError> {
if self.batch_is_empty() {
return Ok(None);
}
@@ -831,7 +830,7 @@ impl Decoder {
/// If a schema change was detected while decoding rows for the current
batch, the
/// schema switch is applied **after** flushing this batch, so the
**next** batch
/// (if any) may have a different schema.
- pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ pub fn flush(&mut self) -> Result<Option<RecordBatch>, AvroError> {
// We must flush the active decoder before switching to the pending
one.
let batch = self.flush_and_reset();
self.apply_pending_schema();
@@ -856,7 +855,7 @@ impl Decoder {
// Decode either the block count or remaining capacity from `data` (an OCF
block payload).
//
// Returns the number of bytes consumed from `data` along with the number
of records decoded.
- fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize,
usize), ArrowError> {
+ fn decode_block(&mut self, data: &[u8], count: usize) -> Result<(usize,
usize), AvroError> {
// OCF decoding never interleaves records across blocks, so no
chunking.
let to_decode = std::cmp::min(count, self.remaining_capacity);
if to_decode == 0 {
@@ -869,7 +868,7 @@ impl Decoder {
// Produce a `RecordBatch` if at least one row is fully decoded, returning
// `Ok(None)` if no new rows are available.
- fn flush_block(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ fn flush_block(&mut self) -> Result<Option<RecordBatch>, AvroError> {
self.flush_and_reset()
}
}
@@ -979,7 +978,7 @@ impl ReaderBuilder {
&self,
writer_schema: &Schema,
reader_schema: Option<&Schema>,
- ) -> Result<RecordDecoder, ArrowError> {
+ ) -> Result<RecordDecoder, AvroError> {
let mut builder = AvroFieldBuilder::new(writer_schema);
if let Some(reader_schema) = reader_schema {
builder = builder.with_reader_schema(reader_schema);
@@ -995,7 +994,7 @@ impl ReaderBuilder {
&self,
writer_schema: &Schema,
reader_schema: Option<&AvroSchema>,
- ) -> Result<RecordDecoder, ArrowError> {
+ ) -> Result<RecordDecoder, AvroError> {
let reader_schema_raw = reader_schema.map(|s| s.schema()).transpose()?;
self.make_record_decoder(writer_schema, reader_schema_raw.as_ref())
}
@@ -1023,14 +1022,11 @@ impl ReaderBuilder {
&self,
header: Option<&Header>,
reader_schema: Option<&AvroSchema>,
- ) -> Result<Decoder, ArrowError> {
+ ) -> Result<Decoder, AvroError> {
if let Some(hdr) = header {
- let writer_schema = hdr
- .schema()
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
- .ok_or_else(|| {
- ArrowError::ParseError("No Avro schema present in file
header".into())
- })?;
+ let writer_schema = hdr.schema()?.ok_or_else(|| {
+ AvroError::ParseError("No Avro schema present in file
header".into())
+ })?;
let projected_reader_schema = self
.projection
.as_deref()
@@ -1039,13 +1035,13 @@ impl ReaderBuilder {
reader_schema.clone()
} else {
let raw = hdr.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
- ArrowError::ParseError(
+ AvroError::ParseError(
"No Avro schema present in file
header".to_string(),
)
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
- ArrowError::ParseError(format!(
+ AvroError::ParseError(format!(
"Invalid UTF-8 in Avro schema header: {e}"
))
})?
@@ -1066,11 +1062,11 @@ impl ReaderBuilder {
));
}
let store = self.writer_schema_store.as_ref().ok_or_else(|| {
- ArrowError::ParseError("Writer schema store required for raw
Avro".into())
+ AvroError::ParseError("Writer schema store required for raw
Avro".into())
})?;
let fingerprints = store.fingerprints();
if fingerprints.is_empty() {
- return Err(ArrowError::ParseError(
+ return Err(AvroError::ParseError(
"Writer schema store must contain at least one schema".into(),
));
}
@@ -1078,7 +1074,7 @@ impl ReaderBuilder {
.active_fingerprint
.or_else(|| fingerprints.first().copied())
.ok_or_else(|| {
- ArrowError::ParseError("Could not determine initial schema
fingerprint".into())
+ AvroError::ParseError("Could not determine initial schema
fingerprint".into())
})?;
let projection = self.projection.as_deref();
let projected_reader_schema = match (projection, reader_schema) {
@@ -1091,7 +1087,7 @@ impl ReaderBuilder {
let avro_schema = match store.lookup(&fingerprint) {
Some(schema) => schema,
None => {
- return Err(ArrowError::ComputeError(format!(
+ return Err(AvroError::General(format!(
"Fingerprint {fingerprint:?} not found in schema
store",
)));
}
@@ -1121,7 +1117,7 @@ impl ReaderBuilder {
}
}
let active_decoder = active_decoder.ok_or_else(|| {
- ArrowError::ComputeError(format!(
+ AvroError::General(format!(
"Initial fingerprint {start_fingerprint:?} not found in schema
store"
))
})?;
@@ -1296,6 +1292,7 @@ impl ReaderBuilder {
));
}
self.make_decoder(None, self.reader_schema.as_ref())
+ .map_err(ArrowError::from)
}
}
@@ -1336,7 +1333,7 @@ impl<R: BufRead> Reader<R> {
///
/// Batches are bounded by `batch_size`; a single OCF block may yield
multiple batches,
/// and a batch may also span multiple blocks.
- fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ fn read(&mut self) -> Result<Option<RecordBatch>, AvroError> {
'outer: while !self.finished && !self.decoder.batch_is_full() {
while self.block_cursor == self.block_data.len() {
let buf = self.reader.fill_buf()?;
@@ -1350,7 +1347,8 @@ impl<R: BufRead> Reader<R> {
if let Some(block) = self.block_decoder.flush() {
// Successfully decoded a block.
self.block_data = if let Some(ref codec) =
self.header.compression()? {
- codec.decompress(&block.data)?
+ let decompressed: Vec<u8> =
codec.decompress(&block.data)?;
+ decompressed
} else {
block.data
};
@@ -1358,7 +1356,7 @@ impl<R: BufRead> Reader<R> {
self.block_cursor = 0;
} else if consumed == 0 {
// The block decoder made no progress on a non-empty
buffer.
- return Err(ArrowError::ParseError(
+ return Err(AvroError::ParseError(
"Could not decode next Avro block from partial
data".to_string(),
));
}
@@ -1380,7 +1378,7 @@ impl<R: BufRead> Iterator for Reader<R> {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
- self.read().transpose()
+ self.read().map_err(ArrowError::from).transpose()
}
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index ec69e7788c..9c118193b8 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -21,6 +21,7 @@ use crate::codec::{
AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo,
ResolvedRecord,
ResolvedUnion,
};
+use crate::errors::AvroError;
use crate::reader::cursor::AvroCursor;
use crate::schema::Nullability;
#[cfg(feature = "small_decimals")]
@@ -29,12 +30,12 @@ use arrow_array::builder::{Decimal128Builder,
Decimal256Builder, IntervalMonthDa
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::*;
-use arrow_schema::{
- ArrowError, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType,
Field as ArrowField,
- FieldRef, Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
-};
#[cfg(feature = "small_decimals")]
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
+use arrow_schema::{
+ DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DataType, Field as
ArrowField, FieldRef,
+ Fields, Schema as ArrowSchema, SchemaRef, UnionFields, UnionMode,
+};
#[cfg(feature = "avro_custom_types")]
use arrow_select::take::{TakeOptions, take};
use std::cmp::Ordering;
@@ -67,8 +68,7 @@ macro_rules! flush_decimal {
($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) =>
{{
let (_, vals, _) = $builder.finish().into_parts();
let dec = <$ArrayTy>::try_new(vals, $nulls)?
- .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0)
as i8)
- .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0)
as i8)?;
Arc::new(dec) as ArrayRef
}};
}
@@ -84,7 +84,7 @@ macro_rules! append_decimal_default {
$builder.append_value(val);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
concat!(
"Default for ",
$name,
@@ -115,7 +115,7 @@ impl RecordDecoder {
///
/// # Errors
/// This function will return an error if the provided `data_type` is not
a `Record`.
- pub(crate) fn try_new_with_options(data_type: &AvroDataType) ->
Result<Self, ArrowError> {
+ pub(crate) fn try_new_with_options(data_type: &AvroDataType) ->
Result<Self, AvroError> {
match data_type.codec() {
Codec::Struct(reader_fields) => {
// Build Arrow schema fields and per-child decoders
@@ -137,7 +137,7 @@ impl RecordDecoder {
projector,
})
}
- other => Err(ArrowError::ParseError(format!(
+ other => Err(AvroError::ParseError(format!(
"Expected record got {other:?}"
))),
}
@@ -149,7 +149,7 @@ impl RecordDecoder {
}
/// Decode `count` records from `buf`
- pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize,
ArrowError> {
+ pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize,
AvroError> {
let mut cursor = AvroCursor::new(buf);
match self.projector.as_mut() {
Some(proj) => {
@@ -169,13 +169,13 @@ impl RecordDecoder {
}
/// Flush the decoded records into a [`RecordBatch`]
- pub(crate) fn flush(&mut self) -> Result<RecordBatch, ArrowError> {
+ pub(crate) fn flush(&mut self) -> Result<RecordBatch, AvroError> {
let arrays = self
.fields
.iter_mut()
.map(|x| x.flush(None))
.collect::<Result<Vec<_>, _>>()?;
- RecordBatch::try_new(self.schema.clone(), arrays)
+ RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
}
}
@@ -246,7 +246,7 @@ enum Decoder {
}
impl Decoder {
- fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
+ fn try_new(data_type: &AvroDataType) -> Result<Self, AvroError> {
if let Some(ResolutionInfo::Union(info)) =
data_type.resolution.as_ref() {
if info.writer_is_union && !info.reader_is_union {
let mut clone = data_type.clone();
@@ -264,7 +264,7 @@ impl Decoder {
Self::try_new_internal(data_type)
}
- fn try_new_internal(data_type: &AvroDataType) -> Result<Self, ArrowError> {
+ fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
// Extract just the Promotion (if any) to simplify pattern matching
let promotion = match data_type.resolution.as_ref() {
Some(ResolutionInfo::Promotion(p)) => Some(p),
@@ -369,7 +369,7 @@ impl Decoder {
.with_precision_and_scale(prec, scl)?;
Self::Decimal256(p, s, *size, builder)
} else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Decimal precision {p} exceeds maximum supported"
)));
}
@@ -385,7 +385,7 @@ impl Decoder {
.with_precision_and_scale(prec, scl)?;
Self::Decimal256(p, s, *size, builder)
} else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Decimal precision {p} exceeds maximum supported"
)));
}
@@ -452,7 +452,7 @@ impl Decoder {
.map(Self::try_new_internal)
.collect::<Result<Vec<_>, _>>()?;
if fields.len() != decoders.len() {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Union has {} fields but {} decoders",
fields.len(),
decoders.len()
@@ -463,7 +463,7 @@ impl Decoder {
let branch_count = decoders.len();
let max_addr = (i32::MAX as usize) + 1;
if branch_count > max_addr {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Union has {branch_count} branches, which exceeds the
maximum addressable \
branches by an Avro int tag ({} + 1).",
i32::MAX
@@ -480,7 +480,7 @@ impl Decoder {
Self::Union(builder.build()?)
}
(Codec::Union(_, _, _), _) => {
- return Err(ArrowError::NotYetImplemented(
+ return Err(AvroError::NYI(
"Sparse Arrow unions are not yet supported".to_string(),
));
}
@@ -493,7 +493,7 @@ impl Decoder {
32 => 4,
64 => 8,
other => {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Unsupported run-end width {other} for
RunEndEncoded; \
expected 16/32/64 bits or 2/4/8 bytes"
)));
@@ -525,7 +525,7 @@ impl Decoder {
}
/// Append a null record
- fn append_null(&mut self) -> Result<(), ArrowError> {
+ fn append_null(&mut self) -> Result<(), AvroError> {
match self {
Self::Null(count) => *count += 1,
Self::Boolean(b) => b.append(false),
@@ -593,7 +593,7 @@ impl Decoder {
}
/// Append a single default literal into the decoder's buffers
- fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+ fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
match self {
Self::Nullable(_, nb, inner, _) => {
if matches!(lit, AvroLiteral::Null) {
@@ -609,7 +609,7 @@ impl Decoder {
*count += 1;
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Non-null default for null type".to_string(),
)),
},
@@ -618,7 +618,7 @@ impl Decoder {
b.append(*v);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for boolean must be boolean".to_string(),
)),
},
@@ -627,7 +627,7 @@ impl Decoder {
v.push(*i);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for int32/date32/time-millis must be
int".to_string(),
)),
},
@@ -640,7 +640,7 @@ impl Decoder {
v.push(*i);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for duration long must be long".to_string(),
)),
},
@@ -658,7 +658,7 @@ impl Decoder {
v.push(*i as i64);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for long/time-micros/timestamp must be long or
int".to_string(),
)),
},
@@ -667,7 +667,7 @@ impl Decoder {
v.push(*f);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for float must be float".to_string(),
)),
},
@@ -679,7 +679,7 @@ impl Decoder {
v.push(*f);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for double must be double".to_string(),
)),
},
@@ -689,7 +689,7 @@ impl Decoder {
values.extend_from_slice(b);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for bytes must be bytes".to_string(),
)),
},
@@ -702,26 +702,26 @@ impl Decoder {
values.extend_from_slice(b);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for string must be string".to_string(),
)),
},
Self::Uuid(values) => match lit {
AvroLiteral::String(s) => {
let uuid = Uuid::try_parse(s).map_err(|e| {
- ArrowError::InvalidArgumentError(format!("Invalid UUID
default: {s} ({e})"))
+ AvroError::InvalidArgument(format!("Invalid UUID
default: {s} ({e})"))
})?;
values.extend_from_slice(uuid.as_bytes());
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for uuid must be string".to_string(),
)),
},
Self::Fixed(sz, accum) => match lit {
AvroLiteral::Bytes(b) => {
if b.len() != *sz as usize {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Fixed default length {} does not match size {sz}",
b.len(),
)));
@@ -729,7 +729,7 @@ impl Decoder {
accum.extend_from_slice(b);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for fixed must be bytes".to_string(),
)),
},
@@ -750,7 +750,7 @@ impl Decoder {
Self::Duration(builder) => match lit {
AvroLiteral::Bytes(b) => {
if b.len() != 12 {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Duration default must be exactly 12 bytes, got
{}",
b.len()
)));
@@ -766,7 +766,7 @@ impl Decoder {
));
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for duration must be 12-byte little-endian
months/days/millis"
.to_string(),
)),
@@ -779,7 +779,7 @@ impl Decoder {
}
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for array must be an array literal".to_string(),
)),
},
@@ -794,21 +794,21 @@ impl Decoder {
}
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for map must be a map/object literal".to_string(),
)),
},
Self::Enum(indices, symbols, _) => match lit {
AvroLiteral::Enum(sym) => {
let pos = symbols.iter().position(|s| s ==
sym).ok_or_else(|| {
- ArrowError::InvalidArgumentError(format!(
+ AvroError::InvalidArgument(format!(
"Enum default symbol {sym:?} not in reader symbols"
))
})?;
indices.push(pos as i32);
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for enum must be a symbol".to_string(),
)),
},
@@ -842,7 +842,7 @@ impl Decoder {
}
Ok(())
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Default for record must be a map/object or
null".to_string(),
)),
},
@@ -850,7 +850,7 @@ impl Decoder {
}
/// Decode a single record from `buf`
- fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+ fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
match self {
Self::Null(x) => *x += 1,
Self::Boolean(values) => values.append(buf.get_bool()?),
@@ -887,10 +887,10 @@ impl Decoder {
Self::Uuid(values) => {
let s_bytes = buf.get_bytes()?;
let s = std::str::from_utf8(s_bytes).map_err(|e| {
- ArrowError::ParseError(format!("UUID bytes are not valid
UTF-8: {e}"))
+ AvroError::ParseError(format!("UUID bytes are not valid
UTF-8: {e}"))
})?;
let uuid = Uuid::try_parse(s)
- .map_err(|e| ArrowError::ParseError(format!("Failed to
parse uuid: {e}")))?;
+ .map_err(|e| AvroError::ParseError(format!("Failed to
parse uuid: {e}")))?;
values.extend_from_slice(uuid.as_bytes());
}
Self::Array(_, off, encoding) => {
@@ -945,7 +945,7 @@ impl Decoder {
if resolved >= 0 {
indices.push(resolved);
} else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Enum symbol index {raw} not resolvable and no default
provided",
)));
}
@@ -994,7 +994,7 @@ impl Decoder {
&mut self,
buf: &mut AvroCursor<'_>,
promotion: Promotion,
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
#[cfg(feature = "avro_custom_types")]
if let Self::RunEndEncoded(_, len, inner) = self {
*len += 1;
@@ -1009,7 +1009,7 @@ impl Decoder {
v.push(x as $to);
Ok(())
}
- other => Err(ArrowError::ParseError(format!(
+ other => Err(AvroError::ParseError(format!(
"Promotion {promotion} target mismatch: expected {},
got {}",
stringify!($variant),
<Self as ::std::convert::AsRef<str>>::as_ref(other)
@@ -1032,7 +1032,7 @@ impl Decoder {
values.extend_from_slice(data);
Ok(())
}
- other => Err(ArrowError::ParseError(format!(
+ other => Err(AvroError::ParseError(format!(
"Promotion {promotion} target mismatch: expected bytes
(Binary/StringToBytes), got {}",
<Self as AsRef<str>>::as_ref(other)
))),
@@ -1046,7 +1046,7 @@ impl Decoder {
values.extend_from_slice(data);
Ok(())
}
- other => Err(ArrowError::ParseError(format!(
+ other => Err(AvroError::ParseError(format!(
"Promotion {promotion} target mismatch: expected string
(String/StringView/BytesToString), got {}",
<Self as AsRef<str>>::as_ref(other)
))),
@@ -1055,7 +1055,7 @@ impl Decoder {
}
/// Flush decoded records to an [`ArrayRef`]
- fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
ArrowError> {
+ fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
AvroError> {
Ok(match self {
Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
Self::Null(size) =>
Arc::new(NullArray::new(std::mem::replace(size, 0))),
@@ -1152,7 +1152,7 @@ impl Decoder {
let val_arr = valdec.flush(None)?;
let key_arr = StringArray::try_new(koff, kd, None)?;
if key_arr.len() != val_arr.len() {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Map keys length ({}) != map values length ({})",
key_arr.len(),
val_arr.len()
@@ -1161,7 +1161,7 @@ impl Decoder {
let final_len = moff.len() - 1;
if let Some(n) = &nulls {
if n.len() != final_len {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Map array null buffer length {} != final map
length {final_len}",
n.len()
)));
@@ -1170,7 +1170,7 @@ impl Decoder {
let entries_fields = match map_field.data_type() {
DataType::Struct(fields) => fields.clone(),
other => {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Map entries field must be a Struct, got {other:?}"
)));
}
@@ -1184,12 +1184,12 @@ impl Decoder {
Self::Fixed(sz, accum) => {
let b: Buffer = flush_values(accum).into();
let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
- .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ .map_err(|e| AvroError::ParseError(e.to_string()))?;
Arc::new(arr)
}
Self::Uuid(values) => {
let arr = FixedSizeBinaryArray::try_new(16,
std::mem::take(values).into(), nulls)
- .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ .map_err(|e| AvroError::ParseError(e.to_string()))?;
Arc::new(arr)
}
#[cfg(feature = "small_decimals")]
@@ -1210,7 +1210,7 @@ impl Decoder {
Self::Duration(builder) => {
let (_, vals, _) = builder.finish().into_parts();
let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
- .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ .map_err(|e| AvroError::ParseError(e.to_string()))?;
Arc::new(vals)
}
#[cfg(feature = "avro_custom_types")]
@@ -1228,7 +1228,7 @@ impl Decoder {
}
}
if n > (u32::MAX as usize) {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"RunEndEncoded length {n} exceeds maximum supported by
UInt32 indices for take",
)));
}
@@ -1239,7 +1239,7 @@ impl Decoder {
values.slice(0, 0)
} else {
take(arr, &take_idx,
Option::from(TakeOptions::default())).map_err(|e| {
- ArrowError::ParseError(format!("take() for REE values
failed: {e}"))
+ AvroError::ParseError(format!("take() for REE values
failed: {e}"))
})?
};
@@ -1256,14 +1256,14 @@ impl Decoder {
}
let ends: PrimitiveArray<$ArrowTy> =
ends.into_iter().collect();
let run_arr = RunArray::<$ArrowTy>::try_new(&ends,
per_run_values.as_ref())
- .map_err(|e|
ArrowError::ParseError(e.to_string()))?;
+ .map_err(|e|
AvroError::ParseError(e.to_string()))?;
Arc::new(run_arr) as ArrayRef
}};
}
match *width {
2 => {
if n > i16::MAX as usize {
- return
Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"RunEndEncoded length {n} exceeds i16::MAX for
run end width 2"
)));
}
@@ -1272,7 +1272,7 @@ impl Decoder {
4 => build_run_array!(i32, Int32Type),
8 => build_run_array!(i64, Int64Type),
other => {
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Unsupported run-end width {other} for
RunEndEncoded"
)));
}
@@ -1316,14 +1316,14 @@ const NO_SOURCE: i8 = -1;
impl DispatchLookupTable {
fn from_writer_to_reader(
promotion_map: &[Option<(usize, Promotion)>],
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
let mut to_reader = Vec::with_capacity(promotion_map.len());
let mut promotion = Vec::with_capacity(promotion_map.len());
for map in promotion_map {
match *map {
Some((idx, promo)) => {
let idx_i8 = i8::try_from(idx).map_err(|_| {
- ArrowError::SchemaError(format!(
+ AvroError::SchemaError(format!(
"Reader branch index {idx} exceeds i8 range (max
{})",
i8::MAX
))
@@ -1401,7 +1401,7 @@ impl UnionDecoder {
fields: UnionFields,
branches: Vec<Decoder>,
resolved: Option<ResolvedUnion>,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
let reader_type_codes = fields.iter().map(|(tid, _)|
tid).collect::<Vec<i8>>();
let null_branch = branches.iter().position(|b| matches!(b,
Decoder::Null(_)));
let default_emit_idx = 0;
@@ -1410,7 +1410,7 @@ impl UnionDecoder {
// Guard against impractically large unions that cannot be indexed by
an Avro int
let max_addr = (i32::MAX as usize) + 1;
if branches.len() > max_addr {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Reader union has {} branches, which exceeds the maximum
addressable \
branches by an Avro int tag ({} + 1).",
branches.len(),
@@ -1433,7 +1433,7 @@ impl UnionDecoder {
fn try_new_from_writer_union(
info: ResolvedUnion,
target: Box<Decoder>,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
// This constructor is only for writer-union to single-type resolution
debug_assert!(info.writer_is_union && !info.reader_is_union);
let lookup_table =
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
@@ -1446,7 +1446,7 @@ impl UnionDecoder {
})
}
- fn plan_from_resolved(resolved: Option<ResolvedUnion>) ->
Result<UnionReadPlan, ArrowError> {
+ fn plan_from_resolved(resolved: Option<ResolvedUnion>) ->
Result<UnionReadPlan, AvroError> {
let Some(info) = resolved else {
return Ok(UnionReadPlan::Passthrough);
};
@@ -1460,7 +1460,7 @@ impl UnionDecoder {
let Some(&(reader_idx, promotion)) =
info.writer_to_reader.first().and_then(Option::as_ref)
else {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Writer type does not match any reader union
branch".to_string(),
));
};
@@ -1469,12 +1469,12 @@ impl UnionDecoder {
promotion,
})
}
- (true, false) => Err(ArrowError::InvalidArgumentError(
+ (true, false) => Err(AvroError::InvalidArgument(
"UnionDecoder::try_new cannot build writer-union to single;
use UnionDecoderBuilder with a target"
.to_string(),
)),
// (false, false) is invalid and should never be constructed by
the resolver.
- _ => Err(ArrowError::SchemaError(
+ _ => Err(AvroError::SchemaError(
"ResolvedUnion constructed for non-union sides; resolver
should return None"
.to_string(),
)),
@@ -1482,19 +1482,19 @@ impl UnionDecoder {
}
#[inline]
- fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+ fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
// Avro unions are encoded by first writing the zero-based branch
index.
// In Avro 1.11.1 this is specified as an *int*; older specs said
*long*,
// but both use zig-zag varint encoding, so decoding as long is
compatible
// with either form and widely used in practice.
let raw = buf.get_long()?;
if raw < 0 {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Negative union branch index {raw}"
)));
}
usize::try_from(raw).map_err(|_| {
- ArrowError::ParseError(format!(
+ AvroError::ParseError(format!(
"Union branch index {raw} does not fit into usize on this
platform ({}-bit)",
(usize::BITS as usize)
))
@@ -1502,10 +1502,10 @@ impl UnionDecoder {
}
#[inline]
- fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder,
ArrowError> {
+ fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder,
AvroError> {
let branches_len = self.branches.len();
let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Union branch index {reader_idx} out of range ({branches_len}
branches)"
)));
};
@@ -1516,9 +1516,9 @@ impl UnionDecoder {
}
#[inline]
- fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(),
ArrowError>
+ fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(),
AvroError>
where
- F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+ F: FnOnce(&mut Decoder) -> Result<(), AvroError>,
{
if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
return action(target);
@@ -1530,21 +1530,21 @@ impl UnionDecoder {
self.emit_to(reader_idx).and_then(action)
}
- fn append_null(&mut self) -> Result<(), ArrowError> {
+ fn append_null(&mut self) -> Result<(), AvroError> {
self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
}
- fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+ fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
self.on_decoder(self.default_emit_idx, |decoder|
decoder.append_default(lit))
}
- fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+ fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
let (reader_idx, promotion) = match &mut self.plan {
UnionReadPlan::Passthrough => (Self::read_tag(buf)?,
Promotion::Direct),
UnionReadPlan::ReaderUnion { lookup_table } => {
let idx = Self::read_tag(buf)?;
lookup_table.resolve(idx).ok_or_else(|| {
- ArrowError::ParseError(format!(
+ AvroError::ParseError(format!(
"Union branch index {idx} not resolvable by reader
schema"
))
})?
@@ -1560,7 +1560,7 @@ impl UnionDecoder {
let idx = Self::read_tag(buf)?;
return match lookup_table.resolve(idx) {
Some((_, promotion)) => target.decode_with_promotion(buf,
promotion),
- None => Err(ArrowError::ParseError(format!(
+ None => Err(AvroError::ParseError(format!(
"Writer union branch {idx} does not resolve to reader
type"
))),
};
@@ -1570,7 +1570,7 @@ impl UnionDecoder {
decoder.decode_with_promotion(buf, promotion)
}
- fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
ArrowError> {
+ fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
AvroError> {
if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
return target.flush(nulls);
}
@@ -1590,7 +1590,7 @@ impl UnionDecoder {
Some(flush_values(&mut self.offsets).into_iter().collect()),
children,
)
- .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ .map_err(|e| AvroError::ParseError(e.to_string()))?;
Ok(Arc::new(arr))
}
}
@@ -1628,7 +1628,7 @@ impl UnionDecoderBuilder {
self
}
- fn build(self) -> Result<UnionDecoder, ArrowError> {
+ fn build(self) -> Result<UnionDecoder, AvroError> {
match (self.resolved, self.fields, self.branches, self.target) {
(resolved, Some(fields), Some(branches), None) => {
UnionDecoder::try_new(fields, branches, resolved)
@@ -1638,7 +1638,7 @@ impl UnionDecoderBuilder {
{
UnionDecoder::try_new_from_writer_union(info, target)
}
- _ => Err(ArrowError::InvalidArgumentError(
+ _ => Err(AvroError::InvalidArgument(
"Invalid UnionDecoderBuilder configuration: expected either \
(fields + branches + resolved) with no target for
reader-unions, or \
(resolved + target) with no fields/branches for writer-union
to single."
@@ -1657,8 +1657,8 @@ enum NegativeBlockBehavior {
#[inline]
fn skip_blocks(
buf: &mut AvroCursor,
- mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
-) -> Result<usize, ArrowError> {
+ mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
+) -> Result<usize, AvroError> {
process_blockwise(
buf,
move |c| skip_item(c),
@@ -1671,30 +1671,30 @@ fn flush_dict(
indices: &mut Vec<i32>,
symbols: &[String],
nulls: Option<NullBuffer>,
-) -> Result<ArrayRef, ArrowError> {
+) -> Result<ArrayRef, AvroError> {
let keys = flush_primitive::<Int32Type>(indices, nulls);
let values = Arc::new(StringArray::from_iter_values(
symbols.iter().map(|s| s.as_str()),
));
DictionaryArray::try_new(keys, values)
- .map_err(|e| ArrowError::ParseError(e.to_string()))
+ .map_err(Into::into)
.map(|arr| Arc::new(arr) as ArrayRef)
}
#[inline]
fn read_blocks(
buf: &mut AvroCursor,
- decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
-) -> Result<usize, ArrowError> {
+ decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
+) -> Result<usize, AvroError> {
process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)
}
#[inline]
fn process_blockwise(
buf: &mut AvroCursor,
- mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>,
+ mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), AvroError>,
negative_behavior: NegativeBlockBehavior,
-) -> Result<usize, ArrowError> {
+) -> Result<usize, AvroError> {
let mut total = 0usize;
loop {
// Read the block count
@@ -1756,7 +1756,7 @@ fn flush_primitive<T: ArrowPrimitiveType>(
fn read_decimal_bytes_be<const N: usize>(
buf: &mut AvroCursor<'_>,
size: &Option<usize>,
-) -> Result<[u8; N], ArrowError> {
+) -> Result<[u8; N], AvroError> {
match size {
Some(n) if *n == N => {
let raw = buf.get_fixed(N)?;
@@ -1784,7 +1784,7 @@ fn read_decimal_bytes_be<const N: usize>(
/// If `raw.len() > N`, all truncated leading bytes must match the
sign-extension byte
/// and the MSB of the first kept byte must match the sign (to avoid silent
overflow).
#[inline]
-fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
+fn sign_cast_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], AvroError> {
let len = raw.len();
// Fast path: exact width, just copy
if len == N {
@@ -1803,7 +1803,7 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) ->
Result<[u8; N], ArrowError> {
let extra = len - N;
// Any non-sign byte in the truncated prefix indicates overflow
if raw[..extra].iter().any(|&b| b != sign_byte) {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Decimal value with {} bytes cannot be represented in {} bytes
without overflow",
len, N
)));
@@ -1812,7 +1812,7 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) ->
Result<[u8; N], ArrowError> {
let first_kept = raw[extra];
let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0;
if sign_bit_mismatch {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Decimal value with {} bytes cannot be represented in {}
bytes without overflow",
len, N
)));
@@ -1863,7 +1863,7 @@ impl<'a> ProjectorBuilder<'a> {
}
#[inline]
- fn build(self) -> Result<Projector, ArrowError> {
+ fn build(self) -> Result<Projector, AvroError> {
let reader_fields = self.reader_fields;
let mut field_defaults: Vec<Option<AvroLiteral>> =
Vec::with_capacity(reader_fields.len());
for avro_field in reader_fields.as_ref() {
@@ -1904,7 +1904,7 @@ impl<'a> ProjectorBuilder<'a> {
impl Projector {
#[inline]
- fn project_default(&self, decoder: &mut Decoder, index: usize) ->
Result<(), ArrowError> {
+ fn project_default(&self, decoder: &mut Decoder, index: usize) ->
Result<(), AvroError> {
// SAFETY: `index` is obtained by listing the reader's record fields
(i.e., from
// `decoders.iter_mut().enumerate()`), and `field_defaults` was built
in
// `ProjectorBuilder::build` to have exactly one element per reader
field.
@@ -1923,7 +1923,7 @@ impl Projector {
&mut self,
buf: &mut AvroCursor<'_>,
encodings: &mut [Decoder],
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
debug_assert_eq!(
self.writer_to_reader.len(),
self.skip_decoders.len(),
@@ -1939,7 +1939,7 @@ impl Projector {
(Some(reader_index), _) =>
encodings[*reader_index].decode(buf)?,
(None, Some(skipper)) => skipper.skip(buf)?,
(None, None) => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"No skipper available for writer-only field at index
{i}",
)));
}
@@ -1986,7 +1986,7 @@ enum Skipper {
}
impl Skipper {
- fn from_avro(dt: &AvroDataType) -> Result<Self, ArrowError> {
+ fn from_avro(dt: &AvroDataType) -> Result<Self, AvroError> {
let mut base = match dt.codec() {
Codec::Null => Self::Null,
Codec::Boolean => Self::Boolean,
@@ -2021,7 +2021,7 @@ impl Skipper {
Codec::Union(encodings, _, _) => {
let max_addr = (i32::MAX as usize) + 1;
if encodings.len() > max_addr {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Writer union has {} branches, which exceeds the
maximum addressable \
branches by an Avro int tag ({} + 1).",
encodings.len(),
@@ -2046,7 +2046,7 @@ impl Skipper {
Ok(base)
}
- fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+ fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
match self {
Self::Null => Ok(()),
Self::Boolean => {
@@ -2118,18 +2118,18 @@ impl Skipper {
// Union tag must be ZigZag-decoded
let raw = buf.get_long()?;
if raw < 0 {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Negative union branch index {raw}"
)));
}
let idx: usize = usize::try_from(raw).map_err(|_| {
- ArrowError::ParseError(format!(
+ AvroError::ParseError(format!(
"Union branch index {raw} does not fit into usize on
this platform ({}-bit)",
(usize::BITS as usize)
))
})?;
let Some(encoding) = encodings.get_mut(idx) else {
- return Err(ArrowError::ParseError(format!(
+ return Err(AvroError::ParseError(format!(
"Union branch index {idx} out of range for skipper ({}
branches)",
encodings.len()
)));
@@ -4374,7 +4374,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn skipper_from_avro_maps_custom_duration_variants_to_int64() ->
Result<(), ArrowError> {
+ fn skipper_from_avro_maps_custom_duration_variants_to_int64() ->
Result<(), AvroError> {
for codec in [
Codec::DurationNanos,
Codec::DurationMicros,
@@ -4393,7 +4393,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(),
ArrowError> {
+ fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(),
AvroError> {
let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN /
3];
for codec in [
Codec::DurationNanos,
@@ -4421,7 +4421,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn skipper_nullable_custom_duration_respects_null_first() -> Result<(),
ArrowError> {
+ fn skipper_nullable_custom_duration_respects_null_first() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::DurationNanos,
Some(Nullability::NullFirst));
let mut s = Skipper::from_avro(&dt)?;
match &s {
@@ -4450,7 +4450,7 @@ mod tests {
#[cfg(feature = "avro_custom_types")]
#[test]
- fn skipper_nullable_custom_duration_respects_null_second() -> Result<(),
ArrowError> {
+ fn skipper_nullable_custom_duration_respects_null_second() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::DurationMicros,
Some(Nullability::NullSecond));
let mut s = Skipper::from_avro(&dt)?;
match &s {
@@ -4481,7 +4481,7 @@ mod tests {
}
#[test]
- fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(),
ArrowError> {
+ fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::Interval, None);
let mut s = Skipper::from_avro(&dt)?;
match s {
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index ef9e02c8fa..651d998fea 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -18,6 +18,7 @@
//! Avro Encoder for Arrow types.
use crate::codec::{AvroDataType, AvroField, Codec};
+use crate::errors::AvroError;
use crate::schema::{Fingerprint, Nullability, Prefix};
use arrow_array::cast::AsArray;
use arrow_array::types::{
@@ -40,9 +41,7 @@ use arrow_array::{
#[cfg(feature = "small_decimals")]
use arrow_array::{Decimal32Array, Decimal64Array};
use arrow_buffer::{ArrowNativeType, NullBuffer};
-use arrow_schema::{
- ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema,
TimeUnit, UnionMode,
-};
+use arrow_schema::{DataType, Field, IntervalUnit, Schema as ArrowSchema,
TimeUnit, UnionMode};
use std::io::Write;
use std::sync::Arc;
use uuid::Uuid;
@@ -51,7 +50,7 @@ use uuid::Uuid;
///
/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
#[inline]
-pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) ->
Result<(), ArrowError> {
+pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W, value: i64) ->
Result<(), AvroError> {
let mut zz = ((value << 1) ^ (value >> 63)) as u64;
// At most 10 bytes for 64-bit varint
let mut buf = [0u8; 10];
@@ -64,25 +63,25 @@ pub(crate) fn write_long<W: Write + ?Sized>(out: &mut W,
value: i64) -> Result<(
buf[i] = (zz & 0x7F) as u8;
i += 1;
out.write_all(&buf[..i])
- .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
+ .map_err(|e| AvroError::IoError(format!("write long: {e}"), e))
}
#[inline]
-fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(),
ArrowError> {
+fn write_int<W: Write + ?Sized>(out: &mut W, value: i32) -> Result<(),
AvroError> {
write_long(out, value as i64)
}
#[inline]
-fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) ->
Result<(), ArrowError> {
+fn write_len_prefixed<W: Write + ?Sized>(out: &mut W, bytes: &[u8]) ->
Result<(), AvroError> {
write_long(out, bytes.len() as i64)?;
out.write_all(bytes)
- .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
+ .map_err(|e| AvroError::IoError(format!("write bytes: {e}"), e))
}
#[inline]
-fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(),
ArrowError> {
+fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) -> Result<(),
AvroError> {
out.write_all(&[if v { 1 } else { 0 }])
- .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
+ .map_err(|e| AvroError::IoError(format!("write bool: {e}"), e))
}
/// Minimal two's-complement big-endian representation helper for Avro decimal
(bytes).
@@ -133,12 +132,11 @@ fn write_sign_extended<W: Write + ?Sized>(
out: &mut W,
src_be: &[u8],
n: usize,
-) -> Result<(), ArrowError> {
+) -> Result<(), AvroError> {
let len = src_be.len();
if len == n {
- return out
- .write_all(src_be)
- .map_err(|e| ArrowError::IoError(format!("write decimal fixed:
{e}"), e));
+ out.write_all(src_be)?;
+ return Ok(());
}
let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
0xFF
@@ -155,13 +153,13 @@ fn write_sign_extended<W: Write + ?Sized>(
if src_be[..extra].iter().any(|&b| b != sign_byte)
|| ((src_be[extra] ^ sign_byte) & 0x80) != 0
{
- return Err(ArrowError::InvalidArgumentError(format!(
+ return Err(AvroError::InvalidArgument(format!(
"Decimal value with {len} bytes cannot be represented in {n}
bytes without overflow",
)));
}
return out
.write_all(&src_be[extra..])
- .map_err(|e| ArrowError::IoError(format!("write decimal fixed:
{e}"), e));
+ .map_err(|e| AvroError::IoError(format!("write decimal fixed:
{e}"), e));
}
// len < n: prepend sign bytes (sign extension) then the payload
let pad_len = n - len;
@@ -178,15 +176,15 @@ fn write_sign_extended<W: Write + ?Sized>(
let mut rem = pad_len;
while rem >= pad.len() {
out.write_all(pad)
- .map_err(|e| ArrowError::IoError(format!("write decimal fixed:
{e}"), e))?;
+ .map_err(|e| AvroError::IoError(format!("write decimal fixed:
{e}"), e))?;
rem -= pad.len();
}
if rem > 0 {
out.write_all(&pad[..rem])
- .map_err(|e| ArrowError::IoError(format!("write decimal fixed:
{e}"), e))?;
+ .map_err(|e| AvroError::IoError(format!("write decimal fixed:
{e}"), e))?;
}
out.write_all(src_be)
- .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"),
e))
+ .map_err(|e| AvroError::IoError(format!("write decimal fixed: {e}"),
e))
}
/// Write the union branch index for an optional field.
@@ -198,10 +196,10 @@ fn write_optional_index<W: Write + ?Sized>(
out: &mut W,
is_null: bool,
null_order: Nullability,
-) -> Result<(), ArrowError> {
+) -> Result<(), AvroError> {
let byte = union_value_branch_byte(null_order, is_null);
out.write_all(&[byte])
- .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
+ .map_err(|e| AvroError::IoError(format!("write union branch: {e}"), e))
}
#[derive(Debug, Clone)]
@@ -229,7 +227,7 @@ impl<'a> FieldEncoder<'a> {
array: &'a dyn Array,
plan: &FieldPlan,
nullability: Option<Nullability>,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
let encoder = match plan {
FieldPlan::Scalar => match array.data_type() {
DataType::Null => Encoder::Null,
@@ -244,25 +242,21 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<StringViewArray>()
- .ok_or_else(|| {
- ArrowError::SchemaError("Expected
StringViewArray".into())
- })?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
StringViewArray".into()))?;
Encoder::Utf8View(Utf8ViewEncoder(arr))
}
DataType::BinaryView => {
let arr = array
.as_any()
.downcast_ref::<BinaryViewArray>()
- .ok_or_else(|| {
- ArrowError::SchemaError("Expected
BinaryViewArray".into())
- })?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
BinaryViewArray".into()))?;
Encoder::BinaryView(BinaryViewEncoder(arr))
}
DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
DataType::Date32 =>
Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
DataType::Date64 => {
- return Err(ArrowError::NotYetImplemented(
+ return Err(AvroError::NYI(
"Avro logical type 'date' is days since epoch (int).
Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a
Timestamp."
.into(),
));
@@ -274,13 +268,13 @@ impl<'a> FieldEncoder<'a> {
Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
}
DataType::Time32(TimeUnit::Microsecond) => {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Arrow Time32 only supports Second or Millisecond. Use
Time64 for microseconds."
.into(),
));
}
DataType::Time32(TimeUnit::Nanosecond) => {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Arrow Time32 only supports Second or Millisecond. Use
Time64 for nanoseconds."
.into(),
));
@@ -289,19 +283,19 @@ impl<'a> FieldEncoder<'a> {
array.as_primitive::<Time64MicrosecondType>(),
)),
DataType::Time64(TimeUnit::Nanosecond) => {
- return Err(ArrowError::NotYetImplemented(
+ return Err(AvroError::NYI(
"Avro writer does not support time-nanos; cast to
Time64(Microsecond)."
.into(),
));
}
DataType::Time64(TimeUnit::Millisecond) => {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Arrow Time64 with millisecond unit is not a valid
Arrow type (use Time32 for millis)."
.into(),
));
}
DataType::Time64(TimeUnit::Second) => {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Arrow Time64 with second unit is not a valid Arrow
type (use Time32 for seconds)."
.into(),
));
@@ -321,7 +315,7 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.ok_or_else(|| {
- ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ AvroError::SchemaError("Expected
FixedSizeBinaryArray".into())
})?;
Encoder::Fixed(FixedEncoder(arr))
}
@@ -367,7 +361,7 @@ impl<'a> FieldEncoder<'a> {
)),
},
other => {
- return Err(ArrowError::NotYetImplemented(format!(
+ return Err(AvroError::NYI(format!(
"Avro scalar type not yet supported: {other:?}"
)));
}
@@ -376,7 +370,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<StructArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
StructArray".into()))?;
Encoder::Struct(Box::new(StructEncoder::try_new(arr,
bindings)?))
}
FieldPlan::List {
@@ -387,7 +381,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<ListArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
ListArray".into()))?;
Encoder::List(Box::new(ListEncoder32::try_new(
arr,
*items_nullability,
@@ -398,7 +392,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<LargeListArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
LargeListArray".into()))?;
Encoder::LargeList(Box::new(ListEncoder64::try_new(
arr,
*items_nullability,
@@ -409,7 +403,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<ListViewArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
ListViewArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
ListViewArray".into()))?;
Encoder::ListView(Box::new(ListViewEncoder32::try_new(
arr,
*items_nullability,
@@ -421,7 +415,7 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<LargeListViewArray>()
.ok_or_else(|| {
- ArrowError::SchemaError("Expected
LargeListViewArray".into())
+ AvroError::SchemaError("Expected
LargeListViewArray".into())
})?;
Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
arr,
@@ -434,7 +428,7 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<FixedSizeListArray>()
.ok_or_else(|| {
- ArrowError::SchemaError("Expected
FixedSizeListArray".into())
+ AvroError::SchemaError("Expected
FixedSizeListArray".into())
})?;
Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
arr,
@@ -443,7 +437,7 @@ impl<'a> FieldEncoder<'a> {
)?))
}
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro array site requires Arrow
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
)));
}
@@ -454,7 +448,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<Decimal32Array>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
Decimal32Array".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
Decimal32Array".into()))?;
Encoder::Decimal32(DecimalEncoder::<4,
Decimal32Array>::new(arr, *size))
}
#[cfg(feature = "small_decimals")]
@@ -462,29 +456,25 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<Decimal64Array>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
Decimal64Array".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
Decimal64Array".into()))?;
Encoder::Decimal64(DecimalEncoder::<8,
Decimal64Array>::new(arr, *size))
}
DataType::Decimal128(_, _) => {
let arr = array
.as_any()
.downcast_ref::<Decimal128Array>()
- .ok_or_else(|| {
- ArrowError::SchemaError("Expected
Decimal128Array".into())
- })?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
Decimal128Array".into()))?;
Encoder::Decimal128(DecimalEncoder::<16,
Decimal128Array>::new(arr, *size))
}
DataType::Decimal256(_, _) => {
let arr = array
.as_any()
.downcast_ref::<Decimal256Array>()
- .ok_or_else(|| {
- ArrowError::SchemaError("Expected
Decimal256Array".into())
- })?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
Decimal256Array".into()))?;
Encoder::Decimal256(DecimalEncoder::<32,
Decimal256Array>::new(arr, *size))
}
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro decimal site requires Arrow Decimal 32, 64, 128,
or 256, found: {other:?}"
)));
}
@@ -494,7 +484,7 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.ok_or_else(|| {
- ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ AvroError::SchemaError("Expected
FixedSizeBinaryArray".into())
})?;
Encoder::Uuid(UuidEncoder(arr))
}
@@ -505,7 +495,7 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<MapArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
MapArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
MapArray".into()))?;
Encoder::Map(Box::new(MapEncoder::try_new(
arr,
*values_nullability,
@@ -515,7 +505,7 @@ impl<'a> FieldEncoder<'a> {
FieldPlan::Enum { symbols } => match array.data_type() {
DataType::Dictionary(key_dt, value_dt) => {
if **key_dt != DataType::Int32 || **value_dt !=
DataType::Utf8 {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Avro enum requires Dictionary<Int32,
Utf8>".into(),
));
}
@@ -523,17 +513,17 @@ impl<'a> FieldEncoder<'a> {
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.ok_or_else(|| {
- ArrowError::SchemaError("Expected
DictionaryArray<Int32>".into())
+ AvroError::SchemaError("Expected
DictionaryArray<Int32>".into())
})?;
let values = dict
.values()
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
- ArrowError::SchemaError("Dictionary values must be
Utf8".into())
+ AvroError::SchemaError("Dictionary values must be
Utf8".into())
})?;
if values.len() != symbols.len() {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Enum symbol length {} != dictionary size {}",
symbols.len(),
values.len()
@@ -541,7 +531,7 @@ impl<'a> FieldEncoder<'a> {
}
for i in 0..values.len() {
if values.value(i) != symbols[i].as_str() {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Enum symbol mismatch at {i}: schema='{}'
dict='{}'",
symbols[i],
values.value(i)
@@ -552,7 +542,7 @@ impl<'a> FieldEncoder<'a> {
Encoder::Enum(EnumEncoder { keys })
}
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro enum site requires DataType::Dictionary, found:
{other:?}"
)));
}
@@ -561,7 +551,8 @@ impl<'a> FieldEncoder<'a> {
let arr = array
.as_any()
.downcast_ref::<UnionArray>()
- .ok_or_else(|| ArrowError::SchemaError("Expected
UnionArray".into()))?;
+ .ok_or_else(|| AvroError::SchemaError("Expected
UnionArray".into()))?;
+
Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
}
FieldPlan::RunEndEncoded {
@@ -569,7 +560,7 @@ impl<'a> FieldEncoder<'a> {
value_plan,
} => {
// Helper closure to build a typed RunEncodedEncoder<R>
- let build = |run_arr_any: &'a dyn Array| ->
Result<Encoder<'a>, ArrowError> {
+ let build = |run_arr_any: &'a dyn Array| ->
Result<Encoder<'a>, AvroError> {
if let Some(arr) =
run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
return
Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
Int16Type,
@@ -606,7 +597,7 @@ impl<'a> FieldEncoder<'a> {
)?,
))));
}
- Err(ArrowError::SchemaError(
+ Err(AvroError::SchemaError(
"Unsupported run-ends index type for RunEndEncoded;
expected Int16/Int32/Int64"
.into(),
))
@@ -635,12 +626,12 @@ impl<'a> FieldEncoder<'a> {
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
match &self.null_state {
NullState::NonNullable => {}
NullState::NullableNoNulls { union_value_byte } => out
.write_all(&[*union_value_byte])
- .map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))?,
+ .map_err(|e| AvroError::IoError(format!("write union value
branch: {e}"), e))?,
NullState::Nullable { nulls, null_order } if nulls.is_null(idx) =>
{
return write_optional_index(out, true, *null_order); // no
value to write
}
@@ -727,10 +718,10 @@ impl<'a> RecordEncoderBuilder<'a> {
/// Build the `RecordEncoder` by walking the Avro **record** root in Avro
order,
/// resolving each field to an Arrow index by name.
- pub(crate) fn build(self) -> Result<RecordEncoder, ArrowError> {
+ pub(crate) fn build(self) -> Result<RecordEncoder, AvroError> {
let avro_root_dt = self.avro_root.data_type();
let Codec::Struct(root_fields) = avro_root_dt.codec() else {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Top-level Avro schema must be a record/struct".into(),
));
};
@@ -738,7 +729,7 @@ impl<'a> RecordEncoderBuilder<'a> {
for root_field in root_fields.as_ref() {
let name = root_field.name();
let arrow_index = self.arrow_schema.index_of(name).map_err(|e| {
- ArrowError::SchemaError(format!("Schema mismatch for field
'{name}': {e}"))
+ AvroError::SchemaError(format!("Schema mismatch for field
'{name}': {e}"))
})?;
columns.push(FieldBinding {
arrow_index,
@@ -772,13 +763,13 @@ impl RecordEncoder {
fn prepare_for_batch<'a>(
&'a self,
batch: &'a RecordBatch,
- ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
+ ) -> Result<Vec<FieldEncoder<'a>>, AvroError> {
let arrays = batch.columns();
let mut out = Vec::with_capacity(self.columns.len());
for col_plan in self.columns.iter() {
let arrow_index = col_plan.arrow_index;
let array = arrays.get(arrow_index).ok_or_else(|| {
- ArrowError::SchemaError(format!("Column index {arrow_index}
out of range"))
+ AvroError::SchemaError(format!("Column index {arrow_index} out
of range"))
})?;
#[cfg(not(feature = "avro_custom_types"))]
let site_nullability = match &col_plan.plan {
@@ -803,14 +794,13 @@ impl RecordEncoder {
&self,
out: &mut W,
batch: &RecordBatch,
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
let mut column_encoders = self.prepare_for_batch(batch)?;
let n = batch.num_rows();
match self.prefix {
Some(prefix) => {
for row in 0..n {
- out.write_all(prefix.as_slice())
- .map_err(|e| ArrowError::IoError(format!("write
prefix: {e}"), e))?;
+ out.write_all(prefix.as_slice())?;
for enc in column_encoders.iter_mut() {
enc.encode(out, row)?;
}
@@ -840,7 +830,7 @@ fn find_map_value_field_index(fields:
&arrow_schema::Fields) -> Option<usize> {
}
impl FieldPlan {
- fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self,
ArrowError> {
+ fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result<Self,
AvroError> {
#[cfg(not(feature = "avro_custom_types"))]
if let DataType::RunEndEncoded(_re_field, values_field) =
arrow_field.data_type() {
let values_nullability = avro_dt.nullability();
@@ -849,7 +839,7 @@ impl FieldPlan {
.iter()
.find(|b| !matches!(b.codec(), Codec::Null))
.ok_or_else(|| {
- ArrowError::SchemaError(
+ AvroError::SchemaError(
"Avro union at RunEndEncoded site has no non-null
branch".into(),
)
})?,
@@ -882,7 +872,7 @@ impl FieldPlan {
== Some("uuid");
if ext_is_uuid || md_is_uuid {
if *len != 16 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"logicalType=uuid requires FixedSizeBinary(16)".into(),
));
}
@@ -894,7 +884,7 @@ impl FieldPlan {
let fields = match arrow_field.data_type() {
DataType::Struct(struct_fields) => struct_fields,
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro struct maps to Arrow Struct, found:
{other:?}"
)));
}
@@ -903,7 +893,7 @@ impl FieldPlan {
for avro_field in avro_fields.iter() {
let name = avro_field.name().to_string();
let idx = find_struct_child_index(fields,
&name).ok_or_else(|| {
- ArrowError::SchemaError(format!(
+ AvroError::SchemaError(format!(
"Struct field '{name}' not present in Arrow field
'{}'",
arrow_field.name()
))
@@ -928,7 +918,7 @@ impl FieldPlan {
items_nullability: items_dt.nullability(),
item_plan: Box::new(FieldPlan::build(items_dt.as_ref(),
field_ref.as_ref())?),
}),
- other => Err(ArrowError::SchemaError(format!(
+ other => Err(AvroError::SchemaError(format!(
"Avro array maps to Arrow
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
))),
},
@@ -936,7 +926,7 @@ impl FieldPlan {
let entries_field = match arrow_field.data_type() {
DataType::Map(entries, _sorted) => entries.as_ref(),
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro map maps to Arrow DataType::Map, found:
{other:?}"
)));
}
@@ -944,14 +934,14 @@ impl FieldPlan {
let entries_struct_fields = match entries_field.data_type() {
DataType::Struct(fs) => fs,
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Arrow Map entries must be Struct, found:
{other:?}"
)));
}
};
let value_idx =
find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
- ArrowError::SchemaError("Map entries struct missing
value field".into())
+ AvroError::SchemaError("Map entries struct missing
value field".into())
})?;
let value_field = entries_struct_fields[value_idx].as_ref();
let value_plan = FieldPlan::build(values_dt.as_ref(),
value_field)?;
@@ -963,12 +953,12 @@ impl FieldPlan {
Codec::Enum(symbols) => match arrow_field.data_type() {
DataType::Dictionary(key_dt, value_dt) => {
if **key_dt != DataType::Int32 {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Avro enum requires Dictionary<Int32,
Utf8>".into(),
));
}
if **value_dt != DataType::Utf8 {
- return Err(ArrowError::SchemaError(
+ return Err(AvroError::SchemaError(
"Avro enum requires Dictionary<Int32,
Utf8>".into(),
));
}
@@ -976,7 +966,7 @@ impl FieldPlan {
symbols: symbols.clone(),
})
}
- other => Err(ArrowError::SchemaError(format!(
+ other => Err(AvroError::SchemaError(format!(
"Avro enum maps to Arrow Dictionary<Int32, Utf8>, found:
{other:?}"
))),
},
@@ -990,7 +980,7 @@ impl FieldPlan {
DataType::Decimal128(p, s) => (*p as usize, *s as i32),
DataType::Decimal256(p, s) => (*p as usize, *s as i32),
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro decimal requires Arrow decimal, got
{other:?} for field '{}'",
arrow_field.name()
)));
@@ -998,7 +988,7 @@ impl FieldPlan {
};
let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults
to 0 if absent
if ap != *precision || as_ != sc {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Decimal precision/scale mismatch for field '{}':
Avro({precision},{sc}) vs Arrow({ap},{as_})",
arrow_field.name()
)));
@@ -1011,7 +1001,7 @@ impl FieldPlan {
DataType::Interval(
IntervalUnit::MonthDayNano | IntervalUnit::YearMonth |
IntervalUnit::DayTime,
) => Ok(FieldPlan::Scalar),
- other => Err(ArrowError::SchemaError(format!(
+ other => Err(AvroError::SchemaError(format!(
"Avro duration logical type requires Arrow
Interval(MonthDayNano), found: {other:?}"
))),
},
@@ -1019,18 +1009,18 @@ impl FieldPlan {
let arrow_union_fields = match arrow_field.data_type() {
DataType::Union(fields, UnionMode::Dense) => fields,
DataType::Union(_, UnionMode::Sparse) => {
- return Err(ArrowError::NotYetImplemented(
+ return Err(AvroError::NYI(
"Sparse Arrow unions are not yet
supported".to_string(),
));
}
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro union maps to Arrow Union, found: {other:?}"
)));
}
};
if avro_branches.len() != arrow_union_fields.len() {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Mismatched number of branches between Avro union ({})
and Arrow union ({}) for field '{}'",
avro_branches.len(),
arrow_union_fields.len(),
@@ -1048,10 +1038,10 @@ impl FieldPlan {
plan: FieldPlan::build(avro_branch,
arrow_child_field)?,
})
})
- .collect::<Result<Vec<_>, ArrowError>>()?;
+ .collect::<Result<Vec<_>, AvroError>>()?;
Ok(FieldPlan::Union { bindings })
}
- Codec::Union(_, _, UnionMode::Sparse) =>
Err(ArrowError::NotYetImplemented(
+ Codec::Union(_, _, UnionMode::Sparse) => Err(AvroError::NYI(
"Sparse Arrow unions are not yet supported".to_string(),
)),
#[cfg(feature = "avro_custom_types")]
@@ -1059,7 +1049,7 @@ impl FieldPlan {
let values_field = match arrow_field.data_type() {
DataType::RunEndEncoded(_run_ends_field, values_field) =>
values_field.as_ref(),
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro RunEndEncoded maps to Arrow
DataType::RunEndEncoded, found: {other:?}"
)));
}
@@ -1133,7 +1123,7 @@ enum Encoder<'a> {
impl<'a> Encoder<'a> {
/// Encode the value at `idx`.
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
match self {
Encoder::Boolean(e) => e.encode(out, idx),
Encoder::Int(e) => e.encode(out, idx),
@@ -1188,7 +1178,7 @@ impl<'a> Encoder<'a> {
struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
impl BooleanEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_bool(out, self.0.value(idx))
}
}
@@ -1196,7 +1186,7 @@ impl BooleanEncoder<'_> {
/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a
PrimitiveArray<P>);
impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_int(out, self.0.value(idx))
}
}
@@ -1204,7 +1194,7 @@ impl<'a, P: ArrowPrimitiveType<Native = i32>>
IntEncoder<'a, P> {
/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a
PrimitiveArray<P>);
impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_long(out, self.0.value(idx))
}
}
@@ -1213,11 +1203,11 @@ impl<'a, P: ArrowPrimitiveType<Native = i64>>
LongEncoder<'a, P> {
struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
impl<'a> Time32SecondsToMillisEncoder<'a> {
#[inline]
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let secs = self.0.value(idx);
- let millis = secs.checked_mul(1000).ok_or_else(|| {
- ArrowError::InvalidArgumentError("time32(secs) * 1000
overflowed".into())
- })?;
+ let millis = secs
+ .checked_mul(1000)
+ .ok_or_else(|| AvroError::InvalidArgument("time32(secs) * 1000
overflowed".into()))?;
write_int(out, millis)
}
}
@@ -1226,10 +1216,10 @@ impl<'a> Time32SecondsToMillisEncoder<'a> {
struct TimestampSecondsToMillisEncoder<'a>(&'a
PrimitiveArray<TimestampSecondType>);
impl<'a> TimestampSecondsToMillisEncoder<'a> {
#[inline]
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let secs = self.0.value(idx);
let millis = secs.checked_mul(1000).ok_or_else(|| {
- ArrowError::InvalidArgumentError("timestamp(secs) * 1000
overflowed".into())
+ AvroError::InvalidArgument("timestamp(secs) * 1000
overflowed".into())
})?;
write_long(out, millis)
}
@@ -1238,7 +1228,7 @@ impl<'a> TimestampSecondsToMillisEncoder<'a> {
/// Unified binary encoder generic over offset size (i32/i64).
struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_len_prefixed(out, self.0.value(idx))
}
}
@@ -1246,7 +1236,7 @@ impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
/// BinaryView (byte view) encoder.
struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
impl BinaryViewEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_len_prefixed(out, self.0.value(idx))
}
}
@@ -1254,35 +1244,34 @@ impl BinaryViewEncoder<'_> {
/// StringView encoder.
struct Utf8ViewEncoder<'a>(&'a StringViewArray);
impl Utf8ViewEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_len_prefixed(out, self.0.value(idx).as_bytes())
}
}
struct F32Encoder<'a>(&'a arrow_array::Float32Array);
impl F32Encoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
// Avro float: 4 bytes, IEEE-754 little-endian
let bits = self.0.value(idx).to_bits();
- out.write_all(&bits.to_le_bytes())
- .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
+ out.write_all(&bits.to_le_bytes())?;
+ Ok(())
}
}
struct F64Encoder<'a>(&'a arrow_array::Float64Array);
impl F64Encoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
// Avro double: 8 bytes, IEEE-754 little-endian
let bits = self.0.value(idx).to_bits();
- out.write_all(&bits.to_le_bytes())
- .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
+ out.write_all(&bits.to_le_bytes()).map_err(Into::into)
}
}
struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
write_len_prefixed(out, self.0.value(idx).as_bytes())
}
}
@@ -1308,13 +1297,13 @@ impl<'a> MapEncoder<'a> {
map: &'a MapArray,
values_nullability: Option<Nullability>,
value_plan: &FieldPlan,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
let keys_arr = map.keys();
let keys_kind = match keys_arr.data_type() {
DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::<i32>()),
DataType::LargeUtf8 =>
KeyKind::LargeUtf8(keys_arr.as_string::<i64>()),
other => {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Avro map requires string keys; Arrow key type must be
Utf8/LargeUtf8, found: {other:?}"
)));
}
@@ -1338,8 +1327,8 @@ impl<'a> MapEncoder<'a> {
keys_offset: usize,
start: usize,
end: usize,
- mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>,
- ) -> Result<(), ArrowError>
+ mut write_item: impl FnMut(&mut W, usize) -> Result<(), AvroError>,
+ ) -> Result<(), AvroError>
where
W: Write + ?Sized,
O: OffsetSizeTrait,
@@ -1351,7 +1340,7 @@ impl<'a> MapEncoder<'a> {
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let offsets = self.map.offsets();
let start = offsets[idx] as usize;
let end = offsets[idx + 1] as usize;
@@ -1390,7 +1379,7 @@ struct EnumEncoder<'a> {
keys: &'a PrimitiveArray<Int32Type>,
}
impl EnumEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, row: usize) ->
Result<(), AvroError> {
write_int(out, self.keys.value(row))
}
}
@@ -1402,12 +1391,12 @@ struct UnionEncoder<'a> {
}
impl<'a> UnionEncoder<'a> {
- fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) ->
Result<Self, ArrowError> {
+ fn try_new(array: &'a UnionArray, field_bindings: &[FieldBinding]) ->
Result<Self, AvroError> {
let DataType::Union(fields, UnionMode::Dense) = array.data_type() else
{
- return Err(ArrowError::SchemaError("Expected Dense
UnionArray".into()));
+ return Err(AvroError::SchemaError("Expected Dense
UnionArray".into()));
};
if fields.len() != field_bindings.len() {
- return Err(ArrowError::SchemaError(format!(
+ return Err(AvroError::SchemaError(format!(
"Mismatched number of union branches between Arrow array ({})
and encoding plan ({})",
fields.len(),
field_bindings.len()
@@ -1420,7 +1409,7 @@ impl<'a> UnionEncoder<'a> {
for (i, (type_id, _)) in fields.iter().enumerate() {
let binding = field_bindings
.get(i)
- .ok_or_else(|| ArrowError::SchemaError("Binding and field
mismatch".to_string()))?;
+ .ok_or_else(|| AvroError::SchemaError("Binding and field
mismatch".to_string()))?;
encoders.push(FieldEncoder::make_encoder(
array.child(type_id).as_ref(),
&binding.plan,
@@ -1435,7 +1424,7 @@ impl<'a> UnionEncoder<'a> {
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
// SAFETY: `idx` is always in bounds because:
// 1. The encoder is called from `RecordEncoder::encode,` which
iterates over `0..batch.num_rows()`
// 2. `self.array` is a column from the same batch, so its length
equals `batch.num_rows()`
@@ -1445,10 +1434,10 @@ impl<'a> UnionEncoder<'a> {
.type_id_to_encoder_index
.get(type_id as usize)
.and_then(|opt| *opt)
- .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id
{type_id}")))?;
+ .ok_or_else(|| AvroError::SchemaError(format!("Invalid type_id
{type_id}")))?;
write_int(out, encoder_index as i32)?;
let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
- ArrowError::SchemaError(format!("Invalid encoder index
{encoder_index}"))
+ AvroError::SchemaError(format!("Invalid encoder index
{encoder_index}"))
})?;
encoder.encode(out, self.array.value_offset(idx))
}
@@ -1459,15 +1448,12 @@ struct StructEncoder<'a> {
}
impl<'a> StructEncoder<'a> {
- fn try_new(
- array: &'a StructArray,
- field_bindings: &[FieldBinding],
- ) -> Result<Self, ArrowError> {
+ fn try_new(array: &'a StructArray, field_bindings: &[FieldBinding]) ->
Result<Self, AvroError> {
let mut encoders = Vec::with_capacity(field_bindings.len());
for field_binding in field_bindings {
let idx = field_binding.arrow_index;
let column = array.columns().get(idx).ok_or_else(|| {
- ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
+ AvroError::SchemaError(format!("Struct child index {idx} out
of range"))
})?;
let encoder = FieldEncoder::make_encoder(
column.as_ref(),
@@ -1479,7 +1465,7 @@ impl<'a> StructEncoder<'a> {
Ok(Self { encoders })
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
for encoder in self.encoders.iter_mut() {
encoder.encode(out, idx)?;
}
@@ -1495,9 +1481,9 @@ fn encode_blocked_range<W: Write + ?Sized, F>(
start: usize,
end: usize,
mut write_item: F,
-) -> Result<(), ArrowError>
+) -> Result<(), AvroError>
where
- F: FnMut(&mut W, usize) -> Result<(), ArrowError>,
+ F: FnMut(&mut W, usize) -> Result<(), AvroError>,
{
let len = end.saturating_sub(start);
if len == 0 {
@@ -1528,7 +1514,7 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
list: &'a GenericListArray<O>,
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
Ok(Self {
list,
values: FieldEncoder::make_encoder(
@@ -1545,23 +1531,20 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
out: &mut W,
start: usize,
end: usize,
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
encode_blocked_range(out, start, end, |out, row| {
self.values
.encode(out, row.saturating_sub(self.values_offset))
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let offsets = self.list.offsets();
let start = offsets[idx].to_usize().ok_or_else(|| {
- ArrowError::InvalidArgumentError(format!("Error converting
offset[{idx}] to usize"))
+ AvroError::InvalidArgument(format!("Error converting offset[{idx}]
to usize"))
})?;
let end = offsets[idx + 1].to_usize().ok_or_else(|| {
- ArrowError::InvalidArgumentError(format!(
- "Error converting offset[{}] to usize",
- idx + 1
- ))
+ AvroError::InvalidArgument(format!("Error converting offset[{}] to
usize", idx + 1))
})?;
self.encode_list_range(out, start, end)
}
@@ -1581,7 +1564,7 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
list: &'a GenericListViewArray<O>,
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
Ok(Self {
list,
values: FieldEncoder::make_encoder(
@@ -1593,14 +1576,12 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
- ArrowError::InvalidArgumentError(format!(
- "Error converting value_offset[{idx}] to usize"
- ))
+ AvroError::InvalidArgument(format!("Error converting
value_offset[{idx}] to usize"))
})?;
let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
- ArrowError::InvalidArgumentError(format!("Error converting
value_size[{idx}] to usize"))
+ AvroError::InvalidArgument(format!("Error converting
value_size[{idx}] to usize"))
})?;
let start = start + self.values_offset;
let end = start + len;
@@ -1624,7 +1605,7 @@ impl<'a> FixedSizeListEncoder<'a> {
list: &'a FixedSizeListArray,
items_nullability: Option<Nullability>,
item_plan: &FieldPlan,
- ) -> Result<Self, ArrowError> {
+ ) -> Result<Self, AvroError> {
Ok(Self {
list,
values: FieldEncoder::make_encoder(
@@ -1637,7 +1618,7 @@ impl<'a> FixedSizeListEncoder<'a> {
})
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
// Starting index is relative to values() start
let rel = self.list.value_offset(idx) as usize;
let start = self.values_offset + rel;
@@ -1653,10 +1634,10 @@ impl<'a> FixedSizeListEncoder<'a> {
/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
impl FixedEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let v = self.0.value(idx); // &[u8] of fixed width
- out.write_all(v)
- .map_err(|e| ArrowError::IoError(format!("write fixed bytes:
{e}"), e))
+ out.write_all(v)?;
+ Ok(())
}
}
@@ -1664,15 +1645,15 @@ impl FixedEncoder<'_> {
/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated
form.
struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
impl UuidEncoder<'_> {
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
buf[0] = 0x48;
let v = self.0.value(idx);
let u = Uuid::from_slice(v)
- .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid
UUID bytes: {e}")))?;
+ .map_err(|e| AvroError::InvalidArgument(format!("Invalid UUID
bytes: {e}")))?;
let _ = u.hyphenated().encode_lower(&mut buf[1..]);
- out.write_all(&buf)
- .map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
+ out.write_all(&buf)?;
+ Ok(())
}
}
@@ -1684,25 +1665,25 @@ struct DurationParts {
}
/// Trait mapping an Arrow interval native value to Avro duration `(months,
days, millis)`.
trait IntervalToDurationParts: ArrowPrimitiveType {
- fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError>;
+ fn duration_parts(native: Self::Native) -> Result<DurationParts,
AvroError>;
}
impl IntervalToDurationParts for IntervalMonthDayNanoType {
- fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
+ fn duration_parts(native: Self::Native) -> Result<DurationParts,
AvroError> {
let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
if months < 0 || days < 0 || nanos < 0 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Avro 'duration' cannot encode negative
months/days/nanoseconds".into(),
));
}
if nanos % 1_000_000 != 0 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Avro 'duration' requires whole milliseconds; nanoseconds must
be divisible by 1_000_000"
.into(),
));
}
let millis = nanos / 1_000_000;
if millis > u32::MAX as i64 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Avro 'duration' milliseconds exceed u32::MAX".into(),
));
}
@@ -1714,9 +1695,9 @@ impl IntervalToDurationParts for IntervalMonthDayNanoType
{
}
}
impl IntervalToDurationParts for IntervalYearMonthType {
- fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
+ fn duration_parts(native: Self::Native) -> Result<DurationParts,
AvroError> {
if native < 0 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Avro 'duration' cannot encode negative months".into(),
));
}
@@ -1728,10 +1709,10 @@ impl IntervalToDurationParts for IntervalYearMonthType {
}
}
impl IntervalToDurationParts for IntervalDayTimeType {
- fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
+ fn duration_parts(native: Self::Native) -> Result<DurationParts,
AvroError> {
let (days, millis) = IntervalDayTimeType::to_parts(native);
if days < 0 || millis < 0 {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Avro 'duration' cannot encode negative days or
milliseconds".into(),
));
}
@@ -1748,7 +1729,7 @@ impl IntervalToDurationParts for IntervalDayTimeType {
struct DurationEncoder<'a, P: ArrowPrimitiveType +
IntervalToDurationParts>(&'a PrimitiveArray<P>);
impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts> DurationEncoder<'a,
P> {
#[inline(always)]
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let parts = P::duration_parts(self.0.value(idx))?;
let months = parts.months.to_le_bytes();
let days = parts.days.to_le_bytes();
@@ -1764,16 +1745,16 @@ impl<'a, P: ArrowPrimitiveType +
IntervalToDurationParts> DurationEncoder<'a, P>
// indices. [std docs; Rust Performance Book on bounds-check
elimination]
// - Memory safety: The `[u8; 12]` array is built on the stack by
value, with no
// aliasing and no uninitialized memory. There is no `unsafe`.
- // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated
and mapped
- // into `ArrowError`, so I/O errors are reported, not panicked.
+ // - I/O: `write_all(&buf)` is fallible and its `Result` is propagated
as an AvroError,
+ // so I/O errors are reported, not panicked.
// Consequently, constructing `buf` with the constant indices below is
safe and
// panic-free under these validated preconditions.
let buf = [
months[0], months[1], months[2], months[3], days[0], days[1],
days[2], days[3], ms[0],
ms[1], ms[2], ms[3],
];
- out.write_all(&buf)
- .map_err(|e| ArrowError::IoError(format!("write duration: {e}"),
e))
+ out.write_all(&buf)?;
+ Ok(())
}
}
@@ -1821,7 +1802,7 @@ impl<'a, const N: usize, A: DecimalBeBytes<N>>
DecimalEncoder<'a, N, A> {
Self { arr, fixed_size }
}
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
let be = self.arr.value_be_bytes(idx);
match self.fixed_size {
Some(n) => write_sign_extended(out, &be, n),
@@ -1875,7 +1856,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
/// Advance `cur_run` so that `idx` is within the run ending at `cur_end`.
/// Uses the REE invariant: run ends are strictly increasing, positive,
and 1-based.
#[inline(always)]
- fn advance_to_row(&mut self, idx: usize) -> Result<(), ArrowError> {
+ fn advance_to_row(&mut self, idx: usize) -> Result<(), AvroError> {
if idx < self.cur_end {
return Ok(());
}
@@ -1887,7 +1868,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
if idx < self.cur_end {
Ok(())
} else {
- Err(ArrowError::InvalidArgumentError(format!(
+ Err(AvroError::InvalidArgument(format!(
"row index {idx} out of bounds for run-ends ({} runs)",
self.len
)))
@@ -1895,7 +1876,7 @@ impl<'a, R: RunEndIndexType> RunEncodedEncoder<'a, R> {
}
#[inline(always)]
- fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), AvroError> {
self.advance_to_row(idx)?;
// For REE values, the value for any logical row within a run is at
// the physical index of that run.
@@ -2288,10 +2269,10 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(msg.contains("Invalid UUID bytes"))
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
@@ -2656,10 +2637,10 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(msg.contains("cannot encode negative months"))
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
@@ -2683,10 +2664,10 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(msg.contains("cannot encode negative days"))
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
@@ -2710,10 +2691,10 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(msg.contains("requires whole milliseconds") ||
msg.contains("divisible"))
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
@@ -2743,8 +2724,8 @@ mod tests {
// truncation overflow
let err = write_sign_extended(&mut out, &[0x01, 0x00], 1).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(_) => {}
- _ => panic!("expected InvalidArgumentError"),
+ AvroError::InvalidArgument(_) => {}
+ _ => panic!("expected InvalidArgument"),
}
}
@@ -2758,8 +2739,8 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- ArrowError::InvalidArgumentError(msg) =>
assert!(msg.contains("exceed u32::MAX")),
- _ => panic!("expected InvalidArgumentError"),
+ AvroError::InvalidArgument(msg) => assert!(msg.contains("exceed
u32::MAX")),
+ _ => panic!("expected InvalidArgument"),
}
}
@@ -2772,7 +2753,7 @@ mod tests {
let avro_dt = AvroDataType::new(Codec::Decimal(10, Some(2), None),
HashMap::new(), None);
let err = FieldPlan::build(&avro_dt, &arrow_field).unwrap_err();
match err {
- ArrowError::SchemaError(msg) => {
+ AvroError::SchemaError(msg) => {
assert!(msg.contains("Decimal precision/scale mismatch"))
}
_ => panic!("expected SchemaError"),
@@ -2924,13 +2905,13 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(
msg.contains("overflowed") || msg.contains("overflow"),
"unexpected message: {msg}"
)
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
@@ -2956,13 +2937,13 @@ mod tests {
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
- arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+ AvroError::InvalidArgument(msg) => {
assert!(
msg.contains("overflowed") || msg.contains("overflow"),
"unexpected message: {msg}"
)
}
- other => panic!("expected InvalidArgumentError, got {other:?}"),
+ other => panic!("expected InvalidArgument, got {other:?}"),
}
}
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index ba2a0b8564..e8e93a62c7 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -18,9 +18,10 @@
//! Avro Writer Formats for Arrow.
use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
+use crate::errors::AvroError;
use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
use crate::writer::encoder::write_long;
-use arrow_schema::{ArrowError, Schema};
+use arrow_schema::Schema;
use rand::RngCore;
use std::fmt::Debug;
use std::io::Write;
@@ -40,7 +41,7 @@ pub trait AvroFormat: Debug + Default {
writer: &mut W,
schema: &Schema,
compression: Option<CompressionCodec>,
- ) -> Result<(), ArrowError>;
+ ) -> Result<(), AvroError>;
/// Return the 16‑byte sync marker (OCF) or `None` (binary stream).
fn sync_marker(&self) -> Option<&[u8; 16]>;
@@ -59,7 +60,7 @@ impl AvroFormat for AvroOcfFormat {
writer: &mut W,
schema: &Schema,
compression: Option<CompressionCodec>,
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
let mut rng = rand::rng();
rng.fill_bytes(&mut self.sync_marker);
// Choose the Avro schema JSON that the file will advertise.
@@ -71,11 +72,10 @@ impl AvroFormat for AvroOcfFormat {
null_order: None,
strip_metadata: true,
}),
- )?;
+ )
+ .map_err(|e| AvroError::SchemaError(format!("{:?}", e)))?;
// Magic
- writer
- .write_all(b"Obj\x01")
- .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"),
e))?;
+ writer.write_all(b"Obj\x01")?;
// File metadata map: { "avro.schema": <json>, "avro.codec": <codec> }
let codec_str = match compression {
Some(CompressionCodec::Deflate) => "deflate",
@@ -93,9 +93,7 @@ impl AvroFormat for AvroOcfFormat {
write_bytes(writer, codec_str.as_bytes())?;
write_long(writer, 0)?;
// Sync marker (16 bytes)
- writer
- .write_all(&self.sync_marker)
- .map_err(|e| ArrowError::IoError(format!("write OCF sync marker:
{e}"), e))?;
+ writer.write_all(&self.sync_marker)?;
Ok(())
}
@@ -121,9 +119,9 @@ impl AvroFormat for AvroSoeFormat {
_writer: &mut W,
_schema: &Schema,
compression: Option<CompressionCodec>,
- ) -> Result<(), ArrowError> {
+ ) -> Result<(), AvroError> {
if compression.is_some() {
- return Err(ArrowError::InvalidArgumentError(
+ return Err(AvroError::InvalidArgument(
"Compression not supported for Avro SOE streaming".to_string(),
));
}
@@ -136,14 +134,13 @@ impl AvroFormat for AvroSoeFormat {
}
#[inline]
-fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
+fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), AvroError> {
write_bytes(writer, s.as_bytes())
}
#[inline]
-fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(),
ArrowError> {
+fn write_bytes<W: Write>(writer: &mut W, bytes: &[u8]) -> Result<(),
AvroError> {
write_long(writer, bytes.len() as i64)?;
- writer
- .write_all(bytes)
- .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
+ writer.write_all(bytes)?;
+ Ok(())
}