This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a776837c94 Implement ArrowSchema to AvroSchema conversion logic in
arrow-avro (#8075)
a776837c94 is described below
commit a776837c94a672bebab9dab466c155cc14c98436
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Aug 12 12:18:49 2025 -0500
Implement ArrowSchema to AvroSchema conversion logic in arrow-avro (#8075)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
# Rationale for this change
This change introduces functionality to convert an `ArrowSchema` into an
`AvroSchema`. This is a crucial feature for improving interoperability
between Arrow and Avro. By enabling direct schema conversion, we
simplify schema evolution support by creating `AvroSchema` instances
directly from an arrow-rs `Schema`. Additionally, these updates are
foundational for the upcoming `arrow-avro` `Writer`.
# What changes are included in this PR?
- **`TryFrom<&ArrowSchema> for AvroSchema`**: The core of this PR is the
implementation of the `TryFrom` trait to allow a fallible conversion
from an `ArrowSchema` reference to a new `AvroSchema`.
- **Type Mapping Logic**: Added comprehensive logic to map Arrow
`DataType` variants to their corresponding Avro type representations.
This includes:
- Primitive types (`Boolean`, `Int`, `Float`, `Binary`, `Utf8`).
- Logical types (e.g., `Timestamp`, `Date`, `Decimal`, `UUID`).
- Complex types (`Struct`, `List`, `Map`, `Dictionary`). Dictionaries
are converted to Avro `enum` types.
- **Name Sanitization**: Implemented a `NameGenerator` to ensure that
field names derived from the `ArrowSchema` are valid according to Avro
naming conventions and are unique within their scope.
- **Metadata Handling**: The conversion preserves relevant metadata from
the Arrow schema.
- `arrow-avro` metadata constants to simplify working with Avro metadata
in Arrow DataTypes.
# Are these changes tested?
Yes, this change is accompanied by new tests in `schema.rs`. The tests
cover:
- Correct mapping of all supported primitive, temporal, and logical
types.
- Conversion of complex and nested structures like `Struct`, `List`, and
`Map`.
- Proper handling of dictionary-encoded fields to Avro enums.
- Validation of name sanitization logic.
- Round-trip conversion tests for various data types to ensure
correctness.
# Are there any user-facing changes?
N/A
---
arrow-avro/src/codec.rs | 9 +-
arrow-avro/src/reader/mod.rs | 8 +-
arrow-avro/src/schema.rs | 655 ++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 660 insertions(+), 12 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index dcd3984501..a10e3a238d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::schema::{Attributes, AvroSchema, ComplexType, PrimitiveType,
Record, Schema, TypeName};
+use crate::schema::{
+ Attributes, AvroSchema, ComplexType, PrimitiveType, Record, Schema,
TypeName,
+ AVRO_ENUM_SYMBOLS_METADATA_KEY,
+};
use arrow_schema::{
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit,
DECIMAL128_MAX_PRECISION,
DECIMAL128_MAX_SCALE,
@@ -623,7 +626,7 @@ fn make_data_type<'a>(
let symbols_json =
serde_json::to_string(&e.symbols).map_err(|e| {
ArrowError::ParseError(format!("Failed to serialize enum
symbols: {e}"))
})?;
- metadata.insert("avro.enum.symbols".to_string(), symbols_json);
+ metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
symbols_json);
let field = AvroDataType {
nullability: None,
metadata,
@@ -780,11 +783,9 @@ mod tests {
#[test]
fn test_uuid_type() {
let mut codec = Codec::Fixed(16);
-
if let c @ Codec::Fixed(16) = &mut codec {
*c = Codec::Uuid;
}
-
assert!(matches!(codec, Codec::Uuid));
}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index e9bf7af61e..1f741d6d53 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -595,7 +595,7 @@ mod test {
use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
use crate::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema
as AvroRaw,
- SchemaStore, SINGLE_OBJECT_MAGIC,
+ SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, SINGLE_OBJECT_MAGIC,
};
use crate::test_util::arrow_test_data;
use arrow::array::ArrayDataBuilder;
@@ -1420,19 +1420,19 @@ mod test {
DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
let mut md_f1 = HashMap::new();
md_f1.insert(
- "avro.enum.symbols".to_string(),
+ AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
r#"["a","b","c","d"]"#.to_string(),
);
let f1_field = Field::new("f1", dict_type.clone(),
false).with_metadata(md_f1);
let mut md_f2 = HashMap::new();
md_f2.insert(
- "avro.enum.symbols".to_string(),
+ AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
r#"["e","f","g","h"]"#.to_string(),
);
let f2_field = Field::new("f2", dict_type.clone(),
false).with_metadata(md_f2);
let mut md_f3 = HashMap::new();
md_f3.insert(
- "avro.enum.symbols".to_string(),
+ AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
r#"["i","j","k"]"#.to_string(),
);
let f3_field = Field::new("f3", dict_type.clone(),
true).with_metadata(md_f3);
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 539e7b02f3..2f1c0a2bcf 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_schema::ArrowError;
+use arrow_schema::{
+ ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as
ArrowSchema, TimeUnit,
+};
use serde::{Deserialize, Serialize};
-use serde_json::{json, Value};
+use serde_json::{json, Map as JsonMap, Value};
use std::cmp::PartialEq;
use std::collections::hash_map::Entry;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use strum_macros::AsRefStr;
/// The metadata key used for storing the JSON encoded [`Schema`]
@@ -29,6 +31,21 @@ pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
+/// Metadata key used to represent Avro enum symbols in an Arrow schema.
+pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
+
+/// Metadata key used to store the default value of a field in an Avro schema.
+pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
+
+/// Metadata key used to store the name of a type in an Avro schema.
+pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
+
+/// Metadata key used to store the name of a type in an Avro schema.
+pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
+
+/// Metadata key used to store the documentation for a type in an Avro schema.
+pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
+
/// Compare two Avro schemas for equality (identical schemas).
/// Returns true if the schemas have the same parsing canonical form (i.e.,
logically identical).
pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool,
ArrowError> {
@@ -284,6 +301,57 @@ pub struct AvroSchema {
pub json_string: String,
}
+impl TryFrom<&ArrowSchema> for AvroSchema {
+ type Error = ArrowError;
+
+ fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
+ // Fast‑path: schema already contains Avro JSON
+ if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+ return Ok(AvroSchema::new(json.clone()));
+ }
+ let mut name_gen = NameGenerator::default();
+ let fields_json = schema
+ .fields()
+ .iter()
+ .map(|f| arrow_field_to_avro(f, &mut name_gen))
+ .collect::<Result<Vec<_>, _>>()?;
+ // Assemble top‑level record
+ let record_name = schema
+ .metadata
+ .get(AVRO_NAME_METADATA_KEY)
+ .map_or("topLevelRecord", |s| s.as_str());
+ let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
+ record.insert("type".into(), Value::String("record".into()));
+ record.insert(
+ "name".into(),
+ Value::String(sanitise_avro_name(record_name)),
+ );
+ if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
+ record.insert("namespace".into(), Value::String(ns.clone()));
+ }
+ if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
+ record.insert("doc".into(), Value::String(doc.clone()));
+ }
+ record.insert("fields".into(), Value::Array(fields_json));
+ let schema_prefix = format!("{SCHEMA_METADATA_KEY}.");
+ for (meta_key, meta_val) in &schema.metadata {
+ // Skip keys already handled or internal
+ if meta_key.starts_with("avro.")
+ || meta_key.starts_with(schema_prefix.as_str())
+ || is_internal_arrow_key(meta_key)
+ {
+ continue;
+ }
+ let json_val =
+ serde_json::from_str(meta_val).unwrap_or_else(|_|
Value::String(meta_val.clone()));
+ record.insert(meta_key.clone(), json_val);
+ }
+ let json_string = serde_json::to_string(&Value::Object(record))
+ .map_err(|e| ArrowError::SchemaError(format!("Serialising Avro
JSON failed: {e}")))?;
+ Ok(AvroSchema::new(json_string))
+ }
+}
+
impl AvroSchema {
/// Creates a new `AvroSchema` from a JSON string.
pub fn new(json_string: String) -> Self {
@@ -647,12 +715,336 @@ pub(crate) fn compute_fingerprint_rabin(canonical_form:
&str) -> u64 {
fp
}
+#[inline]
+fn is_internal_arrow_key(key: &str) -> bool {
+ key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
+}
+
+// Sanitize an arbitrary string so it is a valid Avro field or type name
+fn sanitise_avro_name(base_name: &str) -> String {
+ if base_name.is_empty() {
+ return "_".to_owned();
+ }
+ let mut out: String = base_name
+ .chars()
+ .map(|char| {
+ if char.is_ascii_alphanumeric() || char == '_' {
+ char
+ } else {
+ '_'
+ }
+ })
+ .collect();
+ if out.as_bytes()[0].is_ascii_digit() {
+ out.insert(0, '_');
+ }
+ out
+}
+
+#[derive(Default)]
+struct NameGenerator {
+ used: HashSet<String>,
+ counters: HashMap<String, usize>,
+}
+
+impl NameGenerator {
+ fn make_unique(&mut self, field_name: &str) -> String {
+ let field_name = sanitise_avro_name(field_name);
+ if self.used.insert(field_name.clone()) {
+ self.counters.insert(field_name.clone(), 1);
+ return field_name;
+ }
+ let counter = self.counters.entry(field_name.clone()).or_insert(1);
+ loop {
+ let candidate = format!("{field_name}_{}", *counter);
+ if self.used.insert(candidate.clone()) {
+ return candidate;
+ }
+ *counter += 1;
+ }
+ }
+}
+
+fn merge_extras(schema: Value, mut extras: JsonMap<String, Value>) -> Value {
+ if extras.is_empty() {
+ return schema;
+ }
+ match schema {
+ Value::Object(mut map) => {
+ map.extend(extras);
+ Value::Object(map)
+ }
+ Value::Array(mut union) => {
+ if let Some(non_null) = union.iter_mut().find(|val| val.as_str()
!= Some("null")) {
+ let original = std::mem::take(non_null);
+ *non_null = merge_extras(original, extras);
+ }
+ Value::Array(union)
+ }
+ primitive => {
+ let mut map = JsonMap::with_capacity(extras.len() + 1);
+ map.insert("type".into(), primitive);
+ map.extend(extras);
+ Value::Object(map)
+ }
+ }
+}
+
+// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn datatype_to_avro(
+ dt: &DataType,
+ field_name: &str,
+ metadata: &HashMap<String, String>,
+ name_gen: &mut NameGenerator,
+) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
+ let mut extras = JsonMap::new();
+ let val = match dt {
+ DataType::Null => Value::String("null".into()),
+ DataType::Boolean => Value::String("boolean".into()),
+ DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16
| DataType::Int32 => {
+ Value::String("int".into())
+ }
+ DataType::UInt32 | DataType::Int64 | DataType::UInt64 =>
Value::String("long".into()),
+ DataType::Float16 | DataType::Float32 => Value::String("float".into()),
+ DataType::Float64 => Value::String("double".into()),
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View =>
Value::String("string".into()),
+ DataType::Binary | DataType::LargeBinary =>
Value::String("bytes".into()),
+ DataType::FixedSizeBinary(len) => {
+ let is_uuid = metadata
+ .get("logicalType")
+ .is_some_and(|value| value == "uuid")
+ || (*len == 16
+ && metadata
+ .get("ARROW:extension:name")
+ .is_some_and(|value| value == "uuid"));
+ if is_uuid {
+ json!({ "type": "string", "logicalType": "uuid" })
+ } else {
+ json!({
+ "type": "fixed",
+ "name": name_gen.make_unique(field_name),
+ "size": len
+ })
+ }
+ }
+ DataType::Decimal128(precision, scale) |
DataType::Decimal256(precision, scale) => {
+ // Prefer fixed if original size info present
+ let mut meta = JsonMap::from_iter([
+ ("logicalType".into(), json!("decimal")),
+ ("precision".into(), json!(*precision)),
+ ("scale".into(), json!(*scale)),
+ ]);
+ if let Some(size) = metadata
+ .get("size")
+ .and_then(|val| val.parse::<usize>().ok())
+ {
+ meta.insert("type".into(), json!("fixed"));
+ meta.insert("size".into(), json!(size));
+ meta.insert("name".into(),
json!(name_gen.make_unique(field_name)));
+ } else {
+ meta.insert("type".into(), json!("bytes"));
+ }
+ Value::Object(meta)
+ }
+ DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
+ DataType::Date64 => json!({ "type": "long", "logicalType":
"local-timestamp-millis" }),
+ DataType::Time32(unit) => match unit {
+ TimeUnit::Millisecond => json!({ "type": "int", "logicalType":
"time-millis" }),
+ TimeUnit::Second => {
+ extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
+ Value::String("int".into())
+ }
+ _ => Value::String("int".into()),
+ },
+ DataType::Time64(unit) => match unit {
+ TimeUnit::Microsecond => json!({ "type": "long", "logicalType":
"time-micros" }),
+ TimeUnit::Nanosecond => {
+ extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ Value::String("long".into())
+ }
+ _ => Value::String("long".into()),
+ },
+ DataType::Timestamp(unit, tz) => {
+ let logical_type = match (unit, tz.is_some()) {
+ (TimeUnit::Millisecond, true) => "timestamp-millis",
+ (TimeUnit::Millisecond, false) => "local-timestamp-millis",
+ (TimeUnit::Microsecond, true) => "timestamp-micros",
+ (TimeUnit::Microsecond, false) => "local-timestamp-micros",
+ (TimeUnit::Second, _) => {
+ extras.insert("arrowTimeUnit".into(),
Value::String("second".into()));
+ return Ok((Value::String("long".into()), extras));
+ }
+ (TimeUnit::Nanosecond, _) => {
+ extras.insert("arrowTimeUnit".into(),
Value::String("nanosecond".into()));
+ return Ok((Value::String("long".into()), extras));
+ }
+ };
+ json!({ "type": "long", "logicalType": logical_type })
+ }
+ DataType::Duration(unit) => {
+ extras.insert(
+ "arrowDurationUnit".into(),
+ Value::String(format!("{unit:?}").to_lowercase()),
+ );
+ Value::String("long".into())
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => json!({
+ "type": "fixed",
+ "name": name_gen.make_unique(&format!("{field_name}_duration")),
+ "size": 12,
+ "logicalType": "duration"
+ }),
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ extras.insert(
+ "arrowIntervalUnit".into(),
+ Value::String("yearmonth".into()),
+ );
+ Value::String("long".into())
+ }
+ DataType::Interval(IntervalUnit::DayTime) => {
+ extras.insert("arrowIntervalUnit".into(),
Value::String("daytime".into()));
+ Value::String("long".into())
+ }
+ DataType::List(child) | DataType::LargeList(child) => {
+ if matches!(dt, DataType::LargeList(_)) {
+ extras.insert("arrowLargeList".into(), Value::Bool(true));
+ }
+ let (items, ie) =
+ datatype_to_avro(child.data_type(), child.name(),
child.metadata(), name_gen)?;
+ json!({
+ "type": "array",
+ "items": merge_extras(items, ie)
+ })
+ }
+ DataType::FixedSizeList(child, len) => {
+ extras.insert("arrowFixedSize".into(), json!(len));
+ let (items, ie) =
+ datatype_to_avro(child.data_type(), child.name(),
child.metadata(), name_gen)?;
+ json!({
+ "type": "array",
+ "items": merge_extras(items, ie)
+ })
+ }
+ DataType::Map(entries, _) => {
+ let value_field = match entries.data_type() {
+ DataType::Struct(fs) => &fs[1],
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Map 'entries' field must be Struct(key,value)".into(),
+ ))
+ }
+ };
+ let (val_schema, value_entry) = datatype_to_avro(
+ value_field.data_type(),
+ value_field.name(),
+ value_field.metadata(),
+ name_gen,
+ )?;
+ json!({
+ "type": "map",
+ "values": merge_extras(val_schema, value_entry)
+ })
+ }
+ DataType::Struct(fields) => {
+ let avro_fields = fields
+ .iter()
+ .map(|field| arrow_field_to_avro(field, name_gen))
+ .collect::<Result<Vec<_>, _>>()?;
+ json!({
+ "type": "record",
+ "name": name_gen.make_unique(field_name),
+ "fields": avro_fields
+ })
+ }
+ DataType::Dictionary(_, value) => {
+ if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
+ let symbols: Vec<&str> =
+ serde_json::from_str(j).map_err(|e|
ArrowError::ParseError(e.to_string()))?;
+ json!({
+ "type": "enum",
+ "name": name_gen.make_unique(field_name),
+ "symbols": symbols
+ })
+ } else {
+ let (inner, ie) = datatype_to_avro(value.as_ref(), field_name,
metadata, name_gen)?;
+ merge_extras(inner, ie)
+ }
+ }
+ DataType::RunEndEncoded(_, values) => {
+ let (inner, ie) = datatype_to_avro(
+ values.data_type(),
+ values.name(),
+ values.metadata(),
+ name_gen,
+ )?;
+ merge_extras(inner, ie)
+ }
+ DataType::Union(_, _) => {
+ return Err(ArrowError::NotYetImplemented(
+ "Arrow Union to Avro Union not yet supported".into(),
+ ))
+ }
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Arrow type {other:?} has no Avro representation"
+ )))
+ }
+ };
+ Ok((val, extras))
+}
+
+fn arrow_field_to_avro(
+ field: &ArrowField,
+ name_gen: &mut NameGenerator,
+) -> Result<Value, ArrowError> {
+ // Sanitize field name to ensure Avro validity but store the original in
metadata
+ let avro_name = sanitise_avro_name(field.name());
+ let (schema, extras) =
+ datatype_to_avro(field.data_type(), &avro_name, field.metadata(),
name_gen)?;
+ // If nullable, wrap `[ "null", <type> ]`, NOTE: second order nullability
to be added in a follow-up
+ let mut schema = if field.is_nullable() {
+ Value::Array(vec![
+ Value::String("null".into()),
+ merge_extras(schema, extras),
+ ])
+ } else {
+ merge_extras(schema, extras)
+ };
+ // Build the field map
+ let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
+ map.insert("name".into(), Value::String(avro_name));
+ map.insert("type".into(), schema);
+ // Transfer selected metadata
+ for (meta_key, meta_val) in field.metadata() {
+ if is_internal_arrow_key(meta_key) {
+ continue;
+ }
+ match meta_key.as_str() {
+ AVRO_DOC_METADATA_KEY => {
+ map.insert("doc".into(), Value::String(meta_val.clone()));
+ }
+ AVRO_FIELD_DEFAULT_METADATA_KEY => {
+ let default_value = serde_json::from_str(meta_val)
+ .unwrap_or_else(|_| Value::String(meta_val.clone()));
+ map.insert("default".into(), default_value);
+ }
+ _ => {
+ let json_val = serde_json::from_str(meta_val)
+ .unwrap_or_else(|_| Value::String(meta_val.clone()));
+ map.insert(meta_key.clone(), json_val);
+ }
+ }
+ }
+ Ok(Value::Object(map))
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::{AvroDataType, AvroField};
- use arrow_schema::{DataType, Fields, TimeUnit};
+ use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit};
use serde_json::json;
+ use std::sync::Arc;
fn int_schema() -> Schema<'static> {
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
@@ -682,6 +1074,19 @@ mod tests {
}))
}
+ fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
+ let mut sb = SchemaBuilder::new();
+ sb.push(field);
+ sb.finish()
+ }
+
+ fn assert_json_contains(avro_json: &str, needle: &str) {
+ assert!(
+ avro_json.contains(needle),
+ "JSON did not contain `{needle}` : {avro_json}"
+ )
+ }
+
#[test]
fn test_deserialize() {
let t: Schema = serde_json::from_str("\"string\"").unwrap();
@@ -1120,4 +1525,246 @@ mod tests {
let canonical_form =
generate_canonical_form(&schema_with_attrs).unwrap();
assert_eq!(canonical_form, expected_canonical_form);
}
+
+ #[test]
+ fn test_primitive_mappings() {
+ let cases = vec![
+ (DataType::Boolean, "\"boolean\""),
+ (DataType::Int8, "\"int\""),
+ (DataType::Int16, "\"int\""),
+ (DataType::Int32, "\"int\""),
+ (DataType::Int64, "\"long\""),
+ (DataType::UInt8, "\"int\""),
+ (DataType::UInt16, "\"int\""),
+ (DataType::UInt32, "\"long\""),
+ (DataType::UInt64, "\"long\""),
+ (DataType::Float16, "\"float\""),
+ (DataType::Float32, "\"float\""),
+ (DataType::Float64, "\"double\""),
+ (DataType::Utf8, "\"string\""),
+ (DataType::Binary, "\"bytes\""),
+ ];
+ for (dt, avro_token) in cases {
+ let field = ArrowField::new("col", dt.clone(), false);
+ let arrow_schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+ assert_json_contains(&avro.json_string, avro_token);
+ }
+ }
+
+ #[test]
+ fn test_temporal_mappings() {
+ let cases = vec![
+ (DataType::Date32, "\"logicalType\":\"date\""),
+ (
+ DataType::Time32(TimeUnit::Millisecond),
+ "\"logicalType\":\"time-millis\"",
+ ),
+ (
+ DataType::Time64(TimeUnit::Microsecond),
+ "\"logicalType\":\"time-micros\"",
+ ),
+ (
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ "\"logicalType\":\"local-timestamp-millis\"",
+ ),
+ (
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ "\"logicalType\":\"timestamp-micros\"",
+ ),
+ ];
+ for (dt, needle) in cases {
+ let field = ArrowField::new("ts", dt.clone(), true);
+ let arrow_schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+ assert_json_contains(&avro.json_string, needle);
+ }
+ }
+
+ #[test]
+ fn test_decimal_and_uuid() {
+ let decimal_field = ArrowField::new("amount", DataType::Decimal128(25,
2), false);
+ let dec_schema = single_field_schema(decimal_field);
+ let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
+ assert_json_contains(&avro_dec.json_string,
"\"logicalType\":\"decimal\"");
+ assert_json_contains(&avro_dec.json_string, "\"precision\":25");
+ assert_json_contains(&avro_dec.json_string, "\"scale\":2");
+ let mut md = HashMap::new();
+ md.insert("logicalType".into(), "uuid".into());
+ let uuid_field =
+ ArrowField::new("id", DataType::FixedSizeBinary(16),
false).with_metadata(md);
+ let uuid_schema = single_field_schema(uuid_field);
+ let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
+ assert_json_contains(&avro_uuid.json_string,
"\"logicalType\":\"uuid\"");
+ }
+
+ #[test]
+ fn test_interval_duration() {
+ let interval_field = ArrowField::new(
+ "span",
+ DataType::Interval(IntervalUnit::MonthDayNano),
+ false,
+ );
+ let s = single_field_schema(interval_field);
+ let avro = AvroSchema::try_from(&s).unwrap();
+ assert_json_contains(&avro.json_string,
"\"logicalType\":\"duration\"");
+ assert_json_contains(&avro.json_string, "\"size\":12");
+ let dur_field = ArrowField::new("latency",
DataType::Duration(TimeUnit::Nanosecond), false);
+ let s2 = single_field_schema(dur_field);
+ let avro2 = AvroSchema::try_from(&s2).unwrap();
+ assert_json_contains(&avro2.json_string, "\"arrowDurationUnit\"");
+ }
+
+ #[test]
+ fn test_complex_types() {
+ let list_dt = DataType::List(Arc::new(ArrowField::new("item",
DataType::Int32, true)));
+ let list_schema = single_field_schema(ArrowField::new("numbers",
list_dt, false));
+ let avro_list = AvroSchema::try_from(&list_schema).unwrap();
+ assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
+ assert_json_contains(&avro_list.json_string, "\"items\"");
+ let value_field = ArrowField::new("value", DataType::Boolean, true);
+ let entries_struct = ArrowField::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ ArrowField::new("key", DataType::Utf8, false),
+ value_field.clone(),
+ ])),
+ false,
+ );
+ let map_dt = DataType::Map(Arc::new(entries_struct), false);
+ let map_schema = single_field_schema(ArrowField::new("props", map_dt,
false));
+ let avro_map = AvroSchema::try_from(&map_schema).unwrap();
+ assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
+ assert_json_contains(&avro_map.json_string, "\"values\"");
+ let struct_dt = DataType::Struct(Fields::from(vec![
+ ArrowField::new("f1", DataType::Int64, false),
+ ArrowField::new("f2", DataType::Utf8, true),
+ ]));
+ let struct_schema = single_field_schema(ArrowField::new("person",
struct_dt, true));
+ let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
+ assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
+ assert_json_contains(&avro_struct.json_string, "\"null\"");
+ }
+
+ #[test]
+ fn test_enum_dictionary() {
+ let mut md = HashMap::new();
+ md.insert(
+ AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
+ "[\"OPEN\",\"CLOSED\"]".into(),
+ );
+ let enum_dt = DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
+ let field = ArrowField::new("status", enum_dt,
false).with_metadata(md);
+ let schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
+ assert_json_contains(&avro.json_string,
"\"symbols\":[\"OPEN\",\"CLOSED\"]");
+ }
+
+ #[test]
+ fn test_run_end_encoded() {
+ let ree_dt = DataType::RunEndEncoded(
+ Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
+ Arc::new(ArrowField::new("values", DataType::Utf8, false)),
+ );
+ let s = single_field_schema(ArrowField::new("text", ree_dt, false));
+ let avro = AvroSchema::try_from(&s).unwrap();
+ assert_json_contains(&avro.json_string, "\"string\"");
+ }
+
+ #[test]
+ fn test_dense_union_error() {
+ use arrow_schema::UnionFields;
+ let uf: UnionFields = vec![(0i8, Arc::new(ArrowField::new("a",
DataType::Int32, false)))]
+ .into_iter()
+ .collect();
+ let union_dt = DataType::Union(uf, arrow_schema::UnionMode::Dense);
+ let s = single_field_schema(ArrowField::new("u", union_dt, false));
+ let err = AvroSchema::try_from(&s).unwrap_err();
+ assert!(err
+ .to_string()
+ .contains("Arrow Union to Avro Union not yet supported"));
+ }
+
+ #[test]
+ fn round_trip_primitive() {
+ let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1",
DataType::Int32, false)]);
+ let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
+ let decoded = avro_schema.schema().unwrap();
+ assert!(matches!(decoded, Schema::Complex(_)));
+ }
+
+ #[test]
+ fn test_name_generator_sanitization_and_uniqueness() {
+ let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8),
false);
+ let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8),
false);
+ let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8),
false);
+ let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
+ let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+ assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
+ assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
+ assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
+ }
+
+ #[test]
+ fn test_date64_logical_type_mapping() {
+ let field = ArrowField::new("d", DataType::Date64, true);
+ let schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(
+ &avro.json_string,
+ "\"logicalType\":\"local-timestamp-millis\"",
+ );
+ }
+
+ #[test]
+ fn test_duration_list_extras_propagated() {
+ let child = ArrowField::new("lat",
DataType::Duration(TimeUnit::Microsecond), false);
+ let list_dt = DataType::List(Arc::new(child));
+ let arrow_schema = single_field_schema(ArrowField::new("durations",
list_dt, false));
+ let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+ assert_json_contains(&avro.json_string,
"\"arrowDurationUnit\":\"microsecond\"");
+ }
+
+ #[test]
+ fn test_interval_yearmonth_extra() {
+ let field = ArrowField::new("iv",
DataType::Interval(IntervalUnit::YearMonth), false);
+ let schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(&avro.json_string,
"\"arrowIntervalUnit\":\"yearmonth\"");
+ }
+
+ #[test]
+ fn test_interval_daytime_extra() {
+ let field = ArrowField::new("iv_dt",
DataType::Interval(IntervalUnit::DayTime), false);
+ let schema = single_field_schema(field);
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(&avro.json_string,
"\"arrowIntervalUnit\":\"daytime\"");
+ }
+
+ #[test]
+ fn test_fixed_size_list_extra() {
+ let child = ArrowField::new("item", DataType::Int32, false);
+ let dt = DataType::FixedSizeList(Arc::new(child), 3);
+ let schema = single_field_schema(ArrowField::new("triples", dt,
false));
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
+ }
+
+ #[test]
+ fn test_map_duration_value_extra() {
+ let val_field = ArrowField::new("value",
DataType::Duration(TimeUnit::Second), true);
+ let entries_struct = ArrowField::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ ArrowField::new("key", DataType::Utf8, false),
+ val_field,
+ ])),
+ false,
+ );
+ let map_dt = DataType::Map(Arc::new(entries_struct), false);
+ let schema = single_field_schema(ArrowField::new("metrics", map_dt,
false));
+ let avro = AvroSchema::try_from(&schema).unwrap();
+ assert_json_contains(&avro.json_string,
"\"arrowDurationUnit\":\"second\"");
+ }
}