This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch branch-1.11 in repository https://gitbox.apache.org/repos/asf/avro.git
commit e8ce233387929144ae2a8ca78865956b443624b1 Author: Martin Grigorov <[email protected]> AuthorDate: Wed Jan 19 09:44:47 2022 +0200 AVRO-3302: Add interop tests for rust (#1456) * AVRO-3302: Add interop tests for Rust module Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302: Implement test_interop_data for Rust Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302: Run JS interop tests to read .avro files created by Rust Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Fix the path to lang/js Add debug for failing Maven build Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302: git checkout & install Rust stable Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302: Fix formatting Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Remove debug statement Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Revert removed debug statement Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302: Improve documentation First check 'parsed_schemas', then 'resolving_schemas' and finally 'input_schemas'. Enable a test case that is working now. Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Seems to work ?! Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Code formatting Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Properly encode/decode Schema::Ref Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Remove debug statements Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Print messages for successful reads Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Rename Codec::Zstd to Codec::Zstandard for consistency This is the name used by the other Avro modules (e.g. Java & Perl) Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Fix formatting Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Collect all errors during interop test and panic at the end Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Format imports for +nightly Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Use Perl to read the interop .avro files created by Rust Perl support most of the optional codecs Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Code cleanup Revert changes which are not really needed. Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3302 Remove a FIXME that cannot be addressed Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> (cherry picked from commit 98f4f4a87bb61cacf91f50253f8154c8b36de690) --- .github/workflows/test-lang-rust-ci.yml | 71 ++++++ lang/rust/build.sh | 35 ++- lang/rust/examples/generate_interop_data.rs | 99 ++++++++ lang/rust/examples/test_interop_data.rs | 59 +++++ lang/rust/src/codec.rs | 16 +- lang/rust/src/decode.rs | 338 +++++++++++++++------------- lang/rust/src/encode.rs | 199 +++++++++------- lang/rust/src/error.rs | 4 + lang/rust/src/schema.rs | 164 +++++++++++++- lang/rust/src/schema_compatibility.rs | 1 - lang/rust/src/types.rs | 116 ++++++---- lang/rust/src/util.rs | 43 +++- lang/rust/tests/schema.rs | 86 ++++++- 13 files changed, 916 insertions(+), 315 deletions(-) diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml index dacf461..91364f9 100644 --- a/.github/workflows/test-lang-rust-ci.yml +++ b/.github/workflows/test-lang-rust-ci.yml @@ -77,3 +77,74 @@ jobs: with: command: test args: --manifest-path lang/rust/Cargo.toml --doc + + interop: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Rust Toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Cache Local Maven Repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Install Java Avro for Interop Test + working-directory: . + run: mvn -B install -DskipTests + + - name: Create Interop Data Directory + working-directory: . + run: mkdir -p build/interop/data + + - name: Generate Interop Resources + working-directory: lang/java/avro + run: mvn -B -P interop-data-generate generate-resources + + - name: Generate interop data + run: ./build.sh interop-data-generate + + - name: Rust reads interop files created by Java and Rust + run: ./build.sh interop-data-test + + - uses: shogo82148/actions-setup-perl@v1 + with: + perl-version: 5.32 + + - name: Install Dependencies + run: | + sudo apt-get -qqy install --no-install-recommends libcompress-raw-zlib-perl \ + libcpan-uploader-perl \ + libencode-perl \ + libio-string-perl \ + libjansson-dev \ + libjson-xs-perl \ + libmodule-install-perl \ + libmodule-install-readmefrompod-perl \ + libobject-tiny-perl \ + libsnappy-dev \ + libtest-exception-perl \ + libtest-pod-perl + cpanm --mirror https://www.cpan.org/ install Compress::Zstd \ + Error::Simple \ + Module::Install::Repository \ + Object::Tiny \ + Regexp::Common \ + Try::Tiny \ + inc::Module::Install + + + - name: Perl reads interop files created by Java and Rust + working-directory: lang/perl + run: ./build.sh interop-data-test diff --git a/lang/rust/build.sh b/lang/rust/build.sh index d9a2484..2f0a824 100755 --- a/lang/rust/build.sh +++ b/lang/rust/build.sh @@ -15,7 +15,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -e +set -e # exit on error + +root_dir=$(pwd) +build_dir="../../build/rust" +dist_dir="../../dist/rust" + + +function clean { + if [ -d $build_dir ]; then + find $build_dir | xargs chmod 755 + rm -rf $build_dir + fi +} + + +function prepare_build { + clean + mkdir -p $build_dir +} cd `dirname "$0"` @@ -35,10 +53,21 @@ do cargo build --release --lib --all-features cargo package mkdir -p ../../dist/rust - cp target/package/avro-rs-*.crate ../../dist/rust + cp target/package/avro-rs-*.crate $dist_dir + ;; + interop-data-generate) + prepare_build + export RUST_LOG=avro_rs=debug + export RUST_BACKTRACE=1 + cargo run --all-features --example generate_interop_data + ;; + + interop-data-test) + prepare_build + cargo run --all-features --example test_interop_data ;; *) - echo "Usage: $0 {lint|test|dist|clean}" >&2 + echo "Usage: $0 {lint|test|dist|clean|interop-data-generate|interop-data-test}" >&2 exit 1 esac done diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs new file mode 100644 index 0000000..cb8efda --- /dev/null +++ b/lang/rust/examples/generate_interop_data.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use avro_rs::{ + schema::Schema, + types::{Record, Value}, + Codec, Writer, +}; +use std::collections::HashMap; +use strum::IntoEnumIterator; + +fn create_datum(schema: &Schema) -> Record { + let mut datum = Record::new(schema).unwrap(); + datum.put("intField", 12_i32); + datum.put("longField", 15234324_i64); + datum.put("stringField", "hey"); + datum.put("boolField", true); + datum.put("floatField", 1234.0_f32); + datum.put("doubleField", -1234.0_f64); + datum.put("bytesField", b"12312adf".to_vec()); + datum.put("nullField", Value::Null); + datum.put( + "arrayField", + Value::Array(vec![ + Value::Double(5.0), + Value::Double(0.0), + Value::Double(12.0), + ]), + ); + let mut map = HashMap::new(); + map.insert( + "a".into(), + Value::Record(vec![("label".into(), Value::String("a".into()))]), + ); + map.insert( + "bee".into(), + Value::Record(vec![("label".into(), Value::String("cee".into()))]), + ); + datum.put("mapField", Value::Map(map)); + datum.put("unionField", Value::Union(Box::new(Value::Double(12.0)))); + datum.put("enumField", Value::Enum(2, "C".to_owned())); + datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec())); + datum.put( + "recordField", + Value::Record(vec![ + ("label".into(), Value::String("outer".into())), + ( + "children".into(), + Value::Array(vec![Value::Record(vec![ + ("label".into(), Value::String("inner".into())), + ("children".into(), Value::Array(vec![])), + ])]), + ), + ]), + ); + + datum +} + +fn main() -> anyhow::Result<()> { + let schema_str = std::fs::read_to_string("../../share/test/schemas/interop.avsc") + .expect("Unable to read the interop Avro schema"); + let schema = Schema::parse_str(schema_str.as_str())?; + + for codec in Codec::iter() { + let mut writer = Writer::with_codec(&schema, Vec::new(), codec); + let datum = create_datum(&schema); + writer.append(datum)?; + let bytes = writer.into_inner()?; + + let codec_name = <&str>::from(codec); + let suffix = if codec_name == "null" { + "".to_owned() + } else { + format!("_{}", codec_name) + }; + + std::fs::write( + format!("../../build/interop/data/rust{}.avro", suffix), + bytes, + )?; + } + + Ok(()) +} diff --git a/lang/rust/examples/test_interop_data.rs b/lang/rust/examples/test_interop_data.rs new file mode 100644 index 0000000..f86c6c4 --- /dev/null +++ b/lang/rust/examples/test_interop_data.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use avro_rs::Reader; +use std::ffi::OsStr; + +fn main() -> anyhow::Result<()> { + let data_dir = std::fs::read_dir("../../build/interop/data/") + .expect("Unable to list the interop data directory"); + + let mut errors = Vec::new(); + + for entry in data_dir { + let path = entry + .expect("Unable to read the interop data directory's files") + .path(); + + if path.is_file() { + let ext = path.extension().and_then(OsStr::to_str).unwrap(); + + if ext == "avro" { + println!("Checking {:?}", &path); + let content = std::fs::File::open(&path)?; + let reader = Reader::new(&content)?; + for value in reader { + if let Err(e) = value { + errors.push(format!( + "There is a problem with reading of '{:?}', \n {:?}\n", + &path, e + )); + } + } + } + } + } + + if errors.is_empty() { + Ok(()) + } else { + panic!( + "There were errors reading some .avro files:\n{}", + errors.join(", ") + ); + } +} diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs index 0ba8abe..c9a584e 100644 --- a/lang/rust/src/codec.rs +++ b/lang/rust/src/codec.rs @@ -19,7 +19,7 @@ use crate::{types::Value, AvroResult, Error}; use libflate::deflate::{Decoder, Encoder}; use std::io::{Read, Write}; -use strum_macros::{EnumString, IntoStaticStr}; +use strum_macros::{EnumIter, EnumString, IntoStaticStr}; #[cfg(feature = "bzip")] use bzip2::{ @@ -34,7 +34,7 @@ use crc32fast::Hasher; use xz2::read::{XzDecoder, XzEncoder}; /// The compression codec used to compress blocks. -#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)] +#[derive(Clone, Copy, Debug, PartialEq, EnumIter, EnumString, IntoStaticStr)] #[strum(serialize_all = "kebab_case")] pub enum Codec { /// The `Null` codec simply passes through data uncompressed. @@ -49,7 +49,7 @@ pub enum Codec { /// CRC32 checksum of the uncompressed data in the block. Snappy, #[cfg(feature = "zstandard")] - Zstd, + Zstandard, #[cfg(feature = "bzip")] /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/) /// compression library. @@ -98,7 +98,7 @@ impl Codec { *stream = encoded; } #[cfg(feature = "zstandard")] - Codec::Zstd => { + Codec::Zstandard => { let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap(); encoder.write_all(stream).map_err(Error::ZstdCompress)?; *stream = encoder.finish().unwrap(); @@ -157,7 +157,7 @@ impl Codec { decoded } #[cfg(feature = "zstandard")] - Codec::Zstd => { + Codec::Zstandard => { let mut decoded = Vec::new(); let mut decoder = zstd::Decoder::new(&stream[..]).unwrap(); std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?; @@ -212,7 +212,7 @@ mod tests { #[cfg(feature = "zstandard")] #[test] fn zstd_compress_and_decompress() { - compress_and_decompress(Codec::Zstd); + compress_and_decompress(Codec::Zstandard); } #[cfg(feature = "bzip")] @@ -245,7 +245,7 @@ mod tests { assert_eq!(<&str>::from(Codec::Snappy), "snappy"); #[cfg(feature = "zstandard")] - assert_eq!(<&str>::from(Codec::Zstd), "zstd"); + assert_eq!(<&str>::from(Codec::Zstandard), "zstandard"); #[cfg(feature = "bzip")] assert_eq!(<&str>::from(Codec::Bzip2), "bzip2"); @@ -265,7 +265,7 @@ mod tests { assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy); #[cfg(feature = "zstandard")] - assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd); + assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard); #[cfg(feature = "bzip")] assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2); diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs index 5639d28..26678bf 100644 --- a/lang/rust/src/decode.rs +++ b/lang/rust/src/decode.rs @@ -68,183 +68,217 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> AvroResult<usize> { /// Decode a `Value` from avro format given its `Schema`. pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> { - match *schema { - Schema::Null => Ok(Value::Null), - Schema::Boolean => { - let mut buf = [0u8; 1]; - match reader.read_exact(&mut buf[..]) { - Ok(_) => match buf[0] { - 0u8 => Ok(Value::Boolean(false)), - 1u8 => Ok(Value::Boolean(true)), - _ => Err(Error::BoolValue(buf[0])), - }, - Err(io_err) => { - if let ErrorKind::UnexpectedEof = io_err.kind() { - Ok(Value::Null) - } else { - Err(Error::ReadBoolean(io_err)) + fn decode0<R: Read>( + schema: &Schema, + reader: &mut R, + schemas_by_name: &mut HashMap<String, Schema>, + ) -> AvroResult<Value> { + match *schema { + Schema::Null => Ok(Value::Null), + Schema::Boolean => { + let mut buf = [0u8; 1]; + match reader.read_exact(&mut buf[..]) { + Ok(_) => match buf[0] { + 0u8 => Ok(Value::Boolean(false)), + 1u8 => Ok(Value::Boolean(true)), + _ => Err(Error::BoolValue(buf[0])), + }, + Err(io_err) => { + if let ErrorKind::UnexpectedEof = io_err.kind() { + Ok(Value::Null) + } else { + Err(Error::ReadBoolean(io_err)) + } } } } - } - Schema::Decimal { ref inner, .. } => match &**inner { - Schema::Fixed { .. } => match decode(inner, reader)? { - Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))), - value => Err(Error::FixedValue(value.into())), - }, - Schema::Bytes => match decode(inner, reader)? { - Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))), - value => Err(Error::BytesValue(value.into())), + Schema::Decimal { ref inner, .. } => match &**inner { + Schema::Fixed { .. } => match decode0(inner, reader, schemas_by_name)? { + Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))), + value => Err(Error::FixedValue(value.into())), + }, + Schema::Bytes => match decode0(inner, reader, schemas_by_name)? { + Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))), + value => Err(Error::BytesValue(value.into())), + }, + schema => Err(Error::ResolveDecimalSchema(schema.into())), }, - schema => Err(Error::ResolveDecimalSchema(schema.into())), - }, - Schema::Uuid => Ok(Value::Uuid( - Uuid::from_str(match decode(&Schema::String, reader)? { - Value::String(ref s) => s, - value => return Err(Error::GetUuidFromStringValue(value.into())), - }) - .map_err(Error::ConvertStrToUuid)?, - )), - Schema::Int => decode_int(reader), - Schema::Date => zag_i32(reader).map(Value::Date), - Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis), - Schema::Long => decode_long(reader), - Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros), - Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis), - Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros), - Schema::Duration => { - let mut buf = [0u8; 12]; - reader.read_exact(&mut buf).map_err(Error::ReadDuration)?; - Ok(Value::Duration(Duration::from(buf))) - } - Schema::Float => { - let mut buf = [0u8; std::mem::size_of::<f32>()]; - reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?; - Ok(Value::Float(f32::from_le_bytes(buf))) - } - Schema::Double => { - let mut buf = [0u8; std::mem::size_of::<f64>()]; - reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?; - Ok(Value::Double(f64::from_le_bytes(buf))) - } - Schema::Bytes => { - let len = decode_len(reader)?; - let mut buf = vec![0u8; len]; - reader.read_exact(&mut buf).map_err(Error::ReadBytes)?; - Ok(Value::Bytes(buf)) - } - Schema::String => { - let len = decode_len(reader)?; - let mut buf = vec![0u8; len]; - match reader.read_exact(&mut buf) { - Ok(_) => Ok(Value::String( - String::from_utf8(buf).map_err(Error::ConvertToUtf8)?, - )), - Err(io_err) => { - if let ErrorKind::UnexpectedEof = io_err.kind() { - Ok(Value::Null) - } else { - Err(Error::ReadString(io_err)) + Schema::Uuid => Ok(Value::Uuid( + Uuid::from_str(match decode0(&Schema::String, reader, schemas_by_name)? { + Value::String(ref s) => s, + value => return Err(Error::GetUuidFromStringValue(value.into())), + }) + .map_err(Error::ConvertStrToUuid)?, + )), + Schema::Int => decode_int(reader), + Schema::Date => zag_i32(reader).map(Value::Date), + Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis), + Schema::Long => decode_long(reader), + Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros), + Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis), + Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros), + Schema::Duration => { + let mut buf = [0u8; 12]; + reader.read_exact(&mut buf).map_err(Error::ReadDuration)?; + Ok(Value::Duration(Duration::from(buf))) + } + Schema::Float => { + let mut buf = [0u8; std::mem::size_of::<f32>()]; + reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?; + Ok(Value::Float(f32::from_le_bytes(buf))) + } + Schema::Double => { + let mut buf = [0u8; std::mem::size_of::<f64>()]; + reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?; + Ok(Value::Double(f64::from_le_bytes(buf))) + } + Schema::Bytes => { + let len = decode_len(reader)?; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf).map_err(Error::ReadBytes)?; + Ok(Value::Bytes(buf)) + } + Schema::String => { + let len = decode_len(reader)?; + let mut buf = vec![0u8; len]; + match reader.read_exact(&mut buf) { + Ok(_) => Ok(Value::String( + String::from_utf8(buf).map_err(Error::ConvertToUtf8)?, + )), + Err(io_err) => { + if let ErrorKind::UnexpectedEof = io_err.kind() { + Ok(Value::Null) + } else { + Err(Error::ReadString(io_err)) + } } } } - } - Schema::Fixed { size, .. } => { - let mut buf = vec![0u8; size]; - reader - .read_exact(&mut buf) - .map_err(|e| Error::ReadFixed(e, size))?; - Ok(Value::Fixed(size, buf)) - } - Schema::Array(ref inner) => { - let mut items = Vec::new(); + Schema::Fixed { ref name, size, .. } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + let mut buf = vec![0u8; size]; + reader + .read_exact(&mut buf) + .map_err(|e| Error::ReadFixed(e, size))?; + Ok(Value::Fixed(size, buf)) + } + Schema::Array(ref inner) => { + let mut items = Vec::new(); - loop { - let len = decode_seq_len(reader)?; - if len == 0 { - break; - } + loop { + let len = decode_seq_len(reader)?; + if len == 0 { + break; + } - items.reserve(len); - for _ in 0..len { - items.push(decode(inner, reader)?); + items.reserve(len); + for _ in 0..len { + items.push(decode0(inner, reader, schemas_by_name)?); + } } - } - Ok(Value::Array(items)) - } - Schema::Map(ref inner) => { - let mut items = HashMap::new(); + Ok(Value::Array(items)) + } + Schema::Map(ref inner) => { + let mut items = HashMap::new(); - loop { - let len = decode_seq_len(reader)?; - if len == 0 { - break; - } + loop { + let len = decode_seq_len(reader)?; + if len == 0 { + break; + } - items.reserve(len); - for _ in 0..len { - match decode(&Schema::String, reader)? { - Value::String(key) => { - let value = decode(inner, reader)?; - items.insert(key, value); + items.reserve(len); + for _ in 0..len { + match decode0(&Schema::String, reader, schemas_by_name)? { + Value::String(key) => { + let value = decode0(inner, reader, schemas_by_name)?; + items.insert(key, value); + } + value => return Err(Error::MapKeyType(value.into())), } - value => return Err(Error::MapKeyType(value.into())), } } - } - Ok(Value::Map(items)) - } - Schema::Union(ref inner) => match zag_i64(reader) { - Ok(index) => { - let variants = inner.variants(); - let variant = variants - .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?) - .ok_or_else(|| Error::GetUnionVariant { - index, - num_variants: variants.len(), - })?; - let value = decode(variant, reader)?; - Ok(Value::Union(Box::new(value))) + Ok(Value::Map(items)) } - Err(Error::ReadVariableIntegerBytes(io_err)) => { - if let ErrorKind::UnexpectedEof = io_err.kind() { - Ok(Value::Union(Box::new(Value::Null))) - } else { - Err(Error::ReadVariableIntegerBytes(io_err)) + Schema::Union(ref inner) => match zag_i64(reader) { + Ok(index) => { + let variants = inner.variants(); + let variant = variants + .get( + usize::try_from(index) + .map_err(|e| Error::ConvertI64ToUsize(e, index))?, + ) + .ok_or_else(|| Error::GetUnionVariant { + index, + num_variants: variants.len(), + })?; + let value = decode0(variant, reader, schemas_by_name)?; + Ok(Value::Union(Box::new(value))) } - } - Err(io_err) => Err(io_err), - }, + Err(Error::ReadVariableIntegerBytes(io_err)) => { + if let ErrorKind::UnexpectedEof = io_err.kind() { + Ok(Value::Union(Box::new(Value::Null))) + } else { + Err(Error::ReadVariableIntegerBytes(io_err)) + } + } + Err(io_err) => Err(io_err), + }, - Schema::Record { ref fields, .. } => { - // Benchmarks indicate ~10% improvement using this method. - let mut items = Vec::with_capacity(fields.len()); - for field in fields { - // TODO: This clone is also expensive. See if we can do away with it... - items.push((field.name.clone(), decode(&field.schema, reader)?)); + Schema::Record { + ref name, + ref fields, + .. + } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + // Benchmarks indicate ~10% improvement using this method. + let mut items = Vec::with_capacity(fields.len()); + for field in fields { + // TODO: This clone is also expensive. See if we can do away with it... + items.push(( + field.name.clone(), + decode0(&field.schema, reader, schemas_by_name)?, + )); + } + Ok(Value::Record(items)) } - Ok(Value::Record(items)) - } - Schema::Enum { ref symbols, .. } => { - Ok(if let Value::Int(raw_index) = decode_int(reader)? { - let index = usize::try_from(raw_index) - .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?; - if (0..=symbols.len()).contains(&index) { - let symbol = symbols[index].clone(); - Value::Enum(raw_index, symbol) + Schema::Enum { + ref name, + ref symbols, + .. + } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + Ok(if let Value::Int(raw_index) = decode_int(reader)? { + let index = usize::try_from(raw_index) + .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?; + if (0..=symbols.len()).contains(&index) { + let symbol = symbols[index].clone(); + Value::Enum(raw_index, symbol) + } else { + return Err(Error::GetEnumValue { + index, + nsymbols: symbols.len(), + }); + } + } else { + return Err(Error::GetEnumSymbol); + }) + } + Schema::Ref { ref name } => { + let name = &name.name; + if let Some(resolved) = schemas_by_name.get(name.as_str()) { + decode0(resolved, reader, &mut schemas_by_name.clone()) } else { - return Err(Error::GetEnumValue { - index, - nsymbols: symbols.len(), - }); + Err(Error::SchemaResolutionError(name.clone())) } - } else { - return Err(Error::GetEnumSymbol); - }) + } } } + + let mut schemas_by_name: HashMap<String, Schema> = HashMap::new(); + decode0(schema, reader, &mut schemas_by_name) } #[cfg(test)] diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs index 5c68051..088def9 100644 --- a/lang/rust/src/encode.rs +++ b/lang/rust/src/encode.rs @@ -20,7 +20,7 @@ use crate::{ types::Value, util::{zig_i32, zig_i64}, }; -use std::convert::TryInto; +use std::{collections::HashMap, convert::TryInto}; /// Encode a `Value` into avro format. /// @@ -51,104 +51,135 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) { /// be valid with regards to the schema. Schema are needed only to guide the /// encoding for complex type values. pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) { - match value { - Value::Null => (), - Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }), - // Pattern | Pattern here to signify that these _must_ have the same encoding. - Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer), - Value::Long(i) - | Value::TimestampMillis(i) - | Value::TimestampMicros(i) - | Value::TimeMicros(i) => encode_long(*i, buffer), - Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()), - Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()), - Value::Decimal(decimal) => match schema { - Schema::Decimal { inner, .. } => match *inner.clone() { - Schema::Fixed { size, .. } => { - let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap(); - let num_bytes = bytes.len(); - if num_bytes != size { - panic!( - "signed decimal bytes length {} not equal to fixed schema size {}", - num_bytes, size - ); + fn encode_ref0( + value: &Value, + schema: &Schema, + buffer: &mut Vec<u8>, + schemas_by_name: &mut HashMap<String, Schema>, + ) { + match &schema { + Schema::Ref { ref name } => { + let resolved = schemas_by_name.get(name.name.as_str()).unwrap(); + return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone()); + } + Schema::Record { ref name, .. } + | Schema::Enum { ref name, .. } + | Schema::Fixed { ref name, .. } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + } + _ => (), + } + + match value { + Value::Null => (), + Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }), + // Pattern | Pattern here to signify that these _must_ have the same encoding. + Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer), + Value::Long(i) + | Value::TimestampMillis(i) + | Value::TimestampMicros(i) + | Value::TimeMicros(i) => encode_long(*i, buffer), + Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()), + Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()), + Value::Decimal(decimal) => match schema { + Schema::Decimal { inner, .. } => match *inner.clone() { + Schema::Fixed { size, .. } => { + let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap(); + let num_bytes = bytes.len(); + if num_bytes != size { + panic!( + "signed decimal bytes length {} not equal to fixed schema size {}", + num_bytes, size + ); + } + encode(&Value::Fixed(size, bytes), inner, buffer) } - encode(&Value::Fixed(size, bytes), inner, buffer) - } - Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer), - _ => panic!("invalid inner type for decimal: {:?}", inner), + Schema::Bytes => { + encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer) + } + _ => panic!("invalid inner type for decimal: {:?}", inner), + }, + _ => panic!("invalid schema type for decimal: {:?}", schema), }, - _ => panic!("invalid type for decimal: {:?}", schema), - }, - &Value::Duration(duration) => { - let slice: [u8; 12] = duration.into(); - buffer.extend_from_slice(&slice); - } - Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer), - Value::Bytes(bytes) => match *schema { - Schema::Bytes => encode_bytes(bytes, buffer), - Schema::Fixed { .. } => buffer.extend(bytes), - _ => (), - }, - Value::String(s) => match *schema { - Schema::String => { - encode_bytes(s, buffer); + &Value::Duration(duration) => { + let slice: [u8; 12] = duration.into(); + buffer.extend_from_slice(&slice); } - Schema::Enum { ref symbols, .. } => { - if let Some(index) = symbols.iter().position(|item| item == s) { - encode_int(index as i32, buffer); + Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer), + Value::Bytes(bytes) => match *schema { + Schema::Bytes => encode_bytes(bytes, buffer), + Schema::Fixed { .. } => buffer.extend(bytes), + _ => error!("invalid schema type for bytes: {:?}", schema), + }, + Value::String(s) => match *schema { + Schema::String => { + encode_bytes(s, buffer); + } + Schema::Enum { ref symbols, .. } => { + if let Some(index) = symbols.iter().position(|item| item == s) { + encode_int(index as i32, buffer); + } + } + _ => error!("invalid schema type for String: {:?}", schema), + }, + Value::Fixed(_, bytes) => buffer.extend(bytes), + Value::Enum(i, _) => encode_int(*i, buffer), + Value::Union(item) => { + if let Schema::Union(ref inner) = *schema { + // Find the schema that is matched here. Due to validation, this should always + // return a value. + let (idx, inner_schema) = inner + .find_schema(item) + .expect("Invalid Union validation occurred"); + encode_long(idx as i64, buffer); + encode_ref0(&*item, inner_schema, buffer, schemas_by_name); + } else { + error!("invalid schema type for Union: {:?}", schema); } } - _ => (), - }, - Value::Fixed(_, bytes) => buffer.extend(bytes), - Value::Enum(i, _) => encode_int(*i, buffer), - Value::Union(item) => { - if let Schema::Union(ref inner) = *schema { - // Find the schema that is matched here. Due to validation, this should always - // return a value. - let (idx, inner_schema) = inner - .find_schema(item) - .expect("Invalid Union validation occurred"); - encode_long(idx as i64, buffer); - encode_ref(&*item, inner_schema, buffer); - } - } - Value::Array(items) => { - if let Schema::Array(ref inner) = *schema { - if !items.is_empty() { - encode_long(items.len() as i64, buffer); - for item in items.iter() { - encode_ref(item, inner, buffer); + Value::Array(items) => { + if let Schema::Array(ref inner) = *schema { + if !items.is_empty() { + encode_long(items.len() as i64, buffer); + for item in items.iter() { + encode_ref0(item, inner, buffer, schemas_by_name); + } } + buffer.push(0u8); + } else { + error!("invalid schema type for Array: {:?}", schema); } - buffer.push(0u8); } - } - Value::Map(items) => { - if let Schema::Map(ref inner) = *schema { - if !items.is_empty() { - encode_long(items.len() as i64, buffer); - for (key, value) in items { - encode_bytes(key, buffer); - encode_ref(value, inner, buffer); + Value::Map(items) => { + if let Schema::Map(ref inner) = *schema { + if !items.is_empty() { + encode_long(items.len() as i64, buffer); + for (key, value) in items { + encode_bytes(key, buffer); + encode_ref0(value, inner, buffer, schemas_by_name); + } } + buffer.push(0u8); + } else { + error!("invalid schema type for Map: {:?}", schema); } - buffer.push(0u8); } - } - Value::Record(fields) => { - if let Schema::Record { - fields: ref schema_fields, - .. - } = *schema - { - for (i, &(_, ref value)) in fields.iter().enumerate() { - encode_ref(value, &schema_fields[i].schema, buffer); + Value::Record(fields) => { + if let Schema::Record { + fields: ref schema_fields, + .. + } = *schema + { + for (i, &(_, ref value)) in fields.iter().enumerate() { + encode_ref0(value, &schema_fields[i].schema, buffer, schemas_by_name); + } } } } } + + let mut schemas_by_name = HashMap::new(); + encode_ref0(value, schema, buffer, &mut schemas_by_name) } pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> { diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs index f4b1fb5..f65ec72 100644 --- a/lang/rust/src/error.rs +++ b/lang/rust/src/error.rs @@ -369,6 +369,10 @@ pub enum Error { /// Error while converting float to json value #[error("failed to convert avro float to json: {0}")] ConvertF64ToJson(f64), + + /// Error while resolving Schema::Ref + #[error("Unresolved schema reference: {0}")] + SchemaResolutionError(String), } impl serde::ser::Error for Error { diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs index 3617bb1..2697328 100644 --- a/lang/rust/src/schema.rs +++ b/lang/rust/src/schema.rs @@ -21,6 +21,7 @@ use digest::Digest; use lazy_static::lazy_static; use regex::Regex; use serde::{ + ser, ser::{SerializeMap, SerializeSeq}, Deserialize, Serialize, Serializer, }; @@ -31,6 +32,7 @@ use std::{ convert::TryInto, fmt, str::FromStr, + sync::{Arc, Mutex}, }; use strum_macros::{EnumDiscriminants, EnumString}; @@ -141,6 +143,10 @@ pub enum Schema { TimestampMicros, /// An amount of time defined by a number of months, days and milliseconds. Duration, + // A reference to another schema. + Ref { + name: Name, + }, } impl PartialEq for Schema { @@ -234,6 +240,11 @@ impl Name { fn parse(complex: &Map<String, Value>) -> AvroResult<Self> { let name = complex.name().ok_or(Error::GetNameField)?; + let type_name = match complex.get("type") { + Some(Value::Object(complex_type)) => complex_type.name().or(None), + _ => None, + }; + let namespace = complex.string("namespace"); let aliases: Option<Vec<String>> = complex @@ -248,7 +259,7 @@ impl Name { }); Ok(Name { - name, + name: type_name.unwrap_or(name), namespace, aliases, }) @@ -420,11 +431,24 @@ fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalM #[derive(Default)] struct Parser { input_schemas: HashMap<String, Value>, + // A map of name -> Schema::Ref + // Used to resolve cyclic references, i.e. when a + // field's type is a reference to its record's type + resolving_schemas: HashMap<String, Schema>, input_order: Vec<String>, + // A map of name -> fully parsed Schema + // Used to avoid parsing the same schema twice parsed_schemas: HashMap<String, Schema>, } impl Schema { + // Used to help resolve cyclic references while serializing Schema to JSON. + // Needed because serde[_json] does not support using contexts. + // TODO: See whether alternatives like + // https://users.rust-lang.org/t/serde-question-access-to-a-shared-context-data-within-serialize-and-deserialize/39546 + // can be used + thread_local!(static SCHEMAS_BY_NAME: Arc<Mutex<HashMap<String, Schema>>> = Arc::new(Mutex::new(HashMap::new()))); + /// Converts `self` into its [Parsing Canonical Form]. /// /// [Parsing Canonical Form]: @@ -480,6 +504,7 @@ impl Schema { } let mut parser = Parser { input_schemas, + resolving_schemas: HashMap::default(), input_order, parsed_schemas: HashMap::with_capacity(input.len()), }; @@ -515,7 +540,8 @@ impl Parser { .remove_entry(&next_name) .expect("Key unexpectedly missing"); let parsed = self.parse(&value)?; - self.parsed_schemas.insert(name, parsed); + self.parsed_schemas + .insert(get_schema_type_name(name, value), parsed); } let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len()); @@ -558,9 +584,11 @@ impl Parser { } /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`. - /// If a parsed schema is not found, it checks if a json with that name exists - /// in `input_schemas` and then parses it (removing it from `input_schemas`) - /// and adds the parsed schema to `parsed_schemas` + /// If a parsed schema is not found, it checks if a currently resolving + /// schema with that name exists. + /// If a resolving schema is not found, it checks if a json with that name exists + /// in `input_schemas` and then parses it (removing it from `input_schemas`) + /// and adds the parsed schema to `parsed_schemas`. /// /// This method allows schemas definitions that depend on other types to /// parse their dependencies (or look them up if already parsed). @@ -568,12 +596,20 @@ impl Parser { if let Some(parsed) = self.parsed_schemas.get(name) { return Ok(parsed.clone()); } + if let Some(resolving_schema) = self.resolving_schemas.get(name) { + return Ok(resolving_schema.clone()); + } + let value = self .input_schemas .remove(name) .ok_or_else(|| Error::ParsePrimitive(name.into()))?; + let parsed = self.parse(&value)?; - self.parsed_schemas.insert(name.to_string(), parsed.clone()); + self.parsed_schemas.insert( + get_schema_type_name(name.to_string(), value), + parsed.clone(), + ); Ok(parsed) } @@ -772,6 +808,10 @@ impl Parser { let mut lookup = HashMap::new(); + let resolving_schema = Schema::Ref { name: name.clone() }; + self.resolving_schemas + .insert(name.name.clone(), resolving_schema); + let fields: Vec<RecordField> = complex .get("fields") .and_then(|fields| fields.as_array()) @@ -798,6 +838,7 @@ impl Parser { self.parsed_schemas .insert(name.fullname(None), schema.clone()); + self.resolving_schemas.remove(name.name.as_str()); Ok(schema) } @@ -893,12 +934,45 @@ impl Parser { } } +fn get_schema_type_name(name: String, value: Value) -> String { + match value.get("type") { + Some(Value::Object(complex_type)) => match complex_type.name() { + Some(name) => name, + _ => name, + }, + _ => name, + } +} + impl Serialize for Schema { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer, { + fn remember_schema(name: &Name, schema: &Schema) { + Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| match schemas_by_name.try_lock() { + Ok(mut schemas) => { + schemas.insert((&name.name).clone(), schema.clone()); + } + Err(poisoned) => { + error!("Wasn't able to lock schemas_by_name {:?}", poisoned); + } + }); + } + match *self { + Schema::Ref { ref name } => { + let name = &name.name; + Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| { + let schemas = schemas_by_name.lock().unwrap(); + if schemas.contains_key(name.as_str()) { + serializer.serialize_str(name) + } else { + Err(ser::Error::custom(format!("Could not serialize Schema::Ref('{}') because it cannot be found in ({})", + name, schemas.keys().cloned().collect::<Vec<_>>().join(", ")))) + } + }) + } Schema::Null => serializer.serialize_str("null"), Schema::Boolean => serializer.serialize_str("boolean"), Schema::Int => serializer.serialize_str("int"), @@ -933,6 +1007,7 @@ impl Serialize for Schema { ref fields, .. } => { + remember_schema(name, self); let mut map = serializer.serialize_map(None)?; map.serialize_entry("type", "record")?; if let Some(ref n) = name.namespace { @@ -953,6 +1028,7 @@ impl Serialize for Schema { ref symbols, .. } => { + remember_schema(name, self); let mut map = serializer.serialize_map(None)?; map.serialize_entry("type", "enum")?; map.serialize_entry("name", &name.name)?; @@ -964,6 +1040,7 @@ impl Serialize for Schema { ref doc, ref size, } => { + remember_schema(name, self); let mut map = serializer.serialize_map(None)?; map.serialize_entry("type", "fixed")?; map.serialize_entry("name", &name.name)?; @@ -1240,7 +1317,7 @@ mod tests { #[test] fn test_record_schema() { - let schema = Schema::parse_str( + let parsed = Schema::parse_str( r#" { "type": "record", @@ -1282,7 +1359,78 @@ mod tests { lookup, }; - assert_eq!(expected, schema); + assert_eq!(parsed, expected); + } + + // AVRO-3302 + #[test] + fn test_record_schema_with_currently_parsing_schema() { + let schema = Schema::parse_str( + r#" + { + "type": "record", + "name": "test", + "fields": [{ + "name": "recordField", + "type": { + "type": "record", + "name": "Node", + "fields": [ + {"name": "label", "type": "string"}, + {"name": "children", "type": {"type": "array", "items": "Node"}} + ] + } + }] + } + "#, + ) + .unwrap(); + + let mut lookup = HashMap::new(); + lookup.insert("recordField".to_owned(), 0); + + let mut node_lookup = HashMap::new(); + node_lookup.insert("children".to_owned(), 1); + node_lookup.insert("label".to_owned(), 0); + + let expected = Schema::Record { + name: Name::new("test"), + doc: None, + fields: vec![RecordField { + name: "recordField".to_string(), + doc: None, + default: None, + schema: Schema::Record { + name: Name::new("Node"), + doc: None, + fields: vec![ + RecordField { + name: "label".to_string(), + doc: None, + default: None, + schema: Schema::String, + order: RecordFieldOrder::Ascending, + position: 0, + }, + RecordField { + name: "children".to_string(), + doc: None, + default: None, + schema: Schema::Array(Box::new(Schema::Ref { + name: Name::new("Node"), + })), + order: RecordFieldOrder::Ascending, + position: 1, + }, + ], + lookup: node_lookup, + }, + order: RecordFieldOrder::Ascending, + position: 0, + }], + lookup, + }; + assert_eq!(schema, expected); } #[test] diff --git a/lang/rust/src/schema_compatibility.rs b/lang/rust/src/schema_compatibility.rs index 5ba46e1..7c815e0 100644 --- a/lang/rust/src/schema_compatibility.rs +++ b/lang/rust/src/schema_compatibility.rs @@ -433,7 +433,6 @@ mod tests { .map(|s| s.canonical_form()) .collect::<Vec<String>>() .join(","); - dbg!(&schema_string); Schema::parse_str(&format!("[{}]", schema_string)).unwrap() } diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs index 75f0509..48a1813 100644 --- a/lang/rust/src/types.rs +++ b/lang/rust/src/types.rs @@ -323,6 +323,7 @@ impl Value { /// for the full set of rules of schema validation. pub fn validate(&self, schema: &Schema) -> bool { match (self, schema) { + (_, &Schema::Ref { name: _ }) => true, (&Value::Null, &Schema::Null) => true, (&Value::Boolean(_), &Schema::Boolean) => true, (&Value::Int(_), &Schema::Int) => true, @@ -383,7 +384,10 @@ impl Value { } }) } - _ => false, + (v, s) => { + error!("Unsupported value-schema combination:\n{:?}\n{:?}", v, s); + false + } } } @@ -394,45 +398,79 @@ impl Value { /// in the Avro specification for the full set of rules of schema /// resolution. pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> { - // Check if this schema is a union, and if the reader schema is not. - if SchemaKind::from(&self) == SchemaKind::Union - && SchemaKind::from(schema) != SchemaKind::Union - { - // Pull out the Union, and attempt to resolve against it. - let v = match self { - Value::Union(b) => *b, - _ => unreachable!(), - }; - self = v; - } - match *schema { - Schema::Null => self.resolve_null(), - Schema::Boolean => self.resolve_boolean(), - Schema::Int => self.resolve_int(), - Schema::Long => self.resolve_long(), - Schema::Float => self.resolve_float(), - Schema::Double => self.resolve_double(), - Schema::Bytes => self.resolve_bytes(), - Schema::String => self.resolve_string(), - Schema::Fixed { size, .. } => self.resolve_fixed(size), - Schema::Union(ref inner) => self.resolve_union(inner), - Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols), - Schema::Array(ref inner) => self.resolve_array(inner), - Schema::Map(ref inner) => self.resolve_map(inner), - Schema::Record { ref fields, .. } => self.resolve_record(fields), - Schema::Decimal { - scale, - precision, - ref inner, - } => self.resolve_decimal(precision, scale, inner), - Schema::Date => self.resolve_date(), - Schema::TimeMillis => self.resolve_time_millis(), - Schema::TimeMicros => self.resolve_time_micros(), - Schema::TimestampMillis => self.resolve_timestamp_millis(), - Schema::TimestampMicros => self.resolve_timestamp_micros(), - Schema::Duration => self.resolve_duration(), - Schema::Uuid => self.resolve_uuid(), + pub fn resolve0( + value: &mut Value, + schema: &Schema, + schemas_by_name: &mut HashMap<String, Schema>, + ) -> AvroResult<Value> { + // Check if this schema is a union, and if the reader schema is not. + if SchemaKind::from(&value.clone()) == SchemaKind::Union + && SchemaKind::from(schema) != SchemaKind::Union + { + // Pull out the Union, and attempt to resolve against it. + let v = match value { + Value::Union(b) => &**b, + _ => unreachable!(), + }; + *value = v.clone(); + } + let val: Value = value.clone(); + match *schema { + Schema::Ref { ref name } => { + if let Some(resolved) = schemas_by_name.get(name.name.as_str()) { + resolve0(value, resolved, &mut schemas_by_name.clone()) + } else { + Err(Error::SchemaResolutionError(name.name.clone())) + } + } + Schema::Null => val.resolve_null(), + Schema::Boolean => val.resolve_boolean(), + Schema::Int => val.resolve_int(), + Schema::Long => val.resolve_long(), + Schema::Float => val.resolve_float(), + Schema::Double => val.resolve_double(), + Schema::Bytes => val.resolve_bytes(), + Schema::String => val.resolve_string(), + Schema::Fixed { ref name, size, .. } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + val.resolve_fixed(size) + } + Schema::Union(ref inner) => val.resolve_union(inner), + Schema::Enum { + ref name, + ref symbols, + .. + } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + val.resolve_enum(symbols) + } + Schema::Array(ref inner) => val.resolve_array(inner), + Schema::Map(ref inner) => val.resolve_map(inner), + Schema::Record { + ref name, + ref fields, + .. + } => { + schemas_by_name.insert(name.name.clone(), schema.clone()); + val.resolve_record(fields) + } + Schema::Decimal { + scale, + precision, + ref inner, + } => val.resolve_decimal(precision, scale, inner), + Schema::Date => val.resolve_date(), + Schema::TimeMillis => val.resolve_time_millis(), + Schema::TimeMicros => val.resolve_time_micros(), + Schema::TimestampMillis => val.resolve_timestamp_millis(), + Schema::TimestampMicros => val.resolve_timestamp_micros(), + Schema::Duration => val.resolve_duration(), + Schema::Uuid => val.resolve_uuid(), + } } + + let mut schemas_by_name: HashMap<String, Schema> = HashMap::new(); + resolve0(&mut self, schema, &mut schemas_by_name) } fn resolve_uuid(self) -> Result<Self, Error> { diff --git a/lang/rust/src/util.rs b/lang/rust/src/util.rs index f9daf28..e2b353b 100644 --- a/lang/rust/src/util.rs +++ b/lang/rust/src/util.rs @@ -21,7 +21,7 @@ use std::{convert::TryFrom, i64, io::Read, sync::Once}; /// Maximum number of bytes that can be allocated when decoding /// Avro-encoded values. This is a protection against ill-formed -/// data, whose length field might be interpreted as enourmous. +/// data, whose length field might be interpreted as enormous. /// See max_allocation_bytes to change this limit. pub static mut MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024; static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new(); @@ -153,19 +153,40 @@ mod tests { #[test] fn test_zig_i64() { let mut s = Vec::new(); - zig_i64(std::i32::MAX as i64, &mut s); + + zig_i64(0, &mut s); + assert_eq!(s, [0]); + + s.clear(); + zig_i64(-1, &mut s); + assert_eq!(s, [1]); + + s.clear(); + zig_i64(1, &mut s); + assert_eq!(s, [2]); + + s.clear(); + zig_i64(-64, &mut s); + assert_eq!(s, [127]); + + s.clear(); + zig_i64(64, &mut s); + assert_eq!(s, [128, 1]); + + s.clear(); + zig_i64(i32::MAX as i64, &mut s); assert_eq!(s, [254, 255, 255, 255, 15]); s.clear(); - zig_i64(std::i32::MAX as i64 + 1, &mut s); + zig_i64(i32::MAX as i64 + 1, &mut s); assert_eq!(s, [128, 128, 128, 128, 16]); s.clear(); - zig_i64(std::i32::MIN as i64, &mut s); + zig_i64(i32::MIN as i64, &mut s); assert_eq!(s, [255, 255, 255, 255, 15]); s.clear(); - zig_i64(std::i32::MIN as i64 - 1, &mut s); + zig_i64(i32::MIN as i64 - 1, &mut s); assert_eq!(s, [129, 128, 128, 128, 16]); s.clear(); @@ -180,27 +201,27 @@ mod tests { #[test] fn test_zig_i32() { let mut s = Vec::new(); - zig_i32(std::i32::MAX / 2, &mut s); + zig_i32(i32::MAX / 2, &mut s); assert_eq!(s, [254, 255, 255, 255, 7]); s.clear(); - zig_i32(std::i32::MIN / 2, &mut s); + zig_i32(i32::MIN / 2, &mut s); assert_eq!(s, [255, 255, 255, 255, 7]); s.clear(); - zig_i32(-(std::i32::MIN / 2), &mut s); + zig_i32(-(i32::MIN / 2), &mut s); assert_eq!(s, [128, 128, 128, 128, 8]); s.clear(); - zig_i32(std::i32::MIN / 2 - 1, &mut s); + zig_i32(i32::MIN / 2 - 1, &mut s); assert_eq!(s, [129, 128, 128, 128, 8]); s.clear(); - zig_i32(std::i32::MAX, &mut s); + zig_i32(i32::MAX, &mut s); assert_eq!(s, [254, 255, 255, 255, 15]); s.clear(); - zig_i32(std::i32::MIN, &mut s); + zig_i32(i32::MIN, &mut s); assert_eq!(s, [255, 255, 255, 255, 15]); } diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs index cc96429..77b6569 100644 --- a/lang/rust/tests/schema.rs +++ b/lang/rust/tests/schema.rs @@ -17,9 +17,11 @@ use avro_rs::{ schema::{Name, RecordField}, - Error, Schema, + types::{Record, Value}, + Codec, Error, Reader, Schema, Writer, }; use lazy_static::lazy_static; +use log::debug; fn init() { let _ = env_logger::builder() @@ -739,29 +741,28 @@ fn test_parse_list_with_cross_deps_basic() { } #[test] -/// Test that if a cycle of dependencies occurs in the input schema jsons, the algorithm terminates -/// and returns an error. N.B. In the future, when recursive types are supported, this should be -/// revisited. -fn test_parse_list_recursive_type_error() { +fn test_parse_list_recursive_type() { init(); let schema_str_1 = r#"{ "name": "A", + "doc": "A's schema", "type": "record", "fields": [ - {"name": "field_one", "type": "B"} + {"name": "a_field_one", "type": "B"} ] }"#; let schema_str_2 = r#"{ "name": "B", + "doc": "B's schema", "type": "record", "fields": [ - {"name": "field_one", "type": "A"} + {"name": "b_field_one", "type": "A"} ] }"#; let schema_strs_first = [schema_str_1, schema_str_2]; let schema_strs_second = [schema_str_2, schema_str_1]; - let _ = Schema::parse_list(&schema_strs_first).expect_err("Test failed"); - let _ = Schema::parse_list(&schema_strs_second).expect_err("Test failed"); + let _ = Schema::parse_list(&schema_strs_first).expect("Test failed"); + let _ = Schema::parse_list(&schema_strs_second).expect("Test failed"); } #[test] @@ -1306,6 +1307,73 @@ fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> { } } +// AVRO-3302 +#[test] +fn test_record_schema_with_cyclic_references() { + let schema = Schema::parse_str( + r#" + { + "type": "record", + "name": "test", + "fields": [{ + "name": "recordField", + "type": { + "type": "record", + "name": "Node", + "fields": [ + {"name": "label", "type": "string"}, + {"name": "children", "type": {"type": "array", "items": "Node"}} + ] + } + }] + } + "#, + ) + .unwrap(); + + let mut datum = Record::new(&schema).unwrap(); + datum.put( + "recordField", + Value::Record(vec![ + ("label".into(), Value::String("level_1".into())), + ( + "children".into(), + Value::Array(vec![Value::Record(vec![ + ("label".into(), Value::String("level_2".into())), + ( + "children".into(), + Value::Array(vec![Value::Record(vec![ + ("label".into(), Value::String("level_3".into())), + ( + "children".into(), + Value::Array(vec![Value::Record(vec![ + ("label".into(), Value::String("level_4".into())), + ("children".into(), Value::Array(vec![])), + ])]), + ), + ])]), + ), + ])]), + ), + ]), + ); + + let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null); + if let Err(err) = writer.append(datum) { + panic!("An error occurred while writing datum: {:?}", err) + } + let bytes = writer.into_inner().unwrap(); + assert_eq!(316, bytes.len()); + + match Reader::new(&mut bytes.as_slice()) { + Ok(mut reader) => match reader.next() { + Some(value) => debug!("{:?}", value.unwrap()), + None => panic!("No value was read!"), + }, + Err(err) => panic!("An error occurred while reading datum: {:?}", err), + } +} + /* // TODO: (#93) add support for logical type and attributes and uncomment (may need some tweaks to compile) #[test]
