This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ea0089224 Improve `ArrowWriter` memory usage: Buffer Pages in
ArrowWriter instead of RecordBatch (#3871) (#4280)
ea0089224 is described below
commit ea008922445d84d957cf3f89df793187c22d82d8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 29 13:04:46 2023 +0100
Improve `ArrowWriter` memory usage: Buffer Pages in ArrowWriter instead of
RecordBatch (#3871) (#4280)
* Buffer Pages in ArrowWriter instead of RecordBatch (#3871)
* Review feedback
* Improved memory accounting
* Clippy
---
parquet/src/arrow/arrow_writer/byte_array.rs | 57 +--
parquet/src/arrow/arrow_writer/mod.rs | 673 ++++++++++++++++-----------
parquet/src/column/page.rs | 69 +++
parquet/src/column/writer/encoder.rs | 2 +-
parquet/src/column/writer/mod.rs | 42 ++
parquet/src/file/writer.rs | 102 +---
parquet/src/util/memory.rs | 6 +
7 files changed, 535 insertions(+), 416 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 77f9598b2..6dbc83dd0 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -15,25 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
-use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage,
};
-use crate::column::writer::GenericColumnWriter;
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
-use crate::file::properties::{WriterProperties, WriterPropertiesPtr,
WriterVersion};
-use crate::file::writer::OnCloseColumnChunk;
+use crate::file::properties::{WriterProperties, WriterVersion};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::{
- Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray,
LargeBinaryArray,
+ Array, ArrayAccessor, BinaryArray, DictionaryArray, LargeBinaryArray,
LargeStringArray, StringArray,
};
use arrow_schema::DataType;
@@ -94,49 +90,6 @@ macro_rules! downcast_op {
};
}
-/// A writer for byte array types
-pub(super) struct ByteArrayWriter<'a> {
- writer: GenericColumnWriter<'a, ByteArrayEncoder>,
- on_close: Option<OnCloseColumnChunk<'a>>,
-}
-
-impl<'a> ByteArrayWriter<'a> {
- /// Returns a new [`ByteArrayWriter`]
- pub fn new(
- descr: ColumnDescPtr,
- props: WriterPropertiesPtr,
- page_writer: Box<dyn PageWriter + 'a>,
- on_close: OnCloseColumnChunk<'a>,
- ) -> Result<Self> {
- Ok(Self {
- writer: GenericColumnWriter::new(descr, props, page_writer),
- on_close: Some(on_close),
- })
- }
-
- pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()>
{
- self.writer.write_batch_internal(
- array,
- Some(levels.non_null_indices()),
- levels.def_levels(),
- levels.rep_levels(),
- None,
- None,
- None,
- )?;
- Ok(())
- }
-
- pub fn close(self) -> Result<()> {
- let r = self.writer.close()?;
-
- if let Some(on_close) = self.on_close {
- on_close(r)?;
- }
- Ok(())
- }
-}
-
/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
@@ -427,7 +380,7 @@ impl DictEncoder {
}
}
-struct ByteArrayEncoder {
+pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
min_value: Option<ByteArray>,
@@ -437,11 +390,11 @@ struct ByteArrayEncoder {
impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
- type Values = ArrayRef;
+ type Values = dyn Array;
fn min_max(
&self,
- values: &ArrayRef,
+ values: &dyn Array,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 616968bf6..bde21ae85 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -17,16 +17,21 @@
//! Contains writer which writes arrow data into parquet data.
-use std::collections::VecDeque;
+use bytes::Bytes;
use std::fmt::Debug;
-use std::io::Write;
-use std::sync::Arc;
+use std::io::{Read, Write};
+use std::iter::Peekable;
+use std::slice::Iter;
+use std::sync::{Arc, Mutex};
+use std::vec::IntoIter;
+use thrift::protocol::{TCompactOutputProtocol, TSerializable};
use arrow_array::cast::AsArray;
-use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type,
UInt64Type};
-use arrow_array::{
- types, Array, ArrayRef, FixedSizeListArray, RecordBatch, RecordBatchWriter,
+use arrow_array::types::{
+ Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, UInt32Type,
+ UInt64Type,
};
+use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit,
SchemaRef};
use super::schema::{
@@ -34,14 +39,19 @@ use super::schema::{
decimal_length_from_precision,
};
-use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
-use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
-use crate::data_type::{ByteArray, DataType, FixedLenByteArray};
+use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
+use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
+use crate::column::writer::encoder::ColumnValueEncoder;
+use crate::column::writer::{
+ get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
+};
+use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr};
-use crate::file::properties::WriterProperties;
+use crate::file::metadata::{ColumnChunkMetaData, KeyValue,
RowGroupMetaDataPtr};
+use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
+use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::SerializedFileWriter;
-use crate::file::writer::SerializedRowGroupWriter;
+use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use levels::{calculate_array_levels, LevelInfo};
mod byte_array;
@@ -49,8 +59,8 @@ mod levels;
/// Arrow writer
///
-/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up
`RecordBatch` in order
-/// to produce row groups with `max_row_group_size` rows. Any remaining rows
will be
+/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`]
will be encoded
+/// to the same row group, up to `max_row_group_size` rows. Any remaining rows
will be
/// flushed on close, leading the final row group in the output file to
potentially
/// contain fewer than `max_row_group_size` rows
///
@@ -78,11 +88,8 @@ pub struct ArrowWriter<W: Write> {
/// Underlying Parquet writer
writer: SerializedFileWriter<W>,
- /// For each column, maintain an ordered queue of arrays to write
- buffer: Vec<VecDeque<ArrayRef>>,
-
- /// The total number of rows currently buffered
- buffered_rows: usize,
+ /// The in-progress row group if any
+ in_progress: Option<ArrowRowGroupWriter>,
/// A copy of the Arrow schema.
///
@@ -93,24 +100,13 @@ pub struct ArrowWriter<W: Write> {
max_row_group_size: usize,
}
-impl<W: Write> Debug for ArrowWriter<W> {
+impl<W: Write + Send> Debug for ArrowWriter<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let buffered_batches = self.buffer.len();
- let mut buffered_memory = 0;
-
- for batch in self.buffer.iter() {
- for arr in batch.iter() {
- buffered_memory += arr.get_array_memory_size()
- }
- }
-
+ let buffered_memory = self.in_progress_size();
f.debug_struct("ArrowWriter")
.field("writer", &self.writer)
- .field(
- "buffer",
- &format!("{buffered_batches} , {buffered_memory} bytes"),
- )
- .field("buffered_rows", &self.buffered_rows)
+ .field("in_progress_size", &format_args!("{buffered_memory}
bytes"))
+ .field("in_progress_rows", &self.in_progress_rows())
.field("arrow_schema", &self.arrow_schema)
.field("max_row_group_size", &self.max_row_group_size)
.finish()
@@ -140,8 +136,7 @@ impl<W: Write + Send> ArrowWriter<W> {
Ok(Self {
writer: file_writer,
- buffer: vec![Default::default(); arrow_schema.fields().len()],
- buffered_rows: 0,
+ in_progress: None,
arrow_schema,
max_row_group_size,
})
@@ -152,43 +147,75 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.flushed_row_groups()
}
- /// Enqueues the provided `RecordBatch` to be written
+ /// Returns the estimated length in bytes of the current in progress row
group
+ pub fn in_progress_size(&self) -> usize {
+ match &self.in_progress {
+ Some(in_progress) => in_progress
+ .writers
+ .iter()
+ .map(|(_, x)| x.get_estimated_total_bytes() as usize)
+ .sum(),
+ None => 0,
+ }
+ }
+
+ /// Returns the number of rows buffered in the in progress row group
+ pub fn in_progress_rows(&self) -> usize {
+ self.in_progress
+ .as_ref()
+ .map(|x| x.buffered_rows)
+ .unwrap_or_default()
+ }
+
+ /// Encodes the provided [`RecordBatch`]
///
- /// If following this there are more than `max_row_group_size` rows
buffered,
- /// this will flush out one or more row groups with `max_row_group_size`
rows,
- /// and drop any fully written `RecordBatch`
+ /// If this would cause the current row group to exceed
[`WriterProperties::max_row_group_size`]
+ /// rows, the contents of `batch` will be written to one or more row
groups such that all but
+ /// the final row group in the file contain
[`WriterProperties::max_row_group_size`] rows
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
- // validate batch schema against writer's supplied schema
- let batch_schema = batch.schema();
- if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
- || self.arrow_schema.contains(&batch_schema))
- {
- return Err(ParquetError::ArrowError(
- "Record batch schema does not match writer schema".to_string(),
- ));
+ if batch.num_rows() == 0 {
+ return Ok(());
}
- for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
- buffer.push_back(column.clone())
- }
+ let in_progress = match &mut self.in_progress {
+ Some(in_progress) => in_progress,
+ x => x.insert(ArrowRowGroupWriter::new(
+ self.writer.schema_descr(),
+ self.writer.properties(),
+ &self.arrow_schema,
+ )?),
+ };
- self.buffered_rows += batch.num_rows();
- self.flush_completed()?;
+ // If would exceed max_row_group_size, split batch
+ if in_progress.buffered_rows + batch.num_rows() >
self.max_row_group_size {
+ let to_write = self.max_row_group_size - in_progress.buffered_rows;
+ let a = batch.slice(0, to_write);
+ let b = batch.slice(to_write, batch.num_rows() - to_write);
+ self.write(&a)?;
+ return self.write(&b);
+ }
- Ok(())
- }
+ in_progress.write(batch)?;
- /// Flushes buffered data until there are less than `max_row_group_size`
rows buffered
- fn flush_completed(&mut self) -> Result<()> {
- while self.buffered_rows >= self.max_row_group_size {
- self.flush_rows(self.max_row_group_size)?;
+ if in_progress.buffered_rows >= self.max_row_group_size {
+ self.flush()?
}
Ok(())
}
/// Flushes all buffered rows into a new row group
pub fn flush(&mut self) -> Result<()> {
- self.flush_rows(self.buffered_rows)
+ let in_progress = match self.in_progress.take() {
+ Some(in_progress) => in_progress,
+ None => return Ok(()),
+ };
+
+ let mut row_group_writer = self.writer.next_row_group()?;
+ for (chunk, close) in in_progress.close()? {
+ row_group_writer.append_column(&chunk, close)?;
+ }
+ row_group_writer.close()?;
+ Ok(())
}
/// Additional [`KeyValue`] metadata to be written in addition to those
from [`WriterProperties`]
@@ -198,68 +225,6 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.append_key_value_metadata(kv_metadata)
}
- /// Flushes `num_rows` from the buffer into a new row group
- fn flush_rows(&mut self, num_rows: usize) -> Result<()> {
- if num_rows == 0 {
- return Ok(());
- }
-
- assert!(
- num_rows <= self.buffered_rows,
- "cannot flush {} rows only have {}",
- num_rows,
- self.buffered_rows
- );
-
- assert!(
- num_rows <= self.max_row_group_size,
- "cannot flush {} rows would exceed max row group size of {}",
- num_rows,
- self.max_row_group_size
- );
-
- let mut row_group_writer = self.writer.next_row_group()?;
-
- for (col_buffer, field) in
self.buffer.iter_mut().zip(self.arrow_schema.fields())
- {
- // Collect the number of arrays to append
- let mut remaining = num_rows;
- let mut arrays = Vec::with_capacity(col_buffer.len());
- while remaining != 0 {
- match col_buffer.pop_front() {
- Some(next) if next.len() > remaining => {
- col_buffer
- .push_front(next.slice(remaining, next.len() -
remaining));
- arrays.push(next.slice(0, remaining));
- remaining = 0;
- }
- Some(next) => {
- remaining -= next.len();
- arrays.push(next);
- }
- _ => break,
- }
- }
-
- let mut levels = arrays
- .iter()
- .map(|array| {
- let mut levels = calculate_array_levels(array, field)?;
- // Reverse levels as we pop() them when writing arrays
- levels.reverse();
- Ok(levels)
- })
- .collect::<Result<Vec<_>>>()?;
-
- write_leaves(&mut row_group_writer, &arrays, &mut levels)?;
- }
-
- row_group_writer.close()?;
- self.buffered_rows -= num_rows;
-
- Ok(())
- }
-
/// Flushes any outstanding data and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.flush()?;
@@ -284,156 +249,284 @@ impl<W: Write + Send> RecordBatchWriter for
ArrowWriter<W> {
}
}
-fn write_leaves<W: Write + Send>(
- row_group_writer: &mut SerializedRowGroupWriter<'_, W>,
- arrays: &[ArrayRef],
- levels: &mut [Vec<LevelInfo>],
-) -> Result<()> {
- assert_eq!(arrays.len(), levels.len());
- assert!(!arrays.is_empty());
-
- let data_type = arrays.first().unwrap().data_type().clone();
- assert!(arrays.iter().all(|a| a.data_type() == &data_type));
-
- match &data_type {
- ArrowDataType::Null
- | ArrowDataType::Boolean
- | ArrowDataType::Int8
- | ArrowDataType::Int16
- | ArrowDataType::Int32
- | ArrowDataType::Int64
- | ArrowDataType::UInt8
- | ArrowDataType::UInt16
- | ArrowDataType::UInt32
- | ArrowDataType::UInt64
- | ArrowDataType::Float32
- | ArrowDataType::Float64
- | ArrowDataType::Timestamp(_, _)
- | ArrowDataType::Date32
- | ArrowDataType::Date64
- | ArrowDataType::Time32(_)
- | ArrowDataType::Time64(_)
- | ArrowDataType::Duration(_)
- | ArrowDataType::Interval(_)
- | ArrowDataType::Decimal128(_, _)
- | ArrowDataType::Decimal256(_, _)
- | ArrowDataType::FixedSizeBinary(_) => {
- let mut col_writer = row_group_writer.next_column()?.unwrap();
- for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
- write_leaf(col_writer.untyped(), array,
levels.pop().expect("Levels exhausted"))?;
- }
- col_writer.close()
- }
- ArrowDataType::LargeBinary
- | ArrowDataType::Binary
- | ArrowDataType::Utf8
- | ArrowDataType::LargeUtf8 => {
- let mut col_writer =
row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
- for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
- col_writer.write(array, levels.pop().expect("Levels
exhausted"))?;
+/// A list of [`Bytes`] comprising a single column chunk
+#[derive(Default)]
+struct ArrowColumnChunk {
+ length: usize,
+ data: Vec<Bytes>,
+}
+
+impl Length for ArrowColumnChunk {
+ fn len(&self) -> u64 {
+ self.length as _
+ }
+}
+
+impl ChunkReader for ArrowColumnChunk {
+ type T = ChainReader;
+
+ fn get_read(&self, start: u64) -> Result<Self::T> {
+ assert_eq!(start, 0); // Assume append_column writes all data in
one-shot
+ Ok(ChainReader(self.data.clone().into_iter().peekable()))
+ }
+
+ fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
+ unimplemented!()
+ }
+}
+
+/// A [`Read`] for an iterator of [`Bytes`]
+struct ChainReader(Peekable<IntoIter<Bytes>>);
+
+impl Read for ChainReader {
+ fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
+ let buffer = loop {
+ match self.0.peek_mut() {
+ Some(b) if b.is_empty() => {
+ self.0.next();
+ continue;
+ }
+ Some(b) => break b,
+ None => return Ok(0),
}
- col_writer.close()
+ };
+
+ let len = buffer.len().min(out.len());
+ let b = buffer.split_to(len);
+ out[..len].copy_from_slice(&b);
+ Ok(len)
+ }
+}
+
+/// A shared [`ArrowColumnChunk`]
+///
+/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access
via
+/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential
borrows
+type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
+
+#[derive(Default)]
+struct ArrowPageWriter {
+ buffer: SharedColumnChunk,
+}
+
+impl PageWriter for ArrowPageWriter {
+ fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
+ let mut buf = self.buffer.try_lock().unwrap();
+ let page_header = page.to_thrift_header();
+ let header = {
+ let mut header = Vec::with_capacity(1024);
+ let mut protocol = TCompactOutputProtocol::new(&mut header);
+ page_header.write_to_out_protocol(&mut protocol)?;
+ Bytes::from(header)
+ };
+
+ let data = page.compressed_page().buffer().clone();
+ let compressed_size = data.len() + header.len();
+
+ let mut spec = PageWriteSpec::new();
+ spec.page_type = page.page_type();
+ spec.num_values = page.num_values();
+ spec.uncompressed_size = page.uncompressed_size() + header.len();
+ spec.offset = buf.length as u64;
+ spec.compressed_size = compressed_size;
+ spec.bytes_written = compressed_size as u64;
+
+ buf.length += compressed_size;
+ buf.data.push(header);
+ buf.data.push(data.into());
+
+ Ok(spec)
+ }
+
+ fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) ->
Result<()> {
+ // Skip writing metadata as won't be copied anyway
+ Ok(())
+ }
+
+ fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
+
+/// Encodes a leaf column to [`ArrowPageWriter`]
+enum ArrowColumnWriter {
+ ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
+ Column(ColumnWriter<'static>),
+}
+
+impl ArrowColumnWriter {
+ /// Returns the estimated total bytes for this column writer
+ fn get_estimated_total_bytes(&self) -> u64 {
+ match self {
+ ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(),
+ ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(),
}
- ArrowDataType::List(_) => {
- let arrays: Vec<_> = arrays.iter().map(|array|{
- array.as_list::<i32>().values().clone()
- }).collect();
+ }
+}
+
+/// Encodes [`RecordBatch`] to a parquet row group
+struct ArrowRowGroupWriter {
+ writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+ schema: SchemaRef,
+ buffered_rows: usize,
+}
- write_leaves(row_group_writer, &arrays, levels)?;
- Ok(())
+impl ArrowRowGroupWriter {
+ fn new(
+ parquet: &SchemaDescriptor,
+ props: &WriterPropertiesPtr,
+ arrow: &SchemaRef,
+ ) -> Result<Self> {
+ let mut writers = Vec::with_capacity(arrow.fields.len());
+ let mut leaves = parquet.columns().iter();
+ for field in &arrow.fields {
+ get_arrow_column_writer(field.data_type(), props, &mut leaves,
&mut writers)?;
}
- ArrowDataType::LargeList(_) => {
- let arrays: Vec<_> = arrays.iter().map(|array|{
- array.as_list::<i64>().values().clone()
- }).collect();
- write_leaves(row_group_writer, &arrays, levels)?;
- Ok(())
+ Ok(Self {
+ writers,
+ schema: arrow.clone(),
+ buffered_rows: 0,
+ })
+ }
+
+ fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ self.buffered_rows += batch.num_rows();
+ let mut writers = self.writers.iter_mut().map(|(_, x)| x);
+ for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
+ let mut levels = calculate_array_levels(array, field)?.into_iter();
+ write_leaves(&mut writers, &mut levels, array.as_ref())?;
}
- ArrowDataType::Struct(fields) => {
- // Groups child arrays by field
- let mut field_arrays = vec![Vec::with_capacity(arrays.len());
fields.len()];
+ Ok(())
+ }
- for array in arrays {
- let struct_array: &arrow_array::StructArray = array
- .as_any()
- .downcast_ref::<arrow_array::StructArray>()
- .expect("Unable to get struct array");
+ fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+ self.writers
+ .into_iter()
+ .map(|(chunk, writer)| {
+ let close_result = match writer {
+ ArrowColumnWriter::ByteArray(c) => c.close()?,
+ ArrowColumnWriter::Column(c) => c.close()?,
+ };
+
+ let chunk =
Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap();
+ Ok((chunk, close_result))
+ })
+ .collect()
+ }
+}
- assert_eq!(struct_array.columns().len(), fields.len());
+/// Get an [`ArrowColumnWriter`] along with a reference to its
[`SharedColumnChunk`]
+fn get_arrow_column_writer(
+ data_type: &ArrowDataType,
+ props: &WriterPropertiesPtr,
+ leaves: &mut Iter<'_, ColumnDescPtr>,
+ out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+) -> Result<()> {
+ let col = |desc: &ColumnDescPtr| {
+ let page_writer = Box::<ArrowPageWriter>::default();
+ let chunk = page_writer.buffer.clone();
+ let writer = get_column_writer(desc.clone(), props.clone(),
page_writer);
+ (chunk, ArrowColumnWriter::Column(writer))
+ };
- for (child_array, field) in
field_arrays.iter_mut().zip(struct_array.columns()) {
- child_array.push(field.clone())
- }
- }
+ let bytes = |desc: &ColumnDescPtr| {
+ let page_writer = Box::<ArrowPageWriter>::default();
+ let chunk = page_writer.buffer.clone();
+ let writer = GenericColumnWriter::new(desc.clone(), props.clone(),
page_writer);
+ (chunk, ArrowColumnWriter::ByteArray(writer))
+ };
- for field in field_arrays {
- write_leaves(row_group_writer, &field, levels)?;
+ match data_type {
+ _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
+ ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean |
ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
+ ArrowDataType::LargeBinary
+ | ArrowDataType::Binary
+ | ArrowDataType::Utf8
+ | ArrowDataType::LargeUtf8 => {
+ out.push(bytes(leaves.next().unwrap()))
+ }
+ ArrowDataType::List(f)
+ | ArrowDataType::LargeList(f)
+ | ArrowDataType::FixedSizeList(f, _) => {
+ get_arrow_column_writer(f.data_type(), props, leaves, out)?
+ }
+ ArrowDataType::Struct(fields) => {
+ for field in fields {
+ get_arrow_column_writer(field.data_type(), props, leaves, out)?
}
-
- Ok(())
}
- ArrowDataType::Map(_, _) => {
- let mut keys = Vec::with_capacity(arrays.len());
- let mut values = Vec::with_capacity(arrays.len());
- for array in arrays {
- let map_array: &arrow_array::MapArray = array
- .as_any()
- .downcast_ref::<arrow_array::MapArray>()
- .expect("Unable to get map array");
- keys.push(map_array.keys().clone());
- values.push(map_array.values().clone());
+ ArrowDataType::Map(f, _) => match f.data_type() {
+ ArrowDataType::Struct(f) => {
+ get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
+ get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
}
-
- write_leaves(row_group_writer, &keys, levels)?;
- write_leaves(row_group_writer, &values, levels)?;
- Ok(())
+ _ => unreachable!("invalid map type"),
}
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 |
ArrowDataType::Binary | ArrowDataType::LargeBinary => {
- let mut col_writer =
row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
- for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
- col_writer.write(array, levels.pop().expect("Levels
exhausted"))?;
- }
- col_writer.close()
+ out.push(bytes(leaves.next().unwrap()))
}
_ => {
- let mut col_writer = row_group_writer.next_column()?.unwrap();
- for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
- write_leaf(col_writer.untyped(), array,
levels.pop().expect("Levels exhausted"))?;
- }
- col_writer.close()
+ out.push(col(leaves.next().unwrap()))
}
}
- ArrowDataType::Float16 => Err(ParquetError::ArrowError(
- "Float16 arrays not supported".to_string(),
- )),
+ _ => return Err(ParquetError::NYI(
+ format!(
+ "Attempting to write an Arrow type {data_type:?} to parquet
that is not yet implemented"
+ )
+ ))
+ }
+ Ok(())
+}
+
+/// Write the leaves of `array` in depth-first order to `writers` with `levels`
+fn write_leaves<'a, W>(
+ writers: &mut W,
+ levels: &mut IntoIter<LevelInfo>,
+ array: &(dyn Array + 'static),
+) -> Result<()>
+where
+ W: Iterator<Item = &'a mut ArrowColumnWriter>,
+{
+ match array.data_type() {
+ ArrowDataType::List(_) => {
+ write_leaves(writers, levels,
array.as_list::<i32>().values().as_ref())?
+ }
+ ArrowDataType::LargeList(_) => {
+ write_leaves(writers, levels,
array.as_list::<i64>().values().as_ref())?
+ }
ArrowDataType::FixedSizeList(_, _) => {
- let arrays: Vec<_> = arrays.iter().map(|array|{
- array.as_any().downcast_ref::<FixedSizeListArray>()
- .expect("unable to get fixed-size list array")
- .values()
- .clone()
- }).collect();
- write_leaves(row_group_writer, &arrays, levels)?;
- Ok(())
- },
- ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
- Err(ParquetError::NYI(
- format!(
- "Attempting to write an Arrow type {data_type:?} to
parquet that is not yet implemented"
- )
- ))
+ let array =
array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
+ write_leaves(writers, levels, array.values().as_ref())?
+ }
+ ArrowDataType::Struct(_) => {
+ for column in array.as_struct().columns() {
+ write_leaves(writers, levels, column.as_ref())?
+ }
+ }
+ ArrowDataType::Map(_, _) => {
+ let map = array.as_map();
+ write_leaves(writers, levels, map.keys().as_ref())?;
+ write_leaves(writers, levels, map.values().as_ref())?
+ }
+ _ => {
+ let levels = levels.next().unwrap();
+ match writers.next().unwrap() {
+ ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?,
+ ArrowColumnWriter::ByteArray(c) => write_primitive(c, array,
levels)?,
+ };
}
}
+ Ok(())
}
fn write_leaf(
writer: &mut ColumnWriter<'_>,
- column: &ArrayRef,
+ column: &dyn Array,
levels: LevelInfo,
-) -> Result<i64> {
+) -> Result<usize> {
let indices = levels.non_null_indices();
- let written = match writer {
+ match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Date64 => {
@@ -442,26 +535,26 @@ fn write_leaf(
let array = arrow_cast::cast(&array,
&ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
- write_primitive(typed, array.values(), levels)?
+ write_primitive(typed, array.values(), levels)
}
ArrowDataType::UInt32 => {
let values = column.as_primitive::<UInt32Type>().values();
// follow C++ implementation and use overflow/reinterpret
cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = values.inner().typed_data::<i32>();
- write_primitive(typed, array, levels)?
+ write_primitive(typed, array, levels)
}
ArrowDataType::Decimal128(_, _) => {
// use the int32 to represent the decimal with low
precision
let array = column
.as_primitive::<Decimal128Type>()
- .unary::<_, types::Int32Type>(|v| v as i32);
- write_primitive(typed, array.values(), levels)?
+ .unary::<_, Int32Type>(|v| v as i32);
+ write_primitive(typed, array.values(), levels)
}
_ => {
let array = arrow_cast::cast(column,
&ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
- write_primitive(typed, array.values(), levels)?
+ write_primitive(typed, array.values(), levels)
}
}
}
@@ -471,32 +564,32 @@ fn write_leaf(
get_bool_array_slice(array, indices).as_slice(),
levels.def_levels(),
levels.rep_levels(),
- )?
+ )
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
- write_primitive(typed, array.values(), levels)?
+ write_primitive(typed, array.values(), levels)
}
ArrowDataType::UInt64 => {
let values = column.as_primitive::<UInt64Type>().values();
// follow C++ implementation and use overflow/reinterpret
cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = values.inner().typed_data::<i64>();
- write_primitive(typed, array, levels)?
+ write_primitive(typed, array, levels)
}
ArrowDataType::Decimal128(_, _) => {
// use the int64 to represent the decimal with low
precision
let array = column
.as_primitive::<Decimal128Type>()
- .unary::<_, types::Int64Type>(|v| v as i64);
- write_primitive(typed, array.values(), levels)?
+ .unary::<_, Int64Type>(|v| v as i64);
+ write_primitive(typed, array.values(), levels)
}
_ => {
let array = arrow_cast::cast(column,
&ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
- write_primitive(typed, array.values(), levels)?
+ write_primitive(typed, array.values(), levels)
}
}
}
@@ -504,18 +597,12 @@ fn write_leaf(
unreachable!("Currently unreachable because data type not
supported")
}
ColumnWriter::FloatColumnWriter(ref mut typed) => {
- let array = column
- .as_any()
- .downcast_ref::<arrow_array::Float32Array>()
- .expect("Unable to get Float32 array");
- write_primitive(typed, array.values(), levels)?
+ let array = column.as_primitive::<Float32Type>();
+ write_primitive(typed, array.values(), levels)
}
ColumnWriter::DoubleColumnWriter(ref mut typed) => {
- let array = column
- .as_any()
- .downcast_ref::<arrow_array::Float64Array>()
- .expect("Unable to get Float64 array");
- write_primitive(typed, array.values(), levels)?
+ let array = column.as_primitive::<Float64Type>();
+ write_primitive(typed, array.values(), levels)
}
ColumnWriter::ByteArrayColumnWriter(_) => {
unreachable!("should use ByteArrayWriter")
@@ -553,10 +640,7 @@ fn write_leaf(
get_fsb_array_slice(array, indices)
}
ArrowDataType::Decimal128(_, _) => {
- let array = column
- .as_any()
- .downcast_ref::<arrow_array::Decimal128Array>()
- .unwrap();
+ let array = column.as_primitive::<Decimal128Type>();
get_decimal_array_slice(array, indices)
}
_ => {
@@ -566,19 +650,14 @@ fn write_leaf(
));
}
};
- typed.write_batch(
- bytes.as_slice(),
- levels.def_levels(),
- levels.rep_levels(),
- )?
+ typed.write_batch(bytes.as_slice(), levels.def_levels(),
levels.rep_levels())
}
- };
- Ok(written as i64)
+ }
}
-fn write_primitive<T: DataType>(
- writer: &mut ColumnWriterImpl<'_, T>,
- values: &[T::T],
+fn write_primitive<E: ColumnValueEncoder>(
+ writer: &mut GenericColumnWriter<E>,
+ values: &E::Values,
levels: LevelInfo,
) -> Result<usize> {
writer.write_batch_internal(
@@ -2462,4 +2541,40 @@ mod tests {
assert_ne!(back.schema(), batch.schema());
assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
}
+
+ #[test]
+ fn in_progress_accounting() {
+ // define schema
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+
+ // create some data
+ let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+
+ // build a record batch
+ let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(a)]).unwrap();
+
+ let mut writer = ArrowWriter::try_new(vec![], batch.schema(),
None).unwrap();
+
+ // starts empty
+ assert_eq!(writer.in_progress_size(), 0);
+ assert_eq!(writer.in_progress_rows(), 0);
+ writer.write(&batch).unwrap();
+
+ // updated on write
+ let initial_size = writer.in_progress_size();
+ assert!(initial_size > 0);
+ assert_eq!(writer.in_progress_rows(), 5);
+
+ // updated on second write
+ writer.write(&batch).unwrap();
+ assert!(writer.in_progress_size() > initial_size);
+ assert_eq!(writer.in_progress_rows(), 10);
+
+ // cleared on flush
+ writer.flush().unwrap();
+ assert_eq!(writer.in_progress_size(), 0);
+ assert_eq!(writer.in_progress_rows(), 0);
+
+ writer.close().unwrap();
+ }
}
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index f854e5cac..57a0278e2 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -162,6 +162,75 @@ impl CompressedPage {
pub fn data(&self) -> &[u8] {
self.compressed_page.buffer().data()
}
+
+ /// Returns the thrift page header
+ pub(crate) fn to_thrift_header(&self) -> PageHeader {
+ let uncompressed_size = self.uncompressed_size();
+ let compressed_size = self.compressed_size();
+ let num_values = self.num_values();
+ let encoding = self.encoding();
+ let page_type = self.page_type();
+
+ let mut page_header = PageHeader {
+ type_: page_type.into(),
+ uncompressed_page_size: uncompressed_size as i32,
+ compressed_page_size: compressed_size as i32,
+ // TODO: Add support for crc checksum
+ crc: None,
+ data_page_header: None,
+ index_page_header: None,
+ dictionary_page_header: None,
+ data_page_header_v2: None,
+ };
+
+ match self.compressed_page {
+ Page::DataPage {
+ def_level_encoding,
+ rep_level_encoding,
+ ref statistics,
+ ..
+ } => {
+ let data_page_header = crate::format::DataPageHeader {
+ num_values: num_values as i32,
+ encoding: encoding.into(),
+ definition_level_encoding: def_level_encoding.into(),
+ repetition_level_encoding: rep_level_encoding.into(),
+ statistics:
crate::file::statistics::to_thrift(statistics.as_ref()),
+ };
+ page_header.data_page_header = Some(data_page_header);
+ }
+ Page::DataPageV2 {
+ num_nulls,
+ num_rows,
+ def_levels_byte_len,
+ rep_levels_byte_len,
+ is_compressed,
+ ref statistics,
+ ..
+ } => {
+ let data_page_header_v2 = crate::format::DataPageHeaderV2 {
+ num_values: num_values as i32,
+ num_nulls: num_nulls as i32,
+ num_rows: num_rows as i32,
+ encoding: encoding.into(),
+ definition_levels_byte_length: def_levels_byte_len as i32,
+ repetition_levels_byte_length: rep_levels_byte_len as i32,
+ is_compressed: Some(is_compressed),
+ statistics:
crate::file::statistics::to_thrift(statistics.as_ref()),
+ };
+ page_header.data_page_header_v2 = Some(data_page_header_v2);
+ }
+ Page::DictionaryPage { is_sorted, .. } => {
+ let dictionary_page_header =
crate::format::DictionaryPageHeader {
+ num_values: num_values as i32,
+ encoding: encoding.into(),
+ is_sorted: Some(is_sorted),
+ };
+ page_header.dictionary_page_header =
Some(dictionary_page_header);
+ }
+ }
+ page_header
+ }
}
/// Contains page write metrics.
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
index c343f1d6c..fb5889b78 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -36,7 +36,7 @@ pub trait ColumnValues {
}
#[cfg(feature = "arrow")]
-impl<T: arrow_array::Array> ColumnValues for T {
+impl ColumnValues for dyn arrow_array::Array {
fn len(&self) -> usize {
arrow_array::Array::len(self)
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index bf77b2b32..5e623d281 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -43,6 +43,21 @@ use crate::util::memory::ByteBufferPtr;
pub(crate) mod encoder;
+macro_rules! downcast_writer {
+ ($e:expr, $i:ident, $b:expr) => {
+ match $e {
+ Self::BoolColumnWriter($i) => $b,
+ Self::Int32ColumnWriter($i) => $b,
+ Self::Int64ColumnWriter($i) => $b,
+ Self::Int96ColumnWriter($i) => $b,
+ Self::FloatColumnWriter($i) => $b,
+ Self::DoubleColumnWriter($i) => $b,
+ Self::ByteArrayColumnWriter($i) => $b,
+ Self::FixedLenByteArrayColumnWriter($i) => $b,
+ }
+ };
+}
+
/// Column writer for a Parquet type.
pub enum ColumnWriter<'a> {
BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
@@ -55,6 +70,19 @@ pub enum ColumnWriter<'a> {
FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
}
+impl<'a> ColumnWriter<'a> {
+ /// Returns the estimated total bytes for this column writer
+ #[cfg(feature = "arrow")]
+ pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
+ downcast_writer!(self, typed, typed.get_estimated_total_bytes())
+ }
+
+ /// Close this [`ColumnWriter`]
+ pub fn close(self) -> Result<ColumnCloseResult> {
+ downcast_writer!(self, typed, typed.close())
+ }
+}
+
pub enum Level {
Page,
Column,
@@ -421,10 +449,24 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
+ ///
+ /// Note: this value does not include any buffered data that has not
+ /// yet been flushed to a page.
pub fn get_total_bytes_written(&self) -> u64 {
self.column_metrics.total_bytes_written
}
+ /// Returns the estimated total bytes for this column writer
+ ///
+ /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
+ /// of any data that has not yet been flushed to a page
+ #[cfg(feature = "arrow")]
+ pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
+ self.column_metrics.total_bytes_written
+ + self.encoder.estimated_data_page_size() as u64
+ + self.encoder.estimated_dict_page_size().unwrap_or_default() as
u64
+ }
+
/// Returns total number of rows written by this column writer so far.
/// This value is also returned when column writer is closed.
pub fn get_total_rows_written(&self) -> u64 {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 4f15c9f4b..defdaad32 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -30,16 +30,13 @@ use crate::column::writer::{
get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl,
};
use crate::column::{
- page::{CompressedPage, Page, PageWriteSpec, PageWriter},
+ page::{CompressedPage, PageWriteSpec, PageWriter},
writer::{get_column_writer, ColumnWriter},
};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::file::reader::ChunkReader;
-use crate::file::{
- metadata::*, properties::WriterPropertiesPtr,
- statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC,
-};
+use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
use crate::schema::types::{
self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr,
};
@@ -370,6 +367,16 @@ impl<W: Write + Send> SerializedFileWriter<W> {
self.kv_metadatas.push(kv_metadata);
}
+ /// Returns a reference to schema descriptor.
+ pub fn schema_descr(&self) -> &SchemaDescriptor {
+ &self.descr
+ }
+
+ /// Returns a reference to the writer properties
+ pub fn properties(&self) -> &WriterPropertiesPtr {
+ &self.props
+ }
+
/// Writes the file footer and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.assert_previous_writer_closed()?;
@@ -653,17 +660,7 @@ impl<'a> SerializedColumnWriter<'a> {
/// Close this [`SerializedColumnWriter`]
pub fn close(mut self) -> Result<()> {
- let r = match self.inner {
- ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
- ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
- ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
- ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
- ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
- ColumnWriter::FixedLenByteArrayColumnWriter(typed) =>
typed.close()?,
- };
-
+ let r = self.inner.close()?;
if let Some(on_close) = self.on_close.take() {
on_close(r)?
}
@@ -701,83 +698,20 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
- let uncompressed_size = page.uncompressed_size();
- let compressed_size = page.compressed_size();
- let num_values = page.num_values();
- let encoding = page.encoding();
let page_type = page.page_type();
-
- let mut page_header = parquet::PageHeader {
- type_: page_type.into(),
- uncompressed_page_size: uncompressed_size as i32,
- compressed_page_size: compressed_size as i32,
- // TODO: Add support for crc checksum
- crc: None,
- data_page_header: None,
- index_page_header: None,
- dictionary_page_header: None,
- data_page_header_v2: None,
- };
-
- match *page.compressed_page() {
- Page::DataPage {
- def_level_encoding,
- rep_level_encoding,
- ref statistics,
- ..
- } => {
- let data_page_header = parquet::DataPageHeader {
- num_values: num_values as i32,
- encoding: encoding.into(),
- definition_level_encoding: def_level_encoding.into(),
- repetition_level_encoding: rep_level_encoding.into(),
- statistics: statistics_to_thrift(statistics.as_ref()),
- };
- page_header.data_page_header = Some(data_page_header);
- }
- Page::DataPageV2 {
- num_nulls,
- num_rows,
- def_levels_byte_len,
- rep_levels_byte_len,
- is_compressed,
- ref statistics,
- ..
- } => {
- let data_page_header_v2 = parquet::DataPageHeaderV2 {
- num_values: num_values as i32,
- num_nulls: num_nulls as i32,
- num_rows: num_rows as i32,
- encoding: encoding.into(),
- definition_levels_byte_length: def_levels_byte_len as i32,
- repetition_levels_byte_length: rep_levels_byte_len as i32,
- is_compressed: Some(is_compressed),
- statistics: statistics_to_thrift(statistics.as_ref()),
- };
- page_header.data_page_header_v2 = Some(data_page_header_v2);
- }
- Page::DictionaryPage { is_sorted, .. } => {
- let dictionary_page_header = parquet::DictionaryPageHeader {
- num_values: num_values as i32,
- encoding: encoding.into(),
- is_sorted: Some(is_sorted),
- };
- page_header.dictionary_page_header =
Some(dictionary_page_header);
- }
- }
-
let start_pos = self.sink.bytes_written() as u64;
+ let page_header = page.to_thrift_header();
let header_size = self.serialize_page_header(page_header)?;
self.sink.write_all(page.data())?;
let mut spec = PageWriteSpec::new();
spec.page_type = page_type;
- spec.uncompressed_size = uncompressed_size + header_size;
- spec.compressed_size = compressed_size + header_size;
+ spec.uncompressed_size = page.uncompressed_size() + header_size;
+ spec.compressed_size = page.compressed_size() + header_size;
spec.offset = start_pos;
spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
- spec.num_values = num_values;
+ spec.num_values = page.num_values();
Ok(spec)
}
@@ -804,7 +738,7 @@ mod tests {
use std::fs::File;
use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
- use crate::column::page::PageReader;
+ use crate::column::page::{Page, PageReader};
use crate::column::reader::get_typed_column_reader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
use crate::data_type::{BoolType, Int32Type};
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 909878a6d..25d15dd4f 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -114,6 +114,12 @@ impl From<Bytes> for ByteBufferPtr {
}
}
+impl From<ByteBufferPtr> for Bytes {
+ fn from(value: ByteBufferPtr) -> Self {
+ value.data
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;