martin-g commented on a change in pull request #1602:
URL: https://github.com/apache/avro/pull/1602#discussion_r828854088
##########
File path: lang/rust/avro/src/decode.rs
##########
@@ -369,9 +387,372 @@ mod tests {
));
let mut buffer = Vec::<u8>::new();
- encode(&value, &schema, &mut buffer);
+ encode(&value, &schema, &mut buffer).expect("message should encode");
Review comment:
```suggestion
encode(&value, &schema, &mut buffer).expect(success(&value,
&schema));
```
and something like
```
fn success(value, schema) -> String {
format!("Value: {:?}\n should encode with schema:\n{:?}", &value, &schema)
}
```
##########
File path: lang/rust/avro/src/decode.rs
##########
@@ -68,10 +68,21 @@ fn decode_seq_len<R: Read>(reader: &mut R) ->
AvroResult<usize> {
/// Decode a `Value` from avro format given its `Schema`.
pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
- fn decode0<R: Read>(
+ let rs = ResolvedSchema::try_from(schema)?;
+ decode_internal(schema, rs.get_names(), &None, reader)
+}
+
+fn decode_internal<R: Read>(
Review comment:
is there a need of `decode_internal0` now ?
##########
File path: lang/rust/avro/src/encode.rs
##########
@@ -209,7 +236,8 @@ mod tests {
&Value::Array(empty),
&Schema::Array(Box::new(Schema::Int)),
&mut buf,
- );
+ )
+ .expect("message should encode");
Review comment:
let's print value and schema as I suggested in `decode.rs` with the
`expect(success())`
##########
File path: lang/rust/avro/src/schema.rs
##########
@@ -317,17 +327,86 @@ impl From<&str> for Name {
}
}
-impl Hash for Name {
- fn hash<H: Hasher>(&self, state: &mut H) {
- self.fullname(None).hash(state);
+impl fmt::Display for Name {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str(&self.fullname(None)[..])
+ }
+}
+
+pub(crate) struct ResolvedSchema<'s> {
+ names_ref: HashMap<Name, &'s Schema>,
+ root_schema: &'s Schema,
+}
+
+impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
+ type Error = Error;
+
+ fn try_from(schema: &'s Schema) -> AvroResult<Self> {
+ let names = HashMap::new();
+ let mut rs = ResolvedSchema {
+ names_ref: names,
+ root_schema: schema,
+ };
+ Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+ Ok(rs)
}
}
-impl Eq for Name {}
+impl<'s> ResolvedSchema<'s> {
+ pub fn get_names(&self) -> &HashMap<Name, &'s Schema> {
+ &self.names_ref
+ }
-impl PartialEq for Name {
- fn eq(&self, other: &Name) -> bool {
- self.fullname(None).eq(&other.fullname(None))
+ fn from_internal(
+ schema: &'s Schema,
+ names_ref: &mut HashMap<Name, &'s Schema>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<()> {
+ match schema {
+ Schema::Array(schema) | Schema::Map(schema) => {
+ Self::from_internal(schema, names_ref, enclosing_namespace)
+ }
+ Schema::Union(UnionSchema { schemas, .. }) => {
+ for schema in schemas {
+ Self::from_internal(schema, names_ref,
enclosing_namespace)?
+ }
+ Ok(())
+ }
+ Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ Ok(())
+ }
+ }
+ Schema::Record { name, fields, .. } => {
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
+ } else {
+ let record_namespace =
name.fully_qualified_name(enclosing_namespace).namespace;
Review comment:
can we reuse `fully_qualified_name` from line 387 ?
##########
File path: lang/rust/avro/src/encode.rs
##########
@@ -45,155 +50,177 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- fn encode_ref0(
- value: &Value,
- schema: &Schema,
- buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) {
- match &schema {
- Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name).unwrap();
- return encode_ref0(value, resolved, buffer, &mut
schemas_by_name.clone());
- }
- Schema::Record { ref name, .. }
- | Schema::Enum { ref name, .. }
- | Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- }
- _ => (),
- }
+fn encode_internal(
+ value: &Value,
+ schema: &Schema,
+ names: &HashMap<Name, &Schema>,
Review comment:
```suggestion
names: &Names,
```
##########
File path: lang/rust/avro/src/encode.rs
##########
@@ -45,155 +50,177 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- fn encode_ref0(
- value: &Value,
- schema: &Schema,
- buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) {
- match &schema {
- Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name).unwrap();
- return encode_ref0(value, resolved, buffer, &mut
schemas_by_name.clone());
- }
- Schema::Record { ref name, .. }
- | Schema::Enum { ref name, .. }
- | Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- }
- _ => (),
- }
+fn encode_internal(
+ value: &Value,
+ schema: &Schema,
+ names: &HashMap<Name, &Schema>,
+ enclosing_namespace: &Namespace,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ if let Schema::Ref { ref name } = schema {
+ let resolved = *names
+ .get(&name.fully_qualified_name(enclosing_namespace))
+ .ok_or_else(|| {
+
Error::SchemaResolutionError(name.fully_qualified_name(enclosing_namespace))
+ })?;
+ return encode_internal(value, resolved, names, enclosing_namespace,
buffer);
+ }
- match value {
- Value::Null => (),
- Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
- // Pattern | Pattern here to signify that these _must_ have the
same encoding.
- Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
- Value::Long(i)
- | Value::TimestampMillis(i)
- | Value::TimestampMicros(i)
- | Value::TimeMicros(i) => encode_long(*i, buffer),
- Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Decimal(decimal) => match schema {
- Schema::Decimal { inner, .. } => match *inner.clone() {
- Schema::Fixed { size, .. } => {
- let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
- let num_bytes = bytes.len();
- if num_bytes != size {
- panic!(
- "signed decimal bytes length {} not equal to
fixed schema size {}",
- num_bytes, size
- );
- }
- encode(&Value::Fixed(size, bytes), inner, buffer)
+ match value {
+ Value::Null => (),
+ Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+ // Pattern | Pattern here to signify that these _must_ have the same
encoding.
+ Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
+ Value::Long(i)
+ | Value::TimestampMillis(i)
+ | Value::TimestampMicros(i)
+ | Value::TimeMicros(i) => encode_long(*i, buffer),
+ Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Decimal(decimal) => match schema {
+ Schema::Decimal { inner, .. } => match *inner.clone() {
+ Schema::Fixed { size, .. } => {
+ let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
+ let num_bytes = bytes.len();
+ if num_bytes != size {
+ return Err(Error::EncodeDecimalAsFixedError(num_bytes,
size));
}
- Schema::Bytes => {
- encode(&Value::Bytes(decimal.try_into().unwrap()),
inner, buffer)
- }
- _ => panic!("invalid inner type for decimal: {:?}", inner),
- },
- _ => panic!("invalid schema type for decimal: {:?}", schema),
- },
- &Value::Duration(duration) => {
- let slice: [u8; 12] = duration.into();
- buffer.extend_from_slice(&slice);
- }
- Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
- Value::Bytes(bytes) => match *schema {
- Schema::Bytes => encode_bytes(bytes, buffer),
- Schema::Fixed { .. } => buffer.extend(bytes),
- _ => error!("invalid schema type for bytes: {:?}", schema),
- },
- Value::String(s) => match *schema {
- Schema::String => {
- encode_bytes(s, buffer);
+ encode(&Value::Fixed(size, bytes), inner, buffer)?
}
- Schema::Enum { ref symbols, .. } => {
- if let Some(index) = symbols.iter().position(|item| item
== s) {
- encode_int(index as i32, buffer);
- }
+ Schema::Bytes => encode(&Value::Bytes(decimal.try_into()?),
inner, buffer)?,
+ _ => {
+ return Err(Error::ResolveDecimalSchema(SchemaKind::from(
+ *inner.clone(),
+ )));
}
- _ => error!("invalid schema type for String: {:?}", schema),
},
- Value::Fixed(_, bytes) => buffer.extend(bytes),
- Value::Enum(i, _) => encode_int(*i as i32, buffer),
- Value::Union(idx, item) => {
- if let Schema::Union(ref inner) = *schema {
- inner.schemas.iter().for_each(|s| match s {
- Schema::Record { name, .. }
- | Schema::Enum { name, .. }
- | Schema::Fixed { name, .. } => {
- schemas_by_name.insert(name.clone(), s.clone());
- }
- _ => (),
- });
-
- let inner_schema = inner
- .schemas
- .get(*idx as usize)
- .expect("Invalid Union validation occurred");
- encode_long(*idx as i64, buffer);
- encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Decimal,
+ supported_schema: vec![SchemaKind::Decimal],
+ });
+ }
+ },
+ &Value::Duration(duration) => {
+ let slice: [u8; 12] = duration.into();
+ buffer.extend_from_slice(&slice);
+ }
+ Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+ Value::Bytes(bytes) => match *schema {
+ Schema::Bytes => encode_bytes(bytes, buffer),
+ Schema::Fixed { .. } => buffer.extend(bytes),
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Bytes,
+ supported_schema: vec![SchemaKind::Bytes,
SchemaKind::Fixed],
+ });
+ }
+ },
+ Value::String(s) => match *schema {
+ Schema::String => {
+ encode_bytes(s, buffer);
+ }
+ Schema::Enum { ref symbols, .. } => {
+ if let Some(index) = symbols.iter().position(|item| item == s)
{
+ encode_int(index as i32, buffer);
} else {
- error!("invalid schema type for Union: {:?}", schema);
+ error!("Invalid symbol string");
Review comment:
print `s` too
##########
File path: lang/rust/avro/src/schema.rs
##########
@@ -259,56 +263,62 @@ impl Name {
}
/// Parse a `serde_json::Value` into a `Name`.
- fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
+ pub fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
Review comment:
could this be `pub(crate)` ?
##########
File path: lang/rust/avro/src/encode.rs
##########
@@ -45,155 +50,177 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- fn encode_ref0(
- value: &Value,
- schema: &Schema,
- buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) {
- match &schema {
- Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name).unwrap();
- return encode_ref0(value, resolved, buffer, &mut
schemas_by_name.clone());
- }
- Schema::Record { ref name, .. }
- | Schema::Enum { ref name, .. }
- | Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- }
- _ => (),
- }
+fn encode_internal(
+ value: &Value,
+ schema: &Schema,
+ names: &HashMap<Name, &Schema>,
+ enclosing_namespace: &Namespace,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ if let Schema::Ref { ref name } = schema {
+ let resolved = *names
+ .get(&name.fully_qualified_name(enclosing_namespace))
Review comment:
`name.fully_qualified_name(enclosing_namespace)` could be extracted into
a local variable and re-used at line 64
##########
File path: lang/rust/avro/src/encode.rs
##########
@@ -45,155 +50,177 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-/// Encode a `Value` into avro format.
-///
-/// **NOTE** This will not perform schema validation. The value is assumed to
-/// be valid with regards to the schema. Schema are needed only to guide the
-/// encoding for complex type values.
-pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
- fn encode_ref0(
- value: &Value,
- schema: &Schema,
- buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) {
- match &schema {
- Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name).unwrap();
- return encode_ref0(value, resolved, buffer, &mut
schemas_by_name.clone());
- }
- Schema::Record { ref name, .. }
- | Schema::Enum { ref name, .. }
- | Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- }
- _ => (),
- }
+fn encode_internal(
+ value: &Value,
+ schema: &Schema,
+ names: &HashMap<Name, &Schema>,
+ enclosing_namespace: &Namespace,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ if let Schema::Ref { ref name } = schema {
+ let resolved = *names
+ .get(&name.fully_qualified_name(enclosing_namespace))
+ .ok_or_else(|| {
+
Error::SchemaResolutionError(name.fully_qualified_name(enclosing_namespace))
+ })?;
+ return encode_internal(value, resolved, names, enclosing_namespace,
buffer);
+ }
- match value {
- Value::Null => (),
- Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
- // Pattern | Pattern here to signify that these _must_ have the
same encoding.
- Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
- Value::Long(i)
- | Value::TimestampMillis(i)
- | Value::TimestampMicros(i)
- | Value::TimeMicros(i) => encode_long(*i, buffer),
- Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
- Value::Decimal(decimal) => match schema {
- Schema::Decimal { inner, .. } => match *inner.clone() {
- Schema::Fixed { size, .. } => {
- let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
- let num_bytes = bytes.len();
- if num_bytes != size {
- panic!(
- "signed decimal bytes length {} not equal to
fixed schema size {}",
- num_bytes, size
- );
- }
- encode(&Value::Fixed(size, bytes), inner, buffer)
+ match value {
+ Value::Null => (),
+ Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+ // Pattern | Pattern here to signify that these _must_ have the same
encoding.
+ Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) =>
encode_int(*i, buffer),
+ Value::Long(i)
+ | Value::TimestampMillis(i)
+ | Value::TimestampMicros(i)
+ | Value::TimeMicros(i) => encode_long(*i, buffer),
+ Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+ Value::Decimal(decimal) => match schema {
+ Schema::Decimal { inner, .. } => match *inner.clone() {
+ Schema::Fixed { size, .. } => {
+ let bytes =
decimal.to_sign_extended_bytes_with_len(size).unwrap();
+ let num_bytes = bytes.len();
+ if num_bytes != size {
+ return Err(Error::EncodeDecimalAsFixedError(num_bytes,
size));
}
- Schema::Bytes => {
- encode(&Value::Bytes(decimal.try_into().unwrap()),
inner, buffer)
- }
- _ => panic!("invalid inner type for decimal: {:?}", inner),
- },
- _ => panic!("invalid schema type for decimal: {:?}", schema),
- },
- &Value::Duration(duration) => {
- let slice: [u8; 12] = duration.into();
- buffer.extend_from_slice(&slice);
- }
- Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
- Value::Bytes(bytes) => match *schema {
- Schema::Bytes => encode_bytes(bytes, buffer),
- Schema::Fixed { .. } => buffer.extend(bytes),
- _ => error!("invalid schema type for bytes: {:?}", schema),
- },
- Value::String(s) => match *schema {
- Schema::String => {
- encode_bytes(s, buffer);
+ encode(&Value::Fixed(size, bytes), inner, buffer)?
}
- Schema::Enum { ref symbols, .. } => {
- if let Some(index) = symbols.iter().position(|item| item
== s) {
- encode_int(index as i32, buffer);
- }
+ Schema::Bytes => encode(&Value::Bytes(decimal.try_into()?),
inner, buffer)?,
+ _ => {
+ return Err(Error::ResolveDecimalSchema(SchemaKind::from(
+ *inner.clone(),
+ )));
}
- _ => error!("invalid schema type for String: {:?}", schema),
},
- Value::Fixed(_, bytes) => buffer.extend(bytes),
- Value::Enum(i, _) => encode_int(*i as i32, buffer),
- Value::Union(idx, item) => {
- if let Schema::Union(ref inner) = *schema {
- inner.schemas.iter().for_each(|s| match s {
- Schema::Record { name, .. }
- | Schema::Enum { name, .. }
- | Schema::Fixed { name, .. } => {
- schemas_by_name.insert(name.clone(), s.clone());
- }
- _ => (),
- });
-
- let inner_schema = inner
- .schemas
- .get(*idx as usize)
- .expect("Invalid Union validation occurred");
- encode_long(*idx as i64, buffer);
- encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Decimal,
+ supported_schema: vec![SchemaKind::Decimal],
+ });
+ }
+ },
+ &Value::Duration(duration) => {
+ let slice: [u8; 12] = duration.into();
+ buffer.extend_from_slice(&slice);
+ }
+ Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+ Value::Bytes(bytes) => match *schema {
+ Schema::Bytes => encode_bytes(bytes, buffer),
+ Schema::Fixed { .. } => buffer.extend(bytes),
+ _ => {
+ return Err(Error::EncodeValueAsSchemaError {
+ value_kind: ValueKind::Bytes,
+ supported_schema: vec![SchemaKind::Bytes,
SchemaKind::Fixed],
+ });
+ }
+ },
+ Value::String(s) => match *schema {
+ Schema::String => {
+ encode_bytes(s, buffer);
+ }
+ Schema::Enum { ref symbols, .. } => {
+ if let Some(index) = symbols.iter().position(|item| item == s)
{
+ encode_int(index as i32, buffer);
} else {
- error!("invalid schema type for Union: {:?}", schema);
+ error!("Invalid symbol string");
+ return Err(Error::GetEnumSymbol);
Review comment:
here as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]