alamb commented on code in PR #6000:
URL: https://github.com/apache/arrow-rs/pull/6000#discussion_r1685719334


##########
parquet/src/file/writer.rs:
##########
@@ -814,20 +734,289 @@ impl<'a, W: Write + Send> PageWriter for 
SerializedPageWriter<'a, W> {
     }
 }
 
+struct ThriftMetadataWriter<'a, W: Write> {

Review Comment:
   I was trying to articulate how this differs from the `to_thrift` functions 
on the structures
   
   Perhaps its difference is something like
   
   ```suggestion
   /// Writes `crate::file::metadata` structures to a thrift encdoded byte 
streams
   ///
   /// This structure handles the details of writing the various parts of 
parquet
   /// metadata into a byte stream. It is used to write the metadata into a 
   /// parquet file and can also write metadata into other locations (such as a 
   /// store of bytes).
   ///
   /// This is somewhat trickey because the metadata is not store as a single 
inline
   /// thrift struture. It can have several "out of band" structures such as 
the OffsetIndex
   /// and BloomFilters which are stored separately whose locations are stored 
as offsets 
   struct ThriftMetadataWriter<'a, W: Write> {
   ```



##########
parquet/src/file/page_index/index.rs:
##########
@@ -168,6 +168,38 @@ impl<T: ParquetValueType> NativeIndex<T> {
             boundary_order: index.boundary_order,
         })
     }
+
+    pub(crate) fn to_column_index(&self) -> ColumnIndex {

Review Comment:
   I think calling this method  `to_thrift` might be more consistent with other 
APIs like
   
   
https://docs.rs/parquet/latest/parquet/file/metadata/struct.RowGroupMetaData.html#method.to_thrift
 
   
   The naming is already pretty confusing



##########
parquet/src/file/writer.rs:
##########
@@ -814,20 +734,289 @@ impl<'a, W: Write + Send> PageWriter for 
SerializedPageWriter<'a, W> {
     }
 }
 
+struct ThriftMetadataWriter<'a, W: Write> {
+    buf: &'a mut TrackedWrite<W>,
+    schema: &'a TypePtr,
+    schema_descr: &'a SchemaDescPtr,
+    row_groups: Vec<RowGroup>,
+    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
+    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
+    key_value_metadata: Option<Vec<KeyValue>>,
+    created_by: Option<String>,
+    writer_version: i32,
+}
+
+impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
+    /// Serialize all the offset index to the file
+    fn write_offset_indexes(&mut self, offset_indexes: 
&[Vec<Option<OffsetIndex>>]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write offset index to the file
+        for (row_group_idx, row_group) in 
self.row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
+                match &offset_indexes[row_group_idx][column_idx] {
+                    Some(offset_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        offset_index.write_to_out_protocol(&mut protocol)?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.offset_index_offset = 
Some(start_offset as i64);
+                        column_metadata.offset_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Serialize all the column index to the file
+    fn write_column_indexes(&mut self, column_indexes: 
&[Vec<Option<ColumnIndex>>]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write column index to the file
+        for (row_group_idx, row_group) in 
self.row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
+                match &column_indexes[row_group_idx][column_idx] {
+                    Some(column_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        column_index.write_to_out_protocol(&mut protocol)?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.column_index_offset = 
Some(start_offset as i64);
+                        column_metadata.column_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Assembles and writes metadata at the end of the file.

Review Comment:
   ```suggestion
       /// Assembles and writes the final metadata to self.buf
   ```



##########
parquet/src/file/writer.rs:
##########
@@ -814,20 +734,289 @@ impl<'a, W: Write + Send> PageWriter for 
SerializedPageWriter<'a, W> {
     }
 }
 
+struct ThriftMetadataWriter<'a, W: Write> {
+    buf: &'a mut TrackedWrite<W>,
+    schema: &'a TypePtr,
+    schema_descr: &'a SchemaDescPtr,
+    row_groups: Vec<RowGroup>,

Review Comment:
   Rather than storing these fields separately, I wonder if it would be 
possible simply to store a 
https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html
   
   (can be done as a follow on PR)
   
   Something like
   ```rust
   struct ThriftMetadataWriter<'a, W: Write> {
       buf: &'a mut TrackedWrite<W>,
       parquet_metadata: &ParquetMetadata, // or maybe Arc<ParquetMetadata> 🤔 
   }
   ```
   
   And then add the various builder APIs like `with_key_value_metadata` 
directly to `ParquetMetadata` 
   
   This would make it easier to work / manipulate `ParquetMetadata` in general 
and would make the responsibility of the writer clearer (handle the details of 
coordinating the writing of the thrift encoded structures and their indexes)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to