This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 442e1b8d95 perf: optimize skipper for varint values used when
projecting Avro record types (#9397)
442e1b8d95 is described below
commit 442e1b8d952f5f15cc0922165e56a8f42bd1e716
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Mon Feb 16 13:27:44 2026 +0200
perf: optimize skipper for varint values used when projecting Avro record
types (#9397)
# Rationale for this change
The `Skipper` implementation, used to skip over unneeded fields when
projecting an Avro record type to a reader schema, delegates to the
`read_vlq` cursor method for variable-length integer types. Besides
checking the validity of the encoding, the decoding method performs
computations to obtain the value, which is discarded at the skipper call
site.
# What changes are included in this PR?
Provide a dedicated code path to skip over an encoded variable-length
integer, and use it to implement `Skipper` for the types that uses this
encoding.
# Are these changes tested?
A benchmark is added to evaluate the performance improvement.
It shows about 7% improvement in my testing on 11th Gen Intel Core
i5-1135G7.
# Are there any user-facing changes?
No
---
arrow-avro/Cargo.toml | 4 +
arrow-avro/benches/project_record.rs | 239 +++++++++++++++++++++++++++++++++++
arrow-avro/src/reader/cursor.rs | 29 ++++-
arrow-avro/src/reader/mod.rs | 191 ++++++++++++++++++++++------
arrow-avro/src/reader/record.rs | 6 +-
arrow-avro/src/reader/vlq.rs | 32 +++++
6 files changed, 453 insertions(+), 48 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 91633e6062..b7cd7eeb19 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -113,3 +113,7 @@ harness = false
[[bench]]
name = "encoder"
harness = false
+
+[[bench]]
+name = "project_record"
+harness = false
diff --git a/arrow-avro/benches/project_record.rs
b/arrow-avro/benches/project_record.rs
new file mode 100644
index 0000000000..9bddfea93b
--- /dev/null
+++ b/arrow-avro/benches/project_record.rs
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::types::Value;
+use apache_avro::{Schema as ApacheSchema, to_avro_datum};
+use arrow_avro::reader::{Decoder, ReaderBuilder};
+use arrow_avro::schema::{
+ AvroSchema, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC,
SchemaStore,
+};
+use criterion::{BatchSize, BenchmarkId, Criterion, Throughput};
+use criterion::{criterion_group, criterion_main};
+use rand::Rng;
+use rand::rngs::ThreadRng;
+use std::hint::black_box;
+
+const BATCH_SIZE: usize = 8192;
+
+const NUM_ROWS: usize = 10_000;
+
+fn make_prefix(fp: Fingerprint) -> Vec<u8> {
+ match fp {
+ Fingerprint::Rabin(val) => {
+ let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() +
size_of::<u64>());
+ buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
+ buf.extend_from_slice(&val.to_le_bytes()); // little-endian
+ buf
+ }
+ other => panic!("Unexpected fingerprint {other:?}"),
+ }
+}
+
+fn encode_records_with_prefix(
+ schema: &ApacheSchema,
+ prefix: &[u8],
+ rows: impl Iterator<Item = Value>,
+) -> Vec<u8> {
+ let mut out = Vec::new();
+ for v in rows {
+ out.extend_from_slice(prefix);
+ out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum
failed"));
+ }
+ out
+}
+
+fn gen_avro_data_with<F>(schema_json: &str, num_rows: usize, gen_fn: F) ->
Vec<u8>
+where
+ F: FnOnce(ThreadRng, &ApacheSchema, usize, &[u8]) -> Vec<u8>,
+{
+ let schema = ApacheSchema::parse_str(schema_json).expect("invalid schema
for generator");
+ let arrow_schema = AvroSchema::new(schema_json.to_owned());
+ let fingerprint = arrow_schema
+ .fingerprint(FingerprintAlgorithm::Rabin)
+ .expect("fingerprint failed");
+ let prefix = make_prefix(fingerprint);
+ gen_fn(rand::rng(), &schema, num_rows, &prefix)
+}
+
+fn gen_int(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) ->
Vec<u8> {
+ encode_records_with_prefix(
+ sc,
+ prefix,
+ (0..n).map(|i| {
+ Value::Record(vec![
+ ("id".into(), Value::Int(i as i32)),
+ ("field1".into(), Value::Int(rng.random())),
+ ])
+ }),
+ )
+}
+
+fn gen_long(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) ->
Vec<u8> {
+ encode_records_with_prefix(
+ sc,
+ prefix,
+ (0..n).map(|i| {
+ Value::Record(vec![
+ ("id".into(), Value::Int(i as i32)),
+ ("field1".into(), Value::Long(rng.random())),
+ ])
+ }),
+ )
+}
+
+fn gen_float(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) ->
Vec<u8> {
+ encode_records_with_prefix(
+ sc,
+ prefix,
+ (0..n).map(|i| {
+ Value::Record(vec![
+ ("id".into(), Value::Int(i as i32)),
+ ("field1".into(), Value::Float(rng.random())),
+ ])
+ }),
+ )
+}
+
+fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8])
-> Vec<u8> {
+ encode_records_with_prefix(
+ sc,
+ prefix,
+ (0..n).map(|i| {
+ Value::Record(vec![
+ ("id".into(), Value::Int(i as i32)),
+ ("field1".into(), Value::Double(rng.random())),
+ ])
+ }),
+ )
+}
+
+const READER_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"table",
+ "fields": [
+ { "name": "id", "type": "int" }
+ ]
+ }
+ "#;
+
+const INT_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"table",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "field1", "type": "int" }
+ ]
+ }
+ "#;
+
+const LONG_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"table",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "field1", "type": "long" }
+ ]
+ }
+ "#;
+
+const FLOAT_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"table",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "field1", "type": "float" }
+ ]
+ }
+ "#;
+
+const DOUBLE_SCHEMA: &str = r#"
+ {
+ "type":"record",
+ "name":"table",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "field1", "type": "double" }
+ ]
+ }
+ "#;
+
+fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
+ let schema = AvroSchema::new(schema_json.to_owned());
+ let mut store = SchemaStore::new();
+ store.register(schema).unwrap();
+ let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
+ ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .with_batch_size(batch_size)
+ .with_reader_schema(reader_schema)
+ .build_decoder()
+ .expect("failed to build decoder")
+}
+
+fn bench_with_decoder<F>(
+ c: &mut Criterion,
+ name: &str,
+ data: &[u8],
+ num_rows: usize,
+ mut new_decoder: F,
+) where
+ F: FnMut() -> Decoder,
+{
+ let mut group = c.benchmark_group(name);
+ group.throughput(Throughput::Bytes(data.len() as u64));
+ group.bench_function(BenchmarkId::from_parameter(num_rows), |b| {
+ b.iter_batched_ref(
+ &mut new_decoder,
+ |decoder| {
+ black_box(decoder.decode(data).unwrap());
+ black_box(decoder.flush().unwrap().unwrap());
+ },
+ BatchSize::SmallInput,
+ )
+ });
+ group.finish();
+}
+
+fn criterion_benches(c: &mut Criterion) {
+ let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
+ bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
+ new_decoder(INT_SCHEMA, BATCH_SIZE)
+ });
+ let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
+ bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
+ new_decoder(LONG_SCHEMA, BATCH_SIZE)
+ });
+ let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
+ bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
+ new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
+ });
+ let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
+ bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
+ new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
+ });
+}
+
+criterion_group! {
+ name = avro_project_record;
+ config = Criterion::default().configure_from_args();
+ targets = criterion_benches
+}
+criterion_main!(avro_project_record);
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index fb5eb66306..b1199dad20 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::errors::AvroError;
-use crate::reader::vlq::read_varint;
+use crate::reader::vlq;
/// A wrapper around a byte slice, providing low-level decoding for Avro
///
@@ -59,8 +59,8 @@ impl<'a> AvroCursor<'a> {
}
pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> {
- let (val, offset) =
- read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad
varint".to_string()))?;
+ let (val, offset) = vlq::read_varint(self.buf)
+ .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
self.buf = &self.buf[offset..];
Ok(val)
}
@@ -123,4 +123,27 @@ impl<'a> AvroCursor<'a> {
self.buf = &self.buf[n..];
Ok(ret)
}
+
+ pub(crate) fn skip_int(&mut self) -> Result<(), AvroError> {
+ let offset = vlq::skip_varint(self.buf)
+ .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
+ // Check if the skipped encoded value would fail a conversion to i32;
+ // skip_varint only cares about fitting in a 64-bit value.
+ match offset {
+ ..5 => {}
+ 5 if self.buf[4] < 0x10 => {}
+ _ => return Err(AvroError::ParseError("varint
overflow".to_owned())),
+ }
+ self.buf = &self.buf[offset..];
+ Ok(())
+ }
+
+ pub(crate) fn skip_long(&mut self) -> Result<(), AvroError> {
+ let offset = vlq::skip_varint(self.buf)
+ .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
+ // skip_varint invalidates encodings that are out of range for i64,
+ // so we are good.
+ self.buf = &self.buf[offset..];
+ Ok(())
+ }
}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index a60bc26b49..aa01f272bf 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -1401,6 +1401,7 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
#[cfg(test)]
mod test {
use crate::codec::AvroFieldBuilder;
+ use crate::reader::header::HeaderDecoder;
use crate::reader::record::RecordDecoder;
use crate::reader::{Decoder, Reader, ReaderBuilder};
use crate::schema::{
@@ -6823,6 +6824,154 @@ mod test {
);
}
+ #[test]
+ fn test_bad_varint_bug_nullable_array_items() {
+ use flate2::read::GzDecoder;
+ use std::io::Read;
+ let manifest_dir = env!("CARGO_MANIFEST_DIR");
+ let gz_path =
format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
+ let gz_file = File::open(&gz_path).expect("test file should exist");
+ let mut decoder = GzDecoder::new(gz_file);
+ let mut avro_bytes = Vec::new();
+ decoder
+ .read_to_end(&mut avro_bytes)
+ .expect("should decompress");
+ let reader_arrow_schema = Schema::new(vec![Field::new(
+ "int_array",
+ DataType::List(Arc::new(Field::new("element", DataType::Int32,
true))),
+ true,
+ )])
+ .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
+ let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
+ .expect("should convert Arrow schema to Avro");
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(Cursor::new(avro_bytes))
+ .expect("should build reader");
+ let batch = reader
+ .next()
+ .expect("should have one batch")
+ .expect("reading should succeed without bad varint error");
+ assert_eq!(batch.num_rows(), 1);
+ let list_col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .expect("should be ListArray");
+ assert_eq!(list_col.len(), 1);
+ let values = list_col.values();
+ let int_values = values.as_primitive::<Int32Type>();
+ assert_eq!(int_values.len(), 2);
+ assert_eq!(int_values.value(0), 1);
+ assert_eq!(int_values.value(1), 2);
+ }
+
+ fn corrupt_first_block_payload_byte(
+ mut bytes: Vec<u8>,
+ field_offset: usize,
+ expected_original: u8,
+ replacement: u8,
+ ) -> Vec<u8> {
+ let mut header_decoder = HeaderDecoder::default();
+ let header_len = header_decoder.decode(&bytes).expect("decode header");
+ assert!(header_decoder.flush().is_some(), "decode complete header");
+
+ let mut cursor = &bytes[header_len..];
+ let (_, count_len) =
crate::reader::vlq::read_varint(cursor).expect("decode block count");
+ cursor = &cursor[count_len..];
+ let (_, size_len) =
crate::reader::vlq::read_varint(cursor).expect("decode block size");
+ let data_start = header_len + count_len + size_len;
+ let target = data_start + field_offset;
+
+ assert!(
+ target < bytes.len(),
+ "target byte offset {target} out of bounds for input length {}",
+ bytes.len()
+ );
+ assert_eq!(
+ bytes[target], expected_original,
+ "unexpected original byte at payload offset {field_offset}"
+ );
+ bytes[target] = replacement;
+ bytes
+ }
+
+ #[test]
+ fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
+ // Writer row payload is [bad_long=i64::MIN][keep=7]. The first field
is encoded as
+ // 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02
creates an overflow
+ // varint that must fail.
+ let writer_schema = Schema::new(vec![
+ Field::new("bad_long", DataType::Int64, false),
+ Field::new("keep", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(writer_schema.clone()),
+ vec![
+ Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
+ Arc::new(Int32Array::from(vec![7])) as ArrayRef,
+ ],
+ )
+ .expect("build writer batch");
+ let bytes = write_ocf(&writer_schema, &[batch]);
+ let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);
+
+ let err = ReaderBuilder::new()
+ .build(Cursor::new(mutated.clone()))
+ .expect("build full reader")
+ .collect::<Result<Vec<_>, _>>()
+ .expect_err("full decode should reject malformed varint");
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("bad varint"));
+
+ let err = ReaderBuilder::new()
+ .with_projection(vec![1])
+ .build(Cursor::new(mutated))
+ .expect("build projected reader")
+ .collect::<Result<Vec<_>, _>>()
+ .expect_err("projection must also reject malformed skipped
varint");
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("bad varint"));
+ }
+
+ #[test]
+ fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
+ // Writer row payload is [bad_int=i32::MIN][keep=11]. The first field
encodes to
+ // ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid
varint, but now
+ // its value exceeds u32::MAX and must fail Int32 validation even when
projected out.
+ let writer_schema = Schema::new(vec![
+ Field::new("bad_int", DataType::Int32, false),
+ Field::new("keep", DataType::Int64, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(writer_schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
+ Arc::new(Int64Array::from(vec![11])) as ArrayRef,
+ ],
+ )
+ .expect("build writer batch");
+ let bytes = write_ocf(&writer_schema, &[batch]);
+ let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);
+
+ let err = ReaderBuilder::new()
+ .build(Cursor::new(mutated.clone()))
+ .expect("build full reader")
+ .collect::<Result<Vec<_>, _>>()
+ .expect_err("full decode should reject int overflow");
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("varint overflow"));
+
+ let err = ReaderBuilder::new()
+ .with_projection(vec![1])
+ .build(Cursor::new(mutated))
+ .expect("build projected reader")
+ .collect::<Result<Vec<_>, _>>()
+ .expect_err("projection must also reject skipped int overflow");
+ assert!(matches!(err, ArrowError::AvroError(_)));
+ assert!(err.to_string().contains("varint overflow"));
+ }
+
#[test]
fn comprehensive_e2e_test() {
let path = "test/data/comprehensive_e2e.avro";
@@ -9089,46 +9238,4 @@ mod test {
"entire RecordBatch mismatch (schema, all columns, all rows)"
);
}
-
- #[test]
- fn test_bad_varint_bug_nullable_array_items() {
- use flate2::read::GzDecoder;
- use std::io::Read;
- let manifest_dir = env!("CARGO_MANIFEST_DIR");
- let gz_path =
format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
- let gz_file = File::open(&gz_path).expect("test file should exist");
- let mut decoder = GzDecoder::new(gz_file);
- let mut avro_bytes = Vec::new();
- decoder
- .read_to_end(&mut avro_bytes)
- .expect("should decompress");
- let reader_arrow_schema = Schema::new(vec![Field::new(
- "int_array",
- DataType::List(Arc::new(Field::new("element", DataType::Int32,
true))),
- true,
- )])
- .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
- let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
- .expect("should convert Arrow schema to Avro");
- let mut reader = ReaderBuilder::new()
- .with_reader_schema(reader_schema)
- .build(Cursor::new(avro_bytes))
- .expect("should build reader");
- let batch = reader
- .next()
- .expect("should have one batch")
- .expect("reading should succeed without bad varint error");
- assert_eq!(batch.num_rows(), 1);
- let list_col = batch
- .column(0)
- .as_any()
- .downcast_ref::<ListArray>()
- .expect("should be ListArray");
- assert_eq!(list_col.len(), 1);
- let values = list_col.values();
- let int_values = values.as_primitive::<Int32Type>();
- assert_eq!(int_values.len(), 2);
- assert_eq!(int_values.value(0), 1);
- assert_eq!(int_values.value(1), 2);
- }
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 022789102a..7701eeea72 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -2467,7 +2467,7 @@ impl Skipper {
Ok(())
}
Self::Int32 => {
- buf.get_int()?;
+ buf.skip_int()?;
Ok(())
}
Self::Int64
@@ -2475,7 +2475,7 @@ impl Skipper {
| Self::TimestampMillis
| Self::TimestampMicros
| Self::TimestampNanos => {
- buf.get_long()?;
+ buf.skip_long()?;
Ok(())
}
Self::Float32 => {
@@ -2503,7 +2503,7 @@ impl Skipper {
Ok(())
}
Self::Enum => {
- buf.get_int()?;
+ buf.skip_int()?;
Ok(())
}
Self::DurationFixed12 => {
diff --git a/arrow-avro/src/reader/vlq.rs b/arrow-avro/src/reader/vlq.rs
index c0b471b466..26bf656159 100644
--- a/arrow-avro/src/reader/vlq.rs
+++ b/arrow-avro/src/reader/vlq.rs
@@ -97,6 +97,38 @@ fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> {
None
}
+pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> {
+ if let Some(array) = buf.get(..10) {
+ return skip_varint_array(array.try_into().unwrap());
+ }
+ skip_varint_slow(buf)
+}
+
+fn skip_varint_array(buf: [u8; 10]) -> Option<usize> {
+ // Using buf.into_iter().enumerate() regresses performance by 1% on x86-64
+ #[allow(clippy::needless_range_loop)]
+ for idx in 0..9 {
+ if buf[idx] < 0x80 {
+ return Some(idx + 1);
+ }
+ }
+ (buf[9] < 0x02).then_some(10)
+}
+
+#[cold]
+fn skip_varint_slow(buf: &[u8]) -> Option<usize> {
+ debug_assert!(
+ buf.len() < 10,
+ "should be only called on buffers too short for the fast path"
+ );
+ for (idx, &byte) in buf.iter().enumerate() {
+ if byte < 0x80 {
+ return Some(idx + 1);
+ }
+ }
+ None
+}
+
#[cfg(test)]
mod tests {
use super::*;