This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 3ac0053772 Support Encoding Parquet Columns in Parallel (#4871)
3ac0053772 is described below
commit 3ac0053772660f09483d14649996f73be6d45269
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Oct 1 10:53:56 2023 +0100
Support Encoding Parquet Columns in Parallel (#4871)
* Facilitate parallel parquet writing
* Revert OnCloseRowGroup Send
* Add example
* Review feedback
* Fix doc
* Further review feedback
* More docs
---
parquet/src/arrow/arrow_writer/levels.rs | 434 ++++++++++++++++++-------------
parquet/src/arrow/arrow_writer/mod.rs | 306 +++++++++++++++-------
parquet/src/file/writer.rs | 3 +-
3 files changed, 461 insertions(+), 282 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index 48615dc3d5..4a0bd551e1 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -42,19 +42,20 @@
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait,
StructArray};
-use arrow_buffer::NullBuffer;
+use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
+use arrow_buffer::{NullBuffer, OffsetBuffer};
use arrow_schema::{DataType, Field};
use std::ops::Range;
+use std::sync::Arc;
-/// Performs a depth-first scan of the children of `array`, constructing
[`LevelInfo`]
+/// Performs a depth-first scan of the children of `array`, constructing
[`ArrayLevels`]
/// for each leaf column encountered
pub(crate) fn calculate_array_levels(
array: &ArrayRef,
field: &Field,
-) -> Result<Vec<LevelInfo>> {
- let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
- builder.write(array, 0..array.len());
+) -> Result<Vec<ArrayLevels>> {
+ let mut builder = LevelInfoBuilder::try_new(field, Default::default(),
array)?;
+ builder.write(0..array.len());
Ok(builder.finish())
}
@@ -102,31 +103,57 @@ struct LevelContext {
def_level: i16,
}
-/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`]
+/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`]
enum LevelInfoBuilder {
/// A primitive, leaf array
- Primitive(LevelInfo),
- /// A list array, contains the [`LevelInfoBuilder`] of the child and
- /// the [`LevelContext`] of this list
- List(Box<LevelInfoBuilder>, LevelContext),
- /// A list array, contains the [`LevelInfoBuilder`] of its children and
- /// the [`LevelContext`] of this struct array
- Struct(Vec<LevelInfoBuilder>, LevelContext),
+ Primitive(ArrayLevels),
+ /// A list array
+ List(
+ Box<LevelInfoBuilder>, // Child Values
+ LevelContext, // Context
+ OffsetBuffer<i32>, // Offsets
+ Option<NullBuffer>, // Nulls
+ ),
+ /// A large list array
+ LargeList(
+ Box<LevelInfoBuilder>, // Child Values
+ LevelContext, // Context
+ OffsetBuffer<i64>, // Offsets
+ Option<NullBuffer>, // Nulls
+ ),
+ /// A fixed size list array
+ FixedSizeList(
+ Box<LevelInfoBuilder>, // Values
+ LevelContext, // Context
+ usize, // List Size
+ Option<NullBuffer>, // Nulls
+ ),
+ /// A struct array
+ Struct(Vec<LevelInfoBuilder>, LevelContext, Option<NullBuffer>),
}
impl LevelInfoBuilder {
/// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent
[`LevelContext`]
- fn try_new(field: &Field, parent_ctx: LevelContext) -> Result<Self> {
- match field.data_type() {
- d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new(
- parent_ctx,
- field.is_nullable(),
- ))),
- DataType::Dictionary(_, v) if is_leaf(v.as_ref()) =>
Ok(Self::Primitive(
- LevelInfo::new(parent_ctx, field.is_nullable()),
- )),
+ fn try_new(
+ field: &Field,
+ parent_ctx: LevelContext,
+ array: &ArrayRef,
+ ) -> Result<Self> {
+ assert_eq!(field.data_type(), array.data_type());
+ let is_nullable = field.is_nullable();
+
+ match array.data_type() {
+ d if is_leaf(d) => {
+ let levels = ArrayLevels::new(parent_ctx, is_nullable,
array.clone());
+ Ok(Self::Primitive(levels))
+ }
+ DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
+ let levels = ArrayLevels::new(parent_ctx, is_nullable,
array.clone());
+ Ok(Self::Primitive(levels))
+ }
DataType::Struct(children) => {
- let def_level = match field.is_nullable() {
+ let array = array.as_struct();
+ let def_level = match is_nullable {
true => parent_ctx.def_level + 1,
false => parent_ctx.def_level,
};
@@ -138,16 +165,17 @@ impl LevelInfoBuilder {
let children = children
.iter()
- .map(|f| Self::try_new(f, ctx))
+ .zip(array.columns())
+ .map(|(f, a)| Self::try_new(f, ctx, a))
.collect::<Result<_>>()?;
- Ok(Self::Struct(children, ctx))
+ Ok(Self::Struct(children, ctx, array.nulls().cloned()))
}
DataType::List(child)
| DataType::LargeList(child)
| DataType::Map(child, _)
| DataType::FixedSizeList(child, _) => {
- let def_level = match field.is_nullable() {
+ let def_level = match is_nullable {
true => parent_ctx.def_level + 2,
false => parent_ctx.def_level + 1,
};
@@ -157,79 +185,70 @@ impl LevelInfoBuilder {
def_level,
};
- let child = Self::try_new(child.as_ref(), ctx)?;
- Ok(Self::List(Box::new(child), ctx))
+ Ok(match field.data_type() {
+ DataType::List(_) => {
+ let list = array.as_list();
+ let child = Self::try_new(child.as_ref(), ctx,
list.values())?;
+ let offsets = list.offsets().clone();
+ Self::List(Box::new(child), ctx, offsets,
list.nulls().cloned())
+ }
+ DataType::LargeList(_) => {
+ let list = array.as_list();
+ let child = Self::try_new(child.as_ref(), ctx,
list.values())?;
+ let offsets = list.offsets().clone();
+ let nulls = list.nulls().cloned();
+ Self::LargeList(Box::new(child), ctx, offsets, nulls)
+ }
+ DataType::Map(_, _) => {
+ let map = array.as_map();
+ let entries = Arc::new(map.entries().clone()) as
ArrayRef;
+ let child = Self::try_new(child.as_ref(), ctx,
&entries)?;
+ let offsets = map.offsets().clone();
+ Self::List(Box::new(child), ctx, offsets,
map.nulls().cloned())
+ }
+ DataType::FixedSizeList(_, size) => {
+ let list = array.as_fixed_size_list();
+ let child = Self::try_new(child.as_ref(), ctx,
list.values())?;
+ let nulls = list.nulls().cloned();
+ Self::FixedSizeList(Box::new(child), ctx, *size as _,
nulls)
+ }
+ _ => unreachable!(),
+ })
}
d => Err(nyi_err!("Datatype {} is not yet supported", d)),
}
}
- /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the
leaf columns
+ /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the
leaf columns
/// as enumerated by a depth-first search
- fn finish(self) -> Vec<LevelInfo> {
+ fn finish(self) -> Vec<ArrayLevels> {
match self {
LevelInfoBuilder::Primitive(v) => vec![v],
- LevelInfoBuilder::List(v, _) => v.finish(),
- LevelInfoBuilder::Struct(v, _) => {
+ LevelInfoBuilder::List(v, _, _, _)
+ | LevelInfoBuilder::LargeList(v, _, _, _)
+ | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(),
+ LevelInfoBuilder::Struct(v, _, _) => {
v.into_iter().flat_map(|l| l.finish()).collect()
}
}
}
/// Given an `array`, write the level data for the elements in `range`
- fn write(&mut self, array: &dyn Array, range: Range<usize>) {
- match array.data_type() {
- d if is_leaf(d) => self.write_leaf(array, range),
- DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
- self.write_leaf(array, range)
- }
- DataType::Struct(_) => {
- let array = array.as_struct();
- self.write_struct(array, range)
- }
- DataType::List(_) => {
- let array = array.as_list::<i32>();
- self.write_list(
- array.value_offsets(),
- array.nulls(),
- array.values(),
- range,
- )
+ fn write(&mut self, range: Range<usize>) {
+ match self {
+ LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
+ LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
+ Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
}
- DataType::LargeList(_) => {
- let array = array.as_list::<i64>();
- self.write_list(
- array.value_offsets(),
- array.nulls(),
- array.values(),
- range,
- )
+ LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
+ Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
}
- DataType::Map(_, _) => {
- let array = array.as_map();
- // A Map is just as ListArray<i32> with a StructArray child,
we therefore
- // treat it as such to avoid code duplication
- self.write_list(
- array.value_offsets(),
- array.nulls(),
- array.entries(),
- range,
- )
+ LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
+ Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(),
range)
}
- &DataType::FixedSizeList(_, size) => {
- let array = array
- .as_any()
- .downcast_ref::<FixedSizeListArray>()
- .expect("unable to get fixed-size list array");
-
- self.write_fixed_size_list(
- size as usize,
- array.nulls(),
- array.values(),
- range,
- )
+ LevelInfoBuilder::Struct(children, ctx, nulls) => {
+ Self::write_struct(children, ctx, nulls.as_ref(), range)
}
- _ => unreachable!(),
}
}
@@ -237,22 +256,17 @@ impl LevelInfoBuilder {
///
/// Note: MapArrays are `ListArray<i32>` under the hood and so are
dispatched to this method
fn write_list<O: OffsetSizeTrait>(
- &mut self,
+ child: &mut LevelInfoBuilder,
+ ctx: &LevelContext,
offsets: &[O],
nulls: Option<&NullBuffer>,
- values: &dyn Array,
range: Range<usize>,
) {
- let (child, ctx) = match self {
- Self::List(child, ctx) => (child, ctx),
- _ => unreachable!(),
- };
-
let offsets = &offsets[range.start..range.end + 1];
let write_non_null_slice =
|child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
- child.write(values, start_idx..end_idx);
+ child.write(start_idx..end_idx);
child.visit_leaves(|leaf| {
let rep_levels = leaf.rep_levels.as_mut().unwrap();
let mut rev = rep_levels.iter_mut().rev();
@@ -324,12 +338,12 @@ impl LevelInfoBuilder {
}
/// Write `range` elements from StructArray `array`
- fn write_struct(&mut self, array: &StructArray, range: Range<usize>) {
- let (children, ctx) = match self {
- Self::Struct(children, ctx) => (children, ctx),
- _ => unreachable!(),
- };
-
+ fn write_struct(
+ children: &mut [LevelInfoBuilder],
+ ctx: &LevelContext,
+ nulls: Option<&NullBuffer>,
+ range: Range<usize>,
+ ) {
let write_null = |children: &mut [LevelInfoBuilder], range:
Range<usize>| {
for child in children {
child.visit_leaves(|info| {
@@ -346,12 +360,12 @@ impl LevelInfoBuilder {
};
let write_non_null = |children: &mut [LevelInfoBuilder], range:
Range<usize>| {
- for (child_array, child) in array.columns().iter().zip(children) {
- child.write(child_array, range.clone())
+ for child in children {
+ child.write(range.clone())
}
};
- match array.nulls() {
+ match nulls {
Some(validity) => {
let mut last_non_null_idx = None;
let mut last_null_idx = None;
@@ -388,22 +402,17 @@ impl LevelInfoBuilder {
/// Write `range` elements from FixedSizeListArray with child data
`values` and null bitmap `nulls`.
fn write_fixed_size_list(
- &mut self,
+ child: &mut LevelInfoBuilder,
+ ctx: &LevelContext,
fixed_size: usize,
nulls: Option<&NullBuffer>,
- values: &dyn Array,
range: Range<usize>,
) {
- let (child, ctx) = match self {
- Self::List(child, ctx) => (child, ctx),
- _ => unreachable!(),
- };
-
let write_non_null =
|child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
let values_start = start_idx * fixed_size;
let values_end = end_idx * fixed_size;
- child.write(values, values_start..values_end);
+ child.write(values_start..values_end);
child.visit_leaves(|leaf| {
let rep_levels = leaf.rep_levels.as_mut().unwrap();
@@ -481,12 +490,7 @@ impl LevelInfoBuilder {
}
/// Write a primitive array, as defined by [`is_leaf`]
- fn write_leaf(&mut self, array: &dyn Array, range: Range<usize>) {
- let info = match self {
- Self::Primitive(info) => info,
- _ => unreachable!(),
- };
-
+ fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
let len = range.end - range.start;
match &mut info.def_levels {
@@ -494,7 +498,7 @@ impl LevelInfoBuilder {
def_levels.reserve(len);
info.non_null_indices.reserve(len);
- match array.logical_nulls() {
+ match info.array.logical_nulls() {
Some(nulls) => {
// TODO: Faster bitmask iteration (#1757)
for i in range {
@@ -523,11 +527,13 @@ impl LevelInfoBuilder {
}
/// Visits all children of this node in depth first order
- fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) {
+ fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
match self {
LevelInfoBuilder::Primitive(info) => visit(info),
- LevelInfoBuilder::List(c, _) => c.visit_leaves(visit),
- LevelInfoBuilder::Struct(children, _) => {
+ LevelInfoBuilder::List(c, _, _, _)
+ | LevelInfoBuilder::LargeList(c, _, _, _)
+ | LevelInfoBuilder::FixedSizeList(c, _, _, _) =>
c.visit_leaves(visit),
+ LevelInfoBuilder::Struct(children, _, _) => {
for c in children {
c.visit_leaves(visit)
}
@@ -537,8 +543,8 @@ impl LevelInfoBuilder {
}
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub(crate) struct LevelInfo {
+#[derive(Debug, Clone)]
+pub(crate) struct ArrayLevels {
/// Array's definition levels
///
/// Present if `max_def_level != 0`
@@ -558,10 +564,25 @@ pub(crate) struct LevelInfo {
/// The maximum repetition for this leaf column
max_rep_level: i16,
+
+ /// The arrow array
+ array: ArrayRef,
}
-impl LevelInfo {
- fn new(ctx: LevelContext, is_nullable: bool) -> Self {
+impl PartialEq for ArrayLevels {
+ fn eq(&self, other: &Self) -> bool {
+ self.def_levels == other.def_levels
+ && self.rep_levels == other.rep_levels
+ && self.non_null_indices == other.non_null_indices
+ && self.max_def_level == other.max_def_level
+ && self.max_rep_level == other.max_rep_level
+ && self.array.as_ref() == other.array.as_ref()
+ }
+}
+impl Eq for ArrayLevels {}
+
+impl ArrayLevels {
+ fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self {
let max_rep_level = ctx.rep_level;
let max_def_level = match is_nullable {
true => ctx.def_level + 1,
@@ -574,9 +595,14 @@ impl LevelInfo {
non_null_indices: vec![],
max_def_level,
max_rep_level,
+ array,
}
}
+ pub fn array(&self) -> &ArrayRef {
+ &self.array
+ }
+
pub fn def_levels(&self) -> Option<&[i16]> {
self.def_levels.as_deref()
}
@@ -597,6 +623,7 @@ mod tests {
use std::sync::Arc;
use arrow_array::builder::*;
+ use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::*;
use arrow_buffer::{Buffer, ToByteSlice};
@@ -622,7 +649,7 @@ mod tests {
let inner_list = ArrayDataBuilder::new(inner_type)
.len(4)
.add_buffer(offsets)
- .add_child_data(primitives.into_data())
+ .add_child_data(primitives.to_data())
.build()
.unwrap();
@@ -638,12 +665,13 @@ mod tests {
let levels = calculate_array_levels(&outer_list,
&outer_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected = LevelInfo {
+ let expected = ArrayLevels {
def_levels: Some(vec![2; 10]),
rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
max_def_level: 2,
max_rep_level: 2,
+ array: Arc::new(primitives),
};
assert_eq!(&levels[0], &expected);
}
@@ -657,12 +685,13 @@ mod tests {
let levels = calculate_array_levels(&array, &field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: None,
rep_levels: None,
non_null_indices: (0..10).collect(),
max_def_level: 0,
max_rep_level: 0,
+ array,
};
assert_eq!(&levels[0], &expected_levels);
}
@@ -682,12 +711,13 @@ mod tests {
let levels = calculate_array_levels(&array, &field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1, 1, 0]),
rep_levels: None,
non_null_indices: vec![0, 2, 3],
max_def_level: 1,
max_rep_level: 0,
+ array,
};
assert_eq!(&levels[0], &expected_levels);
}
@@ -706,7 +736,7 @@ mod tests {
let list = ArrayDataBuilder::new(list_type.clone())
.len(5)
.add_buffer(offsets)
- .add_child_data(leaf_array.into_data())
+ .add_child_data(leaf_array.to_data())
.build()
.unwrap();
let list = make_array(list);
@@ -715,12 +745,13 @@ mod tests {
let levels = calculate_array_levels(&list, &list_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![1; 5]),
rep_levels: Some(vec![0; 5]),
non_null_indices: (0..5).collect(),
max_def_level: 1,
max_rep_level: 1,
+ array: Arc::new(leaf_array),
};
assert_eq!(&levels[0], &expected_levels);
@@ -737,7 +768,7 @@ mod tests {
let list = ArrayDataBuilder::new(list_type.clone())
.len(5)
.add_buffer(offsets)
- .add_child_data(leaf_array.into_data())
+ .add_child_data(leaf_array.to_data())
.null_bit_buffer(Some(Buffer::from([0b00011101])))
.build()
.unwrap();
@@ -747,12 +778,13 @@ mod tests {
let levels = calculate_array_levels(&list, &list_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
non_null_indices: (0..11).collect(),
max_def_level: 2,
max_rep_level: 1,
+ array: Arc::new(leaf_array),
};
assert_eq!(&levels[0], &expected_levels);
}
@@ -778,7 +810,7 @@ mod tests {
let list_type = DataType::List(Arc::new(leaf_field));
let list = ArrayData::builder(list_type.clone())
.len(5)
- .add_child_data(leaf.into_data())
+ .add_child_data(leaf.to_data())
.add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11]))
.build()
.unwrap();
@@ -795,12 +827,13 @@ mod tests {
let levels = calculate_array_levels(&array, &struct_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
non_null_indices: (4..11).collect(),
max_def_level: 3,
max_rep_level: 1,
+ array: Arc::new(leaf),
};
assert_eq!(&levels[0], &expected_levels);
@@ -820,7 +853,7 @@ mod tests {
let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16,
18, 20, 22]);
let l1 = ArrayData::builder(l1_type.clone())
.len(11)
- .add_child_data(leaf.into_data())
+ .add_child_data(leaf.to_data())
.add_buffer(offsets)
.build()
.unwrap();
@@ -840,7 +873,7 @@ mod tests {
let levels = calculate_array_levels(&l2, &l2_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![
5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5,
]),
@@ -850,6 +883,7 @@ mod tests {
non_null_indices: (0..22).collect(),
max_def_level: 5,
max_rep_level: 2,
+ array: Arc::new(leaf),
};
assert_eq!(&levels[0], &expected_levels);
@@ -871,7 +905,7 @@ mod tests {
let list = ArrayData::builder(list_type.clone())
.len(4)
.add_buffer(Buffer::from_iter(0_i32..5))
- .add_child_data(leaf.into_data())
+ .add_child_data(leaf.to_data())
.build()
.unwrap();
let list = make_array(list);
@@ -880,12 +914,13 @@ mod tests {
let levels = calculate_array_levels(&list, &list_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![1; 4]),
rep_levels: Some(vec![0; 4]),
non_null_indices: (0..4).collect(),
max_def_level: 1,
max_rep_level: 1,
+ array: Arc::new(leaf),
};
assert_eq!(&levels[0], &expected_levels);
@@ -898,7 +933,7 @@ mod tests {
.len(4)
.add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7]))
.null_bit_buffer(Some(Buffer::from([0b00001110])))
- .add_child_data(leaf.into_data())
+ .add_child_data(leaf.to_data())
.build()
.unwrap();
let list = make_array(list);
@@ -911,12 +946,13 @@ mod tests {
let levels = calculate_array_levels(&array, &struct_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
non_null_indices: (0..7).collect(),
max_def_level: 3,
max_rep_level: 1,
+ array: Arc::new(leaf),
};
assert_eq!(&levels[0], &expected_levels);
@@ -933,7 +969,7 @@ mod tests {
let list_1 = ArrayData::builder(list_1_type.clone())
.len(7)
.add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15]))
- .add_child_data(leaf.into_data())
+ .add_child_data(leaf.to_data())
.build()
.unwrap();
@@ -958,12 +994,13 @@ mod tests {
let levels = calculate_array_levels(&array, &struct_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5,
5, 5, 5]),
rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2,
2, 2, 2]),
non_null_indices: (0..15).collect(),
max_def_level: 5,
max_rep_level: 2,
+ array: Arc::new(leaf),
};
assert_eq!(&levels[0], &expected_levels);
}
@@ -980,9 +1017,10 @@ mod tests {
// - {a: {b: {c: 6}}}
let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5),
Some(6)]);
+ let leaf = Arc::new(c) as ArrayRef;
let c_field = Arc::new(Field::new("c", DataType::Int32, true));
let b = StructArray::from((
- (vec![(c_field, Arc::new(c) as ArrayRef)]),
+ (vec![(c_field, leaf.clone())]),
Buffer::from([0b00110111]),
));
@@ -998,12 +1036,13 @@ mod tests {
let levels = calculate_array_levels(&a_array, &a_field).unwrap();
assert_eq!(levels.len(), 1);
- let expected_levels = LevelInfo {
+ let expected_levels = ArrayLevels {
def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
rep_levels: None,
non_null_indices: vec![0, 2, 5],
max_def_level: 3,
max_rep_level: 0,
+ array: leaf,
};
assert_eq!(&levels[0], &expected_levels);
}
@@ -1020,7 +1059,7 @@ mod tests {
.len(5)
.add_buffer(a_value_offsets)
.null_bit_buffer(Some(Buffer::from(vec![0b00011011])))
- .add_child_data(a_values.into_data())
+ .add_child_data(a_values.to_data())
.build()
.unwrap();
@@ -1029,21 +1068,21 @@ mod tests {
let a = ListArray::from(a_list_data);
let item_field = Field::new("item", a_list_type, true);
- let mut builder =
- LevelInfoBuilder::try_new(&item_field,
Default::default()).unwrap();
- builder.write(&a, 2..4);
+ let mut builder = levels(&item_field, a);
+ builder.write(2..4);
let levels = builder.finish();
assert_eq!(levels.len(), 1);
let list_level = levels.get(0).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![0, 3, 3, 3]),
rep_levels: Some(vec![0, 0, 1, 1]),
non_null_indices: vec![3, 4, 5],
max_def_level: 3,
max_rep_level: 1,
+ array: Arc::new(a_values),
};
assert_eq!(list_level, &expected_level);
}
@@ -1100,19 +1139,19 @@ mod tests {
let g = ListArray::from(g_list_data);
let e = StructArray::from(vec![
- (struct_field_f, Arc::new(f) as ArrayRef),
+ (struct_field_f, Arc::new(f.clone()) as ArrayRef),
(struct_field_g, Arc::new(g) as ArrayRef),
]);
let c = StructArray::from(vec![
- (struct_field_d, Arc::new(d) as ArrayRef),
+ (struct_field_d, Arc::new(d.clone()) as ArrayRef),
(struct_field_e, Arc::new(e) as ArrayRef),
]);
// build a record batch
let batch = RecordBatch::try_new(
Arc::new(schema),
- vec![Arc::new(a), Arc::new(b), Arc::new(c)],
+ vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)],
)
.unwrap();
@@ -1132,48 +1171,52 @@ mod tests {
// test "a" levels
let list_level = levels.get(0).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: None,
rep_levels: None,
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 0,
max_rep_level: 0,
+ array: Arc::new(a),
};
assert_eq!(list_level, &expected_level);
// test "b" levels
let list_level = levels.get(1).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![1, 0, 0, 1, 1]),
rep_levels: None,
non_null_indices: vec![0, 3, 4],
max_def_level: 1,
max_rep_level: 0,
+ array: Arc::new(b),
};
assert_eq!(list_level, &expected_level);
// test "d" levels
let list_level = levels.get(2).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![1, 1, 1, 2, 1]),
rep_levels: None,
non_null_indices: vec![3],
max_def_level: 2,
max_rep_level: 0,
+ array: Arc::new(d),
};
assert_eq!(list_level, &expected_level);
// test "f" levels
let list_level = levels.get(3).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![3, 2, 3, 2, 3]),
rep_levels: None,
non_null_indices: vec![0, 2, 4],
max_def_level: 3,
max_rep_level: 0,
+ array: Arc::new(f),
};
assert_eq!(list_level, &expected_level);
}
@@ -1270,27 +1313,31 @@ mod tests {
});
assert_eq!(levels.len(), 2);
+ let map = batch.column(0).as_map();
+
// test key levels
let list_level = levels.get(0).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![1; 7]),
rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
max_def_level: 1,
max_rep_level: 1,
+ array: map.keys().clone(),
};
assert_eq!(list_level, &expected_level);
// test values levels
let list_level = levels.get(1).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
non_null_indices: vec![0, 1, 2, 4, 6],
max_def_level: 2,
max_rep_level: 1,
+ array: map.values().clone(),
};
assert_eq!(list_level, &expected_level);
}
@@ -1358,7 +1405,8 @@ mod tests {
let array = Arc::new(list_builder.finish());
- let values_len = array.values().len();
+ let values = array.values().as_struct().column(0).clone();
+ let values_len = values.len();
assert_eq!(values_len, 5);
let schema = Arc::new(Schema::new(vec![list_field]));
@@ -1368,12 +1416,13 @@ mod tests {
let levels = calculate_array_levels(rb.column(0),
rb.schema().field(0)).unwrap();
let list_level = &levels[0];
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
non_null_indices: vec![0, 4],
max_def_level: 4,
max_rep_level: 1,
+ array: values,
};
assert_eq!(list_level, &expected_level);
@@ -1391,6 +1440,7 @@ mod tests {
None, // Masked by struct array
None,
]);
+ let values = inner.values().clone();
// This test assumes that nulls don't take up space
assert_eq!(inner.values().len(), 7);
@@ -1406,12 +1456,13 @@ mod tests {
assert_eq!(levels.len(), 1);
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
non_null_indices: vec![0, 1, 5, 6],
max_def_level: 4,
max_rep_level: 1,
+ array: values,
};
assert_eq!(&levels[0], &expected_level);
@@ -1422,14 +1473,16 @@ mod tests {
// Test the null mask of a struct array and the null mask of a list
array
// masking out non-null elements of their children
- let a1 = Arc::new(ListArray::from_iter_primitive::<Int32Type, _,
_>(vec![
+ let a1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![None]), // Masked by list array
Some(vec![]), // Masked by list array
Some(vec![Some(3), None]),
Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct
array
None,
None,
- ])) as ArrayRef;
+ ]);
+ let a1_values = a1.values().clone();
+ let a1 = Arc::new(a1) as ArrayRef;
let a2 = Arc::new(Int32Array::from_iter(vec![
Some(1), // Masked by list array
@@ -1439,6 +1492,7 @@ mod tests {
Some(5),
None,
])) as ArrayRef;
+ let a2_values = a2.clone();
let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(),
true));
let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(),
true));
@@ -1486,22 +1540,24 @@ mod tests {
assert_eq!(levels.len(), 2);
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
non_null_indices: vec![1],
max_def_level: 6,
max_rep_level: 2,
+ array: a1_values,
};
assert_eq!(&levels[0], &expected_level);
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
non_null_indices: vec![4],
max_def_level: 4,
max_rep_level: 1,
+ array: a2_values,
};
assert_eq!(&levels[1], &expected_level);
@@ -1522,23 +1578,24 @@ mod tests {
builder.values().append_slice(&[9, 10]);
builder.append(false);
let a = builder.finish();
+ let values = a.values().clone();
let item_field = Field::new("item", a.data_type().clone(), true);
- let mut builder =
- LevelInfoBuilder::try_new(&item_field,
Default::default()).unwrap();
- builder.write(&a, 1..4);
+ let mut builder = levels(&item_field, a);
+ builder.write(1..4);
let levels = builder.finish();
assert_eq!(levels.len(), 1);
let list_level = levels.get(0).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![0, 0, 3, 3]),
rep_levels: Some(vec![0, 0, 0, 1]),
non_null_indices: vec![6, 7],
max_def_level: 3,
max_rep_level: 1,
+ array: values,
};
assert_eq!(list_level, &expected_level);
}
@@ -1670,6 +1727,10 @@ mod tests {
assert_eq!(array.values().len(), 8);
assert_eq!(array.len(), 4);
+ let struct_values = array.values().as_struct();
+ let values_a = struct_values.column(0).clone();
+ let values_b = struct_values.column(1).clone();
+
let schema = Arc::new(Schema::new(vec![list_field]));
let rb = RecordBatch::try_new(schema, vec![array]).unwrap();
@@ -1678,20 +1739,22 @@ mod tests {
let b_levels = &levels[1];
// [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
- let expected_a = LevelInfo {
+ let expected_a = ArrayLevels {
def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
non_null_indices: vec![0, 7],
max_def_level: 4,
max_rep_level: 1,
+ array: values_a,
};
// [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
- let expected_b = LevelInfo {
+ let expected_b = ArrayLevels {
def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
non_null_indices: vec![0, 6, 7],
max_def_level: 3,
max_rep_level: 1,
+ array: values_b,
};
assert_eq!(a_levels, &expected_a);
@@ -1704,24 +1767,25 @@ mod tests {
builder.append(true);
builder.append(false);
builder.append(true);
- let a = builder.finish();
+ let array = builder.finish();
+ let values = array.values().clone();
- let item_field = Field::new("item", a.data_type().clone(), true);
- let mut builder =
- LevelInfoBuilder::try_new(&item_field,
Default::default()).unwrap();
- builder.write(&a, 0..3);
+ let item_field = Field::new("item", array.data_type().clone(), true);
+ let mut builder = levels(&item_field, array);
+ builder.write(0..3);
let levels = builder.finish();
assert_eq!(levels.len(), 1);
let list_level = levels.get(0).unwrap();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![1, 0, 1]),
rep_levels: Some(vec![0, 0, 0]),
non_null_indices: vec![],
max_def_level: 3,
max_rep_level: 1,
+ array: values,
};
assert_eq!(list_level, &expected_level);
}
@@ -1744,19 +1808,20 @@ mod tests {
builder.values().append_null();
builder.append(false);
let a = builder.finish();
+ let values = a.values().as_list::<i32>().values().clone();
let item_field = Field::new("item", a.data_type().clone(), true);
- let mut builder =
- LevelInfoBuilder::try_new(&item_field,
Default::default()).unwrap();
- builder.write(&a, 0..4);
+ let mut builder = levels(&item_field, a);
+ builder.write(0..4);
let levels = builder.finish();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
non_null_indices: vec![0, 2, 3, 4, 5],
max_def_level: 5,
max_rep_level: 2,
+ array: values,
};
assert_eq!(levels[0], expected_level);
@@ -1777,17 +1842,22 @@ mod tests {
let item_field = Field::new("item", dict.data_type().clone(), true);
- let mut builder =
- LevelInfoBuilder::try_new(&item_field,
Default::default()).unwrap();
- builder.write(&dict, 0..4);
+ let mut builder = levels(&item_field, dict.clone());
+ builder.write(0..4);
let levels = builder.finish();
- let expected_level = LevelInfo {
+ let expected_level = ArrayLevels {
def_levels: Some(vec![0, 0, 1, 1]),
rep_levels: None,
non_null_indices: vec![2, 3],
max_def_level: 1,
max_rep_level: 0,
+ array: Arc::new(dict),
};
assert_eq!(levels[0], expected_level);
}
+
+ fn levels<T: Array + 'static>(field: &Field, array: T) -> LevelInfoBuilder
{
+ let v = Arc::new(array) as ArrayRef;
+ LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
+ }
}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 2e170738f1..5dae81d471 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -18,7 +18,6 @@
//! Contains writer which writes arrow data into parquet data.
use bytes::Bytes;
-use std::fmt::Debug;
use std::io::{Read, Write};
use std::iter::Peekable;
use std::slice::Iter;
@@ -28,8 +27,10 @@ use thrift::protocol::{TCompactOutputProtocol,
TSerializable};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
-use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
-use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit,
SchemaRef};
+use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
+use arrow_schema::{
+ ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef,
+};
use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
@@ -47,14 +48,14 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue,
RowGroupMetaDataPtr};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
-use crate::file::writer::SerializedFileWriter;
+use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
-use levels::{calculate_array_levels, LevelInfo};
+use levels::{calculate_array_levels, ArrayLevels};
mod byte_array;
mod levels;
-/// Arrow writer
+/// Encodes [`RecordBatch`] to parquet
///
/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`]
will be encoded
/// to the same row group, up to `max_row_group_size` rows. Any remaining rows
will be
@@ -97,7 +98,7 @@ pub struct ArrowWriter<W: Write> {
max_row_group_size: usize,
}
-impl<W: Write + Send> Debug for ArrowWriter<W> {
+impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let buffered_memory = self.in_progress_size();
f.debug_struct("ArrowWriter")
@@ -150,7 +151,7 @@ impl<W: Write + Send> ArrowWriter<W> {
Some(in_progress) => in_progress
.writers
.iter()
- .map(|(_, x)| x.get_estimated_total_bytes() as usize)
+ .map(|x| x.get_estimated_total_bytes())
.sum(),
None => 0,
}
@@ -208,8 +209,8 @@ impl<W: Write + Send> ArrowWriter<W> {
};
let mut row_group_writer = self.writer.next_row_group()?;
- for (chunk, close) in in_progress.close()? {
- row_group_writer.append_column(&chunk, close)?;
+ for chunk in in_progress.close()? {
+ chunk.append_to_row_group(&mut row_group_writer)?;
}
row_group_writer.close()?;
Ok(())
@@ -246,20 +247,20 @@ impl<W: Write + Send> RecordBatchWriter for
ArrowWriter<W> {
}
}
-/// A list of [`Bytes`] comprising a single column chunk
+/// A single column chunk produced by [`ArrowColumnWriter`]
#[derive(Default)]
-pub struct ArrowColumnChunk {
+struct ArrowColumnChunkData {
length: usize,
data: Vec<Bytes>,
}
-impl Length for ArrowColumnChunk {
+impl Length for ArrowColumnChunkData {
fn len(&self) -> u64 {
self.length as _
}
}
-impl ChunkReader for ArrowColumnChunk {
+impl ChunkReader for ArrowColumnChunkData {
type T = ArrowColumnChunkReader;
fn get_read(&self, start: u64) -> Result<Self::T> {
@@ -274,8 +275,8 @@ impl ChunkReader for ArrowColumnChunk {
}
}
-/// A [`Read`] for an iterator of [`Bytes`]
-pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+/// A [`Read`] for [`ArrowColumnChunkData`]
+struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
impl Read for ArrowColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
@@ -297,11 +298,11 @@ impl Read for ArrowColumnChunkReader {
}
}
-/// A shared [`ArrowColumnChunk`]
+/// A shared [`ArrowColumnChunkData`]
///
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access
via
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential
borrows
-type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
+type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
#[derive(Default)]
struct ArrowPageWriter {
@@ -347,40 +348,180 @@ impl PageWriter for ArrowPageWriter {
}
}
-/// Encodes a leaf column to [`ArrowPageWriter`]
-enum ArrowColumnWriter {
+/// A leaf column that can be encoded by [`ArrowColumnWriter`]
+#[derive(Debug)]
+pub struct ArrowLeafColumn(ArrayLevels);
+
+/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
+pub fn compute_leaves(field: &Field, array: &ArrayRef) ->
Result<Vec<ArrowLeafColumn>> {
+ let levels = calculate_array_levels(array, field)?;
+ Ok(levels.into_iter().map(ArrowLeafColumn).collect())
+}
+
+/// The data for a single column chunk, see [`ArrowColumnWriter`]
+pub struct ArrowColumnChunk {
+ data: ArrowColumnChunkData,
+ close: ColumnCloseResult,
+}
+
+impl std::fmt::Debug for ArrowColumnChunk {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ArrowColumnChunk")
+ .field("length", &self.data.length)
+ .finish_non_exhaustive()
+ }
+}
+
+impl ArrowColumnChunk {
+ /// Calls [`SerializedRowGroupWriter::append_column`] with this column's
data
+ pub fn append_to_row_group<W: Write + Send>(
+ self,
+ writer: &mut SerializedRowGroupWriter<'_, W>,
+ ) -> Result<()> {
+ writer.append_column(&self.data, self.close)
+ }
+}
+
+/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
+///
+/// Note: This is a low-level interface for applications that require
fine-grained control
+/// of encoding, see [`ArrowWriter`] for a higher-level interface
+///
+/// ```
+/// // The arrow schema
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_schema::*;
+/// # use parquet::arrow::arrow_to_parquet_schema;
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves,
get_column_writers};
+/// # use parquet::file::properties::WriterProperties;
+/// # use parquet::file::writer::SerializedFileWriter;
+/// #
+/// let schema = Arc::new(Schema::new(vec![
+/// Field::new("i32", DataType::Int32, false),
+/// Field::new("f32", DataType::Float32, false),
+/// ]));
+///
+/// // Compute the parquet schema
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
+/// let props = Arc::new(WriterProperties::default());
+///
+/// // Create writers for each of the leaf columns
+/// let col_writers = get_column_writers(&parquet_schema, &props,
&schema).unwrap();
+///
+/// // Spawn a worker thread for each column
+/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio,
would be better
+/// let mut workers: Vec<_> = col_writers
+/// .into_iter()
+/// .map(|mut col_writer| {
+/// let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
+/// let handle = std::thread::spawn(move || {
+/// for col in recv {
+/// col_writer.write(&col)?;
+/// }
+/// col_writer.close()
+/// });
+/// (handle, send)
+/// })
+/// .collect();
+///
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// let mut out = Vec::with_capacity(1024); // This could be a File
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema,
props.clone()).unwrap();
+///
+/// // Start row group
+/// let mut row_group = writer.next_row_group().unwrap();
+///
+/// // Columns to encode
+/// let to_write = vec![
+/// Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
+/// Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
+/// ];
+///
+/// // Spawn work to encode columns
+/// let mut worker_iter = workers.iter_mut();
+/// for (arr, field) in to_write.iter().zip(&schema.fields) {
+/// for leaves in compute_leaves(field, arr).unwrap() {
+/// worker_iter.next().unwrap().1.send(leaves).unwrap();
+/// }
+/// }
+///
+/// // Finish up parallel column encoding
+/// for (handle, send) in workers {
+/// drop(send); // Drop send side to signal termination
+/// let chunk = handle.join().unwrap().unwrap();
+/// chunk.append_to_row_group(&mut row_group).unwrap();
+/// }
+/// row_group.close().unwrap();
+///
+/// let metadata = writer.close().unwrap();
+/// assert_eq!(metadata.num_rows, 3);
+/// ```
+pub struct ArrowColumnWriter {
+ writer: ArrowColumnWriterImpl,
+ chunk: SharedColumnChunk,
+}
+
+impl std::fmt::Debug for ArrowColumnWriter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
+ }
+}
+
+enum ArrowColumnWriterImpl {
ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
Column(ColumnWriter<'static>),
}
impl ArrowColumnWriter {
+ /// Write an [`ArrowLeafColumn`]
+ pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
+ match &mut self.writer {
+ ArrowColumnWriterImpl::Column(c) => {
+ write_leaf(c, &col.0)?;
+ }
+ ArrowColumnWriterImpl::ByteArray(c) => {
+ write_primitive(c, col.0.array().as_ref(), &col.0)?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Close this column returning the written [`ArrowColumnChunk`]
+ pub fn close(self) -> Result<ArrowColumnChunk> {
+ let close = match self.writer {
+ ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
+ ArrowColumnWriterImpl::Column(c) => c.close()?,
+ };
+ let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
+ let data = chunk.into_inner().unwrap();
+ Ok(ArrowColumnChunk { data, close })
+ }
+
/// Returns the estimated total bytes for this column writer
- fn get_estimated_total_bytes(&self) -> u64 {
- match self {
- ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(),
- ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(),
+ pub fn get_estimated_total_bytes(&self) -> usize {
+ match &self.writer {
+ ArrowColumnWriterImpl::ByteArray(c) =>
c.get_estimated_total_bytes() as _,
+ ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes()
as _,
}
}
}
/// Encodes [`RecordBatch`] to a parquet row group
-pub struct ArrowRowGroupWriter {
- writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+struct ArrowRowGroupWriter {
+ writers: Vec<ArrowColumnWriter>,
schema: SchemaRef,
buffered_rows: usize,
}
impl ArrowRowGroupWriter {
- pub fn new(
+ fn new(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
) -> Result<Self> {
- let mut writers = Vec::with_capacity(arrow.fields.len());
- let mut leaves = parquet.columns().iter();
- for field in &arrow.fields {
- get_arrow_column_writer(field.data_type(), props, &mut leaves,
&mut writers)?;
- }
+ let writers = get_column_writers(parquet, props, arrow)?;
Ok(Self {
writers,
schema: arrow.clone(),
@@ -388,51 +529,64 @@ impl ArrowRowGroupWriter {
})
}
- pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
- let mut writers = self.writers.iter_mut().map(|(_, x)| x);
- for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
- let mut levels = calculate_array_levels(array, field)?.into_iter();
- write_leaves(&mut writers, &mut levels, array.as_ref())?;
+ let mut writers = self.writers.iter_mut();
+ for (field, column) in
self.schema.fields().iter().zip(batch.columns()) {
+ for leaf in compute_leaves(field.as_ref(), column)? {
+ writers.next().unwrap().write(&leaf)?
+ }
}
Ok(())
}
- pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
+ fn close(self) -> Result<Vec<ArrowColumnChunk>> {
self.writers
.into_iter()
- .map(|(chunk, writer)| {
- let close_result = match writer {
- ArrowColumnWriter::ByteArray(c) => c.close()?,
- ArrowColumnWriter::Column(c) => c.close()?,
- };
-
- let chunk =
Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap();
- Ok((chunk, close_result))
- })
+ .map(|writer| writer.close())
.collect()
}
}
-/// Get an [`ArrowColumnWriter`] along with a reference to its
[`SharedColumnChunk`]
+/// Returns the [`ArrowColumnWriter`] for a given schema
+pub fn get_column_writers(
+ parquet: &SchemaDescriptor,
+ props: &WriterPropertiesPtr,
+ arrow: &SchemaRef,
+) -> Result<Vec<ArrowColumnWriter>> {
+ let mut writers = Vec::with_capacity(arrow.fields.len());
+ let mut leaves = parquet.columns().iter();
+ for field in &arrow.fields {
+ get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut
writers)?;
+ }
+ Ok(writers)
+}
+
+/// Gets the [`ArrowColumnWriter`] for the given `data_type`
fn get_arrow_column_writer(
data_type: &ArrowDataType,
props: &WriterPropertiesPtr,
leaves: &mut Iter<'_, ColumnDescPtr>,
- out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>,
+ out: &mut Vec<ArrowColumnWriter>,
) -> Result<()> {
let col = |desc: &ColumnDescPtr| {
let page_writer = Box::<ArrowPageWriter>::default();
let chunk = page_writer.buffer.clone();
let writer = get_column_writer(desc.clone(), props.clone(),
page_writer);
- (chunk, ArrowColumnWriter::Column(writer))
+ ArrowColumnWriter {
+ chunk,
+ writer: ArrowColumnWriterImpl::Column(writer),
+ }
};
let bytes = |desc: &ColumnDescPtr| {
let page_writer = Box::<ArrowPageWriter>::default();
let chunk = page_writer.buffer.clone();
let writer = GenericColumnWriter::new(desc.clone(), props.clone(),
page_writer);
- (chunk, ArrowColumnWriter::ByteArray(writer))
+ ArrowColumnWriter {
+ chunk,
+ writer: ArrowColumnWriterImpl::ByteArray(writer),
+ }
};
match data_type {
@@ -478,52 +632,8 @@ fn get_arrow_column_writer(
Ok(())
}
-/// Write the leaves of `array` in depth-first order to `writers` with `levels`
-fn write_leaves<'a, W>(
- writers: &mut W,
- levels: &mut IntoIter<LevelInfo>,
- array: &(dyn Array + 'static),
-) -> Result<()>
-where
- W: Iterator<Item = &'a mut ArrowColumnWriter>,
-{
- match array.data_type() {
- ArrowDataType::List(_) => {
- write_leaves(writers, levels,
array.as_list::<i32>().values().as_ref())?
- }
- ArrowDataType::LargeList(_) => {
- write_leaves(writers, levels,
array.as_list::<i64>().values().as_ref())?
- }
- ArrowDataType::FixedSizeList(_, _) => {
- let array =
array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
- write_leaves(writers, levels, array.values().as_ref())?
- }
- ArrowDataType::Struct(_) => {
- for column in array.as_struct().columns() {
- write_leaves(writers, levels, column.as_ref())?
- }
- }
- ArrowDataType::Map(_, _) => {
- let map = array.as_map();
- write_leaves(writers, levels, map.keys().as_ref())?;
- write_leaves(writers, levels, map.values().as_ref())?
- }
- _ => {
- let levels = levels.next().unwrap();
- match writers.next().unwrap() {
- ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?,
- ArrowColumnWriter::ByteArray(c) => write_primitive(c, array,
levels)?,
- };
- }
- }
- Ok(())
-}
-
-fn write_leaf(
- writer: &mut ColumnWriter<'_>,
- column: &dyn Array,
- levels: LevelInfo,
-) -> Result<usize> {
+fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) ->
Result<usize> {
+ let column = levels.array().as_ref();
let indices = levels.non_null_indices();
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
@@ -678,7 +788,7 @@ fn write_leaf(
fn write_primitive<E: ColumnValueEncoder>(
writer: &mut GenericColumnWriter<E>,
values: &E::Values,
- levels: LevelInfo,
+ levels: &ArrayLevels,
) -> Result<usize> {
writer.write_batch_internal(
values,
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 859a0aa1f9..cafb176135 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -115,8 +115,7 @@ pub type OnCloseRowGroup<'a> = Box<
Vec<Option<ColumnIndex>>,
Vec<Option<OffsetIndex>>,
) -> Result<()>
- + 'a
- + Send,
+ + 'a,
>;
// ----------------------------------------------------------------------