This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 442e1b8d95 perf: optimize skipper for varint values used when 
projecting Avro record types (#9397)
442e1b8d95 is described below

commit 442e1b8d952f5f15cc0922165e56a8f42bd1e716
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Mon Feb 16 13:27:44 2026 +0200

    perf: optimize skipper for varint values used when projecting Avro record 
types (#9397)
    
    # Rationale for this change
    
    The `Skipper` implementation, used to skip over unneeded fields when
    projecting an Avro record type to a reader schema, delegates to the
    `read_vlq` cursor method for variable-length integer types. Besides
    checking the validity of the encoding, the decoding method performs
    computations to obtain the value, which is discarded at the skipper call
    site.
    
    # What changes are included in this PR?
    
    Provide a dedicated code path to skip over an encoded variable-length
    integer, and use it to implement `Skipper` for the types that uses this
    encoding.
    
    # Are these changes tested?
    
    A benchmark is added to evaluate the performance improvement.
    It shows about 7% improvement in my testing on 11th Gen Intel Core
    i5-1135G7.
    
    # Are there any user-facing changes?
    
    No
---
 arrow-avro/Cargo.toml                |   4 +
 arrow-avro/benches/project_record.rs | 239 +++++++++++++++++++++++++++++++++++
 arrow-avro/src/reader/cursor.rs      |  29 ++++-
 arrow-avro/src/reader/mod.rs         | 191 ++++++++++++++++++++++------
 arrow-avro/src/reader/record.rs      |   6 +-
 arrow-avro/src/reader/vlq.rs         |  32 +++++
 6 files changed, 453 insertions(+), 48 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 91633e6062..b7cd7eeb19 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -113,3 +113,7 @@ harness = false
 [[bench]]
 name = "encoder"
 harness = false
+
+[[bench]]
+name = "project_record"
+harness = false
diff --git a/arrow-avro/benches/project_record.rs 
b/arrow-avro/benches/project_record.rs
new file mode 100644
index 0000000000..9bddfea93b
--- /dev/null
+++ b/arrow-avro/benches/project_record.rs
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::types::Value;
+use apache_avro::{Schema as ApacheSchema, to_avro_datum};
+use arrow_avro::reader::{Decoder, ReaderBuilder};
+use arrow_avro::schema::{
+    AvroSchema, Fingerprint, FingerprintAlgorithm, SINGLE_OBJECT_MAGIC, 
SchemaStore,
+};
+use criterion::{BatchSize, BenchmarkId, Criterion, Throughput};
+use criterion::{criterion_group, criterion_main};
+use rand::Rng;
+use rand::rngs::ThreadRng;
+use std::hint::black_box;
+
+const BATCH_SIZE: usize = 8192;
+
+const NUM_ROWS: usize = 10_000;
+
+fn make_prefix(fp: Fingerprint) -> Vec<u8> {
+    match fp {
+        Fingerprint::Rabin(val) => {
+            let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + 
size_of::<u64>());
+            buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
+            buf.extend_from_slice(&val.to_le_bytes()); // little-endian
+            buf
+        }
+        other => panic!("Unexpected fingerprint {other:?}"),
+    }
+}
+
+fn encode_records_with_prefix(
+    schema: &ApacheSchema,
+    prefix: &[u8],
+    rows: impl Iterator<Item = Value>,
+) -> Vec<u8> {
+    let mut out = Vec::new();
+    for v in rows {
+        out.extend_from_slice(prefix);
+        out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum 
failed"));
+    }
+    out
+}
+
+fn gen_avro_data_with<F>(schema_json: &str, num_rows: usize, gen_fn: F) -> 
Vec<u8>
+where
+    F: FnOnce(ThreadRng, &ApacheSchema, usize, &[u8]) -> Vec<u8>,
+{
+    let schema = ApacheSchema::parse_str(schema_json).expect("invalid schema 
for generator");
+    let arrow_schema = AvroSchema::new(schema_json.to_owned());
+    let fingerprint = arrow_schema
+        .fingerprint(FingerprintAlgorithm::Rabin)
+        .expect("fingerprint failed");
+    let prefix = make_prefix(fingerprint);
+    gen_fn(rand::rng(), &schema, num_rows, &prefix)
+}
+
+fn gen_int(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> 
Vec<u8> {
+    encode_records_with_prefix(
+        sc,
+        prefix,
+        (0..n).map(|i| {
+            Value::Record(vec![
+                ("id".into(), Value::Int(i as i32)),
+                ("field1".into(), Value::Int(rng.random())),
+            ])
+        }),
+    )
+}
+
+fn gen_long(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> 
Vec<u8> {
+    encode_records_with_prefix(
+        sc,
+        prefix,
+        (0..n).map(|i| {
+            Value::Record(vec![
+                ("id".into(), Value::Int(i as i32)),
+                ("field1".into(), Value::Long(rng.random())),
+            ])
+        }),
+    )
+}
+
+fn gen_float(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) -> 
Vec<u8> {
+    encode_records_with_prefix(
+        sc,
+        prefix,
+        (0..n).map(|i| {
+            Value::Record(vec![
+                ("id".into(), Value::Int(i as i32)),
+                ("field1".into(), Value::Float(rng.random())),
+            ])
+        }),
+    )
+}
+
+fn gen_double(mut rng: impl Rng, sc: &ApacheSchema, n: usize, prefix: &[u8]) 
-> Vec<u8> {
+    encode_records_with_prefix(
+        sc,
+        prefix,
+        (0..n).map(|i| {
+            Value::Record(vec![
+                ("id".into(), Value::Int(i as i32)),
+                ("field1".into(), Value::Double(rng.random())),
+            ])
+        }),
+    )
+}
+
+const READER_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"table",
+        "fields": [
+            { "name": "id", "type": "int" }
+        ]
+    }
+    "#;
+
+const INT_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"table",
+        "fields": [
+            { "name": "id", "type": "int" },
+            { "name": "field1", "type": "int" }
+        ]
+    }
+    "#;
+
+const LONG_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"table",
+        "fields": [
+            { "name": "id", "type": "int" },
+            { "name": "field1", "type": "long" }
+        ]
+    }
+    "#;
+
+const FLOAT_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"table",
+        "fields": [
+            { "name": "id", "type": "int" },
+            { "name": "field1", "type": "float" }
+        ]
+    }
+    "#;
+
+const DOUBLE_SCHEMA: &str = r#"
+    {
+        "type":"record",
+        "name":"table",
+        "fields": [
+            { "name": "id", "type": "int" },
+            { "name": "field1", "type": "double" }
+        ]
+    }
+    "#;
+
+fn new_decoder(schema_json: &'static str, batch_size: usize) -> Decoder {
+    let schema = AvroSchema::new(schema_json.to_owned());
+    let mut store = SchemaStore::new();
+    store.register(schema).unwrap();
+    let reader_schema = AvroSchema::new(READER_SCHEMA.to_owned());
+    ReaderBuilder::new()
+        .with_writer_schema_store(store)
+        .with_batch_size(batch_size)
+        .with_reader_schema(reader_schema)
+        .build_decoder()
+        .expect("failed to build decoder")
+}
+
+fn bench_with_decoder<F>(
+    c: &mut Criterion,
+    name: &str,
+    data: &[u8],
+    num_rows: usize,
+    mut new_decoder: F,
+) where
+    F: FnMut() -> Decoder,
+{
+    let mut group = c.benchmark_group(name);
+    group.throughput(Throughput::Bytes(data.len() as u64));
+    group.bench_function(BenchmarkId::from_parameter(num_rows), |b| {
+        b.iter_batched_ref(
+            &mut new_decoder,
+            |decoder| {
+                black_box(decoder.decode(data).unwrap());
+                black_box(decoder.flush().unwrap().unwrap());
+            },
+            BatchSize::SmallInput,
+        )
+    });
+    group.finish();
+}
+
+fn criterion_benches(c: &mut Criterion) {
+    let data = gen_avro_data_with(INT_SCHEMA, NUM_ROWS, gen_int);
+    bench_with_decoder(c, "skip_int", &data, NUM_ROWS, || {
+        new_decoder(INT_SCHEMA, BATCH_SIZE)
+    });
+    let data = gen_avro_data_with(LONG_SCHEMA, NUM_ROWS, gen_long);
+    bench_with_decoder(c, "skip_long", &data, NUM_ROWS, || {
+        new_decoder(LONG_SCHEMA, BATCH_SIZE)
+    });
+    let data = gen_avro_data_with(FLOAT_SCHEMA, NUM_ROWS, gen_float);
+    bench_with_decoder(c, "skip_float", &data, NUM_ROWS, || {
+        new_decoder(FLOAT_SCHEMA, BATCH_SIZE)
+    });
+    let data = gen_avro_data_with(DOUBLE_SCHEMA, NUM_ROWS, gen_double);
+    bench_with_decoder(c, "skip_double", &data, NUM_ROWS, || {
+        new_decoder(DOUBLE_SCHEMA, BATCH_SIZE)
+    });
+}
+
+criterion_group! {
+    name = avro_project_record;
+    config = Criterion::default().configure_from_args();
+    targets = criterion_benches
+}
+criterion_main!(avro_project_record);
diff --git a/arrow-avro/src/reader/cursor.rs b/arrow-avro/src/reader/cursor.rs
index fb5eb66306..b1199dad20 100644
--- a/arrow-avro/src/reader/cursor.rs
+++ b/arrow-avro/src/reader/cursor.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::errors::AvroError;
-use crate::reader::vlq::read_varint;
+use crate::reader::vlq;
 
 /// A wrapper around a byte slice, providing low-level decoding for Avro
 ///
