This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new b9cbf9a09 AVRO-3847: [Rust] Support default value of pre-defined name
for Union type field (#2468)
b9cbf9a09 is described below
commit b9cbf9a090000d6a30ffb3b956aee9195b9d8047
Author: Kousuke Saruta <[email protected]>
AuthorDate: Thu Aug 31 19:24:49 2023 +0900
AVRO-3847: [Rust] Support default value of pre-defined name for Union type
field (#2468)
---
lang/rust/avro/src/schema.rs | 14 +-
lang/rust/avro/src/types.rs | 25 +-
lang/rust/avro/tests/schema.rs | 621 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 645 insertions(+), 15 deletions(-)
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 2eb7ba06e..9cae97cff 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -1610,9 +1610,17 @@ impl Parser {
.and_then(|schemas| {
if let Some(default_value) = default.cloned() {
let avro_value = types::Value::from(default_value);
- let resolved = schemas
- .iter()
- .any(|schema|
avro_value.to_owned().resolve(schema).is_ok());
+ let resolved = schemas.iter().any(|schema| {
+ avro_value
+ .to_owned()
+ .resolve_internal(
+ schema,
+ &self.parsed_schemas,
+ &schema.namespace(),
+ &None,
+ )
+ .is_ok()
+ });
if !resolved {
let schema: Option<&Schema> = schemas.get(0);
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 07a51c377..e9009d03b 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -20,13 +20,14 @@ use crate::{
decimal::Decimal,
duration::Duration,
schema::{
- DecimalSchema, EnumSchema, FixedSchema, Name, NamesRef, Namespace,
Precision, RecordField,
+ DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision,
RecordField,
RecordSchema, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
},
AvroResult, Error,
};
use serde_json::{Number, Value as JsonValue};
use std::{
+ borrow::Borrow,
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
@@ -605,10 +606,10 @@ impl Value {
self.resolve_internal(schema, rs.get_names(), &enclosing_namespace,
&None)
}
- pub(crate) fn resolve_internal(
+ pub(crate) fn resolve_internal<S: Borrow<Schema> + Debug>(
mut self,
schema: &Schema,
- names: &NamesRef,
+ names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
field_default: &Option<JsonValue>,
) -> AvroResult<Self> {
@@ -630,7 +631,7 @@ impl Value {
if let Some(resolved) = names.get(&name) {
debug!("Resolved {:?}", name);
- self.resolve_internal(resolved, names, &name.namespace,
field_default)
+ self.resolve_internal(resolved.borrow(), names,
&name.namespace, field_default)
} else {
error!("Failed to resolve schema {:?}", name);
Err(Error::SchemaResolutionError(name.clone()))
@@ -907,10 +908,10 @@ impl Value {
}
}
- fn resolve_union(
+ fn resolve_union<S: Borrow<Schema> + Debug>(
self,
schema: &UnionSchema,
- names: &NamesRef,
+ names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
field_default: &Option<JsonValue>,
) -> Result<Self, Error> {
@@ -930,10 +931,10 @@ impl Value {
))
}
- fn resolve_array(
+ fn resolve_array<S: Borrow<Schema> + Debug>(
self,
schema: &Schema,
- names: &NamesRef,
+ names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
) -> Result<Self, Error> {
match self {
@@ -950,10 +951,10 @@ impl Value {
}
}
- fn resolve_map(
+ fn resolve_map<S: Borrow<Schema> + Debug>(
self,
schema: &Schema,
- names: &NamesRef,
+ names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
) -> Result<Self, Error> {
match self {
@@ -974,10 +975,10 @@ impl Value {
}
}
- fn resolve_record(
+ fn resolve_record<S: Borrow<Schema> + Debug>(
self,
fields: &[RecordField],
- names: &NamesRef,
+ names: &HashMap<Name, S>,
enclosing_namespace: &Namespace,
) -> Result<Self, Error> {
let mut items = match self {
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index 223305876..f0d11e954 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -1604,3 +1604,624 @@ fn
test_avro_3785_deserialize_namespace_with_nullable_type_containing_reference_
Ok(())
}
+
+#[test]
+fn test_avro_3847_union_field_with_default_value_of_ref() -> TestResult {
+ // Test for reference to Record
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }, {
+ "name": "f2",
+ "type": ["record2", "int"],
+ "default": {
+ "f1_1": 100
+ }
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ (
+ "f1".to_string(),
+ Value::Record(vec![("f1_1".to_string(), 10.into())]),
+ ),
+ (
+ "f2".to_string(),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![("f1_1".to_string(),
100.into())])),
+ ),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Enum
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Enum(1, "b".to_string()));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }, {
+ "name": "f2",
+ "type": ["enum1", "int"],
+ "default": "a"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Enum(1, "b".to_string())),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Enum(0, "a".to_string()))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Fixed
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "type": "fixed",
+ "size": 3
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "type": "fixed",
+ "size": 3
+ }
+ }, {
+ "name": "f2",
+ "type": ["fixed1", "int"],
+ "default": "abc"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Fixed(3, vec![0, 1, 2])),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Fixed(3, vec![b'a', b'b', b'c']))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ Ok(())
+}
+
+#[test]
+fn test_avro_3847_union_field_with_default_value_of_ref_with_namespace() ->
TestResult {
+ // Test for reference to Record
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.record2", "int"],
+ "default": {
+ "f1_1": 100
+ }
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ (
+ "f1".to_string(),
+ Value::Record(vec![("f1_1".to_string(), 10.into())]),
+ ),
+ (
+ "f2".to_string(),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![("f1_1".to_string(),
100.into())])),
+ ),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Enum
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "namespace": "ns",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Enum(1, "b".to_string()));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "namespace": "ns",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.enum1", "int"],
+ "default": "a"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Enum(1, "b".to_string())),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Enum(0, "a".to_string()))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Fixed
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "namespace": "ns",
+ "type": "fixed",
+ "size": 3
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "namespace": "ns",
+ "type": "fixed",
+ "size": 3
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.fixed1", "int"],
+ "default": "abc"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Fixed(3, vec![0, 1, 2])),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Fixed(3, vec![b'a', b'b', b'c']))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ Ok(())
+}
+
+#[test]
+fn
test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace()
-> TestResult {
+ // Test for reference to Record
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "record2",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "int"
+ }
+ ]
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.record2", "int"],
+ "default": {
+ "f1_1": 100
+ }
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ (
+ "f1".to_string(),
+ Value::Record(vec![("f1_1".to_string(), 10.into())]),
+ ),
+ (
+ "f2".to_string(),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![("f1_1".to_string(),
100.into())])),
+ ),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Enum
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Enum(1, "b".to_string()));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "enum1",
+ "type": "enum",
+ "symbols": ["a", "b"]
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.enum1", "int"],
+ "default": "a"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Enum(1, "b".to_string())),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Enum(0, "a".to_string()))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ // Test for reference to Fixed
+ let writer_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "type": "fixed",
+ "size": 3
+ }
+ }
+ ]
+ }
+ "#;
+ let writer_schema = Schema::parse_str(writer_schema_str)?;
+ let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
+ record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
+ writer.append(record)?;
+
+ let reader_schema_str = r#"
+ {
+ "name": "record1",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {
+ "name": "f1",
+ "type": {
+ "name": "fixed1",
+ "type": "fixed",
+ "size": 3
+ }
+ }, {
+ "name": "f2",
+ "type": ["ns.fixed1", "int"],
+ "default": "abc"
+ }
+ ]
+ }
+ "#;
+ let reader_schema = Schema::parse_str(reader_schema_str)?;
+ let input = writer.into_inner()?;
+ let reader = Reader::with_schema(&reader_schema, &input[..])?;
+ let result = reader.collect::<Result<Vec<_>, _>>()?;
+
+ assert_eq!(1, result.len());
+
+ let expected = Value::Record(vec![
+ ("f1".to_string(), Value::Fixed(3, vec![0, 1, 2])),
+ (
+ "f2".to_string(),
+ Value::Union(0, Box::new(Value::Fixed(3, vec![b'a', b'b', b'c']))),
+ ),
+ ]);
+
+ assert_eq!(expected, result[0]);
+
+ Ok(())
+}