martin-g commented on code in PR #1672: URL: https://github.com/apache/avro/pull/1672#discussion_r858331400
########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; Review Comment: `final` ? ########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; + private static File SCHEMA_FILE = new File(inDir + "/test_schema.json"); + private static File MESSAGE_FILE = new File(inDir + "/test_message.bin"); + private static final Schema SCHEMA; + private static final GenericRecordBuilder BUILDER; + + static { + try { + SCHEMA = new Schema.Parser().parse(new FileInputStream(SCHEMA_FILE)); + BUILDER = new GenericRecordBuilder(SCHEMA); Review Comment: Let's use JUnit's before/after, because now FileInputStream is not closed at the end. ########## lang/rust/avro/src/writer.rs: ########## @@ -352,6 +353,110 @@ fn write_avro_datum<T: Into<Value>>( Ok(()) } +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec<u8>, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult<GenericSingleObjectWriter> { + let fingerprint = schema.fingerprint::<Rabin>(); + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = [ + 0xC3, + 0x01, + fingerprint.bytes[0], + fingerprint.bytes[1], + fingerprint.bytes[2], + fingerprint.bytes[3], + fingerprint.bytes[4], + fingerprint.bytes[5], + fingerprint.bytes[6], + fingerprint.bytes[7], + ]; + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + /// Wrtite the referenced Value to the provided Write object. Returns a result with the number of bytes writtern including the header Review Comment: ```suggestion /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header ``` ########## lang/rust/avro/src/writer.rs: ########## @@ -352,6 +353,110 @@ fn write_avro_datum<T: Into<Value>>( Ok(()) } +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec<u8>, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult<GenericSingleObjectWriter> { + let fingerprint = schema.fingerprint::<Rabin>(); + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = [ + 0xC3, + 0x01, + fingerprint.bytes[0], + fingerprint.bytes[1], + fingerprint.bytes[2], + fingerprint.bytes[3], + fingerprint.bytes[4], + fingerprint.bytes[5], + fingerprint.bytes[6], + fingerprint.bytes[7], + ]; + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + /// Wrtite the referenced Value to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { + if self.buffer.len() != 10 { + Err(Error::IllegalSingleObjectWriterState) + } else { + write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; + writer.write_all(&self.buffer).map_err(Error::WriteBytes)?; + let len = self.buffer.len(); + self.buffer.truncate(10); + Ok(len) + } + } + + /// Wrtite the Value to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> { + self.write_value_ref(&v, writer) + } +} + +/// Writer that encodes messages according to the single object encoding v1 spec +pub struct SingleObjectWriter<T> +where + T: AvroSchema, +{ + inner: GenericSingleObjectWriter, + _model: PhantomData<T>, +} + +impl<T> SingleObjectWriter<T> +where + T: AvroSchema, +{ + pub fn with_capacity(buffer_cap: usize) -> AvroResult<SingleObjectWriter<T>> { + let schema = T::get_schema(); + Ok(SingleObjectWriter { + inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?, + _model: PhantomData, + }) + } +} + +impl<T> SingleObjectWriter<T> +where + T: AvroSchema + Into<Value>, +{ + /// Wrtite the Into<Value> to the provided Write object. Returns a result with the number of bytes writtern including the header Review Comment: ```suggestion /// Write the Into<Value> to the provided Write object. Returns a result with the number of bytes written including the header ``` ########## lang/rust/avro/src/writer.rs: ########## @@ -352,6 +353,110 @@ fn write_avro_datum<T: Into<Value>>( Ok(()) } +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec<u8>, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult<GenericSingleObjectWriter> { + let fingerprint = schema.fingerprint::<Rabin>(); + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = [ + 0xC3, + 0x01, + fingerprint.bytes[0], + fingerprint.bytes[1], + fingerprint.bytes[2], + fingerprint.bytes[3], + fingerprint.bytes[4], + fingerprint.bytes[5], + fingerprint.bytes[6], + fingerprint.bytes[7], + ]; + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + /// Wrtite the referenced Value to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { + if self.buffer.len() != 10 { + Err(Error::IllegalSingleObjectWriterState) + } else { + write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; + writer.write_all(&self.buffer).map_err(Error::WriteBytes)?; + let len = self.buffer.len(); + self.buffer.truncate(10); + Ok(len) + } + } + + /// Wrtite the Value to the provided Write object. Returns a result with the number of bytes writtern including the header Review Comment: ```suggestion /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header ``` ########## lang/rust/avro/src/writer.rs: ########## @@ -943,4 +1069,123 @@ mod tests { assert_eq!(writer.user_metadata, user_meta_data); } + + #[derive(Serialize, Clone)] + struct TestSingleObjectWriter { + a: i64, + b: f64, + c: Vec<String>, + } + + impl AvroSchema for TestSingleObjectWriter { + fn get_schema() -> Schema { + let schema = r#" + { + "type":"record", + "name":"TestSingleObjectWrtierSerialize", + "fields":[ + { + "name":"a", + "type":"long" + }, + { + "name":"b", + "type":"double" + }, + { + "name":"c", + "type":{ + "type":"array", + "items":"string" + } + } + ] + } + "#; + Schema::parse_str(schema).unwrap() + } + } + + impl From<TestSingleObjectWriter> for Value { + fn from(obj: TestSingleObjectWriter) -> Value { + Value::Record(vec![ + ("a".into(), obj.a.into()), + ("b".into(), obj.b.into()), + ( + "c".into(), + Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), + ), + ]) + } + } + + #[test] + fn test_single_object_writer() { + let mut buf: Vec<u8> = Vec::new(); + let obj = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + let mut writer = GenericSingleObjectWriter::new_with_capacity( + &TestSingleObjectWriter::get_schema(), + 1024, + ) + .expect("Should resolve schema"); + let value = obj.into(); + let written_bytes = writer + .write_value_ref(&value, &mut buf) + .expect("Error serializing properly"); + + assert!(buf.len() > 10, "no bytes written"); + assert_eq!(buf.len(), written_bytes); + assert_eq!(buf[0], 0xC3); + assert_eq!(buf[1], 0x01); + assert_eq!( + &buf[2..10], + &TestSingleObjectWriter::get_schema() + .fingerprint::<Rabin>() + .bytes[..] + ); + let mut msg_binary = Vec::new(); + encode( + &value, + &TestSingleObjectWriter::get_schema(), + &mut msg_binary, + ) + .expect("encode should have failed by here as a depndency of any writing"); Review Comment: ```suggestion .expect("encode should have failed by here as a dependency of any writing"); ``` ########## lang/rust/avro/examples/test_interop_message_data.rs: ########## @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use apache_avro::{schema::AvroSchema, types::Value}; + +struct InteropMessage; + +impl AvroSchema for InteropMessage { + fn get_schema() -> apache_avro::Schema { + let schema = std::fs::read_to_string("../../share/test/data/messageV1/test_schema.json") + .expect("File should exist with schema inside"); + apache_avro::Schema::parse_str(schema.as_str()) + .expect("File should exist with schema inside") + } +} + +impl From<InteropMessage> for Value { Review Comment: Why an impl for Value ? Why not a proper struct with fields ? ########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; + private static File SCHEMA_FILE = new File(inDir + "/test_schema.json"); Review Comment: Usually schema files use extension `.avsc` ########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; + private static File SCHEMA_FILE = new File(inDir + "/test_schema.json"); Review Comment: `final` ########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; + private static File SCHEMA_FILE = new File(inDir + "/test_schema.json"); + private static File MESSAGE_FILE = new File(inDir + "/test_message.bin"); Review Comment: `final` ########## share/test/data/messageV1/README.md: ########## @@ -0,0 +1,45 @@ +BinaryMessage data in single object encoding https://avro.apache.org/docs/current/spec.html#single_object_encoding + +Ground truth data generated with Java Code Review Comment: There is no new code to run the Java test in CI. See `.github/workflows/test-lang-rust-ci.yml` ########## lang/java/avro/src/test/java/org/apache/avro/message/TestInteropMessageData.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.logging.Logger; + +public class TestInteropMessageData { + private static String inDir = System.getProperty("share.dir", "../../../share") + "/test/data/messageV1"; + private static File SCHEMA_FILE = new File(inDir + "/test_schema.json"); + private static File MESSAGE_FILE = new File(inDir + "/test_message.bin"); Review Comment: Usually the extension is `.avro` ########## lang/rust/avro/src/writer.rs: ########## @@ -352,6 +353,110 @@ fn write_avro_datum<T: Into<Value>>( Ok(()) } +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec<u8>, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult<GenericSingleObjectWriter> { + let fingerprint = schema.fingerprint::<Rabin>(); + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = [ + 0xC3, + 0x01, + fingerprint.bytes[0], + fingerprint.bytes[1], + fingerprint.bytes[2], + fingerprint.bytes[3], + fingerprint.bytes[4], + fingerprint.bytes[5], + fingerprint.bytes[6], + fingerprint.bytes[7], + ]; + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + /// Wrtite the referenced Value to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { + if self.buffer.len() != 10 { + Err(Error::IllegalSingleObjectWriterState) + } else { + write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; + writer.write_all(&self.buffer).map_err(Error::WriteBytes)?; + let len = self.buffer.len(); + self.buffer.truncate(10); + Ok(len) + } + } + + /// Wrtite the Value to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> { + self.write_value_ref(&v, writer) + } +} + +/// Writer that encodes messages according to the single object encoding v1 spec +pub struct SingleObjectWriter<T> +where + T: AvroSchema, +{ + inner: GenericSingleObjectWriter, + _model: PhantomData<T>, +} + +impl<T> SingleObjectWriter<T> +where + T: AvroSchema, +{ + pub fn with_capacity(buffer_cap: usize) -> AvroResult<SingleObjectWriter<T>> { + let schema = T::get_schema(); + Ok(SingleObjectWriter { + inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?, + _model: PhantomData, + }) + } +} + +impl<T> SingleObjectWriter<T> +where + T: AvroSchema + Into<Value>, +{ + /// Wrtite the Into<Value> to the provided Write object. Returns a result with the number of bytes writtern including the header + pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> { + let v: Value = data.into(); + self.inner.write_value_ref(&v, writer) + } +} + +impl<T> SingleObjectWriter<T> +where + T: AvroSchema + Serialize, +{ + /// Wrtite the referenced Serialize object to the provided Write object. Returns a result with the number of bytes writtern including the header Review Comment: ```suggestion /// Write the referenced Serialize object to the provided Write object. Returns a result with the number of bytes written including the header ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
