This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ea0089224 Improve `ArrowWriter` memory usage: Buffer Pages in 
ArrowWriter instead of RecordBatch (#3871) (#4280)
ea0089224 is described below

commit ea008922445d84d957cf3f89df793187c22d82d8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 29 13:04:46 2023 +0100

    Improve `ArrowWriter` memory usage: Buffer Pages in ArrowWriter instead of 
RecordBatch (#3871) (#4280)
    
    * Buffer Pages in ArrowWriter instead of RecordBatch (#3871)
    
    * Review feedback
    
    * Improved memory accounting
    
    * Clippy
---
 parquet/src/arrow/arrow_writer/byte_array.rs |  57 +--
 parquet/src/arrow/arrow_writer/mod.rs        | 673 ++++++++++++++++-----------
 parquet/src/column/page.rs                   |  69 +++
 parquet/src/column/writer/encoder.rs         |   2 +-
 parquet/src/column/writer/mod.rs             |  42 ++
 parquet/src/file/writer.rs                   | 102 +---
 parquet/src/util/memory.rs                   |   6 +
 7 files changed, 535 insertions(+), 416 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 77f9598b2..6dbc83dd0 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -15,25 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::arrow_writer::levels::LevelInfo;
 use crate::basic::Encoding;
 use crate::bloom_filter::Sbbf;
-use crate::column::page::PageWriter;
 use crate::column::writer::encoder::{
     ColumnValueEncoder, DataPageValues, DictionaryPage,
 };
-use crate::column::writer::GenericColumnWriter;
 use crate::data_type::{AsBytes, ByteArray, Int32Type};
 use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
 use crate::encodings::rle::RleEncoder;
 use crate::errors::{ParquetError, Result};
-use crate::file::properties::{WriterProperties, WriterPropertiesPtr, 
WriterVersion};
-use crate::file::writer::OnCloseColumnChunk;
+use crate::file::properties::{WriterProperties, WriterVersion};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::num_required_bits;
 use crate::util::interner::{Interner, Storage};
 use arrow_array::{
-    Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, 
LargeBinaryArray,
+    Array, ArrayAccessor, BinaryArray, DictionaryArray, LargeBinaryArray,
     LargeStringArray, StringArray,
 };
 use arrow_schema::DataType;
@@ -94,49 +90,6 @@ macro_rules! downcast_op {
     };
 }
 
-/// A writer for byte array types
-pub(super) struct ByteArrayWriter<'a> {
-    writer: GenericColumnWriter<'a, ByteArrayEncoder>,
-    on_close: Option<OnCloseColumnChunk<'a>>,
-}
-
-impl<'a> ByteArrayWriter<'a> {
-    /// Returns a new [`ByteArrayWriter`]
-    pub fn new(
-        descr: ColumnDescPtr,
-        props: WriterPropertiesPtr,
-        page_writer: Box<dyn PageWriter + 'a>,
-        on_close: OnCloseColumnChunk<'a>,
-    ) -> Result<Self> {
-        Ok(Self {
-            writer: GenericColumnWriter::new(descr, props, page_writer),
-            on_close: Some(on_close),
-        })
-    }
-
-    pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> 
{
-        self.writer.write_batch_internal(
-            array,
-            Some(levels.non_null_indices()),
-            levels.def_levels(),
-            levels.rep_levels(),
-            None,
-            None,
-            None,
-        )?;
-        Ok(())
-    }
-
-    pub fn close(self) -> Result<()> {
-        let r = self.writer.close()?;
-
-        if let Some(on_close) = self.on_close {
-            on_close(r)?;
-        }
-        Ok(())
-    }
-}
-
 /// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
 struct FallbackEncoder {
     encoder: FallbackEncoderImpl,
@@ -427,7 +380,7 @@ impl DictEncoder {
     }
 }
 
-struct ByteArrayEncoder {
+pub struct ByteArrayEncoder {
     fallback: FallbackEncoder,
     dict_encoder: Option<DictEncoder>,
     min_value: Option<ByteArray>,
@@ -437,11 +390,11 @@ struct ByteArrayEncoder {
 
 impl ColumnValueEncoder for ByteArrayEncoder {
     type T = ByteArray;
-    type Values = ArrayRef;
+    type Values = dyn Array;
 
     fn min_max(
         &self,
-        values: &ArrayRef,
+        values: &dyn Array,
         value_indices: Option<&[usize]>,
     ) -> Option<(Self::T, Self::T)> {
         match value_indices {
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 616968bf6..bde21ae85 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -17,16 +17,21 @@
 
 //! Contains writer which writes arrow data into parquet data.
 
-use std::collections::VecDeque;
+use bytes::Bytes;
 use std::fmt::Debug;
-use std::io::Write;
-use std::sync::Arc;
+use std::io::{Read, Write};
+use std::iter::Peekable;
+use std::slice::Iter;
+use std::sync::{Arc, Mutex};
+use std::vec::IntoIter;
+use thrift::protocol::{TCompactOutputProtocol, TSerializable};
 
 use arrow_array::cast::AsArray;
-use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type, 
UInt64Type};
-use arrow_array::{
-    types, Array, ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchWriter,
+use arrow_array::types::{
+    Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type,
+    UInt64Type,
 };
+use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
 use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, 
SchemaRef};
 
 use super::schema::{
@@ -34,14 +39,19 @@ use super::schema::{
     decimal_length_from_precision,
 };
 
-use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
-use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
-use crate::data_type::{ByteArray, DataType, FixedLenByteArray};
+use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
+use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
+use crate::column::writer::encoder::ColumnValueEncoder;
+use crate::column::writer::{
+    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
+};
+use crate::data_type::{ByteArray, FixedLenByteArray};
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr};
-use crate::file::properties::WriterProperties;
+use crate::file::metadata::{ColumnChunkMetaData, KeyValue, 
RowGroupMetaDataPtr};
+use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
+use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::SerializedFileWriter;
-use crate::file::writer::SerializedRowGroupWriter;
+use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
 use levels::{calculate_array_levels, LevelInfo};
 
 mod byte_array;
@@ -49,8 +59,8 @@ mod levels;
 
 /// Arrow writer
 ///
-/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up 
`RecordBatch` in order
-/// to produce row groups with `max_row_group_size` rows. Any remaining rows 
will be
+/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] 
will be encoded
+/// to the same row group, up to `max_row_group_size` rows. Any remaining rows 
will be
 /// flushed on close, leading the final row group in the output file to 
potentially
 /// contain fewer than `max_row_group_size` rows
 ///
@@ -78,11 +88,8 @@ pub struct ArrowWriter<W: Write> {
     /// Underlying Parquet writer
     writer: SerializedFileWriter<W>,
 
-    /// For each column, maintain an ordered queue of arrays to write
-    buffer: Vec<VecDeque<ArrayRef>>,
-
-    /// The total number of rows currently buffered
-    buffered_rows: usize,
+    /// The in-progress row group if any
+    in_progress: Option<ArrowRowGroupWriter>,
 
     /// A copy of the Arrow schema.
     ///
@@ -93,24 +100,13 @@ pub struct ArrowWriter<W: Write> {
     max_row_group_size: usize,
 }
 
-impl<W: Write> Debug for ArrowWriter<W> {
+impl<W: Write + Send> Debug for ArrowWriter<W> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let buffered_batches = self.buffer.len();
-        let mut buffered_memory = 0;
-
-        for batch in self.buffer.iter() {
-            for arr in batch.iter() {
-                buffered_memory += arr.get_array_memory_size()
-            }
-        }
-
+        let buffered_memory = self.in_progress_size();
         f.debug_struct("ArrowWriter")
             .field("writer", &self.writer)
-            .field(
-                "buffer",
-                &format!("{buffered_batches} , {buffered_memory} bytes"),
-            )
-            .field("buffered_rows", &self.buffered_rows)
+            .field("in_progress_size", &format_args!("{buffered_memory} 
bytes"))
+            .field("in_progress_rows", &self.in_progress_rows())
             .field("arrow_schema", &self.arrow_schema)
             .field("max_row_group_size", &self.max_row_group_size)
             .finish()
@@ -140,8 +136,7 @@ impl<W: Write + Send> ArrowWriter<W> {
 
         Ok(Self {
             writer: file_writer,
-            buffer: vec![Default::default(); arrow_schema.fields().len()],
-            buffered_rows: 0,
+            in_progress: None,
             arrow_schema,
             max_row_group_size,
         })
@@ -152,43 +147,75 @@ impl<W: Write + Send> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the estimated length in bytes of the current in progress row 
group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(_, x)| x.get_estimated_total_bytes() as usize)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Returns the number of rows buffered in the in progress row group
+    pub fn in_progress_rows(&self) -> usize {
+        self.in_progress
+            .as_ref()
+            .map(|x| x.buffered_rows)
+            .unwrap_or_default()
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows 
buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` 
rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed 
[`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be written to one or more row 
groups such that all but
+    /// the final row group in the file contain 
[`WriterProperties::max_row_group_size`] rows
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
-        // validate batch schema against writer's supplied schema
-        let batch_schema = batch.schema();
-        if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
-            || self.arrow_schema.contains(&batch_schema))
-        {
-            return Err(ParquetError::ArrowError(
-                "Record batch schema does not match writer schema".to_string(),
-            ));
+        if batch.num_rows() == 0 {
+            return Ok(());
         }
 
-        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
-            buffer.push_back(column.clone())
-        }
+        let in_progress = match &mut self.in_progress {
+            Some(in_progress) => in_progress,
+            x => x.insert(ArrowRowGroupWriter::new(
+                self.writer.schema_descr(),
+                self.writer.properties(),
+                &self.arrow_schema,
+            )?),
+        };
 
-        self.buffered_rows += batch.num_rows();
-        self.flush_completed()?;
+        // If would exceed max_row_group_size, split batch
+        if in_progress.buffered_rows + batch.num_rows() > 
self.max_row_group_size {
+            let to_write = self.max_row_group_size - in_progress.buffered_rows;
+            let a = batch.slice(0, to_write);
+            let b = batch.slice(to_write, batch.num_rows() - to_write);
+            self.write(&a)?;
+            return self.write(&b);
+        }
 
-        Ok(())
-    }
+        in_progress.write(batch)?;
 
-    /// Flushes buffered data until there are less than `max_row_group_size` 
rows buffered
-    fn flush_completed(&mut self) -> Result<()> {
-        while self.buffered_rows >= self.max_row_group_size {
-            self.flush_rows(self.max_row_group_size)?;
+        if in_progress.buffered_rows >= self.max_row_group_size {
+            self.flush()?
         }
         Ok(())
     }
 
     /// Flushes all buffered rows into a new row group
     pub fn flush(&mut self) -> Result<()> {
-        self.flush_rows(self.buffered_rows)
+        let in_progress = match self.in_progress.take() {
+            Some(in_progress) => in_progress,
+            None => return Ok(()),
+        };
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+        for (chunk, close) in in_progress.close()? {
+            row_group_writer.append_column(&chunk, close)?;
+        }
+        row_group_writer.close()?;
+        Ok(())
     }
 
     /// Additional [`KeyValue`] metadata to be written in addition to those 
from [`WriterProperties`]
@@ -198,68 +225,6 @@ impl<W: Write + Send> ArrowWriter<W> {
         self.writer.append_key_value_metadata(kv_metadata)
     }
 
-    /// Flushes `num_rows` from the buffer into a new row group
-    fn flush_rows(&mut self, num_rows: usize) -> Result<()> {
-        if num_rows == 0 {
-            return Ok(());
-        }
-
-        assert!(
-            num_rows <= self.buffered_rows,
-            "cannot flush {} rows only have {}",
-            num_rows,
-            self.buffered_rows
-        );
-
-        assert!(
-            num_rows <= self.max_row_group_size,
-            "cannot flush {} rows would exceed max row group size of {}",
-            num_rows,
-            self.max_row_group_size
-        );
-
-        let mut row_group_writer = self.writer.next_row_group()?;
-
-        for (col_buffer, field) in 
self.buffer.iter_mut().zip(self.arrow_schema.fields())
-        {
-            // Collect the number of arrays to append
-            let mut remaining = num_rows;
-            let mut arrays = Vec::with_capacity(col_buffer.len());
-            while remaining != 0 {
-                match col_buffer.pop_front() {
-                    Some(next) if next.len() > remaining => {
-                        col_buffer
-                            .push_front(next.slice(remaining, next.len() - 
remaining));
-                        arrays.push(next.slice(0, remaining));
-                        remaining = 0;
-                    }
-                    Some(next) => {
-                        remaining -= next.len();
-                        arrays.push(next);
-                    }
-                    _ => break,
-                }
-            }
-
-            let mut levels = arrays
-                .iter()
-                .map(|array| {
-                    let mut levels = calculate_array_levels(array, field)?;
-                    // Reverse levels as we pop() them when writing arrays
-                    levels.reverse();
-                    Ok(levels)
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
-        }
-
-        row_group_writer.close()?;
-        self.buffered_rows -= num_rows;
-
-        Ok(())
-    }
-
     /// Flushes any outstanding data and returns the underlying writer.
     pub fn into_inner(mut self) -> Result<W> {
         self.flush()?;
@@ -284,156 +249,284 @@ impl<W: Write + Send> RecordBatchWriter for 
ArrowWriter<W> {
     }
 }
 
-fn write_leaves<W: Write + Send>(
-    row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
-    arrays: &[ArrayRef],
-    levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
-    assert_eq!(arrays.len(), levels.len());
-    assert!(!arrays.is_empty());
-
-    let data_type = arrays.first().unwrap().data_type().clone();
-    assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
-    match &data_type {
-        ArrowDataType::Null
-        | ArrowDataType::Boolean
-        | ArrowDataType::Int8
-        | ArrowDataType::Int16
-        | ArrowDataType::Int32
-        | ArrowDataType::Int64
-        | ArrowDataType::UInt8
-        | ArrowDataType::UInt16
-        | ArrowDataType::UInt32
-        | ArrowDataType::UInt64
-        | ArrowDataType::Float32
-        | ArrowDataType::Float64
-        | ArrowDataType::Timestamp(_, _)
-        | ArrowDataType::Date32
-        | ArrowDataType::Date64
-        | ArrowDataType::Time32(_)
-        | ArrowDataType::Time64(_)
-        | ArrowDataType::Duration(_)
-        | ArrowDataType::Interval(_)
-        | ArrowDataType::Decimal128(_, _)
-        | ArrowDataType::Decimal256(_, _)
-        | ArrowDataType::FixedSizeBinary(_) => {
-            let mut col_writer = row_group_writer.next_column()?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                write_leaf(col_writer.untyped(), array, 
levels.pop().expect("Levels exhausted"))?;
-            }
-            col_writer.close()
-        }
-        ArrowDataType::LargeBinary
-        | ArrowDataType::Binary
-        | ArrowDataType::Utf8
-        | ArrowDataType::LargeUtf8 => {
-            let mut col_writer = 
row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
-            for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                col_writer.write(array, levels.pop().expect("Levels 
exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+    length: usize,
+    data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+    fn len(&self) -> u64 {
+        self.length as _
+    }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+    type T = ChainReader;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        assert_eq!(start, 0); // Assume append_column writes all data in 
one-shot
+        Ok(ChainReader(self.data.clone().into_iter().peekable()))
+    }
+
+    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
+        unimplemented!()
+    }
+}
+
+/// A [`Read`] for an iterator of [`Bytes`]
+struct ChainReader(Peekable<IntoIter<Bytes>>);
+
+impl Read for ChainReader {
+    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
+        let buffer = loop {
+            match self.0.peek_mut() {
+                Some(b) if b.is_empty() => {
+                    self.0.next();
+                    continue;
+                }
+                Some(b) => break b,
+                None => return Ok(0),
             }
-            col_writer.close()
+        };
+
+        let len = buffer.len().min(out.len());
+        let b = buffer.split_to(len);
+        out[..len].copy_from_slice(&b);
+        Ok(len)
+    }
+}
+
+/// A shared [`ArrowColumnChunk`]
+///
+/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access 
via
+/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential 
borrows
+type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
+
+#[derive(Default)]
+struct ArrowPageWriter {
+    buffer: SharedColumnChunk,
+}
+
+impl PageWriter for ArrowPageWriter {
+    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
+        let mut buf = self.buffer.try_lock().unwrap();
+        let page_header = page.to_thrift_header();
+        let header = {
+            let mut header = Vec::with_capacity(1024);
+            let mut protocol = TCompactOutputProtocol::new(&mut header);
+            page_header.write_to_out_protocol(&mut protocol)?;
+            Bytes::from(header)
+        };
+
+        let data = page.compressed_page().buffer().clone();
+        let compressed_size = data.len() + header.len();
+
+        let mut spec = PageWriteSpec::new();
+        spec.page_type = page.page_type();
+        spec.num_values = page.num_values();
+        spec.uncompressed_size = page.uncompressed_size() + header.len();
+        spec.offset = buf.length as u64;
+        spec.compressed_size = compressed_size;
+        spec.bytes_written = compressed_size as u64;
+
+        buf.length += compressed_size;
+        buf.data.push(header);
+        buf.data.push(data.into());
+
+        Ok(spec)
+    }
+
+    fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> 
Result<()> {
+        // Skip writing metadata as won't be copied anyway
+        Ok(())
+    }
+
+    fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
+
+/// Encodes a leaf column to [`ArrowPageWriter`]
+enum ArrowColumnWriter {
+    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
+    Column(ColumnWriter<'static>),
+}
+
+impl ArrowColumnWriter {
+    /// Returns the estimated total bytes for this column writer
+    fn get_estimated_total_bytes(&self) -> u64 {
+        match self {
+            ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(),
+            ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(),
         }
-        ArrowDataType::List(_) => {
-            let arrays: Vec<_> = arrays.iter().map(|array|{
-                array.as_list::<i32>().values().clone()
-            }).collect();
+    }
+}
+
+/// Encodes [`RecordBatch`] to a parquet row group
+struct ArrowRowGroupWriter {
+    writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+    schema: SchemaRef,
+    buffered_rows: usize,
+}
 
-            write_leaves(row_group_writer, &arrays, levels)?;
-            Ok(())
+impl ArrowRowGroupWriter {
+    fn new(
+        parquet: &SchemaDescriptor,
+        props: &WriterPropertiesPtr,
+        arrow: &SchemaRef,
+    ) -> Result<Self> {
+        let mut writers = Vec::with_capacity(arrow.fields.len());
+        let mut leaves = parquet.columns().iter();
+        for field in &arrow.fields {
+            get_arrow_column_writer(field.data_type(), props, &mut leaves, 
&mut writers)?;
         }
-        ArrowDataType::LargeList(_) => {
-            let arrays: Vec<_> = arrays.iter().map(|array|{
-                array.as_list::<i64>().values().clone()
-            }).collect();
-            write_leaves(row_group_writer, &arrays, levels)?;
-            Ok(())
+        Ok(Self {
+            writers,
+            schema: arrow.clone(),
+            buffered_rows: 0,
+        })
+    }
+
+    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        self.buffered_rows += batch.num_rows();
+        let mut writers = self.writers.iter_mut().map(|(_, x)| x);
+        for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
+            let mut levels = calculate_array_levels(array, field)?.into_iter();
+            write_leaves(&mut writers, &mut levels, array.as_ref())?;
         }
-        ArrowDataType::Struct(fields) => {
-            // Groups child arrays by field
-            let mut field_arrays = vec![Vec::with_capacity(arrays.len()); 
fields.len()];
+        Ok(())
+    }
 
-            for array in arrays {
-                let struct_array: &arrow_array::StructArray = array
-                    .as_any()
-                    .downcast_ref::<arrow_array::StructArray>()
-                    .expect("Unable to get struct array");
+    fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+        self.writers
+            .into_iter()
+            .map(|(chunk, writer)| {
+                let close_result = match writer {
+                    ArrowColumnWriter::ByteArray(c) => c.close()?,
+                    ArrowColumnWriter::Column(c) => c.close()?,
+                };
+
+                let chunk = 
Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap();
+                Ok((chunk, close_result))
+            })
+            .collect()
+    }
+}
 
-                assert_eq!(struct_array.columns().len(), fields.len());
+/// Get an [`ArrowColumnWriter`] along with a reference to its 
[`SharedColumnChunk`]
+fn get_arrow_column_writer(
+    data_type: &ArrowDataType,
+    props: &WriterPropertiesPtr,
+    leaves: &mut Iter<'_, ColumnDescPtr>,
+    out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+) -> Result<()> {
+    let col = |desc: &ColumnDescPtr| {
+        let page_writer = Box::<ArrowPageWriter>::default();
+        let chunk = page_writer.buffer.clone();
+        let writer = get_column_writer(desc.clone(), props.clone(), 
page_writer);
+        (chunk, ArrowColumnWriter::Column(writer))
+    };
 
-                for (child_array, field) in 
field_arrays.iter_mut().zip(struct_array.columns()) {
-                    child_array.push(field.clone())
-                }
-            }
+    let bytes = |desc: &ColumnDescPtr| {
+        let page_writer = Box::<ArrowPageWriter>::default();
+        let chunk = page_writer.buffer.clone();
+        let writer = GenericColumnWriter::new(desc.clone(), props.clone(), 
page_writer);
+        (chunk, ArrowColumnWriter::ByteArray(writer))
+    };
 
-            for field in field_arrays {
-                write_leaves(row_group_writer, &field, levels)?;
+    match data_type {
+        _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
+        ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | 
ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
+        ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8 => {
+            out.push(bytes(leaves.next().unwrap()))
+        }
+        ArrowDataType::List(f)
+        | ArrowDataType::LargeList(f)
+        | ArrowDataType::FixedSizeList(f, _) => {
+            get_arrow_column_writer(f.data_type(), props, leaves, out)?
+        }
+        ArrowDataType::Struct(fields) => {
+            for field in fields {
+                get_arrow_column_writer(field.data_type(), props, leaves, out)?
             }
-
-            Ok(())
         }
-        ArrowDataType::Map(_, _) => {
-            let mut keys = Vec::with_capacity(arrays.len());
-            let mut values = Vec::with_capacity(arrays.len());
-            for array in arrays {
-                let map_array: &arrow_array::MapArray = array
-                    .as_any()
-                    .downcast_ref::<arrow_array::MapArray>()
-                    .expect("Unable to get map array");
-                keys.push(map_array.keys().clone());
-                values.push(map_array.values().clone());
+        ArrowDataType::Map(f, _) => match f.data_type() {
+            ArrowDataType::Struct(f) => {
+                get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
+                get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
             }
-
-            write_leaves(row_group_writer, &keys, levels)?;
-            write_leaves(row_group_writer, &values, levels)?;
-            Ok(())
+            _ => unreachable!("invalid map type"),
         }
         ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
             ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | 
ArrowDataType::Binary | ArrowDataType::LargeBinary => {
-                let mut col_writer = 
row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
-                for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                    col_writer.write(array, levels.pop().expect("Levels 
exhausted"))?;
-                }
-                col_writer.close()
+                out.push(bytes(leaves.next().unwrap()))
             }
             _ => {
-                let mut col_writer = row_group_writer.next_column()?.unwrap();
-                for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
-                    write_leaf(col_writer.untyped(), array, 
levels.pop().expect("Levels exhausted"))?;
-                }
-                col_writer.close()
+                out.push(col(leaves.next().unwrap()))
             }
         }
-        ArrowDataType::Float16 => Err(ParquetError::ArrowError(
-            "Float16 arrays not supported".to_string(),
-        )),
+       _ => return Err(ParquetError::NYI(
+           format!(
+               "Attempting to write an Arrow type {data_type:?} to parquet 
that is not yet implemented"
+           )
+       ))
+    }
+    Ok(())
+}
+
+/// Write the leaves of `array` in depth-first order to `writers` with `levels`
+fn write_leaves<'a, W>(
+    writers: &mut W,
+    levels: &mut IntoIter<LevelInfo>,
+    array: &(dyn Array + 'static),
+) -> Result<()>
+where
+    W: Iterator<Item = &'a mut ArrowColumnWriter>,
+{
+    match array.data_type() {
+        ArrowDataType::List(_) => {
+            write_leaves(writers, levels, 
array.as_list::<i32>().values().as_ref())?
+        }
+        ArrowDataType::LargeList(_) => {
+            write_leaves(writers, levels, 
array.as_list::<i64>().values().as_ref())?
+        }
         ArrowDataType::FixedSizeList(_, _) => {
-            let arrays: Vec<_> = arrays.iter().map(|array|{
-                array.as_any().downcast_ref::<FixedSizeListArray>()
-                    .expect("unable to get fixed-size list array")
-                    .values()
-                    .clone()
-            }).collect();
-            write_leaves(row_group_writer, &arrays, levels)?;
-            Ok(())
-        },
-        ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
-            Err(ParquetError::NYI(
-                format!(
-                    "Attempting to write an Arrow type {data_type:?} to 
parquet that is not yet implemented"
-                )
-            ))
+            let array = 
array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
+            write_leaves(writers, levels, array.values().as_ref())?
+        }
+        ArrowDataType::Struct(_) => {
+            for column in array.as_struct().columns() {
+                write_leaves(writers, levels, column.as_ref())?
+            }
+        }
+        ArrowDataType::Map(_, _) => {
+            let map = array.as_map();
+            write_leaves(writers, levels, map.keys().as_ref())?;
+            write_leaves(writers, levels, map.values().as_ref())?
+        }
+        _ => {
+            let levels = levels.next().unwrap();
+            match writers.next().unwrap() {
+                ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?,
+                ArrowColumnWriter::ByteArray(c) => write_primitive(c, array, 
levels)?,
+            };
         }
     }
+    Ok(())
 }
 
 fn write_leaf(
     writer: &mut ColumnWriter<'_>,
-    column: &ArrayRef,
+    column: &dyn Array,
     levels: LevelInfo,
-) -> Result<i64> {
+) -> Result<usize> {
     let indices = levels.non_null_indices();
-    let written = match writer {
+    match writer {
         ColumnWriter::Int32ColumnWriter(ref mut typed) => {
             match column.data_type() {
                 ArrowDataType::Date64 => {
@@ -442,26 +535,26 @@ fn write_leaf(
                     let array = arrow_cast::cast(&array, 
&ArrowDataType::Int32)?;
 
                     let array = array.as_primitive::<Int32Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
                 ArrowDataType::UInt32 => {
                     let values = column.as_primitive::<UInt32Type>().values();
                     // follow C++ implementation and use overflow/reinterpret 
cast from  u32 to i32 which will map
                     // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
                     let array = values.inner().typed_data::<i32>();
-                    write_primitive(typed, array, levels)?
+                    write_primitive(typed, array, levels)
                 }
                 ArrowDataType::Decimal128(_, _) => {
                     // use the int32 to represent the decimal with low 
precision
                     let array = column
                         .as_primitive::<Decimal128Type>()
-                        .unary::<_, types::Int32Type>(|v| v as i32);
-                    write_primitive(typed, array.values(), levels)?
+                        .unary::<_, Int32Type>(|v| v as i32);
+                    write_primitive(typed, array.values(), levels)
                 }
                 _ => {
                     let array = arrow_cast::cast(column, 
&ArrowDataType::Int32)?;
                     let array = array.as_primitive::<Int32Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
             }
         }
@@ -471,32 +564,32 @@ fn write_leaf(
                 get_bool_array_slice(array, indices).as_slice(),
                 levels.def_levels(),
                 levels.rep_levels(),
-            )?
+            )
         }
         ColumnWriter::Int64ColumnWriter(ref mut typed) => {
             match column.data_type() {
                 ArrowDataType::Int64 => {
                     let array = column.as_primitive::<Int64Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
                 ArrowDataType::UInt64 => {
                     let values = column.as_primitive::<UInt64Type>().values();
                     // follow C++ implementation and use overflow/reinterpret 
cast from  u64 to i64 which will map
                     // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
                     let array = values.inner().typed_data::<i64>();
-                    write_primitive(typed, array, levels)?
+                    write_primitive(typed, array, levels)
                 }
                 ArrowDataType::Decimal128(_, _) => {
                     // use the int64 to represent the decimal with low 
precision
                     let array = column
                         .as_primitive::<Decimal128Type>()
-                        .unary::<_, types::Int64Type>(|v| v as i64);
-                    write_primitive(typed, array.values(), levels)?
+                        .unary::<_, Int64Type>(|v| v as i64);
+                    write_primitive(typed, array.values(), levels)
                 }
                 _ => {
                     let array = arrow_cast::cast(column, 
&ArrowDataType::Int64)?;
                     let array = array.as_primitive::<Int64Type>();
-                    write_primitive(typed, array.values(), levels)?
+                    write_primitive(typed, array.values(), levels)
                 }
             }
         }
@@ -504,18 +597,12 @@ fn write_leaf(
             unreachable!("Currently unreachable because data type not 
supported")
         }
         ColumnWriter::FloatColumnWriter(ref mut typed) => {
-            let array = column
-                .as_any()
-                .downcast_ref::<arrow_array::Float32Array>()
-                .expect("Unable to get Float32 array");
-            write_primitive(typed, array.values(), levels)?
+            let array = column.as_primitive::<Float32Type>();
+            write_primitive(typed, array.values(), levels)
         }
         ColumnWriter::DoubleColumnWriter(ref mut typed) => {
-            let array = column
-                .as_any()
-                .downcast_ref::<arrow_array::Float64Array>()
-                .expect("Unable to get Float64 array");
-            write_primitive(typed, array.values(), levels)?
+            let array = column.as_primitive::<Float64Type>();
+            write_primitive(typed, array.values(), levels)
         }
         ColumnWriter::ByteArrayColumnWriter(_) => {
             unreachable!("should use ByteArrayWriter")
@@ -553,10 +640,7 @@ fn write_leaf(
                     get_fsb_array_slice(array, indices)
                 }
                 ArrowDataType::Decimal128(_, _) => {
-                    let array = column
-                        .as_any()
-                        .downcast_ref::<arrow_array::Decimal128Array>()
-                        .unwrap();
+                    let array = column.as_primitive::<Decimal128Type>();
                     get_decimal_array_slice(array, indices)
                 }
                 _ => {
@@ -566,19 +650,14 @@ fn write_leaf(
                     ));
                 }
             };
-            typed.write_batch(
-                bytes.as_slice(),
-                levels.def_levels(),
-                levels.rep_levels(),
-            )?
+            typed.write_batch(bytes.as_slice(), levels.def_levels(), 
levels.rep_levels())
         }
-    };
-    Ok(written as i64)
+    }
 }
 
-fn write_primitive<T: DataType>(
-    writer: &mut ColumnWriterImpl<'_, T>,
-    values: &[T::T],
+fn write_primitive<E: ColumnValueEncoder>(
+    writer: &mut GenericColumnWriter<E>,
+    values: &E::Values,
     levels: LevelInfo,
 ) -> Result<usize> {
     writer.write_batch_internal(
@@ -2462,4 +2541,40 @@ mod tests {
         assert_ne!(back.schema(), batch.schema());
         assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
     }
+
+    #[test]
+    fn in_progress_accounting() {
+        // define schema
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+
+        // create some data
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(a)]).unwrap();
+
+        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), 
None).unwrap();
+
+        // starts empty
+        assert_eq!(writer.in_progress_size(), 0);
+        assert_eq!(writer.in_progress_rows(), 0);
+        writer.write(&batch).unwrap();
+
+        // updated on write
+        let initial_size = writer.in_progress_size();
+        assert!(initial_size > 0);
+        assert_eq!(writer.in_progress_rows(), 5);
+
+        // updated on second write
+        writer.write(&batch).unwrap();
+        assert!(writer.in_progress_size() > initial_size);
+        assert_eq!(writer.in_progress_rows(), 10);
+
+        // cleared on flush
+        writer.flush().unwrap();
+        assert_eq!(writer.in_progress_size(), 0);
+        assert_eq!(writer.in_progress_rows(), 0);
+
+        writer.close().unwrap();
+    }
 }
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index f854e5cac..57a0278e2 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -162,6 +162,75 @@ impl CompressedPage {
     pub fn data(&self) -> &[u8] {
         self.compressed_page.buffer().data()
     }
+
+    /// Returns the thrift page header
+    pub(crate) fn to_thrift_header(&self) -> PageHeader {
+        let uncompressed_size = self.uncompressed_size();
+        let compressed_size = self.compressed_size();
+        let num_values = self.num_values();
+        let encoding = self.encoding();
+        let page_type = self.page_type();
+
+        let mut page_header = PageHeader {
+            type_: page_type.into(),
+            uncompressed_page_size: uncompressed_size as i32,
+            compressed_page_size: compressed_size as i32,
+            // TODO: Add support for crc checksum
+            crc: None,
+            data_page_header: None,
+            index_page_header: None,
+            dictionary_page_header: None,
+            data_page_header_v2: None,
+        };
+
+        match self.compressed_page {
+            Page::DataPage {
+                def_level_encoding,
+                rep_level_encoding,
+                ref statistics,
+                ..
+            } => {
+                let data_page_header = crate::format::DataPageHeader {
+                    num_values: num_values as i32,
+                    encoding: encoding.into(),
+                    definition_level_encoding: def_level_encoding.into(),
+                    repetition_level_encoding: rep_level_encoding.into(),
+                    statistics: 
crate::file::statistics::to_thrift(statistics.as_ref()),
+                };
+                page_header.data_page_header = Some(data_page_header);
+            }
+            Page::DataPageV2 {
+                num_nulls,
+                num_rows,
+                def_levels_byte_len,
+                rep_levels_byte_len,
+                is_compressed,
+                ref statistics,
+                ..
+            } => {
+                let data_page_header_v2 = crate::format::DataPageHeaderV2 {
+                    num_values: num_values as i32,
+                    num_nulls: num_nulls as i32,
+                    num_rows: num_rows as i32,
+                    encoding: encoding.into(),
+                    definition_levels_byte_length: def_levels_byte_len as i32,
+                    repetition_levels_byte_length: rep_levels_byte_len as i32,
+                    is_compressed: Some(is_compressed),
+                    statistics: 
crate::file::statistics::to_thrift(statistics.as_ref()),
+                };
+                page_header.data_page_header_v2 = Some(data_page_header_v2);
+            }
+            Page::DictionaryPage { is_sorted, .. } => {
+                let dictionary_page_header = 
crate::format::DictionaryPageHeader {
+                    num_values: num_values as i32,
+                    encoding: encoding.into(),
+                    is_sorted: Some(is_sorted),
+                };
+                page_header.dictionary_page_header = 
Some(dictionary_page_header);
+            }
+        }
+        page_header
+    }
 }
 
 /// Contains page write metrics.
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
index c343f1d6c..fb5889b78 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -36,7 +36,7 @@ pub trait ColumnValues {
 }
 
 #[cfg(feature = "arrow")]
-impl<T: arrow_array::Array> ColumnValues for T {
+impl ColumnValues for dyn arrow_array::Array {
     fn len(&self) -> usize {
         arrow_array::Array::len(self)
     }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index bf77b2b32..5e623d281 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -43,6 +43,21 @@ use crate::util::memory::ByteBufferPtr;
 
 pub(crate) mod encoder;
 
+macro_rules! downcast_writer {
+    ($e:expr, $i:ident, $b:expr) => {
+        match $e {
+            Self::BoolColumnWriter($i) => $b,
+            Self::Int32ColumnWriter($i) => $b,
+            Self::Int64ColumnWriter($i) => $b,
+            Self::Int96ColumnWriter($i) => $b,
+            Self::FloatColumnWriter($i) => $b,
+            Self::DoubleColumnWriter($i) => $b,
+            Self::ByteArrayColumnWriter($i) => $b,
+            Self::FixedLenByteArrayColumnWriter($i) => $b,
+        }
+    };
+}
+
 /// Column writer for a Parquet type.
 pub enum ColumnWriter<'a> {
     BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
@@ -55,6 +70,19 @@ pub enum ColumnWriter<'a> {
     FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
 }
 
+impl<'a> ColumnWriter<'a> {
+    /// Returns the estimated total bytes for this column writer
+    #[cfg(feature = "arrow")]
+    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
+        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
+    }
+
+    /// Close this [`ColumnWriter`]
+    pub fn close(self) -> Result<ColumnCloseResult> {
+        downcast_writer!(self, typed, typed.close())
+    }
+}
+
 pub enum Level {
     Page,
     Column,
@@ -421,10 +449,24 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
     /// Returns total number of bytes written by this column writer so far.
     /// This value is also returned when column writer is closed.
+    ///
+    /// Note: this value does not include any buffered data that has not
+    /// yet been flushed to a page.
     pub fn get_total_bytes_written(&self) -> u64 {
         self.column_metrics.total_bytes_written
     }
 
+    /// Returns the estimated total bytes for this column writer
+    ///
+    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
+    /// of any data that has not yet been flushed to a page
+    #[cfg(feature = "arrow")]
+    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
+        self.column_metrics.total_bytes_written
+            + self.encoder.estimated_data_page_size() as u64
+            + self.encoder.estimated_dict_page_size().unwrap_or_default() as 
u64
+    }
+
     /// Returns total number of rows written by this column writer so far.
     /// This value is also returned when column writer is closed.
     pub fn get_total_rows_written(&self) -> u64 {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 4f15c9f4b..defdaad32 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -30,16 +30,13 @@ use crate::column::writer::{
     get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl,
 };
 use crate::column::{
-    page::{CompressedPage, Page, PageWriteSpec, PageWriter},
+    page::{CompressedPage, PageWriteSpec, PageWriter},
     writer::{get_column_writer, ColumnWriter},
 };
 use crate::data_type::DataType;
 use crate::errors::{ParquetError, Result};
 use crate::file::reader::ChunkReader;
-use crate::file::{
-    metadata::*, properties::WriterPropertiesPtr,
-    statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC,
-};
+use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
 use crate::schema::types::{
     self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr,
 };
@@ -370,6 +367,16 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         self.kv_metadatas.push(kv_metadata);
     }
 
+    /// Returns a reference to schema descriptor.
+    pub fn schema_descr(&self) -> &SchemaDescriptor {
+        &self.descr
+    }
+
+    /// Returns a reference to the writer properties
+    pub fn properties(&self) -> &WriterPropertiesPtr {
+        &self.props
+    }
+
     /// Writes the file footer and returns the underlying writer.
     pub fn into_inner(mut self) -> Result<W> {
         self.assert_previous_writer_closed()?;
@@ -653,17 +660,7 @@ impl<'a> SerializedColumnWriter<'a> {
 
     /// Close this [`SerializedColumnWriter`]
     pub fn close(mut self) -> Result<()> {
-        let r = match self.inner {
-            ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
-            ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
-            ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
-            ColumnWriter::FixedLenByteArrayColumnWriter(typed) => 
typed.close()?,
-        };
-
+        let r = self.inner.close()?;
         if let Some(on_close) = self.on_close.take() {
             on_close(r)?
         }
@@ -701,83 +698,20 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
 
 impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
     fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
-        let uncompressed_size = page.uncompressed_size();
-        let compressed_size = page.compressed_size();
-        let num_values = page.num_values();
-        let encoding = page.encoding();
         let page_type = page.page_type();
-
-        let mut page_header = parquet::PageHeader {
-            type_: page_type.into(),
-            uncompressed_page_size: uncompressed_size as i32,
-            compressed_page_size: compressed_size as i32,
-            // TODO: Add support for crc checksum
-            crc: None,
-            data_page_header: None,
-            index_page_header: None,
-            dictionary_page_header: None,
-            data_page_header_v2: None,
-        };
-
-        match *page.compressed_page() {
-            Page::DataPage {
-                def_level_encoding,
-                rep_level_encoding,
-                ref statistics,
-                ..
-            } => {
-                let data_page_header = parquet::DataPageHeader {
-                    num_values: num_values as i32,
-                    encoding: encoding.into(),
-                    definition_level_encoding: def_level_encoding.into(),
-                    repetition_level_encoding: rep_level_encoding.into(),
-                    statistics: statistics_to_thrift(statistics.as_ref()),
-                };
-                page_header.data_page_header = Some(data_page_header);
-            }
-            Page::DataPageV2 {
-                num_nulls,
-                num_rows,
-                def_levels_byte_len,
-                rep_levels_byte_len,
-                is_compressed,
-                ref statistics,
-                ..
-            } => {
-                let data_page_header_v2 = parquet::DataPageHeaderV2 {
-                    num_values: num_values as i32,
-                    num_nulls: num_nulls as i32,
-                    num_rows: num_rows as i32,
-                    encoding: encoding.into(),
-                    definition_levels_byte_length: def_levels_byte_len as i32,
-                    repetition_levels_byte_length: rep_levels_byte_len as i32,
-                    is_compressed: Some(is_compressed),
-                    statistics: statistics_to_thrift(statistics.as_ref()),
-                };
-                page_header.data_page_header_v2 = Some(data_page_header_v2);
-            }
-            Page::DictionaryPage { is_sorted, .. } => {
-                let dictionary_page_header = parquet::DictionaryPageHeader {
-                    num_values: num_values as i32,
-                    encoding: encoding.into(),
-                    is_sorted: Some(is_sorted),
-                };
-                page_header.dictionary_page_header = 
Some(dictionary_page_header);
-            }
-        }
-
         let start_pos = self.sink.bytes_written() as u64;
 
+        let page_header = page.to_thrift_header();
         let header_size = self.serialize_page_header(page_header)?;
         self.sink.write_all(page.data())?;
 
         let mut spec = PageWriteSpec::new();
         spec.page_type = page_type;
-        spec.uncompressed_size = uncompressed_size + header_size;
-        spec.compressed_size = compressed_size + header_size;
+        spec.uncompressed_size = page.uncompressed_size() + header_size;
+        spec.compressed_size = page.compressed_size() + header_size;
         spec.offset = start_pos;
         spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
-        spec.num_values = num_values;
+        spec.num_values = page.num_values();
 
         Ok(spec)
     }
@@ -804,7 +738,7 @@ mod tests {
     use std::fs::File;
 
     use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
-    use crate::column::page::PageReader;
+    use crate::column::page::{Page, PageReader};
     use crate::column::reader::get_typed_column_reader;
     use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
     use crate::data_type::{BoolType, Int32Type};
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 909878a6d..25d15dd4f 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -114,6 +114,12 @@ impl From<Bytes> for ByteBufferPtr {
     }
 }
 
+impl From<ByteBufferPtr> for Bytes {
+    fn from(value: ByteBufferPtr) -> Self {
+        value.data
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to