@@ -59,8 +59,8 @@ impl<'a> AvroCursor<'a> {
     }
 
     pub(crate) fn read_vlq(&mut self) -> Result<u64, AvroError> {
-        let (val, offset) =
-            read_varint(self.buf).ok_or_else(|| AvroError::ParseError("bad 
varint".to_string()))?;
+        let (val, offset) = vlq::read_varint(self.buf)
+            .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
         self.buf = &self.buf[offset..];
         Ok(val)
     }
@@ -123,4 +123,27 @@ impl<'a> AvroCursor<'a> {
         self.buf = &self.buf[n..];
         Ok(ret)
     }
+
+    pub(crate) fn skip_int(&mut self) -> Result<(), AvroError> {
+        let offset = vlq::skip_varint(self.buf)
+            .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
+        // Check if the skipped encoded value would fail a conversion to i32;
+        // skip_varint only cares about fitting in a 64-bit value.
+        match offset {
+            ..5 => {}
+            5 if self.buf[4] < 0x10 => {}
+            _ => return Err(AvroError::ParseError("varint 
overflow".to_owned())),
+        }
+        self.buf = &self.buf[offset..];
+        Ok(())
+    }
+
+    pub(crate) fn skip_long(&mut self) -> Result<(), AvroError> {
+        let offset = vlq::skip_varint(self.buf)
+            .ok_or_else(|| AvroError::ParseError("bad varint".to_string()))?;
+        // skip_varint invalidates encodings that are out of range for i64,
+        // so we are good.
+        self.buf = &self.buf[offset..];
+        Ok(())
+    }
 }
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index a60bc26b49..aa01f272bf 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -1401,6 +1401,7 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
 #[cfg(test)]
 mod test {
     use crate::codec::AvroFieldBuilder;
+    use crate::reader::header::HeaderDecoder;
     use crate::reader::record::RecordDecoder;
     use crate::reader::{Decoder, Reader, ReaderBuilder};
     use crate::schema::{
@@ -6823,6 +6824,154 @@ mod test {
         );
     }
 
+    #[test]
+    fn test_bad_varint_bug_nullable_array_items() {
+        use flate2::read::GzDecoder;
+        use std::io::Read;
+        let manifest_dir = env!("CARGO_MANIFEST_DIR");
+        let gz_path = 
format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
+        let gz_file = File::open(&gz_path).expect("test file should exist");
+        let mut decoder = GzDecoder::new(gz_file);
+        let mut avro_bytes = Vec::new();
+        decoder
+            .read_to_end(&mut avro_bytes)
+            .expect("should decompress");
+        let reader_arrow_schema = Schema::new(vec![Field::new(
+            "int_array",
+            DataType::List(Arc::new(Field::new("element", DataType::Int32, 
true))),
+            true,
+        )])
+        .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
+        let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
+            .expect("should convert Arrow schema to Avro");
+        let mut reader = ReaderBuilder::new()
+            .with_reader_schema(reader_schema)
+            .build(Cursor::new(avro_bytes))
+            .expect("should build reader");
+        let batch = reader
+            .next()
+            .expect("should have one batch")
+            .expect("reading should succeed without bad varint error");
+        assert_eq!(batch.num_rows(), 1);
+        let list_col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<ListArray>()
+            .expect("should be ListArray");
+        assert_eq!(list_col.len(), 1);
+        let values = list_col.values();
+        let int_values = values.as_primitive::<Int32Type>();
+        assert_eq!(int_values.len(), 2);
+        assert_eq!(int_values.value(0), 1);
+        assert_eq!(int_values.value(1), 2);
+    }
+
+    fn corrupt_first_block_payload_byte(
+        mut bytes: Vec<u8>,
+        field_offset: usize,
+        expected_original: u8,
+        replacement: u8,
+    ) -> Vec<u8> {
+        let mut header_decoder = HeaderDecoder::default();
+        let header_len = header_decoder.decode(&bytes).expect("decode header");
+        assert!(header_decoder.flush().is_some(), "decode complete header");
+
+        let mut cursor = &bytes[header_len..];
+        let (_, count_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block count");
+        cursor = &cursor[count_len..];
+        let (_, size_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block size");
+        let data_start = header_len + count_len + size_len;
+        let target = data_start + field_offset;
+
+        assert!(
+            target < bytes.len(),
+            "target byte offset {target} out of bounds for input length {}",
+            bytes.len()
+        );
+        assert_eq!(
+            bytes[target], expected_original,
+            "unexpected original byte at payload offset {field_offset}"
+        );
+        bytes[target] = replacement;
+        bytes
+    }
+
+    #[test]
+    fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
+        // Writer row payload is [bad_long=i64::MIN][keep=7]. The first field 
is encoded as
+        // 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 
creates an overflow
+        // varint that must fail.
+        let writer_schema = Schema::new(vec![
+            Field::new("bad_long", DataType::Int64, false),
+            Field::new("keep", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(writer_schema.clone()),
+            vec![
+                Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
+                Arc::new(Int32Array::from(vec![7])) as ArrayRef,
+            ],
+        )
+        .expect("build writer batch");
+        let bytes = write_ocf(&writer_schema, &[batch]);
+        let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);
+
+        let err = ReaderBuilder::new()
+            .build(Cursor::new(mutated.clone()))
+            .expect("build full reader")
+            .collect::<Result<Vec<_>, _>>()
+            .expect_err("full decode should reject malformed varint");
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("bad varint"));
+
+        let err = ReaderBuilder::new()
+            .with_projection(vec![1])
+            .build(Cursor::new(mutated))
+            .expect("build projected reader")
+            .collect::<Result<Vec<_>, _>>()
+            .expect_err("projection must also reject malformed skipped 
varint");
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("bad varint"));
+    }
+
+    #[test]
+    fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
+        // Writer row payload is [bad_int=i32::MIN][keep=11]. The first field 
encodes to
+        // ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid 
varint, but now
+        // its value exceeds u32::MAX and must fail Int32 validation even when 
projected out.
+        let writer_schema = Schema::new(vec![
+            Field::new("bad_int", DataType::Int32, false),
+            Field::new("keep", DataType::Int64, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(writer_schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
+                Arc::new(Int64Array::from(vec![11])) as ArrayRef,
+            ],
+        )
+        .expect("build writer batch");
+        let bytes = write_ocf(&writer_schema, &[batch]);
+        let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);
+
+        let err = ReaderBuilder::new()
+            .build(Cursor::new(mutated.clone()))
+            .expect("build full reader")
+            .collect::<Result<Vec<_>, _>>()
+            .expect_err("full decode should reject int overflow");
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("varint overflow"));
+
+        let err = ReaderBuilder::new()
+            .with_projection(vec![1])
+            .build(Cursor::new(mutated))
+            .expect("build projected reader")
+            .collect::<Result<Vec<_>, _>>()
+            .expect_err("projection must also reject skipped int overflow");
+        assert!(matches!(err, ArrowError::AvroError(_)));
+        assert!(err.to_string().contains("varint overflow"));
+    }
+
     #[test]
     fn comprehensive_e2e_test() {
         let path = "test/data/comprehensive_e2e.avro";
@@ -9089,46 +9238,4 @@ mod test {
             "entire RecordBatch mismatch (schema, all columns, all rows)"
         );
     }
-
-    #[test]
-    fn test_bad_varint_bug_nullable_array_items() {
-        use flate2::read::GzDecoder;
-        use std::io::Read;
-        let manifest_dir = env!("CARGO_MANIFEST_DIR");
-        let gz_path = 
format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
-        let gz_file = File::open(&gz_path).expect("test file should exist");
-        let mut decoder = GzDecoder::new(gz_file);
-        let mut avro_bytes = Vec::new();
-        decoder
-            .read_to_end(&mut avro_bytes)
-            .expect("should decompress");
-        let reader_arrow_schema = Schema::new(vec![Field::new(
-            "int_array",
-            DataType::List(Arc::new(Field::new("element", DataType::Int32, 
true))),
-            true,
-        )])
-        .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
-        let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
-            .expect("should convert Arrow schema to Avro");
-        let mut reader = ReaderBuilder::new()
-            .with_reader_schema(reader_schema)
-            .build(Cursor::new(avro_bytes))
-            .expect("should build reader");
-        let batch = reader
-            .next()
-            .expect("should have one batch")
-            .expect("reading should succeed without bad varint error");
-        assert_eq!(batch.num_rows(), 1);
-        let list_col = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<ListArray>()
-            .expect("should be ListArray");
-        assert_eq!(list_col.len(), 1);
-        let values = list_col.values();
-        let int_values = values.as_primitive::<Int32Type>();
-        assert_eq!(int_values.len(), 2);
-        assert_eq!(int_values.value(0), 1);
-        assert_eq!(int_values.value(1), 2);
-    }
 }
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 022789102a..7701eeea72 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -2467,7 +2467,7 @@ impl Skipper {
                 Ok(())
             }
             Self::Int32 => {
-                buf.get_int()?;
+                buf.skip_int()?;
                 Ok(())
             }
             Self::Int64
@@ -2475,7 +2475,7 @@ impl Skipper {
             | Self::TimestampMillis
             | Self::TimestampMicros
             | Self::TimestampNanos => {
-                buf.get_long()?;
+                buf.skip_long()?;
                 Ok(())
             }
             Self::Float32 => {
@@ -2503,7 +2503,7 @@ impl Skipper {
                 Ok(())
             }
             Self::Enum => {
-                buf.get_int()?;
+                buf.skip_int()?;
                 Ok(())
             }
             Self::DurationFixed12 => {
diff --git a/arrow-avro/src/reader/vlq.rs b/arrow-avro/src/reader/vlq.rs
index c0b471b466..26bf656159 100644
--- a/arrow-avro/src/reader/vlq.rs
+++ b/arrow-avro/src/reader/vlq.rs
@@ -97,6 +97,38 @@ fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> {
     None
 }
 
+pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> {
+    if let Some(array) = buf.get(..10) {
+        return skip_varint_array(array.try_into().unwrap());
+    }
+    skip_varint_slow(buf)
+}
+
+fn skip_varint_array(buf: [u8; 10]) -> Option<usize> {
+    // Using buf.into_iter().enumerate() regresses performance by 1% on x86-64
+    #[allow(clippy::needless_range_loop)]
+    for idx in 0..9 {
+        if buf[idx] < 0x80 {
+            return Some(idx + 1);
+        }
+    }
+    (buf[9] < 0x02).then_some(10)
+}
+
+#[cold]
+fn skip_varint_slow(buf: &[u8]) -> Option<usize> {
+    debug_assert!(
+        buf.len() < 10,
+        "should be only called on buffers too short for the fast path"
+    );
+    for (idx, &byte) in buf.iter().enumerate() {
+        if byte < 0x80 {
+            return Some(idx + 1);
+        }
+    }
+    None
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to