This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/avro-3683-multiple-schemas by
this push:
new 0c2e2183d AVRO-3683: WIP
0c2e2183d is described below
commit 0c2e2183da7b4a89f5a33447d7a9611e33986b03
Author: Martin Tzvetanov Grigorov <[email protected]>
AuthorDate: Thu Dec 15 00:42:50 2022 +0200
AVRO-3683: WIP
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---
lang/rust/avro/src/encode.rs | 15 ++++++++++++--
lang/rust/avro/src/schema.rs | 39 ++++++++++++++++++++++-------------
lang/rust/avro/src/types.rs | 28 +++++++++++++++----------
lang/rust/avro/src/writer.rs | 49 ++++++++++++++++++++++++++------------------
4 files changed, 84 insertions(+), 47 deletions(-)
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 5f0819b60..3cdb067fe 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -37,9 +37,20 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut
Vec<u8>) -> AvroResul
encode_internal(value, schema, rs.get_names(), &None, buffer)
}
-pub fn encode_schemata(value: &Value, schemata: &[&Schema], buffer: &mut
Vec<u8>) -> AvroResult<()> {
+pub fn encode_schemata(
+ value: &Value,
+ schemata: &[&Schema],
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
let rs = ResolvedSchema::try_from(schemata)?;
- encode_internal(value, schema, rs.get_names(), &None, buffer)
+ for schema in schemata {
+ if value.validate(schema) {
+ encode_internal(value, schema, rs.get_names(), &None, buffer)?;
+ break;
+ }
+ }
+
+ todo!("Err(None of the provided schemata matched the value)")
}
fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3e73e59a2..5784f1901 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -401,12 +401,26 @@ impl Serialize for Alias {
pub(crate) struct ResolvedSchema<'s> {
names_ref: NamesRef<'s>,
- schemata: &[&'s Schema],
+ schemata: &'s [&'s Schema],
}
impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
type Error = Error;
+ fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+ let names = HashMap::new();
+ let mut rs = ResolvedSchema {
+ names_ref: names,
+ schemata: &[schema],
+ };
+ Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?;
+ Ok(rs)
+ }
+}
+
+impl<'s> TryFrom<&'s [&'s Schema]> for ResolvedSchema<'s> {
+ type Error = Error;
+
fn try_from(schemata: &[&'s Schema]) -> AvroResult<Self> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
@@ -419,28 +433,27 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
}
impl<'s> ResolvedSchema<'s> {
- pub(crate) fn get_root_schema(&self) -> &'s Schema {
- self.root_schema
+ pub(crate) fn get_schemata(&self) -> &'s [&'s Schema] {
+ self.schemata
}
pub(crate) fn get_names(&self) -> &NamesRef<'s> {
&self.names_ref
}
fn from_internal(
- schemata: &[&'s Schema],
+ schemata: &'s [&'s Schema],
names_ref: &mut NamesRef<'s>,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
for schema in schemata {
match schema {
Schema::Array(schema) | Schema::Map(schema) => {
- Self::from_internal(schema, names_ref, enclosing_namespace)
+ Self::from_internal(&[schema], names_ref,
enclosing_namespace)?
}
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
- Self::from_internal(schema, names_ref,
enclosing_namespace)?
+ Self::from_internal(&[schema], names_ref,
enclosing_namespace)?
}
- Ok(())
}
Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
@@ -448,9 +461,7 @@ impl<'s> ResolvedSchema<'s> {
.insert(fully_qualified_name.clone(), schema)
.is_some()
{
-
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
- } else {
- Ok(())
+ return
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
}
}
Schema::Record { name, fields, .. } => {
@@ -459,13 +470,12 @@ impl<'s> ResolvedSchema<'s> {
.insert(fully_qualified_name.clone(), schema)
.is_some()
{
-
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ return
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
} else {
let record_namespace = fully_qualified_name.namespace;
for field in fields {
- Self::from_internal(&field.schema, names_ref,
&record_namespace)?
+ Self::from_internal(&[&field.schema], names_ref,
&record_namespace)?
}
- Ok(())
}
}
Schema::Ref { name } => {
@@ -473,11 +483,12 @@ impl<'s> ResolvedSchema<'s> {
names_ref
.get(&fully_qualified_name)
.map(|_| ())
-
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
+ .ok_or_else(|| return
Error::SchemaResolutionError(fully_qualified_name))
}
_ => Ok(()),
}
}
+ Ok(())
}
}
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 940ca17aa..ba18f7714 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -339,19 +339,25 @@ impl Value {
/// See the [Avro
specification](https://avro.apache.org/docs/current/spec.html)
/// for the full set of rules of schema validation.
pub fn validate(&self, schema: &Schema) -> bool {
- let rs = ResolvedSchema::try_from(schema).expect("Schema didn't
successfully parse");
- let enclosing_namespace = schema.namespace();
+ self.validate_schemata(&[schema])
+ }
- match self.validate_internal(schema, rs.get_names(),
&enclosing_namespace) {
- Some(error_msg) => {
- error!(
- "Invalid value: {:?} for schema: {:?}. Reason: {}",
- self, schema, error_msg
- );
- false
+ pub fn validate_schemata(&self, schemata: &[&Schema]) -> bool {
+ let rs = ResolvedSchema::try_from(schemata).expect("Schemata didn't
successfully resolve");
+ schemata.iter().any(|schema| {
+ let enclosing_namespace = schema.namespace();
+
+ match self.validate_internal(schema, rs.get_names(),
&enclosing_namespace) {
+ Some(error_msg) => {
+ error!(
+ "Invalid value: {:?} for schema: {:?}. Reason: {}",
+ self, schema, error_msg
+ );
+ false
+ }
+ None => true,
}
- None => true,
- }
+ })
}
fn accumulate(accumulator: Option<String>, other: Option<String>) ->
Option<String> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 6ed93fbc7..c777e31cd 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -16,6 +16,7 @@
// under the License.
//! Logic handling writing in Avro format at user level.
+use crate::encode::encode_schemata;
use crate::{
encode::{encode, encode_internal, encode_to_vec},
rabin::Rabin,
@@ -382,10 +383,10 @@ fn write_avro_datum_schemata<T: Into<Value>>(
buffer: &mut Vec<u8>,
) -> Result<(), Error> {
let avro = value.into();
- if !avro.validate(schemata) {
+ if !avro.validate_schemata(schemata) {
return Err(Error::Validation);
}
- encode(&avro, schemata, buffer)?;
+ encode_schemata(&avro, schemata, buffer)?;
Ok(())
}
@@ -501,21 +502,22 @@ fn write_value_ref_resolved(
value: &Value,
buffer: &mut Vec<u8>,
) -> AvroResult<()> {
- let root_schema = resolved_schema.get_root_schema();
- if let Some(err) = value.validate_internal(
- root_schema,
- resolved_schema.get_names(),
- &root_schema.namespace(),
- ) {
- return Err(Error::ValidationWithReason(err));
+ let schemata = resolved_schema.get_schemata();
+ for schema in schemata {
+ if let Some(err) =
+ value.validate_internal(schema, resolved_schema.get_names(),
&schema.namespace())
+ {
+ return Err(Error::ValidationWithReason(err));
+ } else {
+ encode_internal(
+ value,
+ schema,
+ resolved_schema.get_names(),
+ &schema.namespace(),
+ buffer,
+ )?;
+ }
}
- encode_internal(
- value,
- root_schema,
- resolved_schema.get_names(),
- &root_schema.namespace(),
- buffer,
- )?;
Ok(())
}
@@ -554,9 +556,12 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema,
value: T) -> AvroResult<Ve
Ok(buffer)
}
-pub fn to_avro_datum_schemata<T: Into<Value>>(schemata: &[&Schema], value: T)
-> AvroResult<Vec<u8>> {
+pub fn to_avro_datum_schemata<T: Into<Value>>(
+ schemata: &[&Schema],
+ value: T,
+) -> AvroResult<Vec<u8>> {
let mut buffer = Vec::new();
- write_avro_datum(schemata, value, &mut buffer)?;
+ write_avro_datum_schemata(schemata, value, &mut buffer)?;
Ok(buffer)
}
@@ -1264,7 +1269,7 @@ mod tests {
}
#[test]
- fn test_multiple_schemata_to_avro_datum() {
+ fn test_avro_3683_multiple_schemata_to_avro_datum() {
let schema_a_str = r#"{
"name": "A",
"type": "record",
@@ -1286,6 +1291,10 @@ mod tests {
Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
)]);
- assert_eq!(to_avro_datum(&schema, record).unwrap(), expected);
+ let expected: Vec<u8> = Vec::new();
+ assert_eq!(
+ to_avro_datum_schemata(&schemata[..], record).unwrap(),
+ expected
+ );
}
}