This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new c47d14ce9 add column index and offset index (#1935)
c47d14ce9 is described below

commit c47d14ce9bdd91d68661e67a843d60e2c5799061
Author: Kun Liu <[email protected]>
AuthorDate: Thu Jun 30 17:51:30 2022 +0800

    add column index and offset index (#1935)
---
 parquet/src/column/writer.rs | 172 +++++++++++++++++++++++++++++++++---
 parquet/src/file/metadata.rs | 106 ++++++++++++++++++++++-
 parquet/src/file/writer.rs   | 201 +++++++++++++++++++++++++++++++++++--------
 3 files changed, 427 insertions(+), 52 deletions(-)

diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs
index d80cafe0e..d589aef5a 100644
--- a/parquet/src/column/writer.rs
+++ b/parquet/src/column/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Contains column writer API.
+use parquet_format::{ColumnIndex, OffsetIndex};
 use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};
 
 use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, 
PageType, Type};
@@ -29,6 +30,7 @@ use crate::encodings::{
     levels::{max_buffer_size, LevelEncoder},
 };
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
 use crate::file::statistics::Statistics;
 use crate::file::{
     metadata::ColumnChunkMetaData,
@@ -162,6 +164,14 @@ pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: 
DataType>(
     })
 }
 
+type ColumnCloseResult = (
+    u64,
+    u64,
+    ColumnChunkMetaData,
+    Option<ColumnIndex>,
+    Option<OffsetIndex>,
+);
+
 /// Typed column writer for a primitive column.
 pub struct ColumnWriterImpl<'a, T: DataType> {
     // Column writer properties
@@ -198,6 +208,9 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
     rep_levels_sink: Vec<i16>,
     data_pages: VecDeque<CompressedPage>,
     _phantom: PhantomData<T>,
+    // column index and offset index
+    column_index_builder: ColumnIndexBuilder,
+    offset_index_builder: OffsetIndexBuilder,
 }
 
 impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
@@ -261,6 +274,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
             num_column_nulls: 0,
             column_distinct_count: None,
             _phantom: PhantomData,
+            column_index_builder: ColumnIndexBuilder::new(),
+            offset_index_builder: OffsetIndexBuilder::new(),
         }
     }
 
@@ -416,7 +431,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
 
     /// Finalises writes and closes the column writer.
     /// Returns total bytes written, total rows written and column chunk 
metadata.
-    pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> {
+    pub fn close(mut self) -> Result<ColumnCloseResult> {
         if self.dict_encoder.is_some() {
             self.write_dictionary_page()?;
         }
@@ -425,7 +440,22 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         self.dict_encoder = None;
         self.page_writer.close()?;
 
-        Ok((self.total_bytes_written, self.total_rows_written, metadata))
+        let (column_index, offset_index) = if 
self.column_index_builder.valid() {
+            // build the column and offset index
+            let column_index = self.column_index_builder.build_to_thrift();
+            let offset_index = self.offset_index_builder.build_to_thrift();
+            (Some(column_index), Some(offset_index))
+        } else {
+            (None, None)
+        };
+
+        Ok((
+            self.total_bytes_written,
+            self.total_rows_written,
+            metadata,
+            column_index,
+            offset_index,
+        ))
     }
 
     /// Writes mini batch of values, definition and repetition levels.
@@ -593,6 +623,42 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         Ok(())
     }
 
+    /// Update the column index and offset index when adding the data page
+    fn update_column_offset_index(&mut self, page_statistics: 
&Option<Statistics>) {
+        // update the column index
+        let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
+        // a page contains only null values,
+        // and writers have to set the corresponding entries in min_values and 
max_values to byte[0]
+        if null_page && self.column_index_builder.valid() {
+            self.column_index_builder.append(
+                null_page,
+                &[0; 1],
+                &[0; 1],
+                self.num_page_nulls as i64,
+            );
+        } else if self.column_index_builder.valid() {
+            // from page statistics
+            // If can't get the page statistics, ignore this column/offset 
index for this column chunk
+            match &page_statistics {
+                None => {
+                    self.column_index_builder.to_invalid();
+                }
+                Some(stat) => {
+                    self.column_index_builder.append(
+                        null_page,
+                        stat.min_bytes(),
+                        stat.max_bytes(),
+                        self.num_page_nulls as i64,
+                    );
+                }
+            }
+        }
+
+        // update the offset index
+        self.offset_index_builder
+            .append_row_count(self.num_buffered_rows as i64);
+    }
+
     /// Adds data page.
     /// Data page is either buffered in case of dictionary encoding or written 
directly.
     fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
@@ -622,6 +688,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
             None
         };
 
+        // update column and offset index
+        self.update_column_offset_index(&page_statistics);
+
         let compressed_page = match self.props.writer_version() {
             WriterVersion::PARQUET_1_0 => {
                 let mut buffer = vec![];
@@ -700,8 +769,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
                     buf: ByteBufferPtr::new(buffer),
                     num_values: self.num_buffered_values,
                     encoding,
-                    num_nulls: self.num_buffered_values
-                        - self.num_buffered_encoded_values,
+                    num_nulls: self.num_page_nulls as u32,
                     num_rows: self.num_buffered_rows,
                     def_levels_byte_len: def_levels_byte_len as u32,
                     rep_levels_byte_len: rep_levels_byte_len as u32,
@@ -830,6 +898,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
     #[inline]
     fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
         let page_spec = self.page_writer.write_page(page)?;
+        // update offset index
+        // compressed_size = header_size + compressed_data_size
+        self.offset_index_builder.append_offset_and_size(
+            page_spec.offset as i64,
+            page_spec.compressed_size as i32,
+        );
         self.update_metrics_for_page(page_spec);
         Ok(())
     }
@@ -865,6 +939,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
 
         let page_spec = self.page_writer.write_page(compressed_page)?;
         self.update_metrics_for_page(page_spec);
+        // For the directory page, don't need to update column/offset index.
         Ok(())
     }
 
@@ -1133,6 +1208,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
 
 #[cfg(test)]
 mod tests {
+    use parquet_format::BoundaryOrder;
     use rand::distributions::uniform::SampleUniform;
     use std::sync::Arc;
 
@@ -1256,7 +1332,7 @@ mod tests {
             .write_batch(&[true, false, true, false], None, None)
             .unwrap();
 
-        let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+        let (bytes_written, rows_written, metadata, _, _) = 
writer.close().unwrap();
         // PlainEncoder uses bit writer to write boolean values, which all fit 
into 1
         // byte.
         assert_eq!(bytes_written, 1);
@@ -1529,7 +1605,7 @@ mod tests {
         let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
         writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
 
-        let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+        let (bytes_written, rows_written, metadata, _, _) = 
writer.close().unwrap();
         assert_eq!(bytes_written, 20);
         assert_eq!(rows_written, 4);
         assert_eq!(
@@ -1586,7 +1662,7 @@ mod tests {
                 None,
             )
             .unwrap();
-        let (_bytes_written, _rows_written, metadata) = 
writer.close().unwrap();
+        let (_bytes_written, _rows_written, metadata, _, _) = 
writer.close().unwrap();
         if let Some(stats) = metadata.statistics() {
             assert!(stats.has_min_max_set());
             if let Statistics::ByteArray(stats) = stats {
@@ -1620,7 +1696,7 @@ mod tests {
             Int32Type,
         >(page_writer, 0, 0, props);
         writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
-        let (_bytes_written, _rows_written, metadata) = 
writer.close().unwrap();
+        let (_bytes_written, _rows_written, metadata, _, _) = 
writer.close().unwrap();
         if let Some(stats) = metadata.statistics() {
             assert!(stats.has_min_max_set());
             if let Statistics::Int32(stats) = stats {
@@ -1651,7 +1727,7 @@ mod tests {
             )
             .unwrap();
 
-        let (bytes_written, rows_written, metadata) = writer.close().unwrap();
+        let (bytes_written, rows_written, metadata, _, _) = 
writer.close().unwrap();
         assert_eq!(bytes_written, 20);
         assert_eq!(rows_written, 4);
         assert_eq!(
@@ -1835,7 +1911,7 @@ mod tests {
         let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
         let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
         writer.write_batch(data, None, None).unwrap();
-        let (bytes_written, _, _) = writer.close().unwrap();
+        let (bytes_written, _, _, _, _) = writer.close().unwrap();
 
         // Read pages and check the sequence
         let source = FileSource::new(&file, 0, bytes_written as usize);
@@ -2068,6 +2144,75 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata, column_index, offset_index) =
+            writer.close().unwrap();
+        let column_index = match column_index {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match offset_index {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert!(!column_index.null_pages[idx]);
+            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
+        }
+
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::Int32(stats) = stats {
+                // first page is [1,2,3,4]
+                // second page is [-5,2,4,8]
+                assert_eq!(stats.min_bytes(), 
column_index.min_values[1].as_slice());
+                assert_eq!(
+                    stats.max_bytes(),
+                    column_index.max_values.get(1).unwrap().as_slice()
+                );
+            } else {
+                panic!("expecting Statistics::Int32");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+
+        // page location
+        assert_eq!(
+            0,
+            offset_index.page_locations.get(0).unwrap().first_row_index
+        );
+        assert_eq!(
+            4,
+            offset_index.page_locations.get(1).unwrap().first_row_index
+        );
+    }
+
     /// Performs write-read roundtrip with randomly generated values and 
levels.
     /// `max_size` is maximum number of values or levels (if `max_def_level` > 
0) to write
     /// for a column.
@@ -2149,7 +2294,8 @@ mod tests {
 
         let values_written = writer.write_batch(values, def_levels, 
rep_levels).unwrap();
         assert_eq!(values_written, values.len());
-        let (bytes_written, rows_written, column_metadata) = 
writer.close().unwrap();
+        let (bytes_written, rows_written, column_metadata, _, _) =
+            writer.close().unwrap();
 
         let source = FileSource::new(&file, 0, bytes_written as usize);
         let page_reader = Box::new(
@@ -2215,7 +2361,7 @@ mod tests {
         let props = Arc::new(props);
         let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
         writer.write_batch(values, None, None).unwrap();
-        let (_, _, metadata) = writer.close().unwrap();
+        let (_, _, metadata, _, _) = writer.close().unwrap();
         metadata
     }
 
@@ -2327,7 +2473,7 @@ mod tests {
         let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
         writer.write_batch(values, None, None).unwrap();
 
-        let (_bytes_written, _rows_written, metadata) = 
writer.close().unwrap();
+        let (_bytes_written, _rows_written, metadata, _, _) = 
writer.close().unwrap();
         if let Some(stats) = metadata.statistics() {
             stats.clone()
         } else {
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 4d9842c0e..7ec29de01 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -35,7 +35,10 @@
 
 use std::sync::Arc;
 
-use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
+use parquet_format::{
+    BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation,
+    RowGroup,
+};
 
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
 use crate::errors::{ParquetError, Result};
@@ -794,6 +797,107 @@ impl ColumnChunkMetaDataBuilder {
     }
 }
 
+/// Builder for column index
+pub struct ColumnIndexBuilder {
+    null_pages: Vec<bool>,
+    min_values: Vec<Vec<u8>>,
+    max_values: Vec<Vec<u8>>,
+    // TODO: calc the order for all pages in this column
+    boundary_order: BoundaryOrder,
+    null_counts: Vec<i64>,
+    // If one page can't get build index, need to ignore all index in this 
column
+    valid: bool,
+}
+
+impl ColumnIndexBuilder {
+    pub fn new() -> Self {
+        ColumnIndexBuilder {
+            null_pages: Vec::new(),
+            min_values: Vec::new(),
+            max_values: Vec::new(),
+            boundary_order: BoundaryOrder::Unordered,
+            null_counts: Vec::new(),
+            valid: true,
+        }
+    }
+
+    pub fn append(
+        &mut self,
+        null_page: bool,
+        min_value: &[u8],
+        max_value: &[u8],
+        null_count: i64,
+    ) {
+        self.null_pages.push(null_page);
+        self.min_values.push(min_value.to_vec());
+        self.max_values.push(max_value.to_vec());
+        self.null_counts.push(null_count);
+    }
+
+    pub fn to_invalid(&mut self) {
+        self.valid = false;
+    }
+
+    pub fn valid(&self) -> bool {
+        self.valid
+    }
+
+    /// Build and get the thrift metadata of column index
+    pub fn build_to_thrift(self) -> ColumnIndex {
+        ColumnIndex::new(
+            self.null_pages,
+            self.min_values,
+            self.max_values,
+            self.boundary_order,
+            self.null_counts,
+        )
+    }
+}
+
+/// Builder for offset index
+pub struct OffsetIndexBuilder {
+    offset_array: Vec<i64>,
+    compressed_page_size_array: Vec<i32>,
+    first_row_index_array: Vec<i64>,
+    current_first_row_index: i64,
+}
+
+impl OffsetIndexBuilder {
+    pub fn new() -> Self {
+        OffsetIndexBuilder {
+            offset_array: Vec::new(),
+            compressed_page_size_array: Vec::new(),
+            first_row_index_array: Vec::new(),
+            current_first_row_index: 0,
+        }
+    }
+
+    pub fn append_row_count(&mut self, row_count: i64) {
+        let current_page_row_index = self.current_first_row_index;
+        self.first_row_index_array.push(current_page_row_index);
+        self.current_first_row_index += row_count;
+    }
+
+    pub fn append_offset_and_size(&mut self, offset: i64, 
compressed_page_size: i32) {
+        self.offset_array.push(offset);
+        self.compressed_page_size_array.push(compressed_page_size);
+    }
+
+    /// Build and get the thrift metadata of offset index
+    pub fn build_to_thrift(self) -> OffsetIndex {
+        let locations = self
+            .offset_array
+            .iter()
+            .zip(self.compressed_page_size_array.iter())
+            .zip(self.first_row_index_array.iter())
+            .map(|((offset, size), row_index)| {
+                PageLocation::new(*offset, *size, *row_index)
+            })
+            .collect::<Vec<_>>();
+        OffsetIndex::new(locations)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index b503c264d..10983c741 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -22,6 +22,7 @@ use std::{io::Write, sync::Arc};
 
 use byteorder::{ByteOrder, LittleEndian};
 use parquet_format as parquet;
+use parquet_format::{ColumnIndex, OffsetIndex, RowGroup};
 use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
 
 use crate::basic::PageType;
@@ -78,14 +79,33 @@ impl<W: Write> Write for TrackedWrite<W> {
 /// - the number of bytes written
 /// - the number of rows written
 /// - the column chunk metadata
+/// - the column index
+/// - the offset index
 ///
-pub type OnCloseColumnChunk<'a> =
-    Box<dyn FnOnce(u64, u64, ColumnChunkMetaData) -> Result<()> + 'a>;
+pub type OnCloseColumnChunk<'a> = Box<
+    dyn FnOnce(
+            u64,
+            u64,
+            ColumnChunkMetaData,
+            Option<ColumnIndex>,
+            Option<OffsetIndex>,
+        ) -> Result<()>
+        + 'a,
+>;
 
 /// Callback invoked on closing a row group, arguments are:
 ///
 /// - the row group metadata
-pub type OnCloseRowGroup<'a> = Box<dyn FnOnce(RowGroupMetaDataPtr) -> 
Result<()> + 'a>;
+/// - the column index for each column chunk
+/// - the offset index for each column chunk
+pub type OnCloseRowGroup<'a> = Box<
+    dyn FnOnce(
+            RowGroupMetaDataPtr,
+            Vec<Option<ColumnIndex>>,
+            Vec<Option<OffsetIndex>>,
+        ) -> Result<()>
+        + 'a,
+>;
 
 #[deprecated = "use std::io::Write"]
 pub trait ParquetWriter: Write + std::io::Seek + TryClone {}
@@ -110,6 +130,8 @@ pub struct SerializedFileWriter<W: Write> {
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
     row_groups: Vec<RowGroupMetaDataPtr>,
+    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
+    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
     row_group_index: usize,
 }
 
@@ -124,6 +146,8 @@ impl<W: Write> SerializedFileWriter<W> {
             descr: Arc::new(SchemaDescriptor::new(schema)),
             props: properties,
             row_groups: vec![],
+            column_indexes: Vec::new(),
+            offset_indexes: Vec::new(),
             row_group_index: 0,
         })
     }
@@ -139,8 +163,12 @@ impl<W: Write> SerializedFileWriter<W> {
         self.row_group_index += 1;
 
         let row_groups = &mut self.row_groups;
-        let on_close = |metadata| {
+        let row_column_indexes = &mut self.column_indexes;
+        let row_offset_indexes = &mut self.offset_indexes;
+        let on_close = |metadata, row_group_column_index, 
row_group_offset_index| {
             row_groups.push(metadata);
+            row_column_indexes.push(row_group_column_index);
+            row_offset_indexes.push(row_group_offset_index);
             Ok(())
         };
 
@@ -177,16 +205,74 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    /// Serialize all the offset index to the file
+    fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
+        // iter row group
+        // iter each column
+        // write offset index to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate()
+            {
+                match &self.offset_indexes[row_group_idx][column_idx] {
+                    Some(offset_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        offset_index.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.offset_index_offset = 
Some(start_offset as i64);
+                        column_metadata.offset_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Serialize all the column index to the file
+    fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
+        // iter row group
+        // iter each column
+        // write column index to the file
+        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate()
+            {
+                match &self.column_indexes[row_group_idx][column_idx] {
+                    Some(column_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        column_index.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.column_index_offset = 
Some(start_offset as i64);
+                        column_metadata.column_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
     /// Assembles and writes metadata at the end of the file.
     fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
         let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
 
-        let row_groups = self
+        let mut row_groups = self
             .row_groups
             .as_slice()
             .iter()
             .map(|v| v.to_thrift())
-            .collect();
+            .collect::<Vec<_>>();
+
+        // Write column indexes and offset indexes
+        self.write_column_indexes(&mut row_groups)?;
+        self.write_offset_indexes(&mut row_groups)?;
 
         let file_metadata = parquet::FileMetaData {
             num_rows,
@@ -247,6 +333,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     column_index: usize,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
     column_chunks: Vec<ColumnChunkMetaData>,
+    column_indexes: Vec<Option<ColumnIndex>>,
+    offset_indexes: Vec<Option<OffsetIndex>>,
     on_close: Option<OnCloseRowGroup<'a>>,
 }
 
@@ -273,6 +361,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             column_index: 0,
             row_group_metadata: None,
             column_chunks: Vec::with_capacity(num_columns),
+            column_indexes: Vec::with_capacity(num_columns),
+            offset_indexes: Vec::with_capacity(num_columns),
             total_bytes_written: 0,
         }
     }
@@ -297,25 +387,31 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
         let total_bytes_written = &mut self.total_bytes_written;
         let total_rows_written = &mut self.total_rows_written;
         let column_chunks = &mut self.column_chunks;
-
-        let on_close = |bytes_written, rows_written, metadata| {
-            // Update row group writer metrics
-            *total_bytes_written += bytes_written;
-            column_chunks.push(metadata);
-            if let Some(rows) = *total_rows_written {
-                if rows != rows_written {
-                    return Err(general_err!(
-                        "Incorrect number of rows, expected {} != {} rows",
-                        rows,
-                        rows_written
-                    ));
+        let column_indexes = &mut self.column_indexes;
+        let offset_indexes = &mut self.offset_indexes;
+
+        let on_close =
+            |bytes_written, rows_written, metadata, column_index, 
offset_index| {
+                // Update row group writer metrics
+                *total_bytes_written += bytes_written;
+                column_chunks.push(metadata);
+                column_indexes.push(column_index);
+                offset_indexes.push(offset_index);
+
+                if let Some(rows) = *total_rows_written {
+                    if rows != rows_written {
+                        return Err(general_err!(
+                            "Incorrect number of rows, expected {} != {} rows",
+                            rows,
+                            rows_written
+                        ));
+                    }
+                } else {
+                    *total_rows_written = Some(rows_written);
                 }
-            } else {
-                *total_rows_written = Some(rows_written);
-            }
 
-            Ok(())
-        };
+                Ok(())
+            };
 
         Ok(Some(SerializedColumnWriter::new(
             column_writer,
@@ -343,7 +439,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
             self.row_group_metadata = Some(metadata.clone());
 
             if let Some(on_close) = self.on_close.take() {
-                on_close(metadata)?
+                on_close(
+                    metadata,
+                    self.column_indexes.clone(),
+                    self.offset_indexes.clone(),
+                )?
             }
         }
 
@@ -389,19 +489,26 @@ impl<'a> SerializedColumnWriter<'a> {
 
     /// Close this [`SerializedColumnWriter]
     pub fn close(mut self) -> Result<()> {
-        let (bytes_written, rows_written, metadata) = match self.inner {
-            ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
-            ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
-            ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FixedLenByteArrayColumnWriter(typed) => 
typed.close()?,
-        };
+        let (bytes_written, rows_written, metadata, column_index, 
offset_index) =
+            match self.inner {
+                ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
+                ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
+                ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
+                ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
+                ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
+                ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
+                ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
+                ColumnWriter::FixedLenByteArrayColumnWriter(typed) => 
typed.close()?,
+            };
 
         if let Some(on_close) = self.on_close.take() {
-            on_close(bytes_written, rows_written, metadata)?
+            on_close(
+                bytes_written,
+                rows_written,
+                metadata,
+                column_index,
+                offset_index,
+            )?
         }
 
         Ok(())
@@ -521,7 +628,6 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, 
W> {
 
         Ok(spec)
     }
-
     fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> 
{
         let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
         metadata
@@ -982,7 +1088,10 @@ mod tests {
 
     /// File write-read roundtrip.
     /// `data` consists of arrays of values for each row group.
-    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) {
+    fn test_file_roundtrip(
+        file: File,
+        data: Vec<Vec<i32>>,
+    ) -> parquet_format::FileMetaData {
         let schema = Arc::new(
             types::Type::group_type_builder("schema")
                 .with_fields(&mut vec![Arc::new(
@@ -1014,7 +1123,7 @@ mod tests {
             assert_eq!(flushed.len(), idx + 1);
             assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
         }
-        file_writer.close().unwrap();
+        let file_metadata = file_writer.close().unwrap();
 
         let reader = assert_send(SerializedFileReader::new(file).unwrap());
         assert_eq!(reader.num_row_groups(), data.len());
@@ -1031,6 +1140,7 @@ mod tests {
                 .collect::<Vec<i32>>();
             assert_eq!(res, *item);
         }
+        file_metadata
     }
 
     fn assert_send<T: Send>(t: T) -> T {
@@ -1111,4 +1221,19 @@ mod tests {
             assert_eq!(res, *item);
         }
     }
+
+    #[test]
+    fn test_column_offset_index_file() {
+        let file = tempfile::tempfile().unwrap();
+        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 
5]]);
+        file_metadata.row_groups.iter().for_each(|row_group| {
+            row_group.columns.iter().for_each(|column_chunk| {
+                assert_ne!(None, column_chunk.column_index_offset);
+                assert_ne!(None, column_chunk.column_index_length);
+
+                assert_ne!(None, column_chunk.offset_index_offset);
+                assert_ne!(None, column_chunk.offset_index_length);
+            })
+        });
+    }
 }

Reply via email to