This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/53.0.0-dev by this push:
new 2e7f7ef9b6 Revert "Revert "Write Bloom filters between row groups
instead of the end (#…" (#5933)
2e7f7ef9b6 is described below
commit 2e7f7ef9b68f69e8707d01dbbf5ad0e57f190979
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jul 16 18:30:13 2024 -0400
Revert "Revert "Write Bloom filters between row groups instead of the end
(#…" (#5933)
This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
---
parquet/Cargo.toml | 8 +++
parquet/examples/write_parquet.rs | 131 ++++++++++++++++++++++++++++++++++
parquet/src/arrow/arrow_writer/mod.rs | 28 +++++++-
parquet/src/arrow/async_writer/mod.rs | 4 +-
parquet/src/file/metadata/mod.rs | 5 ++
parquet/src/file/properties.rs | 36 ++++++++++
parquet/src/file/writer.rs | 117 ++++++++++++++++++------------
7 files changed, 277 insertions(+), 52 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 2cc12a81de..7391d09646 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
+sysinfo = { version = "0.30.12", optional = true, default-features = false }
[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -114,12 +115,19 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
+# Display memory in example/write_parquet.rs
+sysinfo = ["dep:sysinfo"]
[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"
+[[example]]
+name = "write_parquet"
+required-features = ["cli", "sysinfo"]
+path = "./examples/write_parquet.rs"
+
[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
diff --git a/parquet/examples/write_parquet.rs
b/parquet/examples/write_parquet.rs
new file mode 100644
index 0000000000..d2ef550df8
--- /dev/null
+++ b/parquet/examples/write_parquet.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use arrow::array::{StructArray, UInt64Builder};
+use arrow::datatypes::DataType::UInt64;
+use arrow::datatypes::{Field, Schema};
+use clap::{Parser, ValueEnum};
+use parquet::arrow::ArrowWriter as ParquetWriter;
+use parquet::basic::Encoding;
+use parquet::errors::Result;
+use parquet::file::properties::{BloomFilterPosition, WriterProperties};
+use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
+
+#[derive(ValueEnum, Clone)]
+enum BloomFilterPositionArg {
+ End,
+ AfterRowGroup,
+}
+
+#[derive(Parser)]
+#[command(version)]
+/// Writes sequences of integers, with a Bloom Filter, while logging timing
and memory usage.
+struct Args {
+ #[arg(long, default_value_t = 1000)]
+ /// Number of batches to write
+ iterations: u64,
+
+ #[arg(long, default_value_t = 1000000)]
+ /// Number of rows in each batch
+ batch: u64,
+
+ #[arg(long, value_enum,
default_value_t=BloomFilterPositionArg::AfterRowGroup)]
+ /// Where to write Bloom Filters
+ bloom_filter_position: BloomFilterPositionArg,
+
+ /// Path to the file to write
+ path: PathBuf,
+}
+
+fn now() -> String {
+ chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
+}
+
+fn mem(system: &mut System) -> String {
+ let pid = Pid::from(std::process::id() as usize);
+ system.refresh_process_specifics(pid,
ProcessRefreshKind::new().with_memory());
+ system
+ .process(pid)
+ .map(|proc| format!("{}MB", proc.memory() / 1_000_000))
+ .unwrap_or("N/A".to_string())
+}
+
+fn main() -> Result<()> {
+ let args = Args::parse();
+
+ let bloom_filter_position = match args.bloom_filter_position {
+ BloomFilterPositionArg::End => BloomFilterPosition::End,
+ BloomFilterPositionArg::AfterRowGroup =>
BloomFilterPosition::AfterRowGroup,
+ };
+
+ let properties = WriterProperties::builder()
+ .set_column_bloom_filter_enabled("id".into(), true)
+ .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
+ .set_bloom_filter_position(bloom_filter_position)
+ .build();
+ let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
+ // Create parquet file that will be read.
+ let file = File::create(args.path).unwrap();
+ let mut writer = ParquetWriter::try_new(file, schema.clone(),
Some(properties))?;
+
+ let mut system =
+
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
+ eprintln!(
+ "{} Writing {} batches of {} rows. RSS = {}",
+ now(),
+ args.iterations,
+ args.batch,
+ mem(&mut system)
+ );
+
+ let mut array_builder = UInt64Builder::new();
+ let mut last_log = Instant::now();
+ for i in 0..args.iterations {
+ if Instant::now() - last_log > Duration::new(10, 0) {
+ last_log = Instant::now();
+ eprintln!(
+ "{} Iteration {}/{}. RSS = {}",
+ now(),
+ i + 1,
+ args.iterations,
+ mem(&mut system)
+ );
+ }
+ for j in 0..args.batch {
+ array_builder.append_value(i + j);
+ }
+ writer.write(
+ &StructArray::new(
+ schema.fields().clone(),
+ vec![Arc::new(array_builder.finish())],
+ None,
+ )
+ .into(),
+ )?;
+ }
+ writer.flush()?;
+ writer.close()?;
+
+ eprintln!("{} Done. RSS = {}", now(), mem(&mut system));
+
+ Ok(())
+}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index cf46f3b64a..070d740094 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ColumnChunkMetaData, KeyValue,
RowGroupMetaDataPtr};
+use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
@@ -204,7 +204,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}
/// Returns metadata for any flushed row groups
- pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+ pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}
@@ -1097,7 +1097,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
- use crate::file::properties::{EnabledStatistics, ReaderProperties,
WriterVersion};
+ use crate::file::properties::{
+ BloomFilterPosition, EnabledStatistics, ReaderProperties,
WriterVersion,
+ };
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
@@ -1745,6 +1747,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
+ bloom_filter_position: BloomFilterPosition,
}
impl RoundTripOptions {
@@ -1755,6 +1758,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
+ bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
}
@@ -1774,6 +1778,7 @@ mod tests {
values,
schema,
bloom_filter,
+ bloom_filter_position,
} = options;
let encodings = match values.data_type() {
@@ -1814,6 +1819,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
+ .set_bloom_filter_position(bloom_filter_position)
.build();
files.push(roundtrip_opts(&expected_batch, props))
@@ -2171,6 +2177,22 @@ mod tests {
values_required::<BinaryViewArray, _>(many_vecs_iter);
}
+ #[test]
+ fn i32_column_bloom_filter_at_end() {
+ let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
+ let mut options = RoundTripOptions::new(array, false);
+ options.bloom_filter = true;
+ options.bloom_filter_position = BloomFilterPosition::End;
+
+ let files = one_column_roundtrip_with_options(options);
+ check_bloom_filter(
+ files,
+ "col".to_string(),
+ (0..SMALL_SIZE as i32).collect(),
+ (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+ );
+ }
+
#[test]
fn i32_column_bloom_filter() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index edeb0fec00..274d8fef89 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -54,7 +54,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
- file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
+ file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
@@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
}
/// Returns metadata for any flushed row groups
- pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+ pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 278d1e464e..39b6de41fa 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -358,6 +358,11 @@ impl RowGroupMetaData {
&self.columns
}
+ /// Returns mutable slice of column chunk metadata.
+ pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
+ &mut self.columns
+ }
+
/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 654b5e23f9..8454625ff7 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics =
EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// Default value for [`WriterProperties::bloom_filter_position`]
+pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition =
BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ",
env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
@@ -88,6 +90,24 @@ impl FromStr for WriterVersion {
}
}
+/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter)
should
+/// write Bloom filters
+///
+/// Basic constant, which is not part of the Thrift definition.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum BloomFilterPosition {
+ /// Write Bloom Filters of each row group right after the row group
+ ///
+ /// This saves memory by writing it as soon as it is computed, at the cost
+ /// of data locality for readers
+ AfterRowGroup,
+ /// Write Bloom Filters at the end of the file
+ ///
+ /// This allows better data locality for readers, at the cost of memory
usage
+ /// for writers.
+ End,
+}
+
/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;
@@ -132,6 +152,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
+ bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
@@ -219,6 +240,11 @@ impl WriterProperties {
self.max_row_group_size
}
+ /// Returns maximum number of rows in a row group.
+ pub fn bloom_filter_position(&self) -> BloomFilterPosition {
+ self.bloom_filter_position
+ }
+
/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
@@ -340,6 +366,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
+ bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
@@ -359,6 +386,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
+ bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
@@ -378,6 +406,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
+ bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
@@ -489,6 +518,12 @@ impl WriterPropertiesBuilder {
self
}
+ /// Sets where in the final file Bloom Filters are written (default
`AfterRowGroup`)
+ pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) ->
Self {
+ self.bloom_filter_position = value;
+ self
+ }
+
/// Sets "created by" property (defaults to `parquet-rs version
<VERSION>`).
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
@@ -1054,6 +1089,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
+ assert_eq!(props.bloom_filter_position(),
DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 7806384cdb..eb633f31c4 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -34,8 +34,9 @@ use crate::column::{
};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
+use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
use crate::file::reader::ChunkReader;
-use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
+use crate::file::{metadata::*, PARQUET_MAGIC};
use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr,
SchemaDescriptor, TypePtr};
/// A wrapper around a [`Write`] that keeps track of the number
@@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box<dyn
FnOnce(ColumnCloseResult) -> Result<()
/// - the row group metadata
/// - the column index for each column chunk
/// - the offset index for each column chunk
-pub type OnCloseRowGroup<'a> = Box<
+pub type OnCloseRowGroup<'a, W> = Box<
dyn FnOnce(
- RowGroupMetaDataPtr,
+ &'a mut TrackedWrite<W>,
+ RowGroupMetaData,
Vec<Option<Sbbf>>,
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
@@ -143,7 +145,7 @@ pub struct SerializedFileWriter<W: Write> {
schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
- row_groups: Vec<RowGroupMetaDataPtr>,
+ row_groups: Vec<RowGroupMetaData>,
bloom_filters: Vec<Vec<Option<Sbbf>>>,
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
@@ -197,18 +199,29 @@ impl<W: Write + Send> SerializedFileWriter<W> {
self.row_group_index += 1;
+ let bloom_filter_position = self.properties().bloom_filter_position();
let row_groups = &mut self.row_groups;
let row_bloom_filters = &mut self.bloom_filters;
let row_column_indexes = &mut self.column_indexes;
let row_offset_indexes = &mut self.offset_indexes;
- let on_close =
- |metadata, row_group_bloom_filter, row_group_column_index,
row_group_offset_index| {
- row_groups.push(metadata);
- row_bloom_filters.push(row_group_bloom_filter);
- row_column_indexes.push(row_group_column_index);
- row_offset_indexes.push(row_group_offset_index);
- Ok(())
+ let on_close = move |buf,
+ mut metadata,
+ row_group_bloom_filter,
+ row_group_column_index,
+ row_group_offset_index| {
+ row_bloom_filters.push(row_group_bloom_filter);
+ row_column_indexes.push(row_group_column_index);
+ row_offset_indexes.push(row_group_offset_index);
+ // write bloom filters out immediately after the row group if
requested
+ match bloom_filter_position {
+ BloomFilterPosition::AfterRowGroup => {
+ write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
+ }
+ BloomFilterPosition::End => (),
};
+ row_groups.push(metadata);
+ Ok(())
+ };
let row_group_writer = SerializedRowGroupWriter::new(
self.descr.clone(),
@@ -221,7 +234,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}
/// Returns metadata for any flushed row groups
- pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+ pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}
@@ -273,34 +286,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}
- /// Serialize all the bloom filter to the file
- fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
- // iter row group
- // iter each column
- // write bloom filter to the file
- for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
- for (column_idx, column_chunk) in
row_group.columns.iter_mut().enumerate() {
- match &self.bloom_filters[row_group_idx][column_idx] {
- Some(bloom_filter) => {
- let start_offset = self.buf.bytes_written();
- bloom_filter.write(&mut self.buf)?;
- let end_offset = self.buf.bytes_written();
- // set offset and index for bloom filter
- let column_chunk_meta = column_chunk
- .meta_data
- .as_mut()
- .expect("can't have bloom filter without column
metadata");
- column_chunk_meta.bloom_filter_offset =
Some(start_offset as i64);
- column_chunk_meta.bloom_filter_length =
- Some((end_offset - start_offset) as i32);
- }
- None => {}
- }
- }
- }
- Ok(())
- }
-
/// Serialize all the column index to the file
fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
// iter row group
@@ -331,6 +316,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
self.finished = true;
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
+ // write out any remaining bloom filters after all row groups
+ for row_group in &mut self.row_groups {
+ write_bloom_filters(&mut self.buf, &mut self.bloom_filters,
row_group)?;
+ }
+
let mut row_groups = self
.row_groups
.as_slice()
@@ -338,7 +328,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();
- self.write_bloom_filters(&mut row_groups)?;
// Write column indexes and offset indexes
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
@@ -443,6 +432,40 @@ impl<W: Write + Send> SerializedFileWriter<W> {
}
}
+/// Serialize all the bloom filters of the given row group to the given buffer,
+/// and returns the updated row group metadata.
+fn write_bloom_filters<W: Write + Send>(
+ buf: &mut TrackedWrite<W>,
+ bloom_filters: &mut [Vec<Option<Sbbf>>],
+ row_group: &mut RowGroupMetaData,
+) -> Result<()> {
+ // iter row group
+ // iter each column
+ // write bloom filter to the file
+
+ let row_group_idx: u16 = row_group
+ .ordinal()
+ .expect("Missing row group ordinal")
+ .try_into()
+ .expect("Negative row group ordinal");
+ let row_group_idx = row_group_idx as usize;
+ for (column_idx, column_chunk) in
row_group.columns_mut().iter_mut().enumerate() {
+ if let Some(bloom_filter) =
bloom_filters[row_group_idx][column_idx].take() {
+ let start_offset = buf.bytes_written();
+ bloom_filter.write(&mut *buf)?;
+ let end_offset = buf.bytes_written();
+ // set offset and index for bloom filter
+ *column_chunk = column_chunk
+ .clone()
+ .into_builder()
+ .set_bloom_filter_offset(Some(start_offset as i64))
+ .set_bloom_filter_length(Some((end_offset - start_offset) as
i32))
+ .build()?;
+ }
+ }
+ Ok(())
+}
+
/// Parquet row group writer API.
/// Provides methods to access column writers in an iterator-like fashion,
order is
/// guaranteed to match the order of schema leaves (column descriptors).
@@ -468,7 +491,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
offset_indexes: Vec<Option<OffsetIndex>>,
row_group_index: i16,
file_offset: i64,
- on_close: Option<OnCloseRowGroup<'a>>,
+ on_close: Option<OnCloseRowGroup<'a, W>>,
}
impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
@@ -485,7 +508,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
properties: WriterPropertiesPtr,
buf: &'a mut TrackedWrite<W>,
row_group_index: i16,
- on_close: Option<OnCloseRowGroup<'a>>,
+ on_close: Option<OnCloseRowGroup<'a, W>>,
) -> Self {
let num_columns = schema_descr.num_columns();
let file_offset = buf.bytes_written() as i64;
@@ -669,12 +692,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W>
{
.set_file_offset(self.file_offset)
.build()?;
- let metadata = Arc::new(row_group_metadata);
- self.row_group_metadata = Some(metadata.clone());
+ self.row_group_metadata =
Some(Arc::new(row_group_metadata.clone()));
if let Some(on_close) = self.on_close.take() {
on_close(
- metadata,
+ self.buf,
+ row_group_metadata,
self.bloom_filters,
self.column_indexes,
self.offset_indexes,
@@ -1446,7 +1469,7 @@ mod tests {
assert_eq!(flushed.len(), idx + 1);
assert_eq!(Some(idx as i16), last_group.ordinal());
assert_eq!(Some(row_group_file_offset as i64),
last_group.file_offset());
- assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
+ assert_eq!(&flushed[idx], last_group.as_ref());
}
let file_metadata = file_writer.close().unwrap();