This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 973e6fca9b Add `ArrowError::AvroError`, remaining types and roundtrip
tests to `arrow-avro`, (#8595)
973e6fca9b is described below
commit 973e6fca9bb3b416623a488eac16c7da954fbc84
Author: Connor Sanders <[email protected]>
AuthorDate: Wed Oct 15 03:24:17 2025 -0500
Add `ArrowError::AvroError`, remaining types and roundtrip tests to
`arrow-avro`, (#8595)
# Which issue does this PR close?
- Closes #4886
- Stacked on #8584
# Rationale for this change
This PR brings Arrow-Avro round‑trip coverage up to date with modern
Arrow types and the latest Avro logical types. In particular, Avro 1.12
adds `timestamp-nanos` and `local-timestamp-nanos`. Enabling these
logical types and filling in missing Avro writer encoders for Arrow’s
newer *view* and list families allows lossless read/write and simpler
pipelines.
It also hardens timestamp/time scaling in the writer to avoid silent
overflow when converting seconds to milliseconds, surfacing a clear
error instead.
# What changes are included in this PR?
* **Nanosecond timestamps**: Introduces a `TimestampNanos(bool)` codec
in `arrow-avro` that maps Avro `timestamp-nanos` /
`local-timestamp-nanos` to Arrow `Timestamp(Nanosecond, tz)`. The
reader/decoder, union field kinds, and Arrow `DataType` mapping are all
extended accordingly. Logical type detection is wired through both
`logicalType` and the `arrowTimeUnit="nanosecond"` attribute.
* **UUID logical type round‑trip fix**: When reading Avro
`logicalType="uuid"` fields, preserve that logical type in Arrow field
metadata so writers can round‑trip it back to Avro.
* **Avro writer encoders**: Add the missing array encoders and coverage
for Arrow’s `ListView`, `LargeListView`, and `FixedSizeList`, and extend
array encoder support to `BinaryView` and `Utf8View`. (See large
additions in `writer/encoder.rs`.)
* **Safer time/timestamp scaling**: Guard second to millisecond
conversions in `Time32`/`Timestamp` encoders to prevent overflow;
encoding now returns a clear `InvalidArgument` error in those cases.
* **Schema utilities**: Add `AvroSchemaOptions` with `null_order` and
`strip_metadata` flags so Avro JSON can be built while optionally
omitting internal Arrow keys during round‑trip schema generation.
* **Tests & round‑trip coverage**: Add unit tests for nanosecond
timestamp decoding (UTC, local, and with nulls) and additional
end‑to‑end/round‑trip tests for the updated writer paths.
# Are these changes tested?
Yes.
* New decoder tests validate `Timestamp(Nanosecond, tz)` behavior for
UTC and local timestamps and for nullable unions.
* Writer tests validate the nanosecond encoder and exercise an overflow
path for second→millisecond conversion that now returns an error.
* Additional round‑trip tests were added alongside the new encoders.
# Are there any user-facing changes?
N/A since `arrow-avro` is not public yet.
---
arrow-avro/src/codec.rs | 39 ++-
arrow-avro/src/reader/mod.rs | 16 +-
arrow-avro/src/reader/record.rs | 114 ++++++++-
arrow-avro/src/schema.rs | 289 ++++++++++++++++++----
arrow-avro/src/writer/encoder.rs | 521 +++++++++++++++++++++++++++++++++++++--
arrow-avro/src/writer/format.rs | 12 +-
arrow-avro/src/writer/mod.rs | 326 +++++++++++++++++++++++-
arrow-schema/src/error.rs | 3 +
8 files changed, 1234 insertions(+), 86 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index a6a495bdf3..4f4669270d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -349,7 +349,8 @@ impl AvroDataType {
Codec::Int64
| Codec::TimeMicros
| Codec::TimestampMillis(_)
- | Codec::TimestampMicros(_) =>
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
+ | Codec::TimestampMicros(_)
+ | Codec::TimestampNanos(_) =>
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
#[cfg(feature = "avro_custom_types")]
Codec::DurationNanos
| Codec::DurationMicros
@@ -652,6 +653,11 @@ pub(crate) enum Codec {
/// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
/// The boolean parameter indicates whether the timestamp has a UTC
timezone (true) or is local time (false)
TimestampMicros(bool),
+ /// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
+ ///
+ /// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
+ /// The boolean parameter indicates whether the timestamp has a UTC
timezone (true) or is local time (false)
+ TimestampNanos(bool),
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
@@ -715,6 +721,9 @@ impl Codec {
Self::TimestampMicros(is_utc) => {
DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(||
"+00:00".into()))
}
+ Self::TimestampNanos(is_utc) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(||
"+00:00".into()))
+ }
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::Decimal(precision, scale, _size) => {
@@ -917,6 +926,8 @@ enum UnionFieldKind {
TimestampMillisLocal,
TimestampMicrosUtc,
TimestampMicrosLocal,
+ TimestampNanosUtc,
+ TimestampNanosLocal,
Duration,
Fixed,
Decimal,
@@ -946,6 +957,8 @@ impl From<&Codec> for UnionFieldKind {
Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
+ Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
+ Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
Codec::Interval => Self::Duration,
Codec::Fixed(_) => Self::Fixed,
Codec::Decimal(..) => Self::Decimal,
@@ -1399,7 +1412,17 @@ impl<'a> Maker<'a> {
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
- (Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
+ (Some("timestamp-nanos"), c @ Codec::Int64) => *c =
Codec::TimestampNanos(true),
+ (Some("local-timestamp-nanos"), c @ Codec::Int64) => {
+ *c = Codec::TimestampNanos(false)
+ }
+ (Some("uuid"), c @ Codec::Utf8) => {
+ // Map Avro string+logicalType=uuid into the UUID
Codec,
+ // and preserve the logicalType in Arrow field metadata
+ // so writers can round-trip it correctly.
+ *c = Codec::Uuid;
+ field.metadata.insert("logicalType".into(),
"uuid".into());
+ }
#[cfg(feature = "avro_custom_types")]
(Some("arrow.duration-nanos"), c @ Codec::Int64) => *c =
Codec::DurationNanos,
#[cfg(feature = "avro_custom_types")]
@@ -1437,6 +1460,18 @@ impl<'a> Maker<'a> {
}
(None, _) => {}
}
+ if matches!(field.codec, Codec::Int64) {
+ if let Some(unit) = t
+ .attributes
+ .additional
+ .get("arrowTimeUnit")
+ .and_then(|v| v.as_str())
+ {
+ if unit == "nanosecond" {
+ field.codec = Codec::TimestampNanos(false);
+ }
+ }
+ }
if !t.attributes.additional.is_empty() {
for (k, v) in &t.attributes.additional {
field.metadata.insert(k.to_string(), v.to_string());
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index cd1833d35c..7e0bfad0ce 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -7437,7 +7437,6 @@ mod test {
"entire RecordBatch mismatch (schema, all columns, all rows)"
);
}
-
#[test]
fn comprehensive_e2e_resolution_test() {
use serde_json::Value;
@@ -7593,6 +7592,7 @@ mod test {
let batch = read_alltypes_with_reader_schema(path,
reader_schema.clone());
const UUID_EXT_KEY: &str = "ARROW:extension:name";
+ const UUID_LOGICAL_KEY: &str = "logicalType";
let uuid_md_top: Option<HashMap<String, String>> = batch
.schema()
@@ -7600,7 +7600,12 @@ mod test {
.ok()
.and_then(|f| {
let md = f.metadata();
- if md.get(UUID_EXT_KEY).is_some() {
+ let has_ext = md.get(UUID_EXT_KEY).is_some();
+ let is_uuid_logical = md
+ .get(UUID_LOGICAL_KEY)
+ .map(|v| v.trim_matches('"') == "uuid")
+ .unwrap_or(false);
+ if has_ext || is_uuid_logical {
Some(md.clone())
} else {
None
@@ -7617,7 +7622,12 @@ mod test {
.find(|(_, child)| child.name() == "uuid")
.and_then(|(_, child)| {
let md = child.metadata();
- if md.get(UUID_EXT_KEY).is_some() {
+ let has_ext = md.get(UUID_EXT_KEY).is_some();
+ let is_uuid_logical = md
+ .get(UUID_LOGICAL_KEY)
+ .map(|v| v.trim_matches('"') == "uuid")
+ .unwrap_or(false);
+ if has_ext || is_uuid_logical {
Some(md.clone())
} else {
None
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 7eac382d9f..0412a3e754 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -206,6 +206,7 @@ enum Decoder {
TimeMicros(Vec<i64>),
TimestampMillis(bool, Vec<i64>),
TimestampMicros(bool, Vec<i64>),
+ TimestampNanos(bool, Vec<i64>),
Int32ToInt64(Vec<i64>),
Int32ToFloat32(Vec<f32>),
Int32ToFloat64(Vec<f64>),
@@ -324,6 +325,9 @@ impl Decoder {
(Codec::TimestampMicros(is_utc), _) => {
Self::TimestampMicros(*is_utc,
Vec::with_capacity(DEFAULT_CAPACITY))
}
+ (Codec::TimestampNanos(is_utc), _) => {
+ Self::TimestampNanos(*is_utc,
Vec::with_capacity(DEFAULT_CAPACITY))
+ }
#[cfg(feature = "avro_custom_types")]
(Codec::DurationNanos, _) => {
Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
@@ -530,7 +534,8 @@ impl Decoder {
| Self::Int32ToInt64(v)
| Self::TimeMicros(v)
| Self::TimestampMillis(_, v)
- | Self::TimestampMicros(_, v) => v.push(0),
+ | Self::TimestampMicros(_, v)
+ | Self::TimestampNanos(_, v) => v.push(0),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(v)
| Self::DurationMillisecond(v)
@@ -643,7 +648,8 @@ impl Decoder {
| Self::Int32ToInt64(v)
| Self::TimeMicros(v)
| Self::TimestampMillis(_, v)
- | Self::TimestampMicros(_, v) => match lit {
+ | Self::TimestampMicros(_, v)
+ | Self::TimestampNanos(_, v) => match lit {
AvroLiteral::Long(i) => {
v.push(*i);
Ok(())
@@ -854,7 +860,8 @@ impl Decoder {
Self::Int64(values)
| Self::TimeMicros(values)
| Self::TimestampMillis(_, values)
- | Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
+ | Self::TimestampMicros(_, values)
+ | Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(values)
| Self::DurationMillisecond(values)
@@ -1070,6 +1077,10 @@ impl Decoder {
flush_primitive::<TimestampMicrosecondType>(values, nulls)
.with_timezone_opt(is_utc.then(|| "+00:00")),
),
+ Self::TimestampNanos(is_utc, values) => Arc::new(
+ flush_primitive::<TimestampNanosecondType>(values, nulls)
+ .with_timezone_opt(is_utc.then(|| "+00:00")),
+ ),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(values) => {
Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
@@ -1959,6 +1970,7 @@ enum Skipper {
TimeMicros,
TimestampMillis,
TimestampMicros,
+ TimestampNanos,
Fixed(usize),
Decimal(Option<usize>),
UuidString,
@@ -1983,6 +1995,7 @@ impl Skipper {
Codec::TimeMicros => Self::TimeMicros,
Codec::TimestampMillis(_) => Self::TimestampMillis,
Codec::TimestampMicros(_) => Self::TimestampMicros,
+ Codec::TimestampNanos(_) => Self::TimestampNanos,
#[cfg(feature = "avro_custom_types")]
Codec::DurationNanos
| Codec::DurationMicros
@@ -2044,7 +2057,11 @@ impl Skipper {
buf.get_int()?;
Ok(())
}
- Self::Int64 | Self::TimeMicros | Self::TimestampMillis |
Self::TimestampMicros => {
+ Self::Int64
+ | Self::TimeMicros
+ | Self::TimestampMillis
+ | Self::TimestampMicros
+ | Self::TimestampNanos => {
buf.get_long()?;
Ok(())
}
@@ -4647,4 +4664,93 @@ mod tests {
.expect("Int32Array");
assert_eq!(a.values(), &[1, 2, 3]);
}
+
+ #[test]
+ fn test_timestamp_nanos_decoding_utc() {
+ let avro_type = avro_from_codec(Codec::TimestampNanos(true));
+ let mut decoder = Decoder::try_new(&avro_type).expect("create
TimestampNanos decoder");
+ let mut data = Vec::new();
+ for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
+ data.extend_from_slice(&encode_avro_long(v));
+ }
+ let mut cur = AvroCursor::new(&data);
+ for _ in 0..4 {
+ decoder.decode(&mut cur).expect("decode nanos ts");
+ }
+ let array = decoder.flush(None).expect("flush nanos ts");
+ let ts = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("TimestampNanosecondArray");
+ assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
+ match ts.data_type() {
+ DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+ assert_eq!(tz.as_deref(), Some("+00:00"));
+ }
+ other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")),
got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_timestamp_nanos_decoding_local() {
+ let avro_type = avro_from_codec(Codec::TimestampNanos(false));
+ let mut decoder = Decoder::try_new(&avro_type).expect("create
TimestampNanos decoder");
+ let mut data = Vec::new();
+ for v in [10_i64, 20_i64, -30_i64] {
+ data.extend_from_slice(&encode_avro_long(v));
+ }
+ let mut cur = AvroCursor::new(&data);
+ for _ in 0..3 {
+ decoder.decode(&mut cur).expect("decode nanos ts");
+ }
+ let array = decoder.flush(None).expect("flush nanos ts");
+ let ts = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("TimestampNanosecondArray");
+ assert_eq!(ts.values(), &[10, 20, -30]);
+ match ts.data_type() {
+ DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+ assert_eq!(tz.as_deref(), None);
+ }
+ other => panic!("expected Timestamp(Nanosecond, None), got
{other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_timestamp_nanos_decoding_with_nulls() {
+ let avro_type = AvroDataType::new(
+ Codec::TimestampNanos(false),
+ Default::default(),
+ Some(Nullability::NullFirst),
+ );
+ let mut decoder = Decoder::try_new(&avro_type).expect("create nullable
TimestampNanos");
+ let mut data = Vec::new();
+ data.extend_from_slice(&encode_avro_long(1));
+ data.extend_from_slice(&encode_avro_long(42));
+ data.extend_from_slice(&encode_avro_long(0));
+ data.extend_from_slice(&encode_avro_long(1));
+ data.extend_from_slice(&encode_avro_long(-7));
+ let mut cur = AvroCursor::new(&data);
+ for _ in 0..3 {
+ decoder.decode(&mut cur).expect("decode nullable nanos ts");
+ }
+ let array = decoder.flush(None).expect("flush nullable nanos ts");
+ let ts = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("TimestampNanosecondArray");
+ assert_eq!(ts.len(), 3);
+ assert!(ts.is_valid(0));
+ assert!(ts.is_null(1));
+ assert!(ts.is_valid(2));
+ assert_eq!(ts.value(0), 42);
+ assert_eq!(ts.value(2), -7);
+ match ts.data_type() {
+ DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+ assert_eq!(tz.as_deref(), None);
+ }
+ other => panic!("expected Timestamp(Nanosecond, None), got
{other:?}"),
+ }
+ }
}
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index cff8ee1937..819ea1f16e 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -329,6 +329,12 @@ pub(crate) struct Fixed<'a> {
pub(crate) attributes: Attributes<'a>,
}
+#[derive(Debug, Copy, Clone, PartialEq, Default)]
+pub(crate) struct AvroSchemaOptions {
+ pub(crate) null_order: Option<Nullability>,
+ pub(crate) strip_metadata: bool,
+}
+
/// A wrapper for an Avro schema in its JSON string representation.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AvroSchema {
@@ -428,7 +434,7 @@ impl AvroSchema {
build_canonical(schema, None)
}
- /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order.
+ /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order and optionally stripping internal Arrow metadata.
///
/// If the input Arrow schema already contains Avro JSON in
/// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
@@ -436,17 +442,21 @@ impl AvroSchema {
/// honoring `null_union_order` at **all nullable sites**.
pub(crate) fn from_arrow_with_options(
schema: &ArrowSchema,
- null_order: Option<Nullability>,
+ options: Option<AvroSchemaOptions>,
) -> Result<AvroSchema, ArrowError> {
- if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
- return Ok(AvroSchema::new(json.clone()));
+ let opts = options.unwrap_or_default();
+ let order = opts.null_order.unwrap_or_default();
+ let strip = opts.strip_metadata;
+ if !strip {
+ if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+ return Ok(AvroSchema::new(json.clone()));
+ }
}
- let order = null_order.unwrap_or_default();
let mut name_gen = NameGenerator::default();
let fields_json = schema
.fields()
.iter()
- .map(|f| arrow_field_to_avro(f, &mut name_gen, order))
+ .map(|f| arrow_field_to_avro(f, &mut name_gen, order, strip))
.collect::<Result<Vec<_>, _>>()?;
let record_name = schema
.metadata
@@ -1170,7 +1180,14 @@ fn wrap_nullable(inner: Value, null_order: Nullability)
-> Value {
let null = Value::String("null".into());
match inner {
Value::Array(mut union) => {
- union.retain(|v| !is_avro_json_null(v));
+ // If this site is already a union and already contains "null",
+ // preserve the branch order exactly. Reordering "null" breaks
+ // the correspondence between Arrow union child order (type_ids)
+ // and the Avro branch index written on the wire.
+ if union.iter().any(is_avro_json_null) {
+ return Value::Array(union);
+ }
+ // Otherwise, inject "null" without reordering existing branches.
match null_order {
Nullability::NullFirst => union.insert(0, null),
Nullability::NullSecond => union.push(null),
@@ -1234,6 +1251,7 @@ fn datatype_to_avro(
metadata: &HashMap<String, String>,
name_gen: &mut NameGenerator,
null_order: Nullability,
+ strip: bool,
) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
let mut extras = JsonMap::new();
let mut handle_decimal = |precision: &u8, scale: &i8| -> Result<Value,
ArrowError> {
@@ -1288,22 +1306,24 @@ fn datatype_to_avro(
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View =>
Value::String("string".into()),
DataType::Binary | DataType::LargeBinary =>
Value::String("bytes".into()),
DataType::BinaryView => {
- extras.insert("arrowBinaryView".into(), Value::Bool(true));
+ if !strip {
+ extras.insert("arrowBinaryView".into(), Value::Bool(true));
+ }
Value::String("bytes".into())
}
DataType::FixedSizeBinary(len) => {
- // UUID handling:
- // - When the canonical extension feature is ON *and* this field
is the Arrow canonical UUID
- // (extension name "arrow.uuid" or legacy "uuid"), emit Avro
string with logicalType "uuid".
- // - Otherwise, fall back to a named fixed of size = len.
- #[cfg(not(feature = "canonical_extension_types"))]
- let is_uuid = false;
+ let md_is_uuid = metadata
+ .get("logicalType")
+ .map(|s| s.trim_matches('"') == "uuid")
+ .unwrap_or(false);
#[cfg(feature = "canonical_extension_types")]
- let is_uuid = (*len == 16)
- && metadata
- .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
- .map(|value| value == arrow_schema::extension::Uuid::NAME
|| value == "uuid")
- .unwrap_or(false);
+ let ext_is_uuid = metadata
+ .get(arrow_schema::extension::EXTENSION_TYPE_NAME_KEY)
+ .map(|v| v == arrow_schema::extension::Uuid::NAME || v ==
"uuid")
+ .unwrap_or(false);
+ #[cfg(not(feature = "canonical_extension_types"))]
+ let ext_is_uuid = false;
+ let is_uuid = (*len == 16) && (md_is_uuid || ext_is_uuid);
if is_uuid {
json!({ "type": "string", "logicalType": "uuid" })
} else {
@@ -1334,7 +1354,9 @@ fn datatype_to_avro(
DataType::Time32(unit) => match unit {
TimeUnit::Millisecond => json!({ "type": "int", "logicalType":
"time-millis" }),
TimeUnit::Second => {
- extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
+ if !strip {
+ extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
+ }
Value::String("int".into())
}
_ => Value::String("int".into()),
@@ -1342,7 +1364,9 @@ fn datatype_to_avro(
DataType::Time64(unit) => match unit {
TimeUnit::Microsecond => json!({ "type": "long", "logicalType":
"time-micros" }),
TimeUnit::Nanosecond => {
- extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ if !strip {
+ extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ }
Value::String("long".into())
}
_ => Value::String("long".into()),
@@ -1353,15 +1377,18 @@ fn datatype_to_avro(
(TimeUnit::Millisecond, false) => "local-timestamp-millis",
(TimeUnit::Microsecond, true) => "timestamp-micros",
(TimeUnit::Microsecond, false) => "local-timestamp-micros",
+ (TimeUnit::Nanosecond, true) => "timestamp-nanos",
+ (TimeUnit::Nanosecond, false) => "local-timestamp-nanos",
(TimeUnit::Second, _) => {
- extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
- return Ok((Value::String("long".into()), extras));
- }
- (TimeUnit::Nanosecond, _) => {
- extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ if !strip {
+ extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
+ }
return Ok((Value::String("long".into()), extras));
}
};
+ if !strip && matches!(unit, TimeUnit::Nanosecond) {
+ extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ }
json!({ "type": "long", "logicalType": logical_type })
}
#[cfg(not(feature = "avro_custom_types"))]
@@ -1396,18 +1423,22 @@ fn datatype_to_avro(
json!(obj)
}
DataType::Interval(IntervalUnit::YearMonth) => {
- extras.insert(
- "arrowIntervalUnit".into(),
- Value::String("yearmonth".into()),
- );
+ if !strip {
+ extras.insert(
+ "arrowIntervalUnit".into(),
+ Value::String("yearmonth".into()),
+ );
+ }
Value::String("long".into())
}
DataType::Interval(IntervalUnit::DayTime) => {
- extras.insert("arrowIntervalUnit".into(),
Value::String("daytime".into()));
+ if !strip {
+ extras.insert("arrowIntervalUnit".into(),
Value::String("daytime".into()));
+ }
Value::String("long".into())
}
DataType::List(child) | DataType::LargeList(child) => {
- if matches!(dt, DataType::LargeList(_)) {
+ if matches!(dt, DataType::LargeList(_)) && !strip {
extras.insert("arrowLargeList".into(), Value::Bool(true));
}
let items_schema = process_datatype(
@@ -1417,6 +1448,7 @@ fn datatype_to_avro(
name_gen,
null_order,
child.is_nullable(),
+ strip,
)?;
json!({
"type": "array",
@@ -1424,10 +1456,12 @@ fn datatype_to_avro(
})
}
DataType::ListView(child) | DataType::LargeListView(child) => {
- if matches!(dt, DataType::LargeListView(_)) {
+ if matches!(dt, DataType::LargeListView(_)) && !strip {
extras.insert("arrowLargeList".into(), Value::Bool(true));
}
- extras.insert("arrowListView".into(), Value::Bool(true));
+ if !strip {
+ extras.insert("arrowListView".into(), Value::Bool(true));
+ }
let items_schema = process_datatype(
child.data_type(),
child.name(),
@@ -1435,6 +1469,7 @@ fn datatype_to_avro(
name_gen,
null_order,
child.is_nullable(),
+ strip,
)?;
json!({
"type": "array",
@@ -1442,7 +1477,9 @@ fn datatype_to_avro(
})
}
DataType::FixedSizeList(child, len) => {
- extras.insert("arrowFixedSize".into(), json!(len));
+ if !strip {
+ extras.insert("arrowFixedSize".into(), json!(len));
+ }
let items_schema = process_datatype(
child.data_type(),
child.name(),
@@ -1450,6 +1487,7 @@ fn datatype_to_avro(
name_gen,
null_order,
child.is_nullable(),
+ strip,
)?;
json!({
"type": "array",
@@ -1472,6 +1510,7 @@ fn datatype_to_avro(
name_gen,
null_order,
value_field.is_nullable(),
+ strip,
)?;
json!({
"type": "map",
@@ -1481,7 +1520,7 @@ fn datatype_to_avro(
DataType::Struct(fields) => {
let avro_fields = fields
.iter()
- .map(|field| arrow_field_to_avro(field, name_gen, null_order))
+ .map(|field| arrow_field_to_avro(field, name_gen, null_order,
strip))
.collect::<Result<Vec<_>, _>>()?;
// Prefer avro.name/avro.namespace when provided on the struct
field metadata
let chosen_name = metadata
@@ -1524,6 +1563,7 @@ fn datatype_to_avro(
name_gen,
null_order,
false,
+ strip,
)?
}
}
@@ -1546,6 +1586,7 @@ fn datatype_to_avro(
values.metadata(),
name_gen,
null_order,
+ strip,
)?;
let mut merged = merge_extras(value_schema, value_extras);
if values.is_nullable() {
@@ -1564,6 +1605,7 @@ fn datatype_to_avro(
values.metadata(),
name_gen,
null_order,
+ strip,
)?;
return Ok((value_schema, JsonMap::new()));
}
@@ -1578,6 +1620,7 @@ fn datatype_to_avro(
field_ref.metadata(),
name_gen,
null_order,
+ strip,
)?;
// Avro unions cannot immediately contain another union
if matches!(branch_schema, Value::Array(_)) {
@@ -1597,21 +1640,22 @@ fn datatype_to_avro(
));
}
}
- extras.insert(
- "arrowUnionMode".into(),
- Value::String(
- match mode {
- UnionMode::Sparse => "sparse",
- UnionMode::Dense => "dense",
- }
- .to_string(),
- ),
- );
- extras.insert(
- "arrowUnionTypeIds".into(),
- Value::Array(type_ids.into_iter().map(|id|
json!(id)).collect()),
- );
-
+ if !strip {
+ extras.insert(
+ "arrowUnionMode".into(),
+ Value::String(
+ match mode {
+ UnionMode::Sparse => "sparse",
+ UnionMode::Dense => "dense",
+ }
+ .to_string(),
+ ),
+ );
+ extras.insert(
+ "arrowUnionTypeIds".into(),
+ Value::Array(type_ids.into_iter().map(|id|
json!(id)).collect()),
+ );
+ }
Value::Array(branches)
}
#[cfg(not(feature = "small_decimals"))]
@@ -1631,8 +1675,9 @@ fn process_datatype(
name_gen: &mut NameGenerator,
null_order: Nullability,
is_nullable: bool,
+ strip: bool,
) -> Result<Value, ArrowError> {
- let (schema, extras) = datatype_to_avro(dt, field_name, metadata,
name_gen, null_order)?;
+ let (schema, extras) = datatype_to_avro(dt, field_name, metadata,
name_gen, null_order, strip)?;
let mut merged = merge_extras(schema, extras);
if is_nullable {
merged = wrap_nullable(merged, null_order)
@@ -1644,6 +1689,7 @@ fn arrow_field_to_avro(
field: &ArrowField,
name_gen: &mut NameGenerator,
null_order: Nullability,
+ strip: bool,
) -> Result<Value, ArrowError> {
let avro_name = sanitise_avro_name(field.name());
let schema_value = process_datatype(
@@ -1653,6 +1699,7 @@ fn arrow_field_to_avro(
name_gen,
null_order,
field.is_nullable(),
+ strip,
)?;
// Build the field map
let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
@@ -2952,4 +2999,142 @@ mod tests {
"expected missing-default error, got: {err}"
);
}
+
+ #[test]
+ fn
test_from_arrow_with_options_respects_schema_metadata_when_not_stripping() {
+ let field = ArrowField::new("x", DataType::Int32, true);
+ let injected_json =
+
r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
+ .to_string();
+ let mut md = HashMap::new();
+ md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json.clone());
+ md.insert("custom".to_string(), "123".to_string());
+ let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
+ let opts = AvroSchemaOptions {
+ null_order: Some(Nullability::NullSecond),
+ strip_metadata: false,
+ };
+ let out = AvroSchema::from_arrow_with_options(&arrow_schema,
Some(opts)).unwrap();
+ assert_eq!(
+ out.json_string, injected_json,
+ "When strip_metadata=false and avro.schema is present, return the
embedded JSON verbatim"
+ );
+ let v: Value = serde_json::from_str(&out.json_string).unwrap();
+ assert_eq!(v.get("type").and_then(|t| t.as_str()), Some("record"));
+ assert_eq!(v.get("name").and_then(|n| n.as_str()), Some("Injected"));
+ }
+
+ #[test]
+ fn
test_from_arrow_with_options_ignores_schema_metadata_when_stripping_and_keeps_passthrough()
{
+ let field = ArrowField::new("x", DataType::Int32, true);
+ let injected_json =
+
r#"{"type":"record","name":"Injected","fields":[{"name":"ignored","type":"int"}]}"#
+ .to_string();
+ let mut md = HashMap::new();
+ md.insert(SCHEMA_METADATA_KEY.to_string(), injected_json);
+ md.insert("custom_meta".to_string(), "7".to_string());
+ let arrow_schema = ArrowSchema::new_with_metadata(vec![field], md);
+ let opts = AvroSchemaOptions {
+ null_order: Some(Nullability::NullFirst),
+ strip_metadata: true,
+ };
+ let out = AvroSchema::from_arrow_with_options(&arrow_schema,
Some(opts)).unwrap();
+ assert_json_contains(&out.json_string, "\"type\":\"record\"");
+ assert_json_contains(&out.json_string, "\"name\":\"topLevelRecord\"");
+ assert_json_contains(&out.json_string, "\"custom_meta\":7");
+ }
+
+ #[test]
+ fn test_from_arrow_with_options_null_first_for_nullable_primitive() {
+ let field = ArrowField::new("s", DataType::Utf8, true);
+ let arrow_schema = single_field_schema(field);
+ let opts = AvroSchemaOptions {
+ null_order: Some(Nullability::NullFirst),
+ strip_metadata: true,
+ };
+ let out = AvroSchema::from_arrow_with_options(&arrow_schema,
Some(opts)).unwrap();
+ let v: Value = serde_json::from_str(&out.json_string).unwrap();
+ let arr = v["fields"][0]["type"]
+ .as_array()
+ .expect("nullable primitive should be Avro union array");
+ assert_eq!(arr[0], Value::String("null".into()));
+ assert_eq!(arr[1], Value::String("string".into()));
+ }
+
+ #[test]
+ fn test_from_arrow_with_options_null_second_for_nullable_primitive() {
+ let field = ArrowField::new("s", DataType::Utf8, true);
+ let arrow_schema = single_field_schema(field);
+ let opts = AvroSchemaOptions {
+ null_order: Some(Nullability::NullSecond),
+ strip_metadata: true,
+ };
+ let out = AvroSchema::from_arrow_with_options(&arrow_schema,
Some(opts)).unwrap();
+ let v: Value = serde_json::from_str(&out.json_string).unwrap();
+ let arr = v["fields"][0]["type"]
+ .as_array()
+ .expect("nullable primitive should be Avro union array");
+ assert_eq!(arr[0], Value::String("string".into()));
+ assert_eq!(arr[1], Value::String("null".into()));
+ }
+
+ #[test]
+ fn test_from_arrow_with_options_union_extras_respected_by_strip_metadata()
{
+ let uf: UnionFields = vec![
+ (2i8, Arc::new(ArrowField::new("a", DataType::Int32, false))),
+ (7i8, Arc::new(ArrowField::new("b", DataType::Utf8, false))),
+ ]
+ .into_iter()
+ .collect();
+ let union_dt = DataType::Union(uf, UnionMode::Dense);
+ let arrow_schema = single_field_schema(ArrowField::new("u", union_dt,
true));
+ let with_extras = AvroSchema::from_arrow_with_options(
+ &arrow_schema,
+ Some(AvroSchemaOptions {
+ null_order: Some(Nullability::NullFirst),
+ strip_metadata: false,
+ }),
+ )
+ .unwrap();
+ let v_with: Value =
serde_json::from_str(&with_extras.json_string).unwrap();
+ let union_arr = v_with["fields"][0]["type"].as_array().expect("union
array");
+ let first_obj = union_arr
+ .iter()
+ .find(|b| b.is_object())
+ .expect("expected an object branch with extras");
+ let obj = first_obj.as_object().unwrap();
+ assert_eq!(obj.get("type").and_then(|t| t.as_str()), Some("int"));
+ assert_eq!(
+ obj.get("arrowUnionMode").and_then(|m| m.as_str()),
+ Some("dense")
+ );
+ let type_ids: Vec<i64> = obj["arrowUnionTypeIds"]
+ .as_array()
+ .expect("arrowUnionTypeIds array")
+ .iter()
+ .map(|n| n.as_i64().expect("i64"))
+ .collect();
+ assert_eq!(type_ids, vec![2, 7]);
+ let stripped = AvroSchema::from_arrow_with_options(
+ &arrow_schema,
+ Some(AvroSchemaOptions {
+ null_order: Some(Nullability::NullFirst),
+ strip_metadata: true,
+ }),
+ )
+ .unwrap();
+ let v_stripped: Value =
serde_json::from_str(&stripped.json_string).unwrap();
+ let union_arr2 = v_stripped["fields"][0]["type"]
+ .as_array()
+ .expect("union array");
+ assert!(
+ !union_arr2.iter().any(|b| b
+ .as_object()
+ .is_some_and(|m| m.contains_key("arrowUnionMode"))),
+ "extras must be removed when strip_metadata=true"
+ );
+ assert_eq!(union_arr2[0], Value::String("null".into()));
+ assert_eq!(union_arr2[1], Value::String("int".into()));
+ assert_eq!(union_arr2[2], Value::String("string".into()));
+ }
}
diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 5ddb798846..79aee4fae0 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -20,7 +20,6 @@
use crate::codec::{AvroDataType, AvroField, Codec};
use crate::schema::{Fingerprint, Nullability, Prefix};
use arrow_array::cast::AsArray;
-use arrow_array::types::RunEndIndexType;
use arrow_array::types::{
ArrowPrimitiveType, Date32Type, DurationMicrosecondType,
DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type,
Int16Type, Int32Type,
@@ -28,10 +27,15 @@ use arrow_array::types::{
Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
TimestampMillisecondType,
};
+use arrow_array::types::{
+ RunEndIndexType, Time32SecondType, TimestampNanosecondType,
TimestampSecondType,
+};
use arrow_array::{
- Array, Decimal128Array, Decimal256Array, DictionaryArray,
FixedSizeBinaryArray,
- GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray,
ListArray, MapArray,
- OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray,
StructArray, UnionArray,
+ Array, BinaryViewArray, Decimal128Array, Decimal256Array, DictionaryArray,
+ FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray,
GenericListArray,
+ GenericListViewArray, GenericStringArray, LargeListArray,
LargeListViewArray, ListArray,
+ ListViewArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
RunArray, StringArray,
+ StringViewArray, StructArray, UnionArray,
};
#[cfg(feature = "small_decimals")]
use arrow_array::{Decimal32Array, Decimal64Array};
@@ -237,15 +241,72 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeUtf8 => {
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
}
+ DataType::Utf8View => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StringViewArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
StringViewArray".into())
+ })?;
+ Encoder::Utf8View(Utf8ViewEncoder(arr))
+ }
+ DataType::BinaryView => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<BinaryViewArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
BinaryViewArray".into())
+ })?;
+ Encoder::BinaryView(BinaryViewEncoder(arr))
+ }
DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
DataType::Date32 =>
Encoder::Date32(IntEncoder(array.as_primitive::<Date32Type>())),
+ DataType::Date64 => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro logical type 'date' is days since epoch (int).
Arrow Date64 (ms) has no direct Avro logical type; cast to Date32 or to a
Timestamp."
+ .into(),
+ ));
+ }
+ DataType::Time32(TimeUnit::Second) =>
Encoder::Time32SecsToMillis(
+
Time32SecondsToMillisEncoder(array.as_primitive::<Time32SecondType>()),
+ ),
DataType::Time32(TimeUnit::Millisecond) => {
Encoder::Time32Millis(IntEncoder(array.as_primitive::<Time32MillisecondType>()))
}
+ DataType::Time32(TimeUnit::Microsecond) => {
+ return Err(ArrowError::InvalidArgumentError(
+ "Arrow Time32 only supports Second or Millisecond. Use
Time64 for microseconds."
+ .into(),
+ ));
+ }
+ DataType::Time32(TimeUnit::Nanosecond) => {
+ return Err(ArrowError::InvalidArgumentError(
+ "Arrow Time32 only supports Second or Millisecond. Use
Time64 for nanoseconds."
+ .into(),
+ ));
+ }
DataType::Time64(TimeUnit::Microsecond) =>
Encoder::Time64Micros(LongEncoder(
array.as_primitive::<Time64MicrosecondType>(),
)),
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer does not support time-nanos; cast to
Time64(Microsecond)."
+ .into(),
+ ));
+ }
+ DataType::Time64(TimeUnit::Millisecond) => {
+ return Err(ArrowError::InvalidArgumentError(
+ "Arrow Time64 with millisecond unit is not a valid
Arrow type (use Time32 for millis)."
+ .into(),
+ ));
+ }
+ DataType::Time64(TimeUnit::Second) => {
+ return Err(ArrowError::InvalidArgumentError(
+ "Arrow Time64 with second unit is not a valid Arrow
type (use Time32 for seconds)."
+ .into(),
+ ));
+ }
DataType::Float32 => {
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
}
@@ -266,17 +327,20 @@ impl<'a> FieldEncoder<'a> {
Encoder::Fixed(FixedEncoder(arr))
}
DataType::Timestamp(unit, _) => match unit {
+ TimeUnit::Second => {
+
Encoder::TimestampSecsToMillis(TimestampSecondsToMillisEncoder(
+ array.as_primitive::<TimestampSecondType>(),
+ ))
+ }
TimeUnit::Millisecond =>
Encoder::TimestampMillis(LongEncoder(
array.as_primitive::<TimestampMillisecondType>(),
)),
- TimeUnit::Microsecond => Encoder::Timestamp(LongEncoder(
+ TimeUnit::Microsecond =>
Encoder::TimestampMicros(LongEncoder(
array.as_primitive::<TimestampMicrosecondType>(),
)),
- other => {
- return Err(ArrowError::NotYetImplemented(format!(
- "Avro writer does not support Timestamp with unit
{other:?}"
- )));
- }
+ TimeUnit::Nanosecond =>
Encoder::TimestampNanos(LongEncoder(
+ array.as_primitive::<TimestampNanosecondType>(),
+ )),
},
DataType::Interval(unit) => match unit {
IntervalUnit::MonthDayNano =>
Encoder::IntervalMonthDayNano(DurationEncoder(
@@ -342,9 +406,46 @@ impl<'a> FieldEncoder<'a> {
item_plan.as_ref(),
)?))
}
+ DataType::ListView(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListViewArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListViewArray".into()))?;
+ Encoder::ListView(Box::new(ListViewEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeListView(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListViewArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
LargeListViewArray".into())
+ })?;
+ Encoder::LargeListView(Box::new(ListViewEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::FixedSizeList(_, _) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
FixedSizeListArray".into())
+ })?;
+
Encoder::FixedSizeList(Box::new(FixedSizeListEncoder::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
other => {
return Err(ArrowError::SchemaError(format!(
- "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ "Avro array site requires Arrow
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
)));
}
},
@@ -845,16 +946,19 @@ impl FieldPlan {
Ok(FieldPlan::Struct { bindings })
}
Codec::List(items_dt) => match arrow_field.data_type() {
- DataType::List(field_ref) => Ok(FieldPlan::List {
+ DataType::List(field_ref)
+ | DataType::LargeList(field_ref)
+ | DataType::ListView(field_ref)
+ | DataType::LargeListView(field_ref) => Ok(FieldPlan::List {
items_nullability: items_dt.nullability(),
item_plan: Box::new(FieldPlan::build(items_dt.as_ref(),
field_ref.as_ref())?),
}),
- DataType::LargeList(field_ref) => Ok(FieldPlan::List {
+ DataType::FixedSizeList(field_ref, _len) => Ok(FieldPlan::List
{
items_nullability: items_dt.nullability(),
item_plan: Box::new(FieldPlan::build(items_dt.as_ref(),
field_ref.as_ref())?),
}),
other => Err(ArrowError::SchemaError(format!(
- "Avro array maps to Arrow List/LargeList, found: {other:?}"
+ "Avro array maps to Arrow
List/LargeList/ListView/LargeListView/FixedSizeList, found: {other:?}"
))),
},
Codec::Map(values_dt) => {
@@ -1003,9 +1107,12 @@ enum Encoder<'a> {
Boolean(BooleanEncoder<'a>),
Int(IntEncoder<'a, Int32Type>),
Long(LongEncoder<'a, Int64Type>),
- Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
+ TimestampMicros(LongEncoder<'a, TimestampMicrosecondType>),
TimestampMillis(LongEncoder<'a, TimestampMillisecondType>),
+ TimestampNanos(LongEncoder<'a, TimestampNanosecondType>),
+ TimestampSecsToMillis(TimestampSecondsToMillisEncoder<'a>),
Date32(IntEncoder<'a, Date32Type>),
+ Time32SecsToMillis(Time32SecondsToMillisEncoder<'a>),
Time32Millis(IntEncoder<'a, Time32MillisecondType>),
Time64Micros(LongEncoder<'a, Time64MicrosecondType>),
DurationSeconds(LongEncoder<'a, DurationSecondType>),
@@ -1018,8 +1125,13 @@ enum Encoder<'a> {
LargeBinary(BinaryEncoder<'a, i64>),
Utf8(Utf8Encoder<'a>),
Utf8Large(Utf8LargeEncoder<'a>),
+ Utf8View(Utf8ViewEncoder<'a>),
+ BinaryView(BinaryViewEncoder<'a>),
List(Box<ListEncoder32<'a>>),
LargeList(Box<ListEncoder64<'a>>),
+ ListView(Box<ListViewEncoder32<'a>>),
+ LargeListView(Box<ListViewEncoder64<'a>>),
+ FixedSizeList(Box<FixedSizeListEncoder<'a>>),
Struct(Box<StructEncoder<'a>>),
/// Avro `fixed` encoder (raw bytes, no length)
Fixed(FixedEncoder<'a>),
@@ -1055,9 +1167,12 @@ impl<'a> Encoder<'a> {
Encoder::Boolean(e) => e.encode(out, idx),
Encoder::Int(e) => e.encode(out, idx),
Encoder::Long(e) => e.encode(out, idx),
- Encoder::Timestamp(e) => e.encode(out, idx),
+ Encoder::TimestampMicros(e) => e.encode(out, idx),
Encoder::TimestampMillis(e) => e.encode(out, idx),
+ Encoder::TimestampNanos(e) => e.encode(out, idx),
+ Encoder::TimestampSecsToMillis(e) => e.encode(out, idx),
Encoder::Date32(e) => e.encode(out, idx),
+ Encoder::Time32SecsToMillis(e) => e.encode(out, idx),
Encoder::Time32Millis(e) => e.encode(out, idx),
Encoder::Time64Micros(e) => e.encode(out, idx),
Encoder::DurationSeconds(e) => e.encode(out, idx),
@@ -1070,8 +1185,13 @@ impl<'a> Encoder<'a> {
Encoder::LargeBinary(e) => e.encode(out, idx),
Encoder::Utf8(e) => e.encode(out, idx),
Encoder::Utf8Large(e) => e.encode(out, idx),
+ Encoder::Utf8View(e) => e.encode(out, idx),
+ Encoder::BinaryView(e) => e.encode(out, idx),
Encoder::List(e) => e.encode(out, idx),
Encoder::LargeList(e) => e.encode(out, idx),
+ Encoder::ListView(e) => e.encode(out, idx),
+ Encoder::LargeListView(e) => e.encode(out, idx),
+ Encoder::FixedSizeList(e) => e.encode(out, idx),
Encoder::Struct(e) => e.encode(out, idx),
Encoder::Fixed(e) => (e).encode(out, idx),
Encoder::Uuid(e) => (e).encode(out, idx),
@@ -1118,6 +1238,32 @@ impl<'a, P: ArrowPrimitiveType<Native = i64>>
LongEncoder<'a, P> {
}
}
+/// Time32(Second) to Avro time-millis (int), via safe scaling by 1000
+struct Time32SecondsToMillisEncoder<'a>(&'a PrimitiveArray<Time32SecondType>);
+impl<'a> Time32SecondsToMillisEncoder<'a> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let secs = self.0.value(idx);
+ let millis = secs.checked_mul(1000).ok_or_else(|| {
+ ArrowError::InvalidArgumentError("time32(secs) * 1000
overflowed".into())
+ })?;
+ write_int(out, millis)
+ }
+}
+
+/// Timestamp(Second) to Avro timestamp-millis (long), via safe scaling by 1000
+struct TimestampSecondsToMillisEncoder<'a>(&'a
PrimitiveArray<TimestampSecondType>);
+impl<'a> TimestampSecondsToMillisEncoder<'a> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let secs = self.0.value(idx);
+ let millis = secs.checked_mul(1000).ok_or_else(|| {
+ ArrowError::InvalidArgumentError("timestamp(secs) * 1000
overflowed".into())
+ })?;
+ write_long(out, millis)
+ }
+}
+
/// Unified binary encoder generic over offset size (i32/i64).
struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
@@ -1126,6 +1272,22 @@ impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
}
}
+/// BinaryView (byte view) encoder.
+struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
+impl BinaryViewEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx))
+ }
+}
+
+/// StringView encoder.
+struct Utf8ViewEncoder<'a>(&'a StringViewArray);
+impl Utf8ViewEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
struct F32Encoder<'a>(&'a arrow_array::Float32Array);
impl F32Encoder<'_> {
fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
@@ -1470,6 +1632,109 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
}
}
+/// ListView encoder using `(offset, size)` buffers.
+struct ListViewEncoder<'a, O: OffsetSizeTrait> {
+ list: &'a GenericListViewArray<O>,
+ values: FieldEncoder<'a>,
+ values_offset: usize,
+}
+type ListViewEncoder32<'a> = ListViewEncoder<'a, i32>;
+type ListViewEncoder64<'a> = ListViewEncoder<'a, i64>;
+
+impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
+ fn try_new(
+ list: &'a GenericListViewArray<O>,
+ items_nullability: Option<Nullability>,
+ item_plan: &FieldPlan,
+ ) -> Result<Self, ArrowError> {
+ let child_field = match list.data_type() {
+ DataType::ListView(field) => field.as_ref(),
+ DataType::LargeListView(field) => field.as_ref(),
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Expected ListView or LargeListView for
ListViewEncoder".into(),
+ ));
+ }
+ };
+ let values_enc = prepare_value_site_encoder(
+ list.values().as_ref(),
+ child_field,
+ items_nullability,
+ item_plan,
+ )?;
+ Ok(Self {
+ list,
+ values: values_enc,
+ values_offset: list.values().offset(),
+ })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let start = self.list.value_offset(idx).to_usize().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Error converting value_offset[{idx}] to usize"
+ ))
+ })?;
+ let len = self.list.value_size(idx).to_usize().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!("Error converting
value_size[{idx}] to usize"))
+ })?;
+ let start = start + self.values_offset;
+ let end = start + len;
+ encode_blocked_range(out, start, end, |out, row| {
+ self.values
+ .encode(out, row.saturating_sub(self.values_offset))
+ })
+ }
+}
+
+/// FixedSizeList encoder.
+struct FixedSizeListEncoder<'a> {
+ list: &'a FixedSizeListArray,
+ values: FieldEncoder<'a>,
+ values_offset: usize,
+ elem_len: usize,
+}
+
+impl<'a> FixedSizeListEncoder<'a> {
+ fn try_new(
+ list: &'a FixedSizeListArray,
+ items_nullability: Option<Nullability>,
+ item_plan: &FieldPlan,
+ ) -> Result<Self, ArrowError> {
+ let child_field = match list.data_type() {
+ DataType::FixedSizeList(field, _len) => field.as_ref(),
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Expected FixedSizeList for FixedSizeListEncoder".into(),
+ ));
+ }
+ };
+ let values_enc = prepare_value_site_encoder(
+ list.values().as_ref(),
+ child_field,
+ items_nullability,
+ item_plan,
+ )?;
+ Ok(Self {
+ list,
+ values: values_enc,
+ values_offset: list.values().offset(),
+ elem_len: list.value_length() as usize,
+ })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ // Starting index is relative to values() start
+ let rel = self.list.value_offset(idx) as usize;
+ let start = self.values_offset + rel;
+ let end = start + self.elem_len;
+ encode_blocked_range(out, start, end, |out, row| {
+ self.values
+ .encode(out, row.saturating_sub(self.values_offset))
+ })
+ }
+}
+
fn prepare_value_site_encoder<'a>(
values_array: &'a dyn Array,
value_field: &Field,
@@ -1491,7 +1756,7 @@ impl FixedEncoder<'_> {
}
}
-/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string
(UUID).
+/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) to Avro string
(UUID).
/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated
form.
struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
impl UuidEncoder<'_> {
@@ -2614,4 +2879,226 @@ mod tests {
_ => panic!("expected SchemaError"),
}
}
+
+ #[test]
+ fn timestamp_micros_encoder() {
+ // Mirrors the style used by `timestamp_millis_encoder`
+ test_scalar_primitive_encoding::<TimestampMicrosecondType>(
+ &[
+ 1_704_067_200_000_000, // 2024-01-01T00:00:00Z in micros
+ 0, // epoch
+ -123_456_789, // pre-epoch
+ ],
+ &[None, Some(1_704_067_200_000_000)],
+ );
+ }
+
+ #[test]
+ fn list_encoder_nullable_items_null_first() {
+ // One List row with three elements: [Some(1), None, Some(2)]
+ let values = Int32Array::from(vec![Some(1), None, Some(2)]);
+ let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 3].into());
+ let list = ListArray::new(
+ Field::new("item", DataType::Int32, true).into(),
+ offsets,
+ Arc::new(values) as ArrayRef,
+ None,
+ );
+
+ let plan = FieldPlan::List {
+ items_nullability: Some(Nullability::NullFirst),
+ item_plan: Box::new(FieldPlan::Scalar),
+ };
+
+ // Avro array encoding per row: one positive block, then 0 terminator.
+ // For NullFirst: Some(v) => branch 1 (0x02) then the value; None =>
branch 0 (0x00)
+ let mut expected = Vec::new();
+ expected.extend(avro_long_bytes(3)); // block of 3
+ expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+ expected.extend(avro_long_bytes(1)); // value 1
+ expected.extend(avro_long_bytes(0)); // union branch=0 (null)
+ expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+ expected.extend(avro_long_bytes(2)); // value 2
+ expected.extend(avro_long_bytes(0)); // block terminator
+
+ let got = encode_all(&list, &plan, None);
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn large_list_encoder_nullable_items_null_first() {
+ // LargeList single row: [Some(10), None]
+ let values = Int32Array::from(vec![Some(10), None]);
+ let offsets = arrow_buffer::OffsetBuffer::new(vec![0i64, 2].into());
+ let list = LargeListArray::new(
+ Field::new("item", DataType::Int32, true).into(),
+ offsets,
+ Arc::new(values) as ArrayRef,
+ None,
+ );
+
+ let plan = FieldPlan::List {
+ items_nullability: Some(Nullability::NullFirst),
+ item_plan: Box::new(FieldPlan::Scalar),
+ };
+
+ let mut expected = Vec::new();
+ expected.extend(avro_long_bytes(2)); // block of 2
+ expected.extend(avro_long_bytes(1)); // union branch=1 (value)
+ expected.extend(avro_long_bytes(10)); // value 10
+ expected.extend(avro_long_bytes(0)); // union branch=0 (null)
+ expected.extend(avro_long_bytes(0)); // block terminator
+
+ let got = encode_all(&list, &plan, None);
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn map_encoder_string_keys_nullable_int_values_null_first() {
+ // One map row: {"k1": Some(7), "k2": None}
+ let keys = StringArray::from(vec!["k1", "k2"]);
+ let values = Int32Array::from(vec![Some(7), None]);
+
+ let entries_fields = Fields::from(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Int32, true),
+ ]);
+ let entries = StructArray::new(
+ entries_fields,
+ vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
+ None,
+ );
+
+ // Single row -> offsets [0, 2]
+ let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2].into());
+ let map = MapArray::new(
+ Field::new("entries", entries.data_type().clone(), false).into(),
+ offsets,
+ entries,
+ None,
+ false,
+ );
+
+ let plan = FieldPlan::Map {
+ values_nullability: Some(Nullability::NullFirst),
+ value_plan: Box::new(FieldPlan::Scalar),
+ };
+
+ // Expected:
+ // - one positive block (len=2)
+ // - "k1", branch=1 + value=7
+ // - "k2", branch=0 (null)
+ // - end-of-block marker 0
+ let mut expected = Vec::new();
+ expected.extend(avro_long_bytes(2)); // block length 2
+ expected.extend(avro_len_prefixed_bytes(b"k1")); // key "k1"
+ expected.extend(avro_long_bytes(1)); // union branch 1 (value)
+ expected.extend(avro_long_bytes(7)); // value 7
+ expected.extend(avro_len_prefixed_bytes(b"k2")); // key "k2"
+ expected.extend(avro_long_bytes(0)); // union branch 0 (null)
+ expected.extend(avro_long_bytes(0)); // block terminator
+
+ let got = encode_all(&map, &plan, None);
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn time32_seconds_to_millis_encoder() {
+ // Time32(Second) must encode as Avro time-millis (ms since midnight).
+ let arr:
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
+ vec![0i32, 1, -2, 12_345].into();
+
+ let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+ let mut expected = Vec::new();
+ for secs in [0i32, 1, -2, 12_345] {
+ let millis = (secs as i64) * 1000;
+ expected.extend_from_slice(&avro_long_bytes(millis));
+ }
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn time32_seconds_to_millis_overflow() {
+ // Choose a value that will overflow i32 when multiplied by 1000.
+ let overflow_secs: i32 = i32::MAX / 1000 + 1;
+ let arr:
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
+ vec![overflow_secs].into();
+
+ let field = arrow_schema::Field::new(
+ "f",
+ arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
+ true,
+ );
+ let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+
+ let mut out = Vec::new();
+ let err = enc.encode(&mut out, 0).unwrap_err();
+ match err {
+ arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+ assert!(
+ msg.contains("overflowed") || msg.contains("overflow"),
+ "unexpected message: {msg}"
+ )
+ }
+ other => panic!("expected InvalidArgumentError, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn timestamp_seconds_to_millis_encoder() {
+ // Timestamp(Second) must encode as Avro timestamp-millis (ms since
epoch).
+ let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
+ vec![0i64, 1, -1, 1_234_567_890].into();
+
+ let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+ let mut expected = Vec::new();
+ for secs in [0i64, 1, -1, 1_234_567_890] {
+ let millis = secs * 1000;
+ expected.extend_from_slice(&avro_long_bytes(millis));
+ }
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn timestamp_seconds_to_millis_overflow() {
+ // Overflow i64 when multiplied by 1000.
+ let overflow_secs: i64 = i64::MAX / 1000 + 1;
+ let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
+ vec![overflow_secs].into();
+
+ let field = arrow_schema::Field::new(
+ "f",
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second,
None),
+ true,
+ );
+ let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
+
+ let mut out = Vec::new();
+ let err = enc.encode(&mut out, 0).unwrap_err();
+ match err {
+ arrow_schema::ArrowError::InvalidArgumentError(msg) => {
+ assert!(
+ msg.contains("overflowed") || msg.contains("overflow"),
+ "unexpected message: {msg}"
+ )
+ }
+ other => panic!("expected InvalidArgumentError, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn timestamp_nanos_encoder() {
+ let arr:
arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
+ vec![0i64, 1, -1, 123].into();
+
+ let got = encode_all(&arr, &FieldPlan::Scalar, None);
+
+ let mut expected = Vec::new();
+ for ns in [0i64, 1, -1, 123] {
+ expected.extend_from_slice(&avro_long_bytes(ns));
+ }
+ assert_bytes_eq(&got, &expected);
+ }
}
diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs
index f885fb86a6..07534c960a 100644
--- a/arrow-avro/src/writer/format.rs
+++ b/arrow-avro/src/writer/format.rs
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+//! Avro Writer Formats for Arrow.
+
use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
-use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+use crate::schema::{AvroSchema, AvroSchemaOptions, SCHEMA_METADATA_KEY};
use crate::writer::encoder::write_long;
use arrow_schema::{ArrowError, Schema};
use rand::RngCore;
@@ -63,7 +65,13 @@ impl AvroFormat for AvroOcfFormat {
// Choose the Avro schema JSON that the file will advertise.
// If `schema.metadata[SCHEMA_METADATA_KEY]` exists,
AvroSchema::try_from
// uses it verbatim; otherwise it is generated from the Arrow schema.
- let avro_schema = AvroSchema::try_from(schema)?;
+ let avro_schema = AvroSchema::from_arrow_with_options(
+ schema,
+ Some(AvroSchemaOptions {
+ null_order: None,
+ strip_metadata: true,
+ }),
+ )?;
// Magic
writer
.write_all(b"Obj\x01")
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index c1ec61c7fa..1d4699865b 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -395,17 +395,23 @@ mod tests {
use crate::reader::ReaderBuilder;
use crate::schema::{AvroSchema, SchemaStore};
use crate::test_util::arrow_test_data;
+ use arrow::datatypes::TimeUnit;
#[cfg(feature = "avro_custom_types")]
use arrow_array::types::{Int16Type, Int32Type, Int64Type};
+ use arrow_array::types::{
+ Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
+ TimestampMillisecondType, TimestampNanosecondType,
+ };
use arrow_array::{
- Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StructArray,
UnionArray,
+ Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray,
RecordBatch,
+ StructArray, UnionArray,
};
#[cfg(feature = "avro_custom_types")]
use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
#[cfg(not(feature = "avro_custom_types"))]
use arrow_schema::{DataType, Field, Schema};
#[cfg(feature = "avro_custom_types")]
- use arrow_schema::{DataType, Field, Schema, TimeUnit};
+ use arrow_schema::{DataType, Field, Schema};
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs::File;
@@ -758,8 +764,8 @@ mod tests {
}
// Strict equality (schema + values) only when canonical extension types
are enabled
- #[cfg(feature = "canonical_extension_types")]
#[test]
+ #[cfg(feature = "canonical_extension_types")]
fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
use arrow_schema::{DataType, IntervalUnit};
let in_file =
@@ -806,8 +812,8 @@ mod tests {
}
// Feature OFF: only values are asserted equal; schema may legitimately
differ (uuid as fixed(16))
- #[cfg(not(feature = "canonical_extension_types"))]
#[test]
+ #[cfg(not(feature = "canonical_extension_types"))]
fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() ->
Result<(), ArrowError>
{
use arrow::datatypes::{DataType, IntervalUnit};
@@ -869,8 +875,9 @@ mod tests {
let uuid_rt = rt_schema.field_with_name("uuid_field")?;
assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
assert_eq!(
- uuid_rt.metadata().get("avro.name").map(|s| s.as_str()),
- Some("uuid_field")
+ uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
+ Some("uuid"),
+ "expected `logicalType = \"uuid\"` on round-tripped field metadata"
);
// 3) Duration remains Interval(MonthDayNano)
@@ -1859,4 +1866,311 @@ mod tests {
assert_eq!(got, &expected);
Ok(())
}
+
+ #[test]
+ // TODO: avoid requiring snappy for this file
+ #[cfg(feature = "snappy")]
+ fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
+ let path = arrow_test_data("avro/nullable.impala.avro");
+ let rdr_file = File::open(&path).expect("open
avro/nullable.impala.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(rdr_file))
+ .expect("build reader for nullable.impala.avro");
+ let in_schema = reader.schema();
+ assert!(
+ in_schema.fields().iter().any(|f| f.is_nullable()),
+ "expected at least one nullable field in avro/nullable.impala.avro"
+ );
+ let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&input_batches).expect("concat input");
+ let buffer: Vec<u8> = Vec::new();
+ let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let out_bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(out_bytes))
+ .expect("build reader for round-tripped in-memory OCF");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let roundtrip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat roundtrip");
+ assert_eq!(
+ roundtrip, original,
+ "Round-trip Avro data mismatch for nullable.impala.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ #[cfg(feature = "snappy")]
+ fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
+ let path = arrow_test_data("avro/datapage_v2.snappy.avro");
+ let rdr_file = File::open(&path).expect("open
avro/datapage_v2.snappy.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(rdr_file))
+ .expect("build reader for datapage_v2.snappy.avro");
+ let in_schema = reader.schema();
+ let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&input_batches).expect("concat input");
+ let mut writer = AvroWriter::new(Vec::<u8>::new(),
in_schema.as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build round-trip reader");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let round_trip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat round_trip");
+ assert_eq!(
+ round_trip, original,
+ "Round-trip batch mismatch for datapage_v2.snappy.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ #[cfg(feature = "snappy")]
+ fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
+ let path = arrow_test_data("avro/single_nan.avro");
+ let in_file = File::open(&path).expect("open avro/single_nan.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(in_file))
+ .expect("build reader for single_nan.avro");
+ let in_schema = reader.schema();
+ let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&in_batches).expect("concat input");
+ let mut writer = AvroWriter::new(Vec::<u8>::new(),
original.schema().as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build round_trip reader");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let round_trip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat round_trip");
+ assert_eq!(
+ round_trip, original,
+ "Round-trip batch mismatch for avro/single_nan.avro"
+ );
+ Ok(())
+ }
+ #[test]
+ // TODO: avoid requiring snappy for this file
+ #[cfg(feature = "snappy")]
+ fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
+ let path = arrow_test_data("avro/dict-page-offset-zero.avro");
+ let rdr_file = File::open(&path).expect("open
avro/dict-page-offset-zero.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(rdr_file))
+ .expect("build reader for dict-page-offset-zero.avro");
+ let in_schema = reader.schema();
+ let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&input_batches).expect("concat input");
+ let buffer: Vec<u8> = Vec::new();
+ let mut writer = AvroWriter::new(buffer,
original.schema().as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build reader for round-trip");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let roundtrip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat roundtrip");
+ assert_eq!(
+ roundtrip, original,
+ "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ #[cfg(feature = "snappy")]
+ fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
+ let path = arrow_test_data("avro/repeated_no_annotation.avro");
+ let in_file = File::open(&path).expect("open
avro/repeated_no_annotation.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(in_file))
+ .expect("build reader for repeated_no_annotation.avro");
+ let in_schema = reader.schema();
+ let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&in_batches).expect("concat input");
+ let mut writer = AvroWriter::new(Vec::<u8>::new(),
original.schema().as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build reader for round-trip buffer");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let round_trip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat round-trip");
+ assert_eq!(
+ round_trip, original,
+ "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
+ let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+ .join("test/data/nested_record_reuse.avro")
+ .to_string_lossy()
+ .into_owned();
+ let in_file = File::open(&path).expect("open
avro/nested_record_reuse.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(in_file))
+ .expect("build reader for nested_record_reuse.avro");
+ let in_schema = reader.schema();
+ let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let input =
+ arrow::compute::concat_batches(&in_schema,
&input_batches).expect("concat input");
+ let mut writer = AvroWriter::new(Vec::<u8>::new(),
in_schema.as_ref().clone())?;
+ writer.write(&input)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build round_trip reader");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let round_trip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat round_trip");
+ assert_eq!(
+ round_trip, input,
+ "Round-trip batch mismatch for nested_record_reuse.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
+ let path =
+
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
+ let rdr_file = std::fs::File::open(&path).expect("open
test/data/enum_reuse.avro");
+ let reader = ReaderBuilder::new()
+ .build(std::io::BufReader::new(rdr_file))
+ .expect("build reader for enum_reuse.avro");
+ let in_schema = reader.schema();
+ let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&input_batches).expect("concat input");
+ let mut writer = AvroWriter::new(Vec::<u8>::new(),
original.schema().as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(std::io::Cursor::new(bytes))
+ .expect("build round_trip reader");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let round_trip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat round_trip");
+ assert_eq!(
+ round_trip, original,
+ "Avro enum type reuse round-trip mismatch"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
+ let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+ .join("test/data/comprehensive_e2e.avro");
+ let rdr_file = File::open(&path).expect("open
test/data/comprehensive_e2e.avro");
+ let reader = ReaderBuilder::new()
+ .build(BufReader::new(rdr_file))
+ .expect("build reader for comprehensive_e2e.avro");
+ let in_schema = reader.schema();
+ let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
+ let original =
+ arrow::compute::concat_batches(&in_schema,
&in_batches).expect("concat input");
+ let sink: Vec<u8> = Vec::new();
+ let mut writer = AvroWriter::new(sink,
original.schema().as_ref().clone())?;
+ writer.write(&original)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(Cursor::new(bytes))
+ .expect("build round-trip reader");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let roundtrip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat roundtrip");
+ assert_eq!(
+ roundtrip, original,
+ "Round-trip batch mismatch for comprehensive_e2e.avro"
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
+ let schema = Schema::new(vec![
+ Field::new("d32", DataType::Date32, false),
+ Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond),
false),
+ Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond),
false),
+ Field::new(
+ "ts_ms",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new(
+ "ts_us",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "ts_ns",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ false,
+ ),
+ ]);
+ let d32 = Date32Array::from(vec![0, 1, -1]);
+ let t32_ms: PrimitiveArray<Time32MillisecondType> =
+ vec![0_i32, 12_345_i32, 86_399_999_i32].into();
+ let t64_us: PrimitiveArray<Time64MicrosecondType> =
+ vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
+ let ts_ms: PrimitiveArray<TimestampMillisecondType> =
+ vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
+ let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64,
1_i64, -1_i64].into();
+ let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64,
1_i64, -1_i64].into();
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(d32) as ArrayRef,
+ Arc::new(t32_ms) as ArrayRef,
+ Arc::new(t64_us) as ArrayRef,
+ Arc::new(ts_ms) as ArrayRef,
+ Arc::new(ts_us) as ArrayRef,
+ Arc::new(ts_ns) as ArrayRef,
+ ],
+ )?;
+ let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+ writer.write(&batch)?;
+ writer.finish()?;
+ let bytes = writer.into_inner();
+ let rt_reader = ReaderBuilder::new()
+ .build(std::io::Cursor::new(bytes))
+ .expect("build reader for round-trip of new time encoders");
+ let rt_schema = rt_reader.schema();
+ let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
+ let roundtrip =
+ arrow::compute::concat_batches(&rt_schema,
&rt_batches).expect("concat roundtrip");
+ assert_eq!(roundtrip, batch);
+ Ok(())
+ }
}
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index e8f367143d..8c113cba86 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -46,6 +46,8 @@ pub enum ArrowError {
CsvError(String),
/// Error during JSON-related operations.
JsonError(String),
+ /// Error during Avro-related operations.
+ AvroError(String),
/// Error during IO operations.
IoError(String, std::io::Error),
/// Error during IPC operations in `arrow-ipc` or `arrow-flight`.
@@ -109,6 +111,7 @@ impl Display for ArrowError {
ArrowError::ComputeError(desc) => write!(f, "Compute error:
{desc}"),
ArrowError::ArithmeticOverflow(desc) => write!(f, "Arithmetic
overflow: {desc}"),
ArrowError::DivideByZero => write!(f, "Divide by zero error"),
+ ArrowError::AvroError(desc) => write!(f, "Avro error: {desc}"),
ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),