etseidl commented on code in PR #8956:
URL: https://github.com/apache/arrow-rs/pull/8956#discussion_r2593472408


##########
parquet/benches/parquet_round_trip.rs:
##########
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    ArrayRef, ArrowPrimitiveType, GenericStringArray, OffsetSizeTrait, 
PrimitiveArray, RecordBatch,
+};
+use arrow::datatypes::{DataType, Field, Float32Type, Float64Type, Int32Type, 
Int64Type, Schema};
+use arrow_array::FixedSizeBinaryArray;
+use bytes::Bytes;
+use criterion::{Criterion, criterion_group, criterion_main};
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, 
compute_leaves};
+use parquet::basic::Encoding;
+use parquet::file::properties::WriterProperties;
+use rand::{
+    Rng, SeedableRng,
+    distr::{Alphanumeric, Distribution, StandardUniform},
+    prelude::StdRng,
+};
+use std::sync::Arc;
+
+#[derive(Copy, Clone)]
+pub enum ColumnType {
+    Binary(usize),
+    FixedLen(i32),
+    Int32,
+    Int64,
+    Float,
+    Double,
+}
+
+/// Creates a [`PrimitiveArray`] of a given `size` and `null_density`
+/// filling it with random numbers generated using the provided `seed`.
+pub fn create_primitive_array_with_seed<T>(
+    size: usize,
+    null_density: f32,
+    seed: u64,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+    StandardUniform: Distribution<T::Native>,
+{
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    (0..size)
+        .map(|_| {
+            if rng.random::<f32>() < null_density {
+                None
+            } else {
+                Some(rng.random())
+            }
+        })
+        .collect()
+}
+
+/// Creates a random (but fixed-seeded) array of rand size with a given max 
size, null density and length
+pub fn create_string_array_with_max_len<Offset: OffsetSizeTrait>(
+    size: usize,
+    null_density: f32,
+    max_str_len: usize,
+    seed: u64,
+) -> GenericStringArray<Offset> {
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    let rng = &mut rng;
+    (0..size)
+        .map(|_| {
+            if rng.random::<f32>() < null_density {
+                None
+            } else {
+                let str_len = rng.random_range(max_str_len / 2..max_str_len);
+                let value = 
rng.sample_iter(&Alphanumeric).take(str_len).collect();
+                let value = String::from_utf8(value).unwrap();
+                Some(value)
+            }
+        })
+        .collect()
+}
+
+/// Creates a random (but fixed-seeded) array of fixed size with a given null 
density and length
+pub fn create_string_array_with_fixed_len<Offset: OffsetSizeTrait>(
+    size: usize,
+    null_density: f32,
+    fixed_len: i32,
+    seed: u64,
+) -> FixedSizeBinaryArray {
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    let rng = &mut rng;
+    FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+        (0..size).map(|_| {
+            if rng.random::<f32>() < null_density {
+                None
+            } else {
+                let value = rng
+                    .sample_iter::<u8, _>(StandardUniform)
+                    .take(fixed_len as usize)
+                    .collect::<Vec<u8>>();
+                Some(value)
+            }
+        }),
+        fixed_len,
+    )
+    .unwrap()
+}
+
+pub fn schema(column_type: ColumnType, num_columns: usize) -> Arc<Schema> {
+    let field_type = match column_type {
+        ColumnType::Binary(_) => DataType::Utf8,
+        ColumnType::FixedLen(size) => DataType::FixedSizeBinary(size),
+        ColumnType::Int32 => DataType::Int32,
+        ColumnType::Int64 => DataType::Int64,
+        ColumnType::Float => DataType::Float32,
+        ColumnType::Double => DataType::Float64,
+    };
+
+    let fields: Vec<Field> = (0..num_columns)
+        .map(|i| Field::new(format!("col_{i}"), field_type.clone(), true))
+        .collect();
+    Arc::new(Schema::new(fields))
+}
+
+pub fn create_batch(
+    schema: &Arc<Schema>,
+    column_type: ColumnType,
+    seed: usize,
+    num_columns: usize,
+    num_rows: usize,
+) -> RecordBatch {
+    let null_density = 0.0001;
+    let mut arrays: Vec<ArrayRef> = vec![];
+    match column_type {
+        ColumnType::Binary(max_str_len) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_string_array_with_max_len::<i32>(
+                    num_rows,
+                    null_density,
+                    max_str_len,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::FixedLen(size) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_string_array_with_fixed_len::<i32>(
+                    num_rows,
+                    null_density,
+                    size,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Int32 => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Int32Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Int64 => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Int64Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Float => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Float32Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Double => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Float64Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+    }
+    RecordBatch::try_new(schema.clone(), arrays).unwrap()
+}
+
+#[derive(Copy, Clone)]
+pub struct ParquetFileSpec {
+    column_type: ColumnType,
+    num_columns: usize,
+    num_row_groups: usize,
+    rows_per_row_group: usize,
+    rows_per_page: usize,
+    encoding: Encoding,
+    use_dict: bool,
+}
+
+const DEFAULT_NUM_COLUMNS: usize = 10;
+const DEFAULT_NUM_ROWGROUPS: usize = 10;
+const DEFAULT_ROWS_PER_PAGE: usize = 2_000;
+const DEFAULT_ROWS_PER_ROWGROUP: usize = 10_000;
+
+impl ParquetFileSpec {
+    pub fn new(column_type: ColumnType) -> Self {
+        Self {
+            column_type,
+            num_columns: DEFAULT_NUM_COLUMNS,
+            num_row_groups: DEFAULT_NUM_ROWGROUPS,
+            rows_per_row_group: DEFAULT_ROWS_PER_ROWGROUP,
+            rows_per_page: DEFAULT_ROWS_PER_PAGE,
+            encoding: Encoding::PLAIN,
+            use_dict: true,
+        }
+    }
+
+    pub fn with_num_columns(self, num_columns: usize) -> Self {
+        Self {
+            num_columns,
+            ..self
+        }
+    }
+
+    pub fn with_num_row_groups(self, num_row_groups: usize) -> Self {
+        Self {
+            num_row_groups,
+            ..self
+        }
+    }
+
+    pub fn with_rows_per_row_group(self, rows_per_row_group: usize) -> Self {
+        Self {
+            rows_per_row_group,
+            ..self
+        }
+    }
+
+    pub fn with_rows_per_page(self, rows_per_page: usize) -> Self {
+        Self {
+            rows_per_page,
+            ..self
+        }
+    }
+
+    pub fn with_encoding(self, encoding: Encoding) -> Self {
+        Self { encoding, ..self }
+    }
+
+    pub fn with_use_dict(self, use_dict: bool) -> Self {
+        Self { use_dict, ..self }
+    }
+}
+
+pub fn encode_row_group(
+    schema: &Arc<Schema>,
+    spec: &ParquetFileSpec,
+    mut column_writers: Vec<ArrowColumnWriter>,
+) -> Vec<ArrowColumnChunk> {
+    let num_rows = spec.rows_per_row_group.min(100);
+    // use the same batch repeatedly otherwise the data generation will 
dominate the time
+    let batch = create_batch(schema, spec.column_type, 0, spec.num_columns, 
num_rows);
+    let mut rows_written = 0;
+    while rows_written < spec.rows_per_row_group {
+        let rows_left = spec.rows_per_row_group - rows_written;
+        let batch = if rows_left < batch.num_rows() {
+            batch.slice(0, rows_left)
+        } else {
+            batch.clone()
+        };
+
+        let mut writers = column_writers.iter_mut();
+        for (field, column) in 
batch.schema().fields().iter().zip(batch.columns()) {
+            for leaf in compute_leaves(field.as_ref(), column).unwrap() {
+                writers.next().unwrap().write(&leaf).unwrap()
+            }
+        }
+        rows_written += batch.num_rows();
+    }
+
+    column_writers
+        .into_iter()
+        .map(|writer| writer.close().unwrap())
+        .collect()
+}
+
+pub fn file_from_spec(spec: ParquetFileSpec, buf_size: Option<usize>) -> Bytes 
{
+    let mut buf = Vec::<u8>::with_capacity(buf_size.unwrap_or(1_000_000));
+    let schema = schema(spec.column_type, spec.num_columns);
+    let props = WriterProperties::builder()
+        .set_max_row_group_size(spec.rows_per_row_group)
+        .set_data_page_row_count_limit(spec.rows_per_page)
+        .set_encoding(spec.encoding)
+        .set_dictionary_enabled(spec.use_dict)
+        .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+        .build();
+    {
+        let writer = ArrowWriter::try_new(&mut buf, schema.clone(), 
Some(props)).unwrap();
+        let (mut file_writer, row_group_factory) = 
writer.into_serialized_writer().unwrap();
+
+        for rg in 0..spec.num_row_groups {
+            let col_writers = 
row_group_factory.create_column_writers(rg).unwrap();
+
+            let encoded_columns = encode_row_group(&schema, &spec, 
col_writers);
+            let mut rg_writer = file_writer.next_row_group().unwrap();
+            for col_chunk in encoded_columns.into_iter() {
+                col_chunk.append_to_row_group(&mut rg_writer).unwrap();
+            }
+            rg_writer.close().unwrap();
+        }
+        file_writer.close().unwrap();
+    }
+
+    buf.into()
+}
+
+fn read_write(c: &mut Criterion, spec: ParquetFileSpec, msg: &str) {
+    let f = file_from_spec(spec, None);
+    let buf_size = Some(f.len());
+
+    c.bench_function(&format!("write {msg}"), |b| {
+        b.iter(|| file_from_spec(spec, buf_size))
+    });
+
+    c.bench_function(&format!("read {msg}"), |b| {
+        b.iter(|| {
+            let record_reader = 
ParquetRecordBatchReaderBuilder::try_new(f.clone())

Review Comment:
   Done in 
https://github.com/apache/arrow-rs/pull/8956/commits/f622a3d283f62dbeeea6655d7dfb164f30a83898



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to