This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 34bc55870 AVRO-3631: [Rust] More efficient (de)serialization using
serde_bytes (#3027)
34bc55870 is described below
commit 34bc5587070152a97cf28cb6ebe4cd5c0b11ecc3
Author: Romain Leroux <[email protected]>
AuthorDate: Thu Jul 18 15:36:04 2024 +0200
AVRO-3631: [Rust] More efficient (de)serialization using serde_bytes (#3027)
* AVRO-3631: [Rust] More efficient (de)serialization using serde_bytes
* AVRO-3631: [Rust] Bump MSRV to 1.73.0
* Remove env var that is default since Rust 1.70.0
---------
Co-authored-by: Martin Grigorov <[email protected]>
(cherry picked from commit 17e7994bf0bd63938e7d3cb0f25ffbd2d51424b0)
---
.github/workflows/test-lang-rust-ci.yml | 3 +-
.github/workflows/test-lang-rust-clippy.yml | 2 +-
lang/rust/Cargo.lock | 10 +
lang/rust/Cargo.toml | 3 +-
lang/rust/avro/Cargo.toml | 1 +
lang/rust/avro/README.md | 2 +-
lang/rust/avro/src/bytes.rs | 320 ++++++++++++++++++++++++++++
lang/rust/avro/src/de.rs | 10 +-
lang/rust/avro/src/lib.rs | 6 +-
lang/rust/avro/src/ser.rs | 11 +-
lang/rust/avro/src/types.rs | 2 +-
share/docker/Dockerfile | 2 +-
12 files changed, 360 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/test-lang-rust-ci.yml
b/.github/workflows/test-lang-rust-ci.yml
index 81771c83c..cb86e11d5 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -31,7 +31,6 @@ permissions:
env:
RUSTFLAGS: -Dwarnings
- CARGO_REGISTRIES_CRATES_IO_PROTOCOL: 'git' # TODO: remove this env var once
MSRV is 1.70.0+
defaults:
run:
@@ -50,7 +49,7 @@ jobs:
- 'stable'
- 'beta'
- 'nightly'
- - '1.70.0' # MSRV
+ - '1.73.0' # MSRV
target:
- x86_64-unknown-linux-gnu
- wasm32-unknown-unknown
diff --git a/.github/workflows/test-lang-rust-clippy.yml
b/.github/workflows/test-lang-rust-clippy.yml
index 78b2d718d..5c0bbafa8 100644
--- a/.github/workflows/test-lang-rust-clippy.yml
+++ b/.github/workflows/test-lang-rust-clippy.yml
@@ -47,7 +47,7 @@ jobs:
matrix:
rust:
- 'stable'
- - '1.70.0' # MSRV
+ - '1.73.0' # MSRV
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
diff --git a/lang/rust/Cargo.lock b/lang/rust/Cargo.lock
index 8fce3d635..572e7bdc0 100644
--- a/lang/rust/Cargo.lock
+++ b/lang/rust/Cargo.lock
@@ -92,6 +92,7 @@ dependencies = [
"regex-lite",
"rstest",
"serde",
+ "serde_bytes",
"serde_json",
"serial_test",
"sha2",
@@ -1175,6 +1176,15 @@ dependencies = [
"serde_derive",
]
+[[package]]
+name = "serde_bytes"
+version = "0.11.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
+dependencies = [
+ "serde",
+]
+
[[package]]
name = "serde_derive"
version = "1.0.204"
diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index a6fd2ad48..a50201336 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -34,7 +34,7 @@ authors = ["Apache Avro team <[email protected]>"]
license = "Apache-2.0"
repository = "https://github.com/apache/avro"
edition = "2021"
-rust-version = "1.70.0"
+rust-version = "1.73.0"
keywords = ["avro", "data", "serialization"]
categories = ["encoding"]
documentation = "https://docs.rs/apache-avro"
@@ -43,6 +43,7 @@ documentation = "https://docs.rs/apache-avro"
[workspace.dependencies]
log = { default-features = false, version = "0.4.22" }
serde = { default-features = false, version = "1.0.204", features = ["derive"]
}
+serde_bytes = { default-features = false, version = "0.11.15", features =
["std"] }
serde_json = { default-features = false, version = "1.0.120", features =
["std"] }
[profile.release.package.hello-wasm]
diff --git a/lang/rust/avro/Cargo.toml b/lang/rust/avro/Cargo.toml
index 27728bcf6..df5874d4e 100644
--- a/lang/rust/avro/Cargo.toml
+++ b/lang/rust/avro/Cargo.toml
@@ -64,6 +64,7 @@ log = { workspace = true }
num-bigint = { default-features = false, version = "0.4.6", features = ["std",
"serde"] }
regex-lite = { default-features = false, version = "0.1.6", features = ["std",
"string"] }
serde = { workspace = true }
+serde_bytes = { workspace = true }
serde_json = { workspace = true }
snap = { default-features = false, version = "1.1.0", optional = true }
strum = { default-features = false, version = "0.26.3" }
diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md
index b47e18045..265635731 100644
--- a/lang/rust/avro/README.md
+++ b/lang/rust/avro/README.md
@@ -730,7 +730,7 @@ registered and used!
## Minimal supported Rust version
-1.70.0
+1.73.0
## License
This project is licensed under [Apache License
2.0](https://github.com/apache/avro/blob/master/LICENSE.txt).
diff --git a/lang/rust/avro/src/bytes.rs b/lang/rust/avro/src/bytes.rs
new file mode 100644
index 000000000..c301fc003
--- /dev/null
+++ b/lang/rust/avro/src/bytes.rs
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::Cell;
+
+thread_local! {
+ /// A thread local that is used to decide how to serialize Rust bytes into
an Avro
+ /// `types::Value` of type bytes.
+ ///
+ /// Relies on the fact that serde's serialization process is
single-threaded.
+ pub(crate) static SER_BYTES_TYPE: Cell<BytesType> = const {
Cell::new(BytesType::Bytes) };
+
+ /// A thread local that is used to decide how to deserialize an Avro
`types::Value`
+ /// of type bytes into Rust bytes.
+ ///
+ /// Relies on the fact that serde's deserialization process is
single-threaded.
+ pub(crate) static DE_BYTES_BORROWED: Cell<bool> = const { Cell::new(false)
};
+}
+
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum BytesType {
+ Bytes,
+ Fixed,
+}
+
+/// Efficient (de)serialization of Avro bytes values.
+///
+/// This module is intended to be used through the Serde `with` attribute. See
below
+/// example:
+///
+/// ```rust
+/// use apache_avro::{serde_avro_bytes, serde_avro_fixed};
+/// use serde::{Deserialize, Serialize};
+///
+/// #[derive(Serialize, Deserialize)]
+/// struct StructWithBytes {
+/// #[serde(with = "serde_avro_bytes")]
+/// vec_field: Vec<u8>,
+///
+/// #[serde(with = "serde_avro_fixed")]
+/// fixed_field: [u8; 6],
+/// }
+/// ```
+pub mod serde_avro_bytes {
+ use serde::{Deserializer, Serializer};
+
+ pub fn serialize<S: Serializer>(bytes: &[u8], serializer: S) ->
Result<S::Ok, S::Error> {
+ serde_bytes::serialize(bytes, serializer)
+ }
+
+ pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) ->
Result<Vec<u8>, D::Error> {
+ serde_bytes::deserialize(deserializer)
+ }
+}
+
+/// Efficient (de)serialization of Avro fixed values.
+///
+/// This module is intended to be used through the Serde `with` attribute. See
below
+/// example:
+///
+/// ```rust
+/// use apache_avro::{serde_avro_bytes, serde_avro_fixed};
+/// use serde::{Deserialize, Serialize};
+///
+/// #[derive(Serialize, Deserialize)]
+/// struct StructWithBytes {
+/// #[serde(with = "serde_avro_bytes")]
+/// vec_field: Vec<u8>,
+///
+/// #[serde(with = "serde_avro_fixed")]
+/// fixed_field: [u8; 6],
+/// }
+/// ```
+pub mod serde_avro_fixed {
+ use super::{BytesType, SER_BYTES_TYPE};
+ use serde::{Deserializer, Serializer};
+
+ pub fn serialize<S: Serializer>(bytes: &[u8], serializer: S) ->
Result<S::Ok, S::Error> {
+ SER_BYTES_TYPE.set(BytesType::Fixed);
+ let res = serde_bytes::serialize(bytes, serializer);
+ SER_BYTES_TYPE.set(BytesType::Bytes);
+ res
+ }
+
+ pub fn deserialize<'de, D: Deserializer<'de>, const N: usize>(
+ deserializer: D,
+ ) -> Result<[u8; N], D::Error> {
+ serde_bytes::deserialize(deserializer)
+ }
+}
+
+/// Efficient (de)serialization of Avro bytes/fixed borrowed values.
+///
+/// This module is intended to be used through the Serde `with` attribute.
Note that
+/// `bytes: &[u8]` are always serialized as
+/// [`Value::Bytes`](crate::types::Value::Bytes). However, both
+/// [`Value::Bytes`](crate::types::Value::Bytes) and
+/// [`Value::Fixed`](crate::types::Value::Fixed) can be deserialized as `bytes:
+/// &[u8]`. See below example:
+///
+/// ```rust
+/// use apache_avro::serde_avro_slice;
+/// use serde::{Deserialize, Serialize};
+///
+/// #[derive(Serialize, Deserialize)]
+/// struct StructWithBytes<'a> {
+/// #[serde(with = "serde_avro_slice")]
+/// slice_field: &'a [u8],
+/// }
+/// ```
+pub mod serde_avro_slice {
+ use super::DE_BYTES_BORROWED;
+ use serde::{Deserializer, Serializer};
+
+ pub fn serialize<S: Serializer>(bytes: &[u8], serializer: S) ->
Result<S::Ok, S::Error> {
+ serde_bytes::serialize(bytes, serializer)
+ }
+
+ pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) ->
Result<&'de [u8], D::Error> {
+ DE_BYTES_BORROWED.set(true);
+ let res = serde_bytes::deserialize(deserializer);
+ DE_BYTES_BORROWED.set(false);
+ res
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{from_value, to_value, types::Value, Schema};
+ use serde::{Deserialize, Serialize};
+
+ #[test]
+ fn avro_3631_validate_schema_for_struct_with_byte_types() {
+ #[derive(Debug, Serialize)]
+ struct TestStructWithBytes<'a> {
+ #[serde(with = "serde_avro_bytes")]
+ vec_field: Vec<u8>,
+
+ #[serde(with = "serde_avro_fixed")]
+ fixed_field: [u8; 6],
+
+ #[serde(with = "serde_avro_slice")]
+ slice_field: &'a [u8],
+ }
+
+ let test = TestStructWithBytes {
+ vec_field: vec![2, 3, 4],
+ fixed_field: [1; 6],
+ slice_field: &[1, 2, 3],
+ };
+ let value: Value = to_value(test).unwrap();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "TestStructFixedField",
+ "fields": [
+ {
+ "name": "vec_field",
+ "type": "bytes"
+ },
+ {
+ "name": "fixed_field",
+ "type": {
+ "name": "fixed_field",
+ "type": "fixed",
+ "size": 6
+ }
+ },
+ {
+ "name": "slice_field",
+ "type": "bytes"
+ }
+ ]
+ }
+ "#,
+ )
+ .unwrap();
+ assert!(value.validate(&schema));
+ }
+
+ #[test]
+ fn avro_3631_deserialize_value_to_struct_with_byte_types() {
+ #[derive(Debug, Deserialize, PartialEq)]
+ struct TestStructWithBytes<'a> {
+ #[serde(with = "serde_avro_bytes")]
+ vec_field: Vec<u8>,
+
+ #[serde(with = "serde_avro_fixed")]
+ fixed_field: [u8; 6],
+
+ #[serde(with = "serde_avro_slice")]
+ slice_field: &'a [u8],
+ #[serde(with = "serde_avro_slice")]
+ slice_field2: &'a [u8],
+ }
+
+ let expected = TestStructWithBytes {
+ vec_field: vec![3, 33],
+ fixed_field: [1; 6],
+ slice_field: &[1, 11, 111],
+ slice_field2: &[2, 22, 222],
+ };
+
+ let value = Value::Record(vec![
+ (
+ "vec_field".to_owned(),
+ Value::Bytes(expected.vec_field.clone()),
+ ),
+ (
+ "fixed_field".to_owned(),
+ Value::Fixed(expected.fixed_field.len(),
expected.fixed_field.to_vec()),
+ ),
+ (
+ "slice_field".to_owned(),
+ Value::Bytes(expected.slice_field.to_vec()),
+ ),
+ (
+ "slice_field2".to_owned(),
+ Value::Fixed(expected.slice_field2.len(),
expected.slice_field2.to_vec()),
+ ),
+ ]);
+ assert_eq!(expected, from_value(&value).unwrap());
+ }
+
+ #[test]
+ fn avro_3631_serialize_struct_to_value_with_byte_types() {
+ #[derive(Debug, Serialize)]
+ struct TestStructFixedField<'a> {
+ array_field: &'a [u8],
+
+ vec_field: Vec<u8>,
+ #[serde(with = "serde_avro_fixed")]
+ vec_field2: Vec<u8>,
+ #[serde(with = "serde_avro_bytes")]
+ vec_field3: Vec<u8>,
+
+ #[serde(with = "serde_avro_fixed")]
+ fixed_field: [u8; 6],
+ #[serde(with = "serde_avro_fixed")]
+ fixed_field2: &'a [u8],
+
+ #[serde(with = "serde_avro_bytes")]
+ bytes_field: &'a [u8],
+ #[serde(with = "serde_avro_bytes")]
+ bytes_field2: [u8; 6],
+ }
+
+ let test = TestStructFixedField {
+ array_field: &[1, 11, 111],
+ vec_field: vec![3, 33],
+ vec_field2: vec![4, 44],
+ vec_field3: vec![5, 55],
+ fixed_field: [1; 6],
+ fixed_field2: &[6, 66],
+ bytes_field: &[2, 22, 222],
+ bytes_field2: [2; 6],
+ };
+ let expected = Value::Record(vec![
+ (
+ "array_field".to_owned(),
+ Value::Array(
+ test.array_field
+ .iter()
+ .map(|&i| Value::Int(i as i32))
+ .collect(),
+ ),
+ ),
+ (
+ "vec_field".to_owned(),
+ Value::Array(
+ test.vec_field
+ .iter()
+ .map(|&i| Value::Int(i as i32))
+ .collect(),
+ ),
+ ),
+ (
+ "vec_field2".to_owned(),
+ Value::Fixed(2, test.vec_field2.clone()),
+ ),
+ (
+ "vec_field3".to_owned(),
+ Value::Bytes(test.vec_field3.clone()),
+ ),
+ (
+ "fixed_field".to_owned(),
+ Value::Fixed(6, test.fixed_field.to_vec()),
+ ),
+ (
+ "fixed_field2".to_owned(),
+ Value::Fixed(2, test.fixed_field2.to_vec()),
+ ),
+ (
+ "bytes_field".to_owned(),
+ Value::Bytes(test.bytes_field.to_vec()),
+ ),
+ (
+ "bytes_field2".to_owned(),
+ Value::Bytes(test.bytes_field2.to_vec()),
+ ),
+ ]);
+ assert_eq!(expected, to_value(test).unwrap());
+ }
+}
diff --git a/lang/rust/avro/src/de.rs b/lang/rust/avro/src/de.rs
index 4d90d88e8..2dcfc401a 100644
--- a/lang/rust/avro/src/de.rs
+++ b/lang/rust/avro/src/de.rs
@@ -16,7 +16,7 @@
// under the License.
//! Logic for serde-compatible deserialization.
-use crate::{types::Value, Error};
+use crate::{bytes::DE_BYTES_BORROWED, types::Value, Error};
use serde::{
de::{self, DeserializeSeed, Visitor},
forward_to_deserialize_any, Deserialize,
@@ -356,7 +356,13 @@ impl<'a, 'de> de::Deserializer<'de> for &'a
Deserializer<'de> {
{
match *self.input {
Value::String(ref s) => visitor.visit_bytes(s.as_bytes()),
- Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) =>
visitor.visit_bytes(bytes),
+ Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) => {
+ if DE_BYTES_BORROWED.get() {
+ visitor.visit_borrowed_bytes(bytes)
+ } else {
+ visitor.visit_bytes(bytes)
+ }
+ }
Value::Uuid(ref u) => visitor.visit_bytes(u.as_bytes()),
Value::Decimal(ref d) => visitor.visit_bytes(&d.to_vec()?),
_ => Err(de::Error::custom(format!(
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index 5fc24692b..c5b6beeac 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -841,6 +841,7 @@
//!
mod bigdecimal;
+mod bytes;
mod codec;
mod de;
mod decimal;
@@ -860,7 +861,10 @@ pub mod schema_equality;
pub mod types;
pub mod validator;
-pub use crate::bigdecimal::BigDecimal;
+pub use crate::{
+ bigdecimal::BigDecimal,
+ bytes::{serde_avro_bytes, serde_avro_fixed, serde_avro_slice},
+};
pub use codec::Codec;
pub use de::from_value;
pub use decimal::Decimal;
diff --git a/lang/rust/avro/src/ser.rs b/lang/rust/avro/src/ser.rs
index 9f05836ab..748e2d541 100644
--- a/lang/rust/avro/src/ser.rs
+++ b/lang/rust/avro/src/ser.rs
@@ -16,7 +16,11 @@
// under the License.
//! Logic for serde-compatible serialization.
-use crate::{types::Value, Error};
+use crate::{
+ bytes::{BytesType, SER_BYTES_TYPE},
+ types::Value,
+ Error,
+};
use serde::{ser, Serialize};
use std::{collections::HashMap, iter::once};
@@ -174,7 +178,10 @@ impl<'b> ser::Serializer for &'b mut Serializer {
}
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
- Ok(Value::Bytes(v.to_owned()))
+ match SER_BYTES_TYPE.get() {
+ BytesType::Bytes => Ok(Value::Bytes(v.to_owned())),
+ BytesType::Fixed => Ok(Value::Fixed(v.len(), v.to_owned())),
+ }
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index de6c875ef..d4d0f78c0 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -1853,7 +1853,7 @@ Field with name '"b"' is not a member of the map items"#,
{
"name": "event",
"type": [
- "null",
+ "null",
{
"type": "record",
"name": "event",
diff --git a/share/docker/Dockerfile b/share/docker/Dockerfile
index 6e9e1cb8d..81584b5f8 100644
--- a/share/docker/Dockerfile
+++ b/share/docker/Dockerfile
@@ -193,7 +193,7 @@ RUN gem install bundler -v 2.4.22 --no-document && \
cd /tmp/lang/ruby && bundle install
# Install Rust
-RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
--default-toolchain 1.70.0
+RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
--default-toolchain 1.73.0
# Note: This "ubertool" container has two JDK versions:
# - OpenJDK 8