This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3630-append-to-preexisting-bytes in repository https://gitbox.apache.org/repos/asf/avro.git
commit fabb82b969d182ef315db60ff26d09efea4bd72b Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Tue Sep 27 15:43:36 2022 +0300 AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes Make it possible to pass a block marker to Writer, so that it could append to pre-existing bytes (i.e. bytes created by another Writer) Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/avro/src/lib.rs | 4 +- lang/rust/avro/src/writer.rs | 42 +++++++++++++-- lang/rust/avro/tests/append_to_existing.rs | 87 ++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 5 deletions(-) diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index 679eb1cf5..9edb3cb31 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -746,7 +746,9 @@ pub use reader::{from_avro_datum, GenericSingleObjectReader, Reader, SpecificSin pub use schema::{AvroSchema, Schema}; pub use ser::to_value; pub use util::max_allocation_bytes; -pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer}; +pub use writer::{ + read_marker, to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer, +}; #[cfg(feature = "derive")] pub use apache_avro_derive::*; diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 98ceafe46..83b58c927 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -47,7 +47,7 @@ pub struct Writer<'a, W> { serializer: Serializer, #[builder(default = 0, setter(skip))] num_values: usize, - #[builder(default = generate_sync_marker(), setter(skip))] + #[builder(default = generate_sync_marker())] marker: Vec<u8>, #[builder(default = false, setter(skip))] has_header: bool, @@ -60,9 +60,7 @@ impl<'a, W: Write> Writer<'a, W> { /// to. /// No compression `Codec` will be used. pub fn new(schema: &'a Schema, writer: W) -> Self { - let mut w = Self::builder().schema(schema).writer(writer).build(); - w.resolved_schema = ResolvedSchema::try_from(schema).ok(); - w + Writer::with_codec(schema, writer, Codec::Null) } /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the @@ -77,6 +75,33 @@ impl<'a, W: Write> Writer<'a, W> { w } + /// Creates a `Writer` that will append values to already populated + /// `std::io::Write` using the provided `marker` + /// No compression `Codec` will be used. + pub fn extend_from(schema: &'a Schema, writer: W, marker: Vec<u8>) -> Self { + Writer::extend_from_with_codec(schema, writer, Codec::Null, marker) + } + + /// Creates a `Writer` that will append values to already populated + /// `std::io::Write` using the provided `marker` + pub fn extend_from_with_codec( + schema: &'a Schema, + writer: W, + codec: Codec, + marker: Vec<u8>, + ) -> Self { + assert_eq!(marker.len(), 16); + let mut w = Self::builder() + .schema(schema) + .writer(writer) + .codec(codec) + .marker(marker) + .build(); + w.has_header = true; + w.resolved_schema = ResolvedSchema::try_from(schema).ok(); + w + } + /// Get a reference to the `Schema` associated to a `Writer`. pub fn schema(&self) -> &'a Schema { self.schema @@ -513,6 +538,15 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve Ok(buffer) } +/// Reads the marker bytes from Avro bytes generated earlier by a `Writer` +pub fn read_marker(bytes: &[u8]) -> Vec<u8> { + assert!( + bytes.len() > 16, + "The bytes are too short to read a marker from them" + ); + bytes[(bytes.len() - 16)..].to_vec() +} + #[cfg(not(target_arch = "wasm32"))] fn generate_sync_marker() -> Vec<u8> { std::iter::repeat_with(rand::random).take(16).collect() diff --git a/lang/rust/avro/tests/append_to_existing.rs b/lang/rust/avro/tests/append_to_existing.rs new file mode 100644 index 000000000..7d2a31733 --- /dev/null +++ b/lang/rust/avro/tests/append_to_existing.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use apache_avro::read_marker; +use apache_avro::{ + types::{Record, Value}, + AvroResult, Reader, Schema, Writer, +}; + +#[test] +fn append_to_an_existing_file() { + let schema_str = r#" + { + "type": "record", + "name": "append_to_existing_file", + "fields": [ + {"name": "a", "type": "int"} + ] + } + "#; + + let schema = Schema::parse_str(schema_str).expect("Cannot parse the schema"); + + let bytes = get_avro_bytes(&schema); + + let marker = read_marker(&bytes); + + let mut writer = Writer::extend_from(&schema, bytes, marker); + + writer + .append(create_datum(&schema, 2)) + .expect("An error occurred while appending more data"); + + let new_bytes = writer.into_inner().expect("Cannot get the new bytes"); + + let reader = Reader::new(&*new_bytes).expect("Cannot read the new bytes"); + let mut i = 1; + for value in reader { + check(value, i); + i += 1 + } +} + +/// Simulates reading from a pre-existing .avro file and returns its bytes +fn get_avro_bytes(schema: &Schema) -> Vec<u8> { + let mut writer = Writer::new(&schema, Vec::new()); + writer + .append(create_datum(&schema, 1)) + .expect("An error while appending data"); + let bytes = writer.into_inner().expect("Cannot get the Avro bytes"); + bytes +} + +/// Creates a new datum to write +fn create_datum(schema: &Schema, value: i32) -> Record { + let mut datum = Record::new(schema).unwrap(); + datum.put("a", value); + datum +} + +/// Checks the read values +fn check(v: AvroResult<Value>, expected: i32) { + match v { + Ok(val) => match val { + Value::Record(fields) => match &fields[0] { + (_, Value::Int(actual)) => assert_eq!(&expected, actual), + _ => unreachable!("The field value type must be an Int!"), + }, + _ => unreachable!("The value type must be an Record!"), + }, + Err(e) => eprintln!("Error white reading the data: {:?}", e), + } +}
