This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1551893f7a Remove allow unused from arrow-avro lib.rs file (#8493)
1551893f7a is described below
commit 1551893f7ab78b667fc651e33dc02fc057c78189
Author: Connor Sanders <[email protected]>
AuthorDate: Mon Oct 6 08:52:52 2025 -0500
Remove allow unused from arrow-avro lib.rs file (#8493)
# Which issue does this PR close?
- **Related to**: #4886 (“Add Avro Support”)
**NOTE:** This PR is stacked on
https://github.com/apache/arrow-rs/pull/8492
# Rationale for this change
`arrow-avro` has seen significant development since `#![allow(unused)]`
was temporarily added to `lib.rs`. Due to fast iteration on the code,
this has led to unused methods and imports throughout the crate, which
need to be cleaned up prior to `arrow-avro` becoming public.
This PR simply removes `#![allow(unused)]` and cleans up the
`arrow-avro` crate's code to comply.
# What changes are included in this PR?
Deleted the `#![allow(unused)]` in `lib.rs` and updated the crate's code
as needed. This impacted almost every files of the crate, however the
changes in this PR are 100% focused and isolated around only the work
related to removing `#![allow(unused)]`.
# Are these changes tested?
The changes in this PR are covered by existing tests. No new
functionality or behavior has been changed/added. This PR is simply
clean up around removing `#![allow(unused)]` from `lib.rs`.
# Are there any user-facing changes?
N/A
---
arrow-avro/src/codec.rs | 103 ++---------
arrow-avro/src/compression.rs | 1 -
arrow-avro/src/lib.rs | 1 -
arrow-avro/src/reader/cursor.rs | 6 +-
arrow-avro/src/reader/header.rs | 27 ++-
arrow-avro/src/reader/mod.rs | 141 +++-----------
arrow-avro/src/reader/record.rs | 389 +++++++++++++++++++++++++++++++--------
arrow-avro/src/reader/vlq.rs | 2 +-
arrow-avro/src/schema.rs | 52 +++---
arrow-avro/src/writer/encoder.rs | 13 +-
arrow-avro/src/writer/format.rs | 5 +-
arrow-avro/src/writer/mod.rs | 48 ++---
12 files changed, 434 insertions(+), 354 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 27f5070a61..141faf5640 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -14,11 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
use crate::schema::{
- make_full_name, Array, Attributes, AvroSchema, ComplexType, Enum, Fixed,
Map, Nullability,
- PrimitiveType, Record, Schema, Type, TypeName,
AVRO_ENUM_SYMBOLS_METADATA_KEY,
- AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_ROOT_RECORD_DEFAULT_NAME,
+ make_full_name, Array, Attributes, ComplexType, Enum, Fixed, Map,
Nullability, PrimitiveType,
+ Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
+ AVRO_FIELD_DEFAULT_METADATA_KEY,
};
use arrow_schema::{
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, UnionFields,
UnionMode,
@@ -77,8 +76,6 @@ pub(crate) enum AvroLiteral {
Array(Vec<AvroLiteral>),
/// Represents a JSON object default for an Avro map/struct, mapping
string keys to value literals.
Map(IndexMap<String, AvroLiteral>),
- /// Represents an unsupported literal type.
- Unsupported,
}
/// Contains the necessary information to resolve a writer's record against a
reader's record schema.
@@ -208,7 +205,7 @@ impl AvroDataType {
}
/// Returns an arrow [`Field`] with the given name
- pub fn field_with_name(&self, name: &str) -> Field {
+ pub(crate) fn field_with_name(&self, name: &str) -> Field {
let mut nullable = self.nullability.is_some();
if !nullable {
if let Codec::Union(children, _, _) = self.codec() {
@@ -230,7 +227,7 @@ impl AvroDataType {
///
/// The codec determines how Avro data is encoded and mapped to Arrow data
types.
/// This is useful when we need to inspect or use the specific encoding of
a field.
- pub fn codec(&self) -> &Codec {
+ pub(crate) fn codec(&self) -> &Codec {
&self.codec
}
@@ -524,29 +521,6 @@ impl AvroField {
pub(crate) fn name(&self) -> &str {
&self.name
}
-
- /// Performs schema resolution between a writer and reader schema.
- ///
- /// This is the primary entry point for handling schema evolution. It
produces an
- /// `AvroField` that contains all the necessary information to read data
written
- /// with the `writer` schema as if it were written with the `reader`
schema.
- pub(crate) fn resolve_from_writer_and_reader<'a>(
- writer_schema: &'a Schema<'a>,
- reader_schema: &'a Schema<'a>,
- use_utf8view: bool,
- strict_mode: bool,
- ) -> Result<Self, ArrowError> {
- let top_name = match reader_schema {
- Schema::Complex(ComplexType::Record(r)) => r.name.to_string(),
- _ => AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
- };
- let mut resolver = Maker::new(use_utf8view, strict_mode);
- let data_type = resolver.make_data_type(writer_schema,
Some(reader_schema), None)?;
- Ok(Self {
- name: top_name,
- data_type,
- })
- }
}
impl<'a> TryFrom<&Schema<'a>> for AvroField {
@@ -1629,28 +1603,6 @@ impl<'a> Maker<'a> {
Ok(datatype)
}
- fn resolve_nullable_union<'s>(
- &mut self,
- writer_variants: &'s [Schema<'a>],
- reader_variants: &'s [Schema<'a>],
- namespace: Option<&'a str>,
- ) -> Result<AvroDataType, ArrowError> {
- match (
- nullable_union_variants(writer_variants),
- nullable_union_variants(reader_variants),
- ) {
- (Some((write_nb, write_nonnull)), Some((_read_nb, read_nonnull)))
=> {
- let mut dt = self.make_data_type(write_nonnull,
Some(read_nonnull), namespace)?;
- dt.nullability = Some(write_nb);
- Ok(dt)
- }
- _ => Err(ArrowError::NotYetImplemented(
- "Union resolution requires both writer and reader to be
2-branch nullable unions"
- .to_string(),
- )),
- }
- }
-
// Resolve writer vs. reader enum schemas according to Avro 1.11.1.
//
// # How enums resolve (writer to reader)
@@ -1915,9 +1867,11 @@ impl<'a> Maker<'a> {
mod tests {
use super::*;
use crate::schema::{
- Attributes, Field as AvroFieldSchema, Fixed, PrimitiveType, Schema,
Type, TypeName,
+ Array, Attributes, ComplexType, Field as AvroFieldSchema, Fixed,
PrimitiveType, Record,
+ Schema, Type, TypeName, AVRO_ROOT_RECORD_DEFAULT_NAME,
};
- use serde_json;
+ use indexmap::IndexMap;
+ use serde_json::{self, Value};
fn create_schema_with_logical_type(
primitive_type: PrimitiveType,
@@ -1934,21 +1888,6 @@ mod tests {
})
}
- fn create_fixed_schema(size: usize, logical_type: &'static str) ->
Schema<'static> {
- let attributes = Attributes {
- logical_type: Some(logical_type),
- additional: Default::default(),
- };
-
- Schema::Complex(ComplexType::Fixed(Fixed {
- name: "fixed_type",
- namespace: None,
- aliases: Vec::new(),
- size,
- attributes,
- }))
- }
-
fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) ->
AvroDataType {
let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
@@ -1965,17 +1904,6 @@ mod tests {
Schema::Union(branches)
}
- fn mk_record_name(name: &str) -> Schema<'_> {
- Schema::Complex(ComplexType::Record(Record {
- name,
- namespace: None,
- doc: None,
- aliases: vec![],
- fields: vec![],
- attributes: Attributes::default(),
- }))
- }
-
#[test]
fn test_date_logical_type() {
let schema = create_schema_with_logical_type(PrimitiveType::Int,
"date");
@@ -2068,7 +1996,7 @@ mod tests {
#[test]
fn test_decimal_logical_type_not_implemented() {
- let mut codec = Codec::Fixed(16);
+ let codec = Codec::Fixed(16);
let process_decimal = || -> Result<(), ArrowError> {
if let Codec::Fixed(_) = codec {
@@ -2556,9 +2484,14 @@ mod tests {
fn
test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
let writer_schema =
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
let reader_schema =
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
- let field =
- AvroField::resolve_from_writer_and_reader(&writer_schema,
&reader_schema, false, false)
- .expect("resolution should succeed");
+ let mut maker = Maker::new(false, false);
+ let data_type = maker
+ .make_data_type(&writer_schema, Some(&reader_schema), None)
+ .expect("resolution should succeed");
+ let field = AvroField {
+ name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
+ data_type,
+ };
assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
assert!(matches!(field.data_type().codec(), Codec::Utf8));
}
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
index 64bacc8fd9..29d1732b9e 100644
--- a/arrow-avro/src/compression.rs
+++ b/arrow-avro/src/compression.rs
@@ -16,7 +16,6 @@
// under the License.
use arrow_schema::ArrowError;
-use std::io;
use std::io::{Read, Write};
/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index cf2d5bfbb2..b00bac29ad 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -224,7 +224,6 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
-#![allow(unused)] // Temporary
/// Core functionality for reading Avro data into Arrow arrays
///
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index 1b89ff86c3..23d9e50333 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -85,7 +85,7 @@ impl<'a> AvroCursor<'a> {
ArrowError::ParseError("offset overflow reading avro
bytes".to_string())
})?;
- if (self.buf.len() < len) {
+ if self.buf.len() < len {
return Err(ArrowError::ParseError(
"Unexpected EOF reading bytes".to_string(),
));
@@ -97,7 +97,7 @@ impl<'a> AvroCursor<'a> {
#[inline]
pub(crate) fn get_float(&mut self) -> Result<f32, ArrowError> {
- if (self.buf.len() < 4) {
+ if self.buf.len() < 4 {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
@@ -109,7 +109,7 @@ impl<'a> AvroCursor<'a> {
#[inline]
pub(crate) fn get_double(&mut self) -> Result<f64, ArrowError> {
- if (self.buf.len() < 8) {
+ if self.buf.len() < 8 {
return Err(ArrowError::ParseError(
"Unexpected EOF reading float".to_string(),
));
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index 46f025d642..5d4a4924e8 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -21,6 +21,27 @@ use crate::compression::{CompressionCodec,
CODEC_METADATA_KEY};
use crate::reader::vlq::VLQDecoder;
use crate::schema::{Schema, SCHEMA_METADATA_KEY};
use arrow_schema::ArrowError;
+use std::io::BufRead;
+
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header,
ArrowError> {
+ let mut decoder = HeaderDecoder::default();
+ loop {
+ let buf = reader.fill_buf()?;
+ if buf.is_empty() {
+ break;
+ }
+ let read = buf.len();
+ let decoded = decoder.decode(buf)?;
+ reader.consume(decoded);
+ if decoded != read {
+ break;
+ }
+ }
+ decoder.flush().ok_or_else(|| {
+ ArrowError::ParseError("Unexpected EOF while reading Avro
header".to_string())
+ })
+}
#[derive(Debug)]
enum HeaderDecoderState {
@@ -265,13 +286,13 @@ impl HeaderDecoder {
#[cfg(test)]
mod test {
use super::*;
- use crate::codec::{AvroDataType, AvroField};
+ use crate::codec::AvroField;
use crate::reader::read_header;
use crate::schema::SCHEMA_METADATA_KEY;
use crate::test_util::arrow_test_data;
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use std::fs::File;
- use std::io::{BufRead, BufReader};
+ use std::io::BufReader;
#[test]
fn test_header_decode() {
@@ -291,7 +312,7 @@ mod test {
fn decode_file(file: &str) -> Header {
let file = File::open(file).unwrap();
- read_header(BufReader::with_capacity(100, file)).unwrap()
+ read_header(BufReader::with_capacity(1000, file)).unwrap()
}
#[test]
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index dc2fb630b2..a5f2681021 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -474,18 +474,18 @@
//! descriptive error. Populate the store up front to avoid this.
//!
//! ---
-use crate::codec::{AvroField, AvroFieldBuilder};
+use crate::codec::AvroFieldBuilder;
+use crate::reader::header::read_header;
use crate::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, SchemaStore,
CONFLUENT_MAGIC,
SINGLE_OBJECT_MAGIC,
};
-use arrow_array::{Array, RecordBatch, RecordBatchReader};
+use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use block::BlockDecoder;
-use header::{Header, HeaderDecoder};
+use header::Header;
use indexmap::IndexMap;
use record::RecordDecoder;
-use std::collections::HashMap;
use std::io::BufRead;
mod block;
@@ -494,26 +494,6 @@ mod header;
mod record;
mod vlq;
-/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
- let mut decoder = HeaderDecoder::default();
- loop {
- let buf = reader.fill_buf()?;
- if buf.is_empty() {
- break;
- }
- let read = buf.len();
- let decoded = decoder.decode(buf)?;
- reader.consume(decoded);
- if decoded != read {
- break;
- }
- }
- decoder.flush().ok_or_else(|| {
- ArrowError::ParseError("Unexpected EOF while reading Avro
header".to_string())
- })
-}
-
fn is_incomplete_data(err: &ArrowError) -> bool {
matches!(
err,
@@ -652,8 +632,6 @@ pub struct Decoder {
remaining_capacity: usize,
cache: IndexMap<Fingerprint, RecordDecoder>,
fingerprint_algorithm: FingerprintAlgorithm,
- utf8_view: bool,
- strict_mode: bool,
pending_schema: Option<(Fingerprint, RecordDecoder)>,
awaiting_body: bool,
}
@@ -988,7 +966,7 @@ impl ReaderBuilder {
.with_utf8view(self.utf8_view)
.with_strict_mode(self.strict_mode)
.build()?;
- RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+ RecordDecoder::try_new_with_options(root.data_type())
}
fn make_record_decoder_from_schemas(
@@ -1013,9 +991,7 @@ impl ReaderBuilder {
active_fingerprint,
active_decoder,
cache,
- utf8_view: self.utf8_view,
fingerprint_algorithm,
- strict_mode: self.strict_mode,
pending_schema: None,
awaiting_body: false,
}
@@ -1287,14 +1263,12 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
#[cfg(test)]
mod test {
- use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder, Codec};
- use crate::compression::CompressionCodec;
+ use crate::codec::AvroFieldBuilder;
use crate::reader::record::RecordDecoder;
- use crate::reader::vlq::VLQDecoder;
- use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
+ use crate::reader::{Decoder, Reader, ReaderBuilder};
use crate::schema::{
- AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema
as AvroRaw,
- SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, CONFLUENT_MAGIC,
SINGLE_OBJECT_MAGIC,
+ AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType,
SchemaStore,
+ AVRO_ENUM_SYMBOLS_METADATA_KEY, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC,
};
use crate::test_util::arrow_test_data;
use crate::writer::AvroWriter;
@@ -1304,29 +1278,35 @@ mod test {
ListBuilder, MapBuilder, StringBuilder, StructBuilder,
};
use arrow_array::cast::AsArray;
+ #[cfg(not(feature = "avro_custom_types"))]
+ use arrow_array::types::Int64Type;
+ #[cfg(feature = "avro_custom_types")]
use arrow_array::types::{
DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType,
- DurationSecondType, Int64Type,
+ DurationSecondType,
};
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
use arrow_array::*;
use arrow_buffer::{
i256, Buffer, IntervalMonthDayNano, NullBuffer, OffsetBuffer,
ScalarBuffer,
};
+ #[cfg(feature = "avro_custom_types")]
use arrow_schema::{
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema,
TimeUnit, UnionFields,
UnionMode,
};
- use bytes::{Buf, BufMut, Bytes};
+ #[cfg(not(feature = "avro_custom_types"))]
+ use arrow_schema::{
+ ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema,
UnionFields, UnionMode,
+ };
+ use bytes::Bytes;
use futures::executor::block_on;
use futures::{stream, Stream, StreamExt, TryStreamExt};
use serde_json::{json, Value};
use std::collections::HashMap;
- use std::fs;
use std::fs::File;
- use std::io::{BufReader, Cursor, Read};
+ use std::io::{BufReader, Cursor};
use std::sync::Arc;
- use std::task::{ready, Poll};
fn read_file(path: &str, batch_size: usize, utf8_view: bool) ->
RecordBatch {
let file = File::open(path).unwrap();
@@ -2185,8 +2165,7 @@ mod test {
let mut decoder = make_decoder(&store, fp_int, &schema_long);
let writer_schema_long = schema_long.schema().unwrap();
let root_long =
AvroFieldBuilder::new(&writer_schema_long).build().unwrap();
- let long_decoder =
- RecordDecoder::try_new_with_options(root_long.data_type(),
decoder.utf8_view).unwrap();
+ let long_decoder =
RecordDecoder::try_new_with_options(root_long.data_type()).unwrap();
let _ = decoder.cache.insert(fp_long, long_decoder);
let mut buf = Vec::from(SINGLE_OBJECT_MAGIC);
match fp_long {
@@ -2235,7 +2214,6 @@ mod test {
fn test_two_messages_schema_switch() {
let w_int = make_value_schema(PrimitiveType::Int);
let w_long = make_value_schema(PrimitiveType::Long);
- let r_long = w_long.clone();
let mut store = SchemaStore::new();
let fp_int = store.register(w_int).unwrap();
let fp_long = store.register(w_long).unwrap();
@@ -2345,71 +2323,12 @@ mod test {
.with_active_fingerprint(Fingerprint::Id(id))
.build_decoder()
.unwrap();
- let buf = &crate::schema::CONFLUENT_MAGIC[..0]; // empty incomplete
magic
+ let buf = &CONFLUENT_MAGIC[..0]; // empty incomplete magic
let res = decoder.handle_prefix(buf).unwrap();
assert_eq!(res, Some(0));
assert!(decoder.pending_schema.is_none());
}
- fn test_split_message_across_chunks() {
- let writer_schema = make_value_schema(PrimitiveType::Int);
- let reader_schema = writer_schema.clone();
- let mut store = SchemaStore::new();
- let fp = store.register(writer_schema).unwrap();
- let msg1 = make_message(fp, 7);
- let msg2 = make_message(fp, 8);
- let msg3 = make_message(fp, 9);
- let (pref2, body2) = msg2.split_at(10);
- let (pref3, body3) = msg3.split_at(10);
- let mut decoder = ReaderBuilder::new()
- .with_batch_size(8)
- .with_reader_schema(reader_schema)
- .with_writer_schema_store(store)
- .with_active_fingerprint(fp)
- .build_decoder()
- .unwrap();
- let _ = decoder.decode(&msg1).unwrap();
- let batch1 = decoder.flush().unwrap().expect("batch1");
- assert_eq!(batch1.num_rows(), 1);
- assert_eq!(
- batch1
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap()
- .value(0),
- 7
- );
- let _ = decoder.decode(pref2).unwrap();
- assert!(decoder.flush().unwrap().is_none());
- let mut chunk3 = Vec::from(body2);
- chunk3.extend_from_slice(pref3);
- let _ = decoder.decode(&chunk3).unwrap();
- let batch2 = decoder.flush().unwrap().expect("batch2");
- assert_eq!(batch2.num_rows(), 1);
- assert_eq!(
- batch2
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap()
- .value(0),
- 8
- );
- let _ = decoder.decode(body3).unwrap();
- let batch3 = decoder.flush().unwrap().expect("batch3");
- assert_eq!(batch3.num_rows(), 1);
- assert_eq!(
- batch3
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap()
- .value(0),
- 9
- );
- }
-
#[test]
fn test_decode_stream_with_schema() {
struct TestCase<'a> {
@@ -2495,20 +2414,6 @@ mod test {
#[test]
fn test_utf8view_support() {
- let schema_json = r#"{
- "type": "record",
- "name": "test",
- "fields": [{
- "name": "str_field",
- "type": "string"
- }]
- }"#;
-
- let schema: crate::schema::Schema =
serde_json::from_str(schema_json).unwrap();
- let avro_field = AvroField::try_from(&schema).unwrap();
-
- let data_type = avro_field.data_type();
-
struct TestHelper;
impl TestHelper {
fn with_utf8view(field: &Field) -> Field {
@@ -3009,7 +2914,6 @@ mod test {
let mut tid_rec_a: Option<i8> = None;
let mut tid_rec_b: Option<i8> = None;
let mut tid_array: Option<i8> = None;
- let mut tid_map: Option<i8> = None;
for (tid, f) in fields.iter() {
match f.data_type() {
DataType::Dictionary(_, _) => tid_enum = Some(tid),
@@ -3024,7 +2928,6 @@ mod test {
}
}
DataType::List(_) => tid_array = Some(tid),
- DataType::Map(_, _) => tid_map = Some(tid),
_ => {}
}
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 15b9530fa3..f2376e5ade 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -92,53 +92,15 @@ macro_rules! append_decimal_default {
}};
}
-#[derive(Debug)]
-pub(crate) struct RecordDecoderBuilder<'a> {
- data_type: &'a AvroDataType,
- use_utf8view: bool,
-}
-
-impl<'a> RecordDecoderBuilder<'a> {
- pub(crate) fn new(data_type: &'a AvroDataType) -> Self {
- Self {
- data_type,
- use_utf8view: false,
- }
- }
-
- pub(crate) fn with_utf8_view(mut self, use_utf8view: bool) -> Self {
- self.use_utf8view = use_utf8view;
- self
- }
-
- /// Builds the `RecordDecoder`.
- pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
- RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
- }
-}
-
/// Decodes avro encoded data into [`RecordBatch`]
#[derive(Debug)]
pub(crate) struct RecordDecoder {
schema: SchemaRef,
fields: Vec<Decoder>,
- use_utf8view: bool,
projector: Option<Projector>,
}
impl RecordDecoder {
- /// Creates a new `RecordDecoderBuilder` for configuring a `RecordDecoder`.
- pub(crate) fn new(data_type: &'_ AvroDataType) -> Self {
- RecordDecoderBuilder::new(data_type).build().unwrap()
- }
-
- /// Create a new [`RecordDecoder`] from the provided [`AvroDataType`] with
default options
- pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self,
ArrowError> {
- RecordDecoderBuilder::new(data_type)
- .with_utf8_view(true)
- .build()
- }
-
/// Creates a new [`RecordDecoder`] from the provided [`AvroDataType`]
with additional options.
///
/// This method allows you to customize how the Avro data is decoded into
Arrow arrays.
@@ -149,10 +111,7 @@ impl RecordDecoder {
///
/// # Errors
/// This function will return an error if the provided `data_type` is not
a `Record`.
- pub(crate) fn try_new_with_options(
- data_type: &AvroDataType,
- use_utf8view: bool,
- ) -> Result<Self, ArrowError> {
+ pub(crate) fn try_new_with_options(data_type: &AvroDataType) ->
Result<Self, ArrowError> {
match data_type.codec() {
Codec::Struct(reader_fields) => {
// Build Arrow schema fields and per-child decoders
@@ -171,7 +130,6 @@ impl RecordDecoder {
Ok(Self {
schema: Arc::new(ArrowSchema::new(arrow_fields)),
fields: encodings,
- use_utf8view,
projector,
})
}
@@ -575,7 +533,7 @@ impl Decoder {
}
Self::Record(_, e, _) => {
for encoding in e.iter_mut() {
- encoding.append_null();
+ encoding.append_null()?;
}
}
Self::Map(_, _koff, moff, _, _) => {
@@ -595,7 +553,7 @@ impl Decoder {
Self::Union(u) => u.append_null()?,
Self::Nullable(_, null_buffer, inner, _) => {
null_buffer.append(false);
- inner.append_null();
+ inner.append_null()?;
}
}
Ok(())
@@ -830,7 +788,7 @@ impl Decoder {
} else if let Some(proj) = projector.as_ref() {
proj.project_default(dec, i)?;
} else {
- dec.append_null();
+ dec.append_null()?;
}
}
Ok(())
@@ -840,7 +798,7 @@ impl Decoder {
if let Some(proj) = projector.as_ref() {
proj.project_default(dec, i)?;
} else {
- dec.append_null();
+ dec.append_null()?;
}
}
Ok(())
@@ -977,7 +935,7 @@ impl Decoder {
// It is important to decode before appending to
null buffer in case of decode error
encoding.decode(buf)?;
} else {
- encoding.append_null();
+ encoding.append_null()?;
}
nb.append(is_not_null);
}
@@ -1281,7 +1239,6 @@ struct UnionDecoder {
branches: Vec<Decoder>,
counts: Vec<i32>,
reader_type_codes: Vec<i8>,
- null_branch: Option<usize>,
default_emit_idx: usize,
null_emit_idx: usize,
plan: UnionReadPlan,
@@ -1296,7 +1253,6 @@ impl Default for UnionDecoder {
branches: Vec::new(),
counts: Vec::new(),
reader_type_codes: Vec::new(),
- null_branch: None,
default_emit_idx: 0,
null_emit_idx: 0,
plan: UnionReadPlan::Passthrough,
@@ -1348,7 +1304,6 @@ impl UnionDecoder {
branches,
counts: vec![0; branch_len],
reader_type_codes,
- null_branch,
default_emit_idx,
null_emit_idx,
plan: Self::plan_from_resolved(resolved)?,
@@ -1878,8 +1833,6 @@ enum Skipper {
Float64,
Bytes,
String,
- Date32,
- TimeMillis,
TimeMicros,
TimestampMillis,
TimestampMicros,
@@ -1905,6 +1858,11 @@ impl Skipper {
Codec::TimeMicros => Self::TimeMicros,
Codec::TimestampMillis(_) => Self::TimestampMillis,
Codec::TimestampMicros(_) => Self::TimestampMicros,
+ #[cfg(feature = "avro_custom_types")]
+ Codec::DurationNanos
+ | Codec::DurationMicros
+ | Codec::DurationMillis
+ | Codec::DurationSeconds => Self::Int64,
Codec::Float32 => Self::Float32,
Codec::Float64 => Self::Float64,
Codec::Binary => Self::Bytes,
@@ -1939,12 +1897,6 @@ impl Skipper {
.collect::<Result<_, _>>()?,
)
}
- _ => {
- return Err(ArrowError::NotYetImplemented(format!(
- "Skipper not implemented for codec {:?}",
- dt.codec()
- )));
- }
};
if let Some(n) = dt.nullability() {
base = Self::Nullable(n, Box::new(base));
@@ -1959,7 +1911,7 @@ impl Skipper {
buf.get_bool()?;
Ok(())
}
- Self::Int32 | Self::Date32 | Self::TimeMillis => {
+ Self::Int32 => {
buf.get_int()?;
Ok(())
}
@@ -2056,10 +2008,11 @@ impl Skipper {
#[cfg(test)]
mod tests {
use super::*;
- use crate::codec::AvroField;
- use crate::schema::{PrimitiveType, Schema, TypeName};
+ use crate::codec::AvroFieldBuilder;
+ use crate::schema::{Attributes, ComplexType, Field, PrimitiveType, Record,
Schema, TypeName};
use arrow_array::cast::AsArray;
use indexmap::IndexMap;
+ use std::collections::HashMap;
fn encode_avro_int(value: i32) -> Vec<u8> {
let mut buf = Vec::new();
@@ -2093,6 +2046,58 @@ mod tests {
AvroDataType::new(codec, Default::default(), None)
}
+ fn resolved_root_datatype(
+ writer: Schema<'static>,
+ reader: Schema<'static>,
+ use_utf8view: bool,
+ strict_mode: bool,
+ ) -> AvroDataType {
+ // Wrap writer schema in a single-field record
+ let writer_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: writer,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+
+ // Wrap reader schema in a single-field record
+ let reader_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: reader,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+
+ // Build resolved record, then extract the inner field's resolved
AvroDataType
+ let field = AvroFieldBuilder::new(&writer_record)
+ .with_reader_schema(&reader_record)
+ .with_utf8view(use_utf8view)
+ .with_strict_mode(strict_mode)
+ .build()
+ .expect("schema resolution should succeed");
+
+ match field.data_type().codec() {
+ Codec::Struct(fields) => fields[0].data_type().clone(),
+ other => panic!("expected wrapper struct, got {other:?}"),
+ }
+ }
+
fn decoder_for_promotion(
writer: PrimitiveType,
reader: PrimitiveType,
@@ -2100,9 +2105,23 @@ mod tests {
) -> Decoder {
let ws = Schema::TypeName(TypeName::Primitive(writer));
let rs = Schema::TypeName(TypeName::Primitive(reader));
- let field =
- AvroField::resolve_from_writer_and_reader(&ws, &rs, use_utf8view,
false).unwrap();
- Decoder::try_new(field.data_type()).unwrap()
+ let dt = resolved_root_datatype(ws, rs, use_utf8view, false);
+ Decoder::try_new(&dt).unwrap()
+ }
+
+ fn make_avro_dt(codec: Codec, nullability: Option<Nullability>) ->
AvroDataType {
+ AvroDataType::new(codec, HashMap::new(), nullability)
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ fn encode_vlq_u64(mut x: u64) -> Vec<u8> {
+ let mut out = Vec::with_capacity(10);
+ while x >= 0x80 {
+ out.push((x as u8) | 0x80);
+ x >>= 7;
+ }
+ out.push(x as u8);
+ out
}
#[test]
@@ -2115,36 +2134,44 @@ mod tests {
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
]);
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+
+ let dt = resolved_root_datatype(ws, rs, false, false);
+ let mut dec = Decoder::try_new(&dt).unwrap();
+
let mut rec1 = encode_avro_long(0);
rec1.extend(encode_avro_int(7));
let mut cur1 = AvroCursor::new(&rec1);
dec.decode(&mut cur1).unwrap();
+
let mut rec2 = encode_avro_long(1);
rec2.extend(encode_avro_bytes("abc".as_bytes()));
let mut cur2 = AvroCursor::new(&rec2);
dec.decode(&mut cur2).unwrap();
+
let arr = dec.flush(None).unwrap();
let ua = arr
.as_any()
.downcast_ref::<UnionArray>()
.expect("dense union output");
+
assert_eq!(
ua.type_id(0),
1,
"first value must select reader 'long' branch"
);
assert_eq!(ua.value_offset(0), 0);
+
assert_eq!(
ua.type_id(1),
0,
"second value must select reader 'string' branch"
);
assert_eq!(ua.value_offset(1), 0);
+
let long_child =
ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(long_child.len(), 1);
assert_eq!(long_child.value(0), 7);
+
let str_child =
ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(str_child.len(), 1);
assert_eq!(str_child.value(0), "abc");
@@ -2157,12 +2184,15 @@ mod tests {
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
]);
let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+
+ let dt = resolved_root_datatype(ws, rs, false, false);
+ let mut dec = Decoder::try_new(&dt).unwrap();
+
let mut data = encode_avro_long(0);
data.extend(encode_avro_int(5));
let mut cur = AvroCursor::new(&data);
dec.decode(&mut cur).unwrap();
+
let arr = dec.flush(None).unwrap();
let out = arr.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(out.len(), 1);
@@ -2176,8 +2206,10 @@ mod tests {
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
]);
let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Long));
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+
+ let dt = resolved_root_datatype(ws, rs, false, false);
+ let mut dec = Decoder::try_new(&dt).unwrap();
+
let mut data = encode_avro_long(1);
data.extend(encode_avro_bytes("z".as_bytes()));
let mut cur = AvroCursor::new(&data);
@@ -2195,11 +2227,14 @@ mod tests {
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
]);
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+
+ let dt = resolved_root_datatype(ws, rs, false, false);
+ let mut dec = Decoder::try_new(&dt).unwrap();
+
let data = encode_avro_int(6);
let mut cur = AvroCursor::new(&data);
dec.decode(&mut cur).unwrap();
+
let arr = dec.flush(None).unwrap();
let ua = arr
.as_any()
@@ -2212,9 +2247,11 @@ mod tests {
"must resolve to reader 'long' branch (type_id 1)"
);
assert_eq!(ua.value_offset(0), 0);
+
let long_child =
ua.child(1).as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(long_child.len(), 1);
assert_eq!(long_child.value(0), 6);
+
let str_child =
ua.child(0).as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(str_child.len(), 0, "string branch must be empty");
}
@@ -2229,8 +2266,10 @@ mod tests {
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
]);
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+
+ let dt = resolved_root_datatype(ws, rs, false, false);
+ let mut dec = Decoder::try_new(&dt).unwrap();
+
let mut data = encode_avro_long(1);
data.push(1);
let mut cur = AvroCursor::new(&data);
@@ -2386,8 +2425,47 @@ mod tests {
fn test_schema_resolution_no_promotion_passthrough_int() {
let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
- let field = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false).unwrap();
- let mut dec = Decoder::try_new(field.data_type()).unwrap();
+ // Wrap both in a synthetic single-field record and resolve with
AvroFieldBuilder
+ let writer_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: ws,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+ let reader_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: rs,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+ let field = AvroFieldBuilder::new(&writer_record)
+ .with_reader_schema(&reader_record)
+ .with_utf8view(false)
+ .with_strict_mode(false)
+ .build()
+ .unwrap();
+ // Extract the resolved inner field's AvroDataType
+ let dt = match field.data_type().codec() {
+ Codec::Struct(fields) => fields[0].data_type().clone(),
+ other => panic!("expected wrapper struct, got {other:?}"),
+ };
+ let mut dec = Decoder::try_new(&dt).unwrap();
assert!(matches!(dec, Decoder::Int32(_)));
for v in [7, -9] {
let data = encode_avro_int(v);
@@ -2404,7 +2482,39 @@ mod tests {
fn test_schema_resolution_illegal_promotion_int_to_boolean_errors() {
let ws = Schema::TypeName(TypeName::Primitive(PrimitiveType::Int));
let rs = Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean));
- let res = AvroField::resolve_from_writer_and_reader(&ws, &rs, false,
false);
+ let writer_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: ws,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+ let reader_record = Schema::Complex(ComplexType::Record(Record {
+ name: "Root",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![Field {
+ name: "v",
+ r#type: rs,
+ default: None,
+ doc: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+ let res = AvroFieldBuilder::new(&writer_record)
+ .with_reader_schema(&reader_record)
+ .with_utf8view(false)
+ .with_strict_mode(false)
+ .build();
assert!(res.is_err(), "expected error for illegal promotion");
}
@@ -4064,4 +4174,127 @@ mod tests {
let type_ids: Vec<i8> = fields.iter().map(|(tid, _)| tid).collect();
assert_eq!(type_ids, vec![42_i8, 7_i8]);
}
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn skipper_from_avro_maps_custom_duration_variants_to_int64() ->
Result<(), ArrowError> {
+ for codec in [
+ Codec::DurationNanos,
+ Codec::DurationMicros,
+ Codec::DurationMillis,
+ Codec::DurationSeconds,
+ ] {
+ let dt = make_avro_dt(codec.clone(), None);
+ let s = Skipper::from_avro(&dt)?;
+ match s {
+ Skipper::Int64 => {}
+ other => panic!("expected Int64 skipper for {:?}, got {:?}",
codec, other),
+ }
+ }
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn skipper_skip_consumes_one_long_for_custom_durations() -> Result<(),
ArrowError> {
+ let values: [i64; 7] = [0, 1, -1, 150, -150, i64::MAX / 3, i64::MIN /
3];
+ for codec in [
+ Codec::DurationNanos,
+ Codec::DurationMicros,
+ Codec::DurationMillis,
+ Codec::DurationSeconds,
+ ] {
+ let dt = make_avro_dt(codec.clone(), None);
+ let mut s = Skipper::from_avro(&dt)?;
+ for &v in &values {
+ let bytes = encode_avro_long(v);
+ let mut cursor = AvroCursor::new(&bytes);
+ s.skip(&mut cursor)?;
+ assert_eq!(
+ cursor.position(),
+ bytes.len(),
+ "did not consume all bytes for {:?} value {}",
+ codec,
+ v
+ );
+ }
+ }
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn skipper_nullable_custom_duration_respects_null_first() -> Result<(),
ArrowError> {
+ let dt = make_avro_dt(Codec::DurationNanos,
Some(Nullability::NullFirst));
+ let mut s = Skipper::from_avro(&dt)?;
+ match &s {
+ Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
+ Skipper::Int64 => {}
+ ref other => panic!("expected inner Int64, got {:?}", other),
+ },
+ other => panic!("expected Nullable(NullFirst, Int64), got {:?}",
other),
+ }
+ {
+ let buf = encode_vlq_u64(0);
+ let mut cursor = AvroCursor::new(&buf);
+ s.skip(&mut cursor)?;
+ assert_eq!(cursor.position(), 1, "expected to consume only tag=0");
+ }
+ {
+ let mut buf = encode_vlq_u64(1);
+ buf.extend(encode_avro_long(0));
+ let mut cursor = AvroCursor::new(&buf);
+ s.skip(&mut cursor)?;
+ assert_eq!(cursor.position(), 2, "expected to consume tag=1 +
long(0)");
+ }
+
+ Ok(())
+ }
+
+ #[cfg(feature = "avro_custom_types")]
+ #[test]
+ fn skipper_nullable_custom_duration_respects_null_second() -> Result<(),
ArrowError> {
+ let dt = make_avro_dt(Codec::DurationMicros,
Some(Nullability::NullSecond));
+ let mut s = Skipper::from_avro(&dt)?;
+ match &s {
+ Skipper::Nullable(Nullability::NullSecond, inner) => match **inner
{
+ Skipper::Int64 => {}
+ ref other => panic!("expected inner Int64, got {:?}", other),
+ },
+ other => panic!("expected Nullable(NullSecond, Int64), got {:?}",
other),
+ }
+ {
+ let buf = encode_vlq_u64(1);
+ let mut cursor = AvroCursor::new(&buf);
+ s.skip(&mut cursor)?;
+ assert_eq!(cursor.position(), 1, "expected to consume only tag=1");
+ }
+ {
+ let mut buf = encode_vlq_u64(0);
+ buf.extend(encode_avro_long(-1));
+ let mut cursor = AvroCursor::new(&buf);
+ s.skip(&mut cursor)?;
+ assert_eq!(
+ cursor.position(),
+ 1 + encode_avro_long(-1).len(),
+ "expected to consume tag=0 + long(-1)"
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(),
ArrowError> {
+ let dt = make_avro_dt(Codec::Interval, None);
+ let mut s = Skipper::from_avro(&dt)?;
+ match s {
+ Skipper::DurationFixed12 => {}
+ other => panic!("expected DurationFixed12, got {:?}", other),
+ }
+ let payload = vec![0u8; 12];
+ let mut cursor = AvroCursor::new(&payload);
+ s.skip(&mut cursor)?;
+ assert_eq!(cursor.position(), 12, "expected to consume 12 fixed
bytes");
+ Ok(())
+ }
}
diff --git a/arrow-avro/src/reader/vlq.rs b/arrow-avro/src/reader/vlq.rs
index b198a0d66f..c0b471b466 100644
--- a/arrow-avro/src/reader/vlq.rs
+++ b/arrow-avro/src/reader/vlq.rs
@@ -84,7 +84,7 @@ fn read_varint_array(buf: [u8; 10]) -> Option<(u64, usize)> {
#[cold]
fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> {
let mut value = 0;
- for (count, byte) in buf.iter().take(10).enumerate() {
+ for (count, _byte) in buf.iter().take(10).enumerate() {
let byte = buf[count];
value |= u64::from(byte & 0x7F) << (count * 7);
if byte <= 0x7F {
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 4f48337b78..feaad8c131 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -910,6 +910,10 @@ fn build_canonical(schema: &Schema, enclosing_ns:
Option<&str>) -> Result<String
.fields
.iter()
.map(|f| {
+ // PCF [STRIP] per Avro spec: keep only attributes
relevant to parsing
+ // ("name" and "type" for fields) and **strip others**
such as doc,
+ // default, order, and **aliases**. This preserves
canonicalization. See:
+ //
https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas
let field_type =
build_canonical(&f.r#type,
child_ns.as_deref().or(enclosing_ns))?;
Ok(format!(
@@ -1098,7 +1102,7 @@ impl NameGenerator {
}
}
-fn merge_extras(schema: Value, mut extras: JsonMap<String, Value>) -> Value {
+fn merge_extras(schema: Value, extras: JsonMap<String, Value>) -> Value {
if extras.is_empty() {
return schema;
}
@@ -1290,23 +1294,19 @@ fn datatype_to_avro(
};
json!({ "type": "long", "logicalType": logical_type })
}
+ #[cfg(not(feature = "avro_custom_types"))]
+ DataType::Duration(_unit) => Value::String("long".into()),
+ #[cfg(feature = "avro_custom_types")]
DataType::Duration(unit) => {
- #[cfg(feature = "avro_custom_types")]
- {
- // When the feature is enabled, create an Avro schema object
- // with the correct `logicalType` annotation.
- let logical_type = match unit {
- TimeUnit::Second => "arrow.duration-seconds",
- TimeUnit::Millisecond => "arrow.duration-millis",
- TimeUnit::Microsecond => "arrow.duration-micros",
- TimeUnit::Nanosecond => "arrow.duration-nanos",
- };
- json!({ "type": "long", "logicalType": logical_type })
- }
- #[cfg(not(feature = "avro_custom_types"))]
- {
- Value::String("long".into())
- }
+ // When the feature is enabled, create an Avro schema object
+ // with the correct `logicalType` annotation.
+ let logical_type = match unit {
+ TimeUnit::Second => "arrow.duration-seconds",
+ TimeUnit::Millisecond => "arrow.duration-millis",
+ TimeUnit::Microsecond => "arrow.duration-micros",
+ TimeUnit::Nanosecond => "arrow.duration-nanos",
+ };
+ json!({ "type": "long", "logicalType": logical_type })
}
DataType::Interval(IntervalUnit::MonthDayNano) => json!({
"type": "fixed",
@@ -1483,6 +1483,7 @@ fn datatype_to_avro(
Value::Array(branches)
}
+ #[cfg(not(feature = "small_decimals"))]
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Arrow type {other:?} has no Avro representation"
@@ -1553,7 +1554,7 @@ fn arrow_field_to_avro(
#[cfg(test)]
mod tests {
use super::*;
- use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder};
+ use crate::codec::{AvroField, AvroFieldBuilder};
use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit, UnionFields};
use serde_json::json;
use std::sync::Arc;
@@ -2079,15 +2080,15 @@ mod tests {
.lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
.is_none());
}
- Fingerprint::Id(id) => {
+ Fingerprint::Id(_id) => {
unreachable!("This test should only generate Rabin
fingerprints")
}
#[cfg(feature = "md5")]
- Fingerprint::MD5(id) => {
+ Fingerprint::MD5(_id) => {
unreachable!("This test should only generate Rabin
fingerprints")
}
#[cfg(feature = "sha256")]
- Fingerprint::SHA256(id) => {
+ Fingerprint::SHA256(_id) => {
unreachable!("This test should only generate Rabin
fingerprints")
}
}
@@ -2177,8 +2178,7 @@ mod tests {
let mut store = SchemaStore::new();
let schema =
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
let canonical_form =
r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
- let expected_fingerprint =
-
Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form));
+ let expected_fingerprint =
Fingerprint::Rabin(compute_fingerprint_rabin(canonical_form));
let fingerprint = store.register(schema.clone()).unwrap();
assert_eq!(fingerprint, expected_fingerprint);
let looked_up = store.lookup(&fingerprint).cloned();
@@ -2306,6 +2306,7 @@ mod tests {
assert_json_contains(&avro_uuid.json_string,
"\"logicalType\":\"uuid\"");
}
+ #[cfg(feature = "avro_custom_types")]
#[test]
fn test_interval_duration() {
let interval_field = ArrowField::new(
@@ -2320,7 +2321,6 @@ mod tests {
let dur_field = ArrowField::new("latency",
DataType::Duration(TimeUnit::Nanosecond), false);
let s2 = single_field_schema(dur_field);
let avro2 = AvroSchema::try_from(&s2).unwrap();
- #[cfg(feature = "avro_custom_types")]
assert_json_contains(
&avro2.json_string,
"\"logicalType\":\"arrow.duration-nanos\"",
@@ -2459,13 +2459,13 @@ mod tests {
);
}
+ #[cfg(feature = "avro_custom_types")]
#[test]
fn test_duration_list_extras_propagated() {
let child = ArrowField::new("lat",
DataType::Duration(TimeUnit::Microsecond), false);
let list_dt = DataType::List(Arc::new(child));
let arrow_schema = single_field_schema(ArrowField::new("durations",
list_dt, false));
let avro = AvroSchema::try_from(&arrow_schema).unwrap();
- #[cfg(feature = "avro_custom_types")]
assert_json_contains(
&avro.json_string,
"\"logicalType\":\"arrow.duration-micros\"",
@@ -2497,6 +2497,7 @@ mod tests {
assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
}
+ #[cfg(feature = "avro_custom_types")]
#[test]
fn test_map_duration_value_extra() {
let val_field = ArrowField::new("value",
DataType::Duration(TimeUnit::Second), true);
@@ -2511,7 +2512,6 @@ mod tests {
let map_dt = DataType::Map(Arc::new(entries_struct), false);
let schema = single_field_schema(ArrowField::new("metrics", map_dt,
false));
let avro = AvroSchema::try_from(&schema).unwrap();
- #[cfg(feature = "avro_custom_types")]
assert_json_contains(
&avro.json_string,
"\"logicalType\":\"arrow.duration-seconds\"",
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index f90a19f931..300892c056 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -34,7 +34,6 @@ use arrow_array::{
use arrow_array::{Decimal32Array, Decimal64Array};
use arrow_buffer::NullBuffer;
use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as
ArrowSchema, TimeUnit};
-use serde::Serialize;
use std::io::Write;
use std::sync::Arc;
use uuid::Uuid;
@@ -244,7 +243,7 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeBinary => {
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
- DataType::FixedSizeBinary(len) => {
+ DataType::FixedSizeBinary(_len) => {
let arr = array
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
@@ -430,11 +429,6 @@ impl<'a> FieldEncoder<'a> {
)))
}
}
- other => {
- return Err(ArrowError::NotYetImplemented(format!(
- "Avro writer: {other:?} not yet supported",
- )));
- }
};
// Compute the effective null state from writer-declared nullability
and data nulls.
let null_state = match (nullability, array.null_count() > 0) {
@@ -1048,12 +1042,10 @@ impl<'a> MapEncoder<'a> {
let offsets = self.map.offsets();
let start = offsets[idx] as usize;
let end = offsets[idx + 1] as usize;
-
- let mut write_item = |out: &mut W, j: usize| {
+ let write_item = |out: &mut W, j: usize| {
let j_val = j.saturating_sub(self.values_offset);
self.values.encode(out, j_val)
};
-
match self.keys {
KeyKind::Utf8(arr) => MapEncoder::<'a>::encode_map_entries(
out,
@@ -1425,7 +1417,6 @@ mod tests {
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Float32Array,
Float64Array, Int32Array,
Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
ListArray, StringArray,
- TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Fields};
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index a6ddba38d2..90a2856330 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -16,10 +16,7 @@
// under the License.
use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
-use crate::schema::{
- AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy,
CONFLUENT_MAGIC,
- SCHEMA_METADATA_KEY, SINGLE_OBJECT_MAGIC,
-};
+use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
use crate::writer::encoder::write_long;
use arrow_schema::{ArrowError, Schema};
use rand::RngCore;
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 471423b6f7..29d2312209 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -392,10 +392,14 @@ mod tests {
use super::*;
use crate::compression::CompressionCodec;
use crate::reader::ReaderBuilder;
- use crate::schema::{AvroSchema, SchemaStore, CONFLUENT_MAGIC};
+ use crate::schema::{AvroSchema, SchemaStore};
use crate::test_util::arrow_test_data;
- use arrow_array::{ArrayRef, BinaryArray, DurationSecondArray, Int32Array,
RecordBatch};
- use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
+ use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch};
+ #[cfg(not(feature = "avro_custom_types"))]
+ use arrow_schema::{DataType, Field, Schema};
+ #[cfg(feature = "avro_custom_types")]
+ use arrow_schema::{DataType, Field, Schema, TimeUnit};
+ #[cfg(feature = "avro_custom_types")]
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufReader, Cursor};
@@ -422,7 +426,6 @@ mod tests {
#[test]
fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError>
{
- let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
@@ -561,7 +564,7 @@ mod tests {
for rel in files {
let path = arrow_test_data(rel);
let rdr_file = File::open(&path).expect("open input avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build reader");
let schema = reader.schema();
@@ -589,7 +592,7 @@ mod tests {
writer.finish()?;
drop(writer);
let rt_file = File::open(&out_path).expect("open roundtrip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build roundtrip reader");
let rt_schema = rt_reader.schema();
@@ -609,7 +612,7 @@ mod tests {
fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
let path = arrow_test_data("avro/nested_records.avro");
let rdr_file = File::open(&path).expect("open nested_records.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build reader for nested_records.avro");
let schema = reader.schema();
@@ -624,7 +627,7 @@ mod tests {
writer.finish()?;
}
let rt_file = File::open(&out_path).expect("open round_trip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build round_trip reader");
let rt_schema = rt_reader.schema();
@@ -642,7 +645,7 @@ mod tests {
fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
let path = arrow_test_data("avro/nested_lists.snappy.avro");
let rdr_file = File::open(&path).expect("open
nested_lists.snappy.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build reader for nested_lists.snappy.avro");
let schema = reader.schema();
@@ -659,7 +662,7 @@ mod tests {
writer.finish()?;
}
let rt_file = File::open(&out_path).expect("open round_trip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build round_trip reader");
let rt_schema = rt_reader.schema();
@@ -677,7 +680,7 @@ mod tests {
fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
let path = arrow_test_data("avro/simple_fixed.avro");
let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build avro reader");
let schema = reader.schema();
@@ -691,7 +694,7 @@ mod tests {
writer.finish()?;
drop(writer);
let rt_file = File::open(tmp.path()).expect("open round_trip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build round_trip reader");
let rt_schema = rt_reader.schema();
@@ -705,9 +708,10 @@ mod tests {
#[cfg(not(feature = "canonical_extension_types"))]
#[test]
fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
+ use arrow_schema::{DataType, IntervalUnit};
let in_file =
File::open("test/data/duration_uuid.avro").expect("open
test/data/duration_uuid.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(in_file))
.expect("build reader for duration_uuid.avro");
let in_schema = reader.schema();
@@ -740,7 +744,7 @@ mod tests {
writer.finish()?;
}
let rt_file = File::open(tmp.path()).expect("open round_trip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build round_trip reader");
let rt_schema = rt_reader.schema();
@@ -759,7 +763,7 @@ mod tests {
// Load source Avro with Map fields
let path = arrow_test_data("avro/nonnullable.impala.avro");
let rdr_file = File::open(&path).expect("open
avro/nonnullable.impala.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build reader for nonnullable.impala.avro");
// Collect all input batches and concatenate to a single RecordBatch
@@ -784,7 +788,7 @@ mod tests {
writer.finish()?;
let out_bytes = writer.into_inner();
// Read the produced bytes back with the Reader
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(Cursor::new(out_bytes))
.expect("build reader for round-tripped in-memory OCF");
let rt_schema = rt_reader.schema();
@@ -824,7 +828,7 @@ mod tests {
};
// Read original file into a single RecordBatch for comparison
let f_in = File::open(&path).expect("open input avro");
- let mut rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
+ let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
let in_schema = rdr.schema();
let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
let original =
@@ -838,7 +842,7 @@ mod tests {
writer.finish()?;
// Read back the file we just wrote and compare equality (schema +
data)
let f_rt = File::open(&out_path).expect("open roundtrip avro");
- let mut rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
+ let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
let rt_schema = rt_rdr.schema();
let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
let roundtrip =
@@ -853,7 +857,7 @@ mod tests {
// Read the known-good enum file (same as reader::test_simple)
let path = arrow_test_data("avro/simple_enum.avro");
let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(rdr_file))
.expect("build reader for simple_enum.avro");
// Concatenate all batches to one RecordBatch for a clean equality
check
@@ -880,7 +884,7 @@ mod tests {
writer.finish()?;
let bytes = writer.into_inner();
// Read back and compare for exact equality (schema + data)
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(Cursor::new(bytes))
.expect("reader for round-trip");
let rt_schema = rt_reader.schema();
@@ -937,7 +941,7 @@ mod tests {
let in_file = File::open(&file_path)
.unwrap_or_else(|_| panic!("Failed to open test file: {}",
file_path));
- let mut reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new()
.build(BufReader::new(in_file))
.expect("build reader for duration_logical_types.avro");
let in_schema = reader.schema();
@@ -978,7 +982,7 @@ mod tests {
}
let rt_file = File::open(tmp.path()).expect("open round_trip avro");
- let mut rt_reader = ReaderBuilder::new()
+ let rt_reader = ReaderBuilder::new()
.build(BufReader::new(rt_file))
.expect("build round_trip reader");
let rt_schema = rt_reader.schema();