This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/rust-parquet-arrow-writer by
this push:
new 8ccd9c3 ARROW-7842: [Rust] [Parquet] Arrow list reader
8ccd9c3 is described below
commit 8ccd9c3a8c52a219d556a1b1618010f2f913e5d0
Author: Neville Dipale <[email protected]>
AuthorDate: Sat Oct 17 21:14:13 2020 +0200
ARROW-7842: [Rust] [Parquet] Arrow list reader
This is a port of #6770 to the parquet-writer branch.
We'll have more of a chance to test this reader,and ensure that we can
roundtrip on list types.
Closes #8449 from nevi-me/ARROW-7842-cherry
Authored-by: Neville Dipale <[email protected]>
Signed-off-by: Neville Dipale <[email protected]>
---
rust/arrow/src/util/display.rs | 17 +
rust/datafusion/tests/sql.rs | 97 ++++-
rust/parquet/src/arrow/array_reader.rs | 628 ++++++++++++++++++++++++++++++++-
rust/parquet/src/arrow/arrow_writer.rs | 27 +-
rust/parquet/src/schema/visitor.rs | 12 +-
5 files changed, 753 insertions(+), 28 deletions(-)
diff --git a/rust/arrow/src/util/display.rs b/rust/arrow/src/util/display.rs
index bf0cade..87c18d2 100644
--- a/rust/arrow/src/util/display.rs
+++ b/rust/arrow/src/util/display.rs
@@ -44,6 +44,22 @@ macro_rules! make_string {
}};
}
+macro_rules! make_string_from_list {
+ ($column: ident, $row: ident) => {{
+ let list = $column
+ .as_any()
+ .downcast_ref::<array::ListArray>()
+ .ok_or(ArrowError::InvalidArgumentError(format!(
+ "Repl error: could not convert list column to list array."
+ )))?
+ .value($row);
+ let string_values = (0..list.len())
+ .map(|i| array_value_to_string(&list.clone(), i))
+ .collect::<Result<Vec<String>>>()?;
+ Ok(format!("[{}]", string_values.join(", ")))
+ }};
+}
+
/// Get the value at the given row in an array as a String.
///
/// Note this function is quite inefficient and is unlikely to be
@@ -89,6 +105,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row:
usize) -> Result<Str
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(array::Time64NanosecondArray, column, row)
}
+ DataType::List(_) => make_string_from_list!(column, row),
DataType::Dictionary(index_type, _value_type) => match **index_type {
DataType::Int8 => dict_array_value_to_string::<Int8Type>(column,
row),
DataType::Int16 => dict_array_value_to_string::<Int16Type>(column,
row),
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 52027a4..7322b63 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.
+use std::convert::TryFrom;
use std::env;
use std::sync::Arc;
extern crate arrow;
extern crate datafusion;
-use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::TimeUnit};
+use arrow::{datatypes::Int64Type, record_batch::RecordBatch};
use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
util::display::array_value_to_string,
@@ -129,6 +130,100 @@ async fn parquet_single_nan_schema() {
}
#[tokio::test]
+async fn parquet_list_columns() {
+ let mut ctx = ExecutionContext::new();
+ let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not
defined");
+ ctx.register_parquet(
+ "list_columns",
+ &format!("{}/list_columns.parquet", testdata),
+ )
+ .unwrap();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "int64_list",
+ DataType::List(Box::new(DataType::Int64)),
+ true,
+ ),
+ Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)),
true),
+ ]));
+
+ let sql = "SELECT int64_list, utf8_list FROM list_columns";
+ let plan = ctx.create_logical_plan(&sql).unwrap();
+ let plan = ctx.optimize(&plan).unwrap();
+ let plan = ctx.create_physical_plan(&plan).unwrap();
+ let results = ctx.collect(plan).await.unwrap();
+
+ // int64_list utf8_list
+ // 0 [1, 2, 3] [abc, efg, hij]
+ // 1 [None, 1] None
+ // 2 [4] [efg, None, hij, xyz]
+
+ assert_eq!(1, results.len());
+ let batch = &results[0];
+ assert_eq!(3, batch.num_rows());
+ assert_eq!(2, batch.num_columns());
+ assert_eq!(schema, batch.schema());
+
+ let int_list_array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ let utf8_list_array = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ assert_eq!(
+ int_list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<Int64Type>>()
+ .unwrap(),
+ &PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
+ );
+
+ assert_eq!(
+ utf8_list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap(),
+ &StringArray::try_from(vec![Some("abc"), Some("efg"),
Some("hij"),]).unwrap()
+ );
+
+ assert_eq!(
+ int_list_array
+ .value(1)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<Int64Type>>()
+ .unwrap(),
+ &PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
+ );
+
+ assert!(utf8_list_array.is_null(1));
+
+ assert_eq!(
+ int_list_array
+ .value(2)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<Int64Type>>()
+ .unwrap(),
+ &PrimitiveArray::<Int64Type>::from(vec![Some(4),])
+ );
+
+ let result = utf8_list_array.value(2);
+ let result = result.as_any().downcast_ref::<StringArray>().unwrap();
+
+ assert_eq!(result.value(0), "efg");
+ assert!(result.is_null(1));
+ assert_eq!(result.value(2), "hij");
+ assert_eq!(result.value(3), "xyz");
+}
+
+#[tokio::test]
async fn csv_count_star() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx)?;
diff --git a/rust/parquet/src/arrow/array_reader.rs
b/rust/parquet/src/arrow/array_reader.rs
index 6fdf5d5..579dcac 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -25,13 +25,35 @@ use std::sync::Arc;
use std::vec::Vec;
use arrow::array::{
- ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder,
BufferBuilderTrait,
- Int16BufferBuilder, StructArray,
+ Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray,
+ BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait,
FixedSizeBinaryArray,
+ FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, ListBuilder,
+ OffsetSizeTrait, PrimitiveArray, PrimitiveArrayOps, PrimitiveBuilder,
StringArray,
+ StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
- DataType as ArrowType, DateUnit, Field, IntervalUnit, Schema, TimeUnit,
+ BooleanType as ArrowBooleanType, DataType as ArrowType,
+ Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit,
+ DurationMicrosecondType as ArrowDurationMicrosecondType,
+ DurationMillisecondType as ArrowDurationMillisecondType,
+ DurationNanosecondType as ArrowDurationNanosecondType,
+ DurationSecondType as ArrowDurationSecondType, Field,
+ Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
+ Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
+ Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit,
Schema,
+ Time32MillisecondType as ArrowTime32MillisecondType,
+ Time32SecondType as ArrowTime32SecondType,
+ Time64MicrosecondType as ArrowTime64MicrosecondType,
+ Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit,
+ TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as
ArrowTimestampMicrosecondType,
+ TimestampMillisecondType as ArrowTimestampMillisecondType,
+ TimestampNanosecondType as ArrowTimestampNanosecondType,
+ TimestampSecondType as ArrowTimestampSecondType, ToByteSlice,
+ UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type,
+ UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type,
};
+use arrow::util::bit_util;
use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, BoolConverter,
BooleanArrayConverter,
@@ -532,6 +554,400 @@ where
}
}
+/// Implementation of list array reader.
+pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ list_def_level: i16,
+ list_rep_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+ _marker: PhantomData<OffsetSize>,
+}
+
+impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
+ /// Construct list array reader.
+ pub fn new(
+ item_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ item_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ ) -> Self {
+ Self {
+ item_reader,
+ data_type,
+ item_type,
+ list_def_level: def_level,
+ list_rep_level: rep_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ _marker: PhantomData,
+ }
+ }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+ ($item_type:ident) => {{
+ let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+ ($builder:ident) => {{
+ let values_builder = $builder::new(0);
+ let mut builder = ListBuilder::new(values_builder);
+ let empty_list_array = builder.finish();
+ Ok(Arc::new(empty_list_array))
+ }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 =>
build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+ ArrowType::UInt16 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+ }
+ ArrowType::UInt32 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+ }
+ ArrowType::UInt64 => {
+ build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+ }
+ ArrowType::Int8 =>
build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+ ArrowType::Int16 =>
build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+ ArrowType::Int32 =>
build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+ ArrowType::Int64 =>
build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+ ArrowType::Float32 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+ }
+ ArrowType::Float64 => {
+ build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+ }
+ ArrowType::Boolean => {
+ build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+ }
+ ArrowType::Date32(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+ }
+ ArrowType::Date64(_) => {
+ build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+
build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+
build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+ }
+ ArrowType::Utf8 => {
+ build_empty_list_array_with_non_primitive_items!(StringBuilder)
+ }
+ ArrowType::Binary => {
+ build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+ }
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+macro_rules! remove_primitive_array_indices {
+ ($arr: expr, $item_type:ty, $indices:expr) => {{
+ let array_data = match
$arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = $item_builder::new(array_data.len());
+
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+ ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr,
$len:expr) => {{
+ let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+ Some(a) => a,
+ _ => return Err(ParquetError::General(format!("Error generating
next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+ };
+ let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+ for i in 0..array_data.len() {
+ if !$indices.contains(&i) {
+ if array_data.is_null(i) {
+ builder.append_null()?;
+ } else {
+ builder.append_value(array_data.value(i))?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ }};
+}
+
+fn remove_indices(
+ arr: ArrayRef,
+ item_type: ArrowType,
+ indices: Vec<usize>,
+) -> Result<ArrayRef> {
+ match item_type {
+ ArrowType::UInt8 => remove_primitive_array_indices!(arr,
ArrowUInt8Type, indices),
+ ArrowType::UInt16 => {
+ remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+ }
+ ArrowType::UInt32 => {
+ remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+ }
+ ArrowType::UInt64 => {
+ remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+ }
+ ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type,
indices),
+ ArrowType::Int16 => remove_primitive_array_indices!(arr,
ArrowInt16Type, indices),
+ ArrowType::Int32 => remove_primitive_array_indices!(arr,
ArrowInt32Type, indices),
+ ArrowType::Int64 => remove_primitive_array_indices!(arr,
ArrowInt64Type, indices),
+ ArrowType::Float32 => {
+ remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+ }
+ ArrowType::Float64 => {
+ remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+ }
+ ArrowType::Boolean => {
+ remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+ }
+ ArrowType::Date32(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+ }
+ ArrowType::Date64(_) => {
+ remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowTime32SecondType,
indices)
+ }
+ ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime32MillisecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType,
indices)
+ }
+ ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowTime64NanosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Second) => {
+ remove_primitive_array_indices!(arr, ArrowDurationSecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMillisecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType,
indices)
+ }
+ ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+ remove_primitive_array_indices!(arr, ArrowDurationNanosecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampSecondType,
indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMillisecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+ remove_primitive_array_indices!(arr,
ArrowTimestampMicrosecondType, indices)
+ }
+ ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+ remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType,
indices)
+ }
+ ArrowType::Utf8 => {
+ remove_array_indices_custom_builder!(arr, StringArray,
StringBuilder, indices)
+ }
+ ArrowType::Binary => {
+ remove_array_indices_custom_builder!(arr, BinaryArray,
BinaryBuilder, indices)
+ }
+ ArrowType::FixedSizeBinary(size) =>
remove_fixed_size_binary_array_indices!(
+ arr,
+ FixedSizeBinaryArray,
+ FixedSizeBinaryBuilder,
+ indices,
+ size
+ ),
+ _ => Err(ParquetError::General(format!(
+ "ListArray of type List({:?}) is not supported by array_reader",
+ item_type
+ ))),
+ }
+}
+
+/// Implementation of ListArrayReader. Nested lists and lists of structs are
not yet supported.
+impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Returns data type.
+ /// This must be a List.
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let next_batch_array = self.item_reader.next_batch(batch_size)?;
+ let item_type = self.item_reader.get_data_type().clone();
+
+ if next_batch_array.len() == 0 {
+ return build_empty_list_array(item_type);
+ }
+ let def_levels = self
+ .item_reader
+ .get_def_levels()
+ .ok_or_else(|| ArrowError("item_reader def levels are
None.".to_string()))?;
+ let rep_levels = self
+ .item_reader
+ .get_rep_levels()
+ .ok_or_else(|| ArrowError("item_reader rep levels are
None.".to_string()))?;
+
+ if !((def_levels.len() == rep_levels.len())
+ && (rep_levels.len() == next_batch_array.len()))
+ {
+ return Err(ArrowError(
+ "Expected item_reader def_levels and rep_levels to be same
length as batch".to_string(),
+ ));
+ }
+
+ // Need to remove from the values array the nulls that represent null
lists rather than null items
+ // null lists have def_level = 0
+ let mut null_list_indices: Vec<usize> = Vec::new();
+ for i in 0..def_levels.len() {
+ if def_levels[i] == 0 {
+ null_list_indices.push(i);
+ }
+ }
+ let batch_values = match null_list_indices.len() {
+ 0 => next_batch_array.clone(),
+ _ => remove_indices(next_batch_array.clone(), item_type,
null_list_indices)?,
+ };
+
+ // null list has def_level = 0
+ // empty list has def_level = 1
+ // null item in a list has def_level = 2
+ // non-null item has def_level = 3
+ // first item in each list has rep_level = 0, subsequent items have
rep_level = 1
+
+ let mut offsets: Vec<OffsetSize> = Vec::new();
+ let mut cur_offset = OffsetSize::zero();
+ for i in 0..rep_levels.len() {
+ if rep_levels[i] == 0 {
+ offsets.push(cur_offset)
+ }
+ if def_levels[i] > 0 {
+ cur_offset = cur_offset + OffsetSize::one();
+ }
+ }
+ offsets.push(cur_offset);
+
+ let num_bytes = bit_util::ceil(offsets.len(), 8);
+ let mut null_buf =
MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+ let null_slice = null_buf.data_mut();
+ let mut list_index = 0;
+ for i in 0..rep_levels.len() {
+ if rep_levels[i] == 0 && def_levels[i] != 0 {
+ bit_util::set_bit(null_slice, list_index);
+ }
+ if rep_levels[i] == 0 {
+ list_index += 1;
+ }
+ }
+ let value_offsets = Buffer::from(&offsets.to_byte_slice());
+
+ // null list has def_level = 0
+ let null_count = def_levels.iter().filter(|x| x == &&0).count();
+
+ let list_data = ArrayData::builder(self.get_data_type().clone())
+ .len(offsets.len() - 1)
+ .add_buffer(value_offsets)
+ .add_child_data(batch_values.data())
+ .null_bit_buffer(null_buf.freeze())
+ .null_count(null_count)
+ .offset(next_batch_array.offset())
+ .build();
+
+ let result_array = GenericListArray::<OffsetSize>::from(list_data);
+ Ok(Arc::new(result_array))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+}
+
/// Implementation of struct array reader.
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
@@ -875,16 +1291,94 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext
}
/// Build array reader for list type.
- /// Currently this is not supported.
fn visit_list_with_item(
&mut self,
- _list_type: Rc<Type>,
- _item_type: &Type,
- _context: &'a ArrayReaderBuilderContext,
+ list_type: Rc<Type>,
+ item_type: Rc<Type>,
+ context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
- Err(ArrowError(
- "Reading parquet list array into arrow is not supported
yet!".to_string(),
- ))
+ let list_child = &list_type
+ .get_fields()
+ .first()
+ .ok_or_else(|| ArrowError("List field must have a
child.".to_string()))?;
+ let mut new_context = context.clone();
+
+ new_context.path.append(vec![list_type.name().to_string()]);
+
+ match list_type.get_basic_info().repetition() {
+ Repetition::REPEATED => {
+ new_context.def_level += 1;
+ new_context.rep_level += 1;
+ }
+ Repetition::OPTIONAL => {
+ new_context.def_level += 1;
+ }
+ _ => (),
+ }
+
+ match list_child.get_basic_info().repetition() {
+ Repetition::REPEATED => {
+ new_context.def_level += 1;
+ new_context.rep_level += 1;
+ }
+ Repetition::OPTIONAL => {
+ new_context.def_level += 1;
+ }
+ _ => (),
+ }
+
+ let item_reader = self
+ .dispatch(item_type.clone(), &new_context)
+ .unwrap()
+ .unwrap();
+
+ let item_reader_type = item_reader.get_data_type().clone();
+
+ match item_reader_type {
+ ArrowType::List(_)
+ | ArrowType::FixedSizeList(_, _)
+ | ArrowType::Struct(_)
+ | ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
+ "reading List({:?}) into arrow not supported yet",
+ item_type
+ ))),
+ _ => {
+ let arrow_type = self
+ .arrow_schema
+ .field_with_name(list_type.name())
+ .ok()
+ .map(|f| f.data_type().to_owned())
+ .unwrap_or_else(|| {
+ ArrowType::List(Box::new(item_reader_type.clone()))
+ });
+
+ let list_array_reader: Box<dyn ArrayReader> = match arrow_type
{
+ ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
+ item_reader,
+ arrow_type,
+ item_reader_type,
+ new_context.def_level,
+ new_context.rep_level,
+ )),
+ ArrowType::LargeList(_) =>
Box::new(ListArrayReader::<i64>::new(
+ item_reader,
+ arrow_type,
+ item_reader_type,
+ new_context.def_level,
+ new_context.rep_level,
+ )),
+
+ _ => {
+ return Err(ArrowError(format!(
+ "creating ListArrayReader with type {:?} should be
unreachable",
+ arrow_type
+ )))
+ }
+ };
+
+ Ok(Some(list_array_reader))
+ }
+ }
}
}
@@ -1100,7 +1594,10 @@ mod tests {
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
use crate::util::test_common::{get_test_file, make_pages};
- use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray,
StructArray};
+ use arrow::array::{
+ Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray,
StringArray,
+ StructArray,
+ };
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32,
Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1759,4 +2256,113 @@ mod tests {
assert_eq!(array_reader.get_data_type(), &arrow_type);
}
+
+ #[test]
+ fn test_list_array_reader() {
+ // [[1, null, 2], null, [3, 4]]
+ let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ Some(4),
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![3, 2, 3, 0, 3, 3]),
+ Some(vec![0, 1, 1, 0, 0, 1]),
+ );
+
+ let mut list_array_reader = ListArrayReader::<i32>::new(
+ Box::new(item_array_reader),
+ ArrowType::List(Box::new(ArrowType::Int32)),
+ ArrowType::Int32,
+ 1,
+ 1,
+ );
+
+ let next_batch = list_array_reader.next_batch(1024).unwrap();
+ let list_array =
next_batch.as_any().downcast_ref::<ListArray>().unwrap();
+
+ assert_eq!(3, list_array.len());
+ // This passes as I expect
+ assert_eq!(1, list_array.null_count());
+
+ assert_eq!(
+ list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+ );
+
+ assert!(list_array.is_null(1));
+
+ assert_eq!(
+ list_array
+ .value(2)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+ );
+ }
+
+ #[test]
+ fn test_large_list_array_reader() {
+ // [[1, null, 2], null, [3, 4]]
+ let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
+ Some(1),
+ None,
+ Some(2),
+ None,
+ Some(3),
+ Some(4),
+ ]));
+ let item_array_reader = InMemoryArrayReader::new(
+ ArrowType::Int32,
+ array,
+ Some(vec![3, 2, 3, 0, 3, 3]),
+ Some(vec![0, 1, 1, 0, 0, 1]),
+ );
+
+ let mut list_array_reader = ListArrayReader::<i64>::new(
+ Box::new(item_array_reader),
+ ArrowType::LargeList(Box::new(ArrowType::Int32)),
+ ArrowType::Int32,
+ 1,
+ 1,
+ );
+
+ let next_batch = list_array_reader.next_batch(1024).unwrap();
+ let list_array = next_batch
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .unwrap();
+
+ assert_eq!(3, list_array.len());
+
+ assert_eq!(
+ list_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(1), None, Some(2)])
+ );
+
+ assert!(list_array.is_null(1));
+
+ assert_eq!(
+ list_array
+ .value(2)
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap(),
+ &PrimitiveArray::<ArrowInt32>::from(vec![Some(3), Some(4)])
+ );
+ }
}
diff --git a/rust/parquet/src/arrow/arrow_writer.rs
b/rust/parquet/src/arrow/arrow_writer.rs
index ff535dc..d4bdb1e 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -534,6 +534,7 @@ mod tests {
}
#[test]
+ #[ignore = "repetitions might be incorrect, will be addressed as part of
ARROW-9728"]
fn arrow_writer_list() {
// define schema
let schema = Schema::new(vec![Field::new(
@@ -546,7 +547,7 @@ mod tests {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// Construct a buffer for value offsets, for the nested array:
- // [[false], [true, false], null, [true, false, true], [false, true,
false, true]]
+ // [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
@@ -562,6 +563,9 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)]).unwrap();
+ // I think this setup is incorrect because this should pass
+ assert_eq!(batch.column(0).data().null_count(), 1);
+
let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
let mut writer = ArrowWriter::try_new(file, Arc::new(schema),
None).unwrap();
writer.write(&batch).unwrap();
@@ -1063,9 +1067,7 @@ mod tests {
}
#[test]
- #[should_panic(
- expected = "Reading parquet list array into arrow is not supported
yet!"
- )]
+ #[ignore = "repetitions might be incorrect, will be addressed as part of
ARROW-9728"]
fn list_single_column() {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let a_value_offsets =
@@ -1075,16 +1077,18 @@ mod tests {
.add_buffer(a_value_offsets)
.add_child_data(a_values.data())
.build();
- let a = ListArray::from(a_list_data);
+ // I think this setup is incorrect because this should pass
+ assert_eq!(a_list_data.null_count(), 1);
+
+ let a = ListArray::from(a_list_data);
let values = Arc::new(a);
+
one_column_roundtrip("list_single_column", values, false);
}
#[test]
- #[should_panic(
- expected = "Reading parquet list array into arrow is not supported
yet!"
- )]
+ #[ignore = "repetitions might be incorrect, will be addressed as part of
ARROW-9728"]
fn large_list_single_column() {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let a_value_offsets =
@@ -1095,14 +1099,17 @@ mod tests {
.add_buffer(a_value_offsets)
.add_child_data(a_values.data())
.build();
- let a = LargeListArray::from(a_list_data);
+ // I think this setup is incorrect because this should pass
+ assert_eq!(a_list_data.null_count(), 1);
+
+ let a = LargeListArray::from(a_list_data);
let values = Arc::new(a);
+
one_column_roundtrip("large_list_single_column", values, false);
}
#[test]
- #[ignore] // Struct support isn't correct yet - null_bitmap doesn't match
fn struct_single_column() {
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let struct_field_a = Field::new("f", DataType::Int32, false);
diff --git a/rust/parquet/src/schema/visitor.rs
b/rust/parquet/src/schema/visitor.rs
index 6d712ce..a1866fb 100644
--- a/rust/parquet/src/schema/visitor.rs
+++ b/rust/parquet/src/schema/visitor.rs
@@ -50,7 +50,7 @@ pub trait TypeVisitor<R, C> {
{
self.visit_list_with_item(
list_type.clone(),
- list_item,
+ list_item.clone(),
context,
)
} else {
@@ -70,13 +70,13 @@ pub trait TypeVisitor<R, C> {
{
self.visit_list_with_item(
list_type.clone(),
- fields.first().unwrap(),
+ fields.first().unwrap().clone(),
context,
)
} else {
self.visit_list_with_item(
list_type.clone(),
- list_item,
+ list_item.clone(),
context,
)
}
@@ -114,7 +114,7 @@ pub trait TypeVisitor<R, C> {
fn visit_list_with_item(
&mut self,
list_type: TypePtr,
- item_type: &Type,
+ item_type: TypePtr,
context: C,
) -> Result<R>;
}
@@ -125,7 +125,7 @@ mod tests {
use crate::basic::Type as PhysicalType;
use crate::errors::Result;
use crate::schema::parser::parse_message_type;
- use crate::schema::types::{Type, TypePtr};
+ use crate::schema::types::TypePtr;
use std::rc::Rc;
struct TestVisitorContext {}
@@ -174,7 +174,7 @@ mod tests {
fn visit_list_with_item(
&mut self,
list_type: TypePtr,
- item_type: &Type,
+ item_type: TypePtr,
_context: TestVisitorContext,
) -> Result<bool> {
assert_eq!(