This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c47d14ce9 add column index and offset index (#1935)
c47d14ce9 is described below
commit c47d14ce9bdd91d68661e67a843d60e2c5799061
Author: Kun Liu <[email protected]>
AuthorDate: Thu Jun 30 17:51:30 2022 +0800
add column index and offset index (#1935)
---
parquet/src/column/writer.rs | 172 +++++++++++++++++++++++++++++++++---
parquet/src/file/metadata.rs | 106 ++++++++++++++++++++++-
parquet/src/file/writer.rs | 201 +++++++++++++++++++++++++++++++++++--------
3 files changed, 427 insertions(+), 52 deletions(-)
diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index d80cafe0e..d589aef5a 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -16,6 +16,7 @@
// under the License.
//! Contains column writer API.
+use parquet_format::{ColumnIndex, OffsetIndex};
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType,
PageType, Type};
@@ -29,6 +30,7 @@ use crate::encodings::{
levels::{max_buffer_size, LevelEncoder},
};
use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
@@ -162,6 +164,14 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T:
DataType>(
})
}
+type ColumnCloseResult = (
+ u64,
+ u64,
+ ColumnChunkMetaData,
+ Option<ColumnIndex>,
+ Option<OffsetIndex>,
+);
+
/// Typed column writer for a primitive column.
pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
@@ -198,6 +208,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,
_phantom: PhantomData<T>,
+ // column index and offset index
+ column_index_builder: ColumnIndexBuilder,
+ offset_index_builder: OffsetIndexBuilder,
}
impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
@@ -261,6 +274,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
num_column_nulls: 0,
column_distinct_count: None,
_phantom: PhantomData,
+ column_index_builder: ColumnIndexBuilder::new(),
+ offset_index_builder: OffsetIndexBuilder::new(),
}
}
@@ -416,7 +431,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
/// Finalises writes and closes the column writer.
/// Returns total bytes written, total rows written and column chunk
metadata.
- pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> {
+ pub fn close(mut self) -> Result<ColumnCloseResult> {
if self.dict_encoder.is_some() {
self.write_dictionary_page()?;
}
@@ -425,7 +440,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.dict_encoder = None;
self.page_writer.close()?;
- Ok((self.total_bytes_written, self.total_rows_written, metadata))
+ let (column_index, offset_index) = if
self.column_index_builder.valid() {
+ // build the column and offset index
+ let column_index = self.column_index_builder.build_to_thrift();
+ let offset_index = self.offset_index_builder.build_to_thrift();
+ (Some(column_index), Some(offset_index))
+ } else {
+ (None, None)
+ };
+
+ Ok((
+ self.total_bytes_written,
+ self.total_rows_written,
+ metadata,
+ column_index,
+ offset_index,
+ ))
}
/// Writes mini batch of values, definition and repetition levels.
@@ -593,6 +623,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
Ok(())
}
+ /// Update the column index and offset index when adding the data page
+ fn update_column_offset_index(&mut self, page_statistics:
&Option<Statistics>) {
+ // update the column index
+ let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
+ // a page contains only null values,
+ // and writers have to set the corresponding entries in min_values and
max_values to byte[0]
+ if null_page && self.column_index_builder.valid() {
+ self.column_index_builder.append(
+ null_page,
+ &[0; 1],
+ &[0; 1],
+ self.num_page_nulls as i64,
+ );
+ } else if self.column_index_builder.valid() {
+ // from page statistics
+ // If can't get the page statistics, ignore this column/offset
index for this column chunk
+ match &page_statistics {
+ None => {
+ self.column_index_builder.to_invalid();
+ }
+ Some(stat) => {
+ self.column_index_builder.append(
+ null_page,
+ stat.min_bytes(),
+ stat.max_bytes(),
+ self.num_page_nulls as i64,
+ );
+ }
+ }
+ }
+
+ // update the offset index
+ self.offset_index_builder
+ .append_row_count(self.num_buffered_rows as i64);
+ }
+
/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written
directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
@@ -622,6 +688,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
None
};
+ // update column and offset index
+ self.update_column_offset_index(&page_statistics);
+
let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
let mut buffer = vec![];
@@ -700,8 +769,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
buf: ByteBufferPtr::new(buffer),
num_values: self.num_buffered_values,
encoding,
- num_nulls: self.num_buffered_values
- - self.num_buffered_encoded_values,
+ num_nulls: self.num_page_nulls as u32,
num_rows: self.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
@@ -830,6 +898,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
+ // update offset index
+ // compressed_size = header_size + compressed_data_size
+ self.offset_index_builder.append_offset_and_size(
+ page_spec.offset as i64,
+ page_spec.compressed_size as i32,
+ );
self.update_metrics_for_page(page_spec);
Ok(())
}
@@ -865,6 +939,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let page_spec = self.page_writer.write_page(compressed_page)?;
self.update_metrics_for_page(page_spec);
+ // For the directory page, don't need to update column/offset index.
Ok(())
}
@@ -1133,6 +1208,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
#[cfg(test)]
mod tests {
+ use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;
@@ -1256,7 +1332,7 @@ mod tests {
.write_batch(&[true, false, true, false], None, None)
.unwrap();
- let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+ let (bytes_written, rows_written, metadata, _, _) =
writer.close().unwrap();
// PlainEncoder uses bit writer to write boolean values, which all fit
into 1
// byte.
assert_eq!(bytes_written, 1);
@@ -1529,7 +1605,7 @@ mod tests {
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
- let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+ let (bytes_written, rows_written, metadata, _, _) =
writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
@@ -1586,7 +1662,7 @@ mod tests {
None,
)
.unwrap();
- let (_bytes_written, _rows_written, metadata) =
writer.close().unwrap();
+ let (_bytes_written, _rows_written, metadata, _, _) =
writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
@@ -1620,7 +1696,7 @@ mod tests {
Int32Type,
>(page_writer, 0, 0, props);
writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
- let (_bytes_written, _rows_written, metadata) =
writer.close().unwrap();
+ let (_bytes_written, _rows_written, metadata, _, _) =
writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
@@ -1651,7 +1727,7 @@ mod tests {
)
.unwrap();
- let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+ let (bytes_written, rows_written, metadata, _, _) =
writer.close().unwrap();
assert_eq!(bytes_written, 20);
assert_eq!(rows_written, 4);
assert_eq!(
@@ -1835,7 +1911,7 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer.write_batch(data, None, None).unwrap();
- let (bytes_written, _, _) = writer.close().unwrap();
+ let (bytes_written, _, _, _, _) = writer.close().unwrap();
// Read pages and check the sequence
let source = FileSource::new(&file, 0, bytes_written as usize);
@@ -2068,6 +2144,75 @@ mod tests {
),);
}
+ #[test]
+ fn test_column_offset_index_metadata() {
+ // write data
+ // and check the offset index and column index
+ let page_writer = get_test_page_writer();
+ let props = Arc::new(WriterProperties::builder().build());
+ let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
+ writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+ // first page
+ writer.flush_data_pages().unwrap();
+ // second page
+ writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+ let (_, rows_written, metadata, column_index, offset_index) =
+ writer.close().unwrap();
+ let column_index = match column_index {
+ None => {
+ panic!("Can't fine the column index");
+ }
+ Some(column_index) => column_index,
+ };
+ let offset_index = match offset_index {
+ None => {
+ panic!("Can't find the offset index");
+ }
+ Some(offset_index) => offset_index,
+ };
+
+ assert_eq!(8, rows_written);
+
+ // column index
+ assert_eq!(2, column_index.null_pages.len());
+ assert_eq!(2, offset_index.page_locations.len());
+ assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+ for idx in 0..2 {
+ assert!(!column_index.null_pages[idx]);
+ assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
+ }
+
+ if let Some(stats) = metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::Int32(stats) = stats {
+ // first page is [1,2,3,4]
+ // second page is [-5,2,4,8]
+ assert_eq!(stats.min_bytes(),
column_index.min_values[1].as_slice());
+ assert_eq!(
+ stats.max_bytes(),
+ column_index.max_values.get(1).unwrap().as_slice()
+ );
+ } else {
+ panic!("expecting Statistics::Int32");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+
+ // page location
+ assert_eq!(
+ 0,
+ offset_index.page_locations.get(0).unwrap().first_row_index
+ );
+ assert_eq!(
+ 4,
+ offset_index.page_locations.get(1).unwrap().first_row_index
+ );
+ }
+
/// Performs write-read roundtrip with randomly generated values and
levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` >
0) to write
/// for a column.
@@ -2149,7 +2294,8 @@ mod tests {
let values_written = writer.write_batch(values, def_levels,
rep_levels).unwrap();
assert_eq!(values_written, values.len());
- let (bytes_written, rows_written, column_metadata) =
writer.close().unwrap();
+ let (bytes_written, rows_written, column_metadata, _, _) =
+ writer.close().unwrap();
let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
@@ -2215,7 +2361,7 @@ mod tests {
let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();
- let (_, _, metadata) = writer.close().unwrap();
+ let (_, _, metadata, _, _) = writer.close().unwrap();
metadata
}
@@ -2327,7 +2473,7 @@ mod tests {
let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();
- let (_bytes_written, _rows_written, metadata) =
writer.close().unwrap();
+ let (_bytes_written, _rows_written, metadata, _, _) =
writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
stats.clone()
} else {
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 4d9842c0e..7ec29de01 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -35,7 +35,10 @@
use std::sync::Arc;
-use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
+use parquet_format::{
+ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex,
PageLocation,
+ RowGroup,
+};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
@@ -794,6 +797,107 @@ impl ColumnChunkMetaDataBuilder {
}
}
+/// Builder for column index
+pub struct ColumnIndexBuilder {
+ null_pages: Vec<bool>,
+ min_values: Vec<Vec<u8>>,
+ max_values: Vec<Vec<u8>>,
+ // TODO: calc the order for all pages in this column
+ boundary_order: BoundaryOrder,
+ null_counts: Vec<i64>,
+ // If one page can't get build index, need to ignore all index in this
column
+ valid: bool,
+}
+
+impl ColumnIndexBuilder {
+ pub fn new() -> Self {
+ ColumnIndexBuilder {
+ null_pages: Vec::new(),
+ min_values: Vec::new(),
+ max_values: Vec::new(),
+ boundary_order: BoundaryOrder::Unordered,
+ null_counts: Vec::new(),
+ valid: true,
+ }
+ }
+
+ pub fn append(
+ &mut self,
+ null_page: bool,
+ min_value: &[u8],
+ max_value: &[u8],
+ null_count: i64,
+ ) {
+ self.null_pages.push(null_page);
+ self.min_values.push(min_value.to_vec());
+ self.max_values.push(max_value.to_vec());
+ self.null_counts.push(null_count);
+ }
+
+ pub fn to_invalid(&mut self) {
+ self.valid = false;
+ }
+
+ pub fn valid(&self) -> bool {
+ self.valid
+ }
+
+ /// Build and get the thrift metadata of column index
+ pub fn build_to_thrift(self) -> ColumnIndex {
+ ColumnIndex::new(
+ self.null_pages,
+ self.min_values,
+ self.max_values,
+ self.boundary_order,
+ self.null_counts,
+ )
+ }
+}
+
+/// Builder for offset index
+pub struct OffsetIndexBuilder {
+ offset_array: Vec<i64>,
+ compressed_page_size_array: Vec<i32>,
+ first_row_index_array: Vec<i64>,
+ current_first_row_index: i64,
+}
+
+impl OffsetIndexBuilder {
+ pub fn new() -> Self {
+ OffsetIndexBuilder {
+ offset_array: Vec::new(),
+ compressed_page_size_array: Vec::new(),
+ first_row_index_array: Vec::new(),
+ current_first_row_index: 0,
+ }
+ }
+
+ pub fn append_row_count(&mut self, row_count: i64) {
+ let current_page_row_index = self.current_first_row_index;
+ self.first_row_index_array.push(current_page_row_index);
+ self.current_first_row_index += row_count;
+ }
+
+ pub fn append_offset_and_size(&mut self, offset: i64,
compressed_page_size: i32) {
+ self.offset_array.push(offset);
+ self.compressed_page_size_array.push(compressed_page_size);
+ }
+
+ /// Build and get the thrift metadata of offset index
+ pub fn build_to_thrift(self) -> OffsetIndex {
+ let locations = self
+ .offset_array
+ .iter()
+ .zip(self.compressed_page_size_array.iter())
+ .zip(self.first_row_index_array.iter())
+ .map(|((offset, size), row_index)| {
+ PageLocation::new(*offset, *size, *row_index)
+ })
+ .collect::<Vec<_>>();
+ OffsetIndex::new(locations)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index b503c264d..10983c741 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc};
use byteorder::{ByteOrder, LittleEndian};
use parquet_format as parquet;
+use parquet_format::{ColumnIndex, OffsetIndex, RowGroup};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use crate::basic::PageType;
@@ -78,14 +79,33 @@ impl<W: Write> Write for TrackedWrite<W> {
/// - the number of bytes written
/// - the number of rows written
/// - the column chunk metadata
+/// - the column index
+/// - the offset index
///
-pub type OnCloseColumnChunk<'a> =
- Box<dyn FnOnce(u64, u64, ColumnChunkMetaData) -> Result<()> + 'a>;
+pub type OnCloseColumnChunk<'a> = Box<
+ dyn FnOnce(
+ u64,
+ u64,
+ ColumnChunkMetaData,
+ Option<ColumnIndex>,
+ Option<OffsetIndex>,
+ ) -> Result<()>
+ + 'a,
+>;
/// Callback invoked on closing a row group, arguments are:
///
/// - the row group metadata
-pub type OnCloseRowGroup<'a> = Box<dyn FnOnce(RowGroupMetaDataPtr) ->
Result<()> + 'a>;
+/// - the column index for each column chunk
+/// - the offset index for each column chunk
+pub type OnCloseRowGroup<'a> = Box<
+ dyn FnOnce(
+ RowGroupMetaDataPtr,
+ Vec<Option<ColumnIndex>>,
+ Vec<Option<OffsetIndex>>,
+ ) -> Result<()>
+ + 'a,
+>;
#[deprecated = "use std::io::Write"]
pub trait ParquetWriter: Write + std::io::Seek + TryClone {}
@@ -110,6 +130,8 @@ pub struct SerializedFileWriter<W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaDataPtr>,
+ column_indexes: Vec<Vec<Option<ColumnIndex>>>,
+ offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
}
@@ -124,6 +146,8 @@ impl<W: Write> SerializedFileWriter<W> {
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
row_groups: vec![],
+ column_indexes: Vec::new(),
+ offset_indexes: Vec::new(),
row_group_index: 0,
})
}
@@ -139,8 +163,12 @@ impl<W: Write> SerializedFileWriter<W> {
self.row_group_index += 1;
let row_groups = &mut self.row_groups;
- let on_close = |metadata| {
+ let row_column_indexes = &mut self.column_indexes;
+ let row_offset_indexes = &mut self.offset_indexes;
+ let on_close = |metadata, row_group_column_index,
row_group_offset_index| {
row_groups.push(metadata);
+ row_column_indexes.push(row_group_column_index);
+ row_offset_indexes.push(row_group_offset_index);
Ok(())
};
@@ -177,16 +205,74 @@ impl<W: Write> SerializedFileWriter<W> {
Ok(())
}
+ /// Serialize all the offset index to the file
+ fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
+ // iter row group
+ // iter each column
+ // write offset index to the file
+ for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+ for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate()
+ {
+ match &self.offset_indexes[row_group_idx][column_idx] {
+ Some(offset_index) => {
+ let start_offset = self.buf.bytes_written();
+ let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
+ offset_index.write_to_out_protocol(&mut protocol)?;
+ protocol.flush()?;
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for offset index
+ column_metadata.offset_index_offset =
Some(start_offset as i64);
+ column_metadata.offset_index_length =
+ Some((end_offset - start_offset) as i32);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Serialize all the column index to the file
+ fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
+ // iter row group
+ // iter each column
+ // write column index to the file
+ for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+ for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate()
+ {
+ match &self.column_indexes[row_group_idx][column_idx] {
+ Some(column_index) => {
+ let start_offset = self.buf.bytes_written();
+ let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
+ column_index.write_to_out_protocol(&mut protocol)?;
+ protocol.flush()?;
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for offset index
+ column_metadata.column_index_offset =
Some(start_offset as i64);
+ column_metadata.column_index_length =
+ Some((end_offset - start_offset) as i32);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
- let row_groups = self
+ let mut row_groups = self
.row_groups
.as_slice()
.iter()
.map(|v| v.to_thrift())
- .collect();
+ .collect::<Vec<_>>();
+
+ // Write column indexes and offset indexes
+ self.write_column_indexes(&mut row_groups)?;
+ self.write_offset_indexes(&mut row_groups)?;
let file_metadata = parquet::FileMetaData {
num_rows,
@@ -247,6 +333,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
column_index: usize,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ column_indexes: Vec<Option<ColumnIndex>>,
+ offset_indexes: Vec<Option<OffsetIndex>>,
on_close: Option<OnCloseRowGroup<'a>>,
}
@@ -273,6 +361,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
column_index: 0,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
+ column_indexes: Vec::with_capacity(num_columns),
+ offset_indexes: Vec::with_capacity(num_columns),
total_bytes_written: 0,
}
}
@@ -297,25 +387,31 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
let total_bytes_written = &mut self.total_bytes_written;
let total_rows_written = &mut self.total_rows_written;
let column_chunks = &mut self.column_chunks;
-
- let on_close = |bytes_written, rows_written, metadata| {
- // Update row group writer metrics
- *total_bytes_written += bytes_written;
- column_chunks.push(metadata);
- if let Some(rows) = *total_rows_written {
- if rows != rows_written {
- return Err(general_err!(
- "Incorrect number of rows, expected {} != {} rows",
- rows,
- rows_written
- ));
+ let column_indexes = &mut self.column_indexes;
+ let offset_indexes = &mut self.offset_indexes;
+
+ let on_close =
+ |bytes_written, rows_written, metadata, column_index,
offset_index| {
+ // Update row group writer metrics
+ *total_bytes_written += bytes_written;
+ column_chunks.push(metadata);
+ column_indexes.push(column_index);
+ offset_indexes.push(offset_index);
+
+ if let Some(rows) = *total_rows_written {
+ if rows != rows_written {
+ return Err(general_err!(
+ "Incorrect number of rows, expected {} != {} rows",
+ rows,
+ rows_written
+ ));
+ }
+ } else {
+ *total_rows_written = Some(rows_written);
}
- } else {
- *total_rows_written = Some(rows_written);
- }
- Ok(())
- };
+ Ok(())
+ };
Ok(Some(SerializedColumnWriter::new(
column_writer,
@@ -343,7 +439,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
self.row_group_metadata = Some(metadata.clone());
if let Some(on_close) = self.on_close.take() {
- on_close(metadata)?
+ on_close(
+ metadata,
+ self.column_indexes.clone(),
+ self.offset_indexes.clone(),
+ )?
}
}
@@ -389,19 +489,26 @@ impl<'a> SerializedColumnWriter<'a> {
/// Close this [`SerializedColumnWriter]
pub fn close(mut self) -> Result<()> {
- let (bytes_written, rows_written, metadata) = match self.inner {
- ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
- ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
- ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
- ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
- ColumnWriter::FixedLenByteArrayColumnWriter(typed) =>
typed.close()?,
- };
+ let (bytes_written, rows_written, metadata, column_index,
offset_index) =
+ match self.inner {
+ ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
+ ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
+ ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
+ ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
+ ColumnWriter::FixedLenByteArrayColumnWriter(typed) =>
typed.close()?,
+ };
if let Some(on_close) = self.on_close.take() {
- on_close(bytes_written, rows_written, metadata)?
+ on_close(
+ bytes_written,
+ rows_written,
+ metadata,
+ column_index,
+ offset_index,
+ )?
}
Ok(())
@@ -521,7 +628,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a,
W> {
Ok(spec)
}
-
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>
{
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
@@ -982,7 +1088,10 @@ mod tests {
/// File write-read roundtrip.
/// `data` consists of arrays of values for each row group.
- fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) {
+ fn test_file_roundtrip(
+ file: File,
+ data: Vec<Vec<i32>>,
+ ) -> parquet_format::FileMetaData {
let schema = Arc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Arc::new(
@@ -1014,7 +1123,7 @@ mod tests {
assert_eq!(flushed.len(), idx + 1);
assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
}
- file_writer.close().unwrap();
+ let file_metadata = file_writer.close().unwrap();
let reader = assert_send(SerializedFileReader::new(file).unwrap());
assert_eq!(reader.num_row_groups(), data.len());
@@ -1031,6 +1140,7 @@ mod tests {
.collect::<Vec<i32>>();
assert_eq!(res, *item);
}
+ file_metadata
}
fn assert_send<T: Send>(t: T) -> T {
@@ -1111,4 +1221,19 @@ mod tests {
assert_eq!(res, *item);
}
}
+
+ #[test]
+ fn test_column_offset_index_file() {
+ let file = tempfile::tempfile().unwrap();
+ let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4,
5]]);
+ file_metadata.row_groups.iter().for_each(|row_group| {
+ row_group.columns.iter().for_each(|column_chunk| {
+ assert_ne!(None, column_chunk.column_index_offset);
+ assert_ne!(None, column_chunk.column_index_length);
+
+ assert_ne!(None, column_chunk.offset_index_offset);
+ assert_ne!(None, column_chunk.offset_index_length);
+ })
+ });
+ }
}