tustvold commented on code in PR #1935:
URL: https://github.com/apache/arrow-rs/pull/1935#discussion_r908320788


##########
parquet/src/file/metadata.rs:
##########
@@ -386,6 +394,9 @@ pub struct ColumnChunkMetaData {
     offset_index_length: Option<i32>,
     column_index_offset: Option<i64>,
     column_index_length: Option<i32>,
+    // column index and offset index
+    column_index: Option<ColumnIndex>,
+    offset_index: Option<OffsetIndex>,

Review Comment:
   I'm not sure about this, as it conflates the file-level metadata with the 
index metadata which is stored separately. It also leaks a type from 
parquet-format out into the public interface, which typically this crate avoids 
doing.
   
   I'm not entirely sure why this needs to be here?



##########
parquet/src/file/writer.rs:
##########
@@ -177,10 +182,66 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    /// Serialize all the offset index to the file
+    fn write_offset_indexes(&mut self) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write offset index to the file
+        for row_group in &mut self.row_groups {
+            for column_metdata in row_group.columns_mut() {
+                match column_metdata.offset_index() {
+                    Some(offset_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        offset_index.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metdata.set_offset_index_offset(start_offset as 
i64);
+                        column_metdata
+                            .set_offset_index_length((end_offset - 
start_offset) as i32);

Review Comment:
   As part of `SerializedFileWriter::write_metadata` we call 
`RowGroupMetaData::to_thrift`, perhaps we could store these offsets and apply 
them directly to the returned thrift message, instead of needing to clone 
RowGroupMetaData??



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());
+            assert_eq!(
+                &0,
+                column_index.null_counts.as_ref().unwrap().get(idx).unwrap()
+            );
+        }
+
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::Int32(stats) = stats {
+                // first page is [1,2,3,4]
+                // second page is [-5,2,4,8]
+                assert_eq!(
+                    stats.min_bytes(),
+                    column_index.min_values.get(1).unwrap().as_slice()

Review Comment:
   ```suggestion
                       column_index.min_values[1].as_slice()
   ```



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());

Review Comment:
   ```suggestion
               assert!!(!column_index.null_pages[idx]);
   ```



##########
parquet/src/file/writer.rs:
##########
@@ -339,11 +400,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
                 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
                 .build()?;
 
-            let metadata = Arc::new(row_group_metadata);
-            self.row_group_metadata = Some(metadata.clone());
+            let clone_row_group_metadata = row_group_metadata.clone();
+            self.row_group_metadata = Some(Arc::new(row_group_metadata));

Review Comment:
   I think I would prefer we clone at the location that the mutation happens, 
i.e. ParquetWriter, as opposed to modifying this interface



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());
+            assert_eq!(
+                &0,
+                column_index.null_counts.as_ref().unwrap().get(idx).unwrap()
+            );

Review Comment:
   ```suggestion
               assert_eq!(
                   0,
                   *column_index.null_counts.as_ref().unwrap()[idx]
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to