This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed1992 Remove arrow array reader (#1197) (#1234)
6ed1992 is described below
commit 6ed1992101ad3c99094818ab1a12b4a28bb043eb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jan 24 15:07:26 2022 +0000
Remove arrow array reader (#1197) (#1234)
---
parquet/Cargo.toml | 2 +-
.../{arrow_array_reader.rs => arrow_reader.rs} | 277 +--
parquet/src/arrow/arrow_array_reader.rs | 1922 --------------------
parquet/src/arrow/mod.rs | 1 -
4 files changed, 40 insertions(+), 2162 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index f0ba360..0765da1 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -81,6 +81,6 @@ name = "arrow_writer"
harness = false
[[bench]]
-name = "arrow_array_reader"
+name = "arrow_reader"
required-features = ["test_common", "experimental"]
harness = false
diff --git a/parquet/benches/arrow_array_reader.rs
b/parquet/benches/arrow_reader.rs
similarity index 74%
rename from parquet/benches/arrow_array_reader.rs
rename to parquet/benches/arrow_reader.rs
index 9db5b4c..1bd6fc6 100644
--- a/parquet/benches/arrow_array_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -288,17 +288,6 @@ fn bench_array_reader(mut array_reader: Box<dyn
ArrayReader>) -> usize {
total_count
}
-fn create_int32_arrow_array_reader(
- page_iterator: impl PageIterator + 'static,
- column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
- use parquet::arrow::arrow_array_reader::{ArrowArrayReader,
PrimitiveArrayConverter};
- let converter =
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
- let reader =
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap();
- Box::new(reader)
-}
-
fn create_int32_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
@@ -314,17 +303,6 @@ fn create_int32_primitive_array_reader(
Box::new(reader)
}
-fn create_string_arrow_array_reader(
- page_iterator: impl PageIterator + 'static,
- column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
- use parquet::arrow::arrow_array_reader::{ArrowArrayReader,
StringArrayConverter};
- let converter = StringArrayConverter::new();
- let reader =
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None).unwrap();
- Box::new(reader)
-}
-
fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
@@ -394,66 +372,32 @@ fn add_benches(c: &mut Criterion) {
mandatory_int32_column_desc.clone(),
0.0,
);
- group.bench_function(
- "read Int32Array, plain encoded, mandatory, no NULLs - old",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_primitive_array_reader(
- plain_int32_no_null_data.clone(),
- mandatory_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
- group.bench_function(
- "read Int32Array, plain encoded, mandatory, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- plain_int32_no_null_data.clone(),
- mandatory_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
+ group.bench_function("read Int32Array, plain encoded, mandatory, no
NULLs", |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ plain_int32_no_null_data.clone(),
+ mandatory_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.0,
);
- group.bench_function(
- "read Int32Array, plain encoded, optional, no NULLs - old",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_primitive_array_reader(
- plain_int32_no_null_data.clone(),
- optional_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
- group.bench_function(
- "read Int32Array, plain encoded, optional, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- plain_int32_no_null_data.clone(),
- optional_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
+ group.bench_function("read Int32Array, plain encoded, optional, no NULLs",
|b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ plain_int32_no_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
// int32, plain encoded, half NULLs
let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
@@ -462,7 +406,7 @@ fn add_benches(c: &mut Criterion) {
0.5,
);
group.bench_function(
- "read Int32Array, plain encoded, optional, half NULLs - old",
+ "read Int32Array, plain encoded, optional, half NULLs",
|b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
@@ -475,20 +419,6 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read Int32Array, plain encoded, optional, half NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- plain_int32_half_null_data.clone(),
- optional_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
// int32, dictionary encoded, no NULLs
let dictionary_int32_no_null_data =
build_dictionary_encoded_int32_page_iterator(
schema.clone(),
@@ -496,7 +426,7 @@ fn add_benches(c: &mut Criterion) {
0.0,
);
group.bench_function(
- "read Int32Array, dictionary encoded, mandatory, no NULLs - old",
+ "read Int32Array, dictionary encoded, mandatory, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
@@ -509,27 +439,13 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read Int32Array, dictionary encoded, mandatory, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- dictionary_int32_no_null_data.clone(),
- mandatory_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
let dictionary_int32_no_null_data =
build_dictionary_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.0,
);
group.bench_function(
- "read Int32Array, dictionary encoded, optional, no NULLs - old",
+ "read Int32Array, dictionary encoded, optional, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
@@ -542,20 +458,6 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read Int32Array, dictionary encoded, optional, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- dictionary_int32_no_null_data.clone(),
- optional_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
// int32, dictionary encoded, half NULLs
let dictionary_int32_half_null_data =
build_dictionary_encoded_int32_page_iterator(
schema.clone(),
@@ -563,7 +465,7 @@ fn add_benches(c: &mut Criterion) {
0.5,
);
group.bench_function(
- "read Int32Array, dictionary encoded, optional, half NULLs - old",
+ "read Int32Array, dictionary encoded, optional, half NULLs",
|b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
@@ -576,20 +478,6 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read Int32Array, dictionary encoded, optional, half NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_int32_arrow_array_reader(
- dictionary_int32_half_null_data.clone(),
- optional_int32_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
// string benchmarks
//==============================
@@ -600,7 +488,7 @@ fn add_benches(c: &mut Criterion) {
0.0,
);
group.bench_function(
- "read StringArray, plain encoded, mandatory, no NULLs - old",
+ "read StringArray, plain encoded, mandatory, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
@@ -613,52 +501,21 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read StringArray, plain encoded, mandatory, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- plain_string_no_null_data.clone(),
- mandatory_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
schema.clone(),
optional_string_column_desc.clone(),
0.0,
);
- group.bench_function(
- "read StringArray, plain encoded, optional, no NULLs - old",
- |b| {
- b.iter(|| {
- let array_reader = create_string_byte_array_reader(
- plain_string_no_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
- group.bench_function(
- "read StringArray, plain encoded, optional, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- plain_string_no_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
+ group.bench_function("read StringArray, plain encoded, optional, no
NULLs", |b| {
+ b.iter(|| {
+ let array_reader = create_string_byte_array_reader(
+ plain_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
// string, plain encoded, half NULLs
let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
@@ -667,7 +524,7 @@ fn add_benches(c: &mut Criterion) {
0.5,
);
group.bench_function(
- "read StringArray, plain encoded, optional, half NULLs - old",
+ "read StringArray, plain encoded, optional, half NULLs",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
@@ -680,20 +537,6 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read StringArray, plain encoded, optional, half NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- plain_string_half_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
// string, dictionary encoded, no NULLs
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(
schema.clone(),
@@ -701,7 +544,7 @@ fn add_benches(c: &mut Criterion) {
0.0,
);
group.bench_function(
- "read StringArray, dictionary encoded, mandatory, no NULLs - old",
+ "read StringArray, dictionary encoded, mandatory, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
@@ -714,27 +557,13 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read StringArray, dictionary encoded, mandatory, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- dictionary_string_no_null_data.clone(),
- mandatory_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
let dictionary_string_no_null_data =
build_dictionary_encoded_string_page_iterator(
schema.clone(),
optional_string_column_desc.clone(),
0.0,
);
group.bench_function(
- "read StringArray, dictionary encoded, optional, no NULLs - old",
+ "read StringArray, dictionary encoded, optional, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
@@ -747,20 +576,6 @@ fn add_benches(c: &mut Criterion) {
},
);
- group.bench_function(
- "read StringArray, dictionary encoded, optional, no NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- dictionary_string_no_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data =
build_dictionary_encoded_string_page_iterator(
schema.clone(),
@@ -768,7 +583,7 @@ fn add_benches(c: &mut Criterion) {
0.5,
);
group.bench_function(
- "read StringArray, dictionary encoded, optional, half NULLs - old",
+ "read StringArray, dictionary encoded, optional, half NULLs",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_reader(
@@ -782,20 +597,6 @@ fn add_benches(c: &mut Criterion) {
);
group.bench_function(
- "read StringArray, dictionary encoded, optional, half NULLs - new",
- |b| {
- b.iter(|| {
- let array_reader = create_string_arrow_array_reader(
- dictionary_string_half_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- },
- );
-
- group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
diff --git a/parquet/src/arrow/arrow_array_reader.rs
b/parquet/src/arrow/arrow_array_reader.rs
deleted file mode 100644
index d814f7f..0000000
--- a/parquet/src/arrow/arrow_array_reader.rs
+++ /dev/null
@@ -1,1922 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::array_reader::ArrayReader;
-use crate::arrow::schema::parquet_to_arrow_field;
-use crate::basic::Encoding;
-use crate::data_type::{ByteArray, ByteArrayType};
-use crate::encodings::decoding::{Decoder, DeltaByteArrayDecoder};
-use crate::errors::{ParquetError, Result};
-use crate::{
- column::page::{Page, PageIterator},
- schema::types::{ColumnDescPtr, ColumnDescriptor},
- util::memory::ByteBufferPtr,
-};
-use arrow::{
- array::{ArrayRef, Int16Array},
- buffer::MutableBuffer,
- datatypes::{DataType as ArrowType, ToByteSlice},
-};
-use std::{any::Any, collections::VecDeque, marker::PhantomData};
-use std::{cell::RefCell, rc::Rc};
-
-struct UnzipIter<Source, Target, State> {
- shared_state: Rc<RefCell<State>>,
- select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
- consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
-}
-
-impl<Source, Target, State> UnzipIter<Source, Target, State> {
- fn new(
- shared_state: Rc<RefCell<State>>,
- item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>,
- source_item_consumer: fn(source_item: Source, state: &mut State) ->
Target,
- ) -> Self {
- Self {
- shared_state,
- select_item_buffer: item_buffer_selector,
- consume_source_item: source_item_consumer,
- }
- }
-}
-
-trait UnzipIterState<T> {
- type SourceIter: Iterator<Item = T>;
- fn source_iter(&mut self) -> &mut Self::SourceIter;
-}
-
-impl<Source, Target, State: UnzipIterState<Source>> Iterator
- for UnzipIter<Source, Target, State>
-{
- type Item = Target;
-
- fn next(&mut self) -> Option<Self::Item> {
- let mut inner = self.shared_state.borrow_mut();
- // try to get one from the stored data
- (self.select_item_buffer)(&mut *inner)
- .pop_front()
- .or_else(||
- // nothing stored, we need a new element.
- inner.source_iter().next().map(|s| {
- (self.consume_source_item)(s, &mut inner)
- }))
- }
-}
-
-struct PageBufferUnzipIterState<V, L, It> {
- iter: It,
- value_iter_buffer: VecDeque<V>,
- def_level_iter_buffer: VecDeque<L>,
- rep_level_iter_buffer: VecDeque<L>,
-}
-
-impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)>
- for PageBufferUnzipIterState<V, L, It>
-{
- type SourceIter = It;
-
- #[inline]
- fn source_iter(&mut self) -> &mut Self::SourceIter {
- &mut self.iter
- }
-}
-
-type ValueUnzipIter<V, L, It> =
- UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>;
-type LevelUnzipIter<V, L, It> =
- UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>;
-type PageUnzipResult<V, L, It> = (
- ValueUnzipIter<V, L, It>,
- LevelUnzipIter<V, L, It>,
- LevelUnzipIter<V, L, It>,
-);
-
-fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) ->
PageUnzipResult<V, L, It> {
- let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState {
- iter: it,
- value_iter_buffer: VecDeque::new(),
- def_level_iter_buffer: VecDeque::new(),
- rep_level_iter_buffer: VecDeque::new(),
- }));
-
- let value_iter = UnzipIter::new(
- shared_data.clone(),
- |state| &mut state.value_iter_buffer,
- |(v, d, r), state| {
- state.def_level_iter_buffer.push_back(d);
- state.rep_level_iter_buffer.push_back(r);
- v
- },
- );
-
- let def_level_iter = UnzipIter::new(
- shared_data.clone(),
- |state| &mut state.def_level_iter_buffer,
- |(v, d, r), state| {
- state.value_iter_buffer.push_back(v);
- state.rep_level_iter_buffer.push_back(r);
- d
- },
- );
-
- let rep_level_iter = UnzipIter::new(
- shared_data,
- |state| &mut state.rep_level_iter_buffer,
- |(v, d, r), state| {
- state.value_iter_buffer.push_back(v);
- state.def_level_iter_buffer.push_back(d);
- r
- },
- );
-
- (value_iter, def_level_iter, rep_level_iter)
-}
-
-pub trait ArrayConverter {
- fn convert_value_bytes(
- &self,
- value_decoder: &mut impl ValueDecoder,
- num_values: usize,
- ) -> Result<arrow::array::ArrayData>;
-}
-
-pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
- column_desc: ColumnDescPtr,
- data_type: ArrowType,
- def_level_decoder: Box<dyn ValueDecoder + 'a>,
- rep_level_decoder: Box<dyn ValueDecoder + 'a>,
- value_decoder: Box<dyn ValueDecoder + 'a>,
- last_def_levels: Option<Int16Array>,
- last_rep_levels: Option<Int16Array>,
- array_converter: C,
-}
-
-pub(crate) struct ColumnChunkContext {
- dictionary_values: Option<Vec<ByteBufferPtr>>,
-}
-
-impl ColumnChunkContext {
- fn new() -> Self {
- Self {
- dictionary_values: None,
- }
- }
-
- fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
- self.dictionary_values = Some(dictionary_values);
- }
-}
-
-type PageDecoderTuple = (
- Box<dyn ValueDecoder>,
- Box<dyn ValueDecoder>,
- Box<dyn ValueDecoder>,
-);
-
-impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
- pub fn try_new<P: PageIterator + 'a>(
- column_chunk_iterator: P,
- column_desc: ColumnDescPtr,
- array_converter: C,
- arrow_type: Option<ArrowType>,
- ) -> Result<Self> {
- let data_type = match arrow_type {
- Some(t) => t,
- None => parquet_to_arrow_field(column_desc.as_ref())?
- .data_type()
- .clone(),
- };
- type PageIteratorItem = Result<(Page,
Rc<RefCell<ColumnChunkContext>>)>;
- let page_iter = column_chunk_iterator
- // build iterator of pages across column chunks
- .flat_map(|x| -> Box<dyn Iterator<Item = PageIteratorItem>> {
- // attach column chunk context
- let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
- match x {
- Ok(page_reader) => Box::new(
- page_reader.map(move |pr| pr.map(|p| (p,
context.clone()))),
- ),
- // errors from reading column chunks / row groups are
propagated to page level
- Err(e) => Box::new(std::iter::once(Err(e))),
- }
- });
- // capture a clone of column_desc in closure so that it can outlive
current function
- let map_page_fn_factory = |column_desc: ColumnDescPtr| {
- move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| {
- x.and_then(|(page, context)| {
- Self::map_page(page, context, column_desc.as_ref())
- })
- }
- };
- let map_page_fn = map_page_fn_factory(column_desc.clone());
- // map page iterator into tuple of buffer iterators for (values, def
levels, rep levels)
- // errors from lower levels are surfaced through the value decoder
iterator
- let decoder_iter = page_iter.map(map_page_fn).map(|x| match x {
- Ok(iter_tuple) => iter_tuple,
- // errors from reading pages are propagated to decoder iterator
level
- Err(e) => Self::map_page_error(e),
- });
- // split tuple iterator into separate iterators for (values, def
levels, rep levels)
- let (value_iter, def_level_iter, rep_level_iter) =
unzip_iter(decoder_iter);
-
- Ok(Self {
- column_desc,
- data_type,
- def_level_decoder:
Box::new(CompositeValueDecoder::new(def_level_iter)),
- rep_level_decoder:
Box::new(CompositeValueDecoder::new(rep_level_iter)),
- value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
- last_def_levels: None,
- last_rep_levels: None,
- array_converter,
- })
- }
-
- #[inline]
- fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
- column_desc.max_def_level() > 0
- }
-
- #[inline]
- fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
- column_desc.max_rep_level() > 0
- }
-
- fn map_page_error(err: ParquetError) -> PageDecoderTuple {
- (
- Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
- Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
- Box::new(<dyn ValueDecoder>::once(Err(err))),
- )
- }
-
- // Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>,
Iterator<RepLevels>)>
- // this method could fail, e.g. if the page encoding is not supported
- fn map_page(
- page: Page,
- column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
- column_desc: &ColumnDescriptor,
- ) -> Result<PageDecoderTuple> {
- use crate::encodings::levels::LevelDecoder;
- match page {
- Page::DictionaryPage {
- buf,
- num_values,
- encoding,
- ..
- } => {
- let mut column_chunk_context =
column_chunk_context.borrow_mut();
- if column_chunk_context.dictionary_values.is_some() {
- return Err(general_err!(
- "Column chunk cannot have more than one dictionary"
- ));
- }
- // create plain decoder for dictionary values
- let mut dict_decoder = Self::get_dictionary_page_decoder(
- buf,
- num_values as usize,
- encoding,
- column_desc,
- )?;
- // decode and cache dictionary values
- let dictionary_values = dict_decoder.read_dictionary_values()?;
- column_chunk_context.set_dictionary(dictionary_values);
-
- // a dictionary page doesn't return any values
- Ok((
- Box::new(<dyn ValueDecoder>::empty()),
- Box::new(<dyn ValueDecoder>::empty()),
- Box::new(<dyn ValueDecoder>::empty()),
- ))
- }
- Page::DataPage {
- buf,
- num_values,
- encoding,
- def_level_encoding,
- rep_level_encoding,
- statistics: _,
- } => {
- let mut buffer_ptr = buf;
- // create rep level decoder iterator
- let rep_level_iter: Box<dyn ValueDecoder> =
- if Self::rep_levels_available(column_desc) {
- let mut rep_decoder = LevelDecoder::v1(
- rep_level_encoding,
- column_desc.max_rep_level(),
- );
- let rep_level_byte_len =
- rep_decoder.set_data(num_values as usize,
buffer_ptr.all());
- // advance buffer pointer
- buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
- Box::new(LevelValueDecoder::new(rep_decoder))
- } else {
- Box::new(<dyn
ValueDecoder>::once(Err(ParquetError::General(
- "rep levels are not available".to_string(),
- ))))
- };
- // create def level decoder iterator
- let (def_level_iter, value_count): (Box<dyn ValueDecoder>,
usize) =
- if Self::def_levels_available(column_desc) {
- // calculate actual value count
- let mut def_decoder = LevelDecoder::v1(
- def_level_encoding,
- column_desc.max_def_level(),
- );
- def_decoder.set_data(num_values as usize,
buffer_ptr.all());
- let value_count = Self::count_def_level_values(
- column_desc,
- def_decoder,
- num_values as usize,
- )?;
- // create def level decoder
- def_decoder = LevelDecoder::v1(
- def_level_encoding,
- column_desc.max_def_level(),
- );
- let def_levels_byte_len =
- def_decoder.set_data(num_values as usize,
buffer_ptr.all());
- // advance buffer pointer
- buffer_ptr =
buffer_ptr.start_from(def_levels_byte_len);
- (Box::new(LevelValueDecoder::new(def_decoder)),
value_count)
- } else {
- (
- Box::new(<dyn ValueDecoder>::once(Err(
- ParquetError::General(
- "def levels are not available".to_string(),
- ),
- ))),
- num_values as usize,
- )
- };
- // create value decoder iterator
- let value_iter = Self::get_value_decoder(
- buffer_ptr,
- value_count,
- encoding,
- column_desc,
- column_chunk_context,
- )?;
- Ok((value_iter, def_level_iter, rep_level_iter))
- }
- Page::DataPageV2 {
- buf,
- num_values,
- encoding,
- num_nulls: _,
- num_rows: _,
- def_levels_byte_len,
- rep_levels_byte_len,
- is_compressed: _,
- statistics: _,
- } => {
- let mut offset = 0;
- // create rep level decoder iterator
- let rep_level_iter: Box<dyn ValueDecoder> =
- if Self::rep_levels_available(column_desc) {
- let rep_levels_byte_len = rep_levels_byte_len as usize;
- let mut rep_decoder =
- LevelDecoder::v2(column_desc.max_rep_level());
- rep_decoder.set_data_range(
- num_values as usize,
- &buf,
- offset,
- rep_levels_byte_len,
- );
- offset += rep_levels_byte_len;
- Box::new(LevelValueDecoder::new(rep_decoder))
- } else {
- Box::new(<dyn
ValueDecoder>::once(Err(ParquetError::General(
- "rep levels are not available".to_string(),
- ))))
- };
- // create def level decoder iterator
- let (def_level_iter, value_count): (Box<dyn ValueDecoder>,
usize) =
- if Self::def_levels_available(column_desc) {
- let def_levels_byte_len = def_levels_byte_len as usize;
- // calculate actual value count
- let mut def_decoder =
- LevelDecoder::v2(column_desc.max_def_level());
- def_decoder.set_data_range(
- num_values as usize,
- &buf,
- offset,
- def_levels_byte_len,
- );
- let value_count = Self::count_def_level_values(
- column_desc,
- def_decoder,
- num_values as usize,
- )?;
- // create def level decoder
- def_decoder =
LevelDecoder::v2(column_desc.max_def_level());
- def_decoder.set_data_range(
- num_values as usize,
- &buf,
- offset,
- def_levels_byte_len,
- );
- offset += def_levels_byte_len;
- (Box::new(LevelValueDecoder::new(def_decoder)),
value_count)
- } else {
- (
- Box::new(<dyn ValueDecoder>::once(Err(
- ParquetError::General(
- "def levels are not available".to_string(),
- ),
- ))),
- num_values as usize,
- )
- };
-
- // create value decoder iterator
- let values_buffer = buf.start_from(offset);
- let value_iter = Self::get_value_decoder(
- values_buffer,
- value_count,
- encoding,
- column_desc,
- column_chunk_context,
- )?;
- Ok((value_iter, def_level_iter, rep_level_iter))
- }
- }
- }
-
- fn count_def_level_values(
- column_desc: &ColumnDescriptor,
- level_decoder: crate::encodings::levels::LevelDecoder,
- num_values: usize,
- ) -> Result<usize> {
- let mut def_level_decoder = LevelValueDecoder::new(level_decoder);
- let def_level_array =
- Self::build_level_array(&mut def_level_decoder, num_values)?;
- let def_level_count = def_level_array.len();
- // use eq_scalar to efficiently build null bitmap array from def levels
- let null_bitmap_array =
- arrow::compute::eq_scalar(&def_level_array,
column_desc.max_def_level())?;
- // efficiently calculate values to read
- Ok(null_bitmap_array
- .values()
- .count_set_bits_offset(0, def_level_count))
- }
-
- fn get_dictionary_page_decoder(
- values_buffer: ByteBufferPtr,
- num_values: usize,
- mut encoding: Encoding,
- column_desc: &ColumnDescriptor,
- ) -> Result<Box<dyn DictionaryValueDecoder>> {
- if encoding == Encoding::PLAIN || encoding ==
Encoding::PLAIN_DICTIONARY {
- encoding = Encoding::RLE_DICTIONARY
- }
-
- if encoding == Encoding::RLE_DICTIONARY {
- Ok(
- Self::get_plain_value_decoder(values_buffer, num_values,
column_desc)
- .into_dictionary_decoder(),
- )
- } else {
- Err(nyi_err!(
- "Invalid/Unsupported encoding type for dictionary: {}",
- encoding
- ))
- }
- }
-
- fn get_value_decoder(
- values_buffer: ByteBufferPtr,
- num_values: usize,
- mut encoding: Encoding,
- column_desc: &ColumnDescriptor,
- column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
- ) -> Result<Box<dyn ValueDecoder>> {
- if encoding == Encoding::PLAIN_DICTIONARY {
- encoding = Encoding::RLE_DICTIONARY;
- }
-
- match encoding {
- Encoding::PLAIN => {
- Ok(
- Self::get_plain_value_decoder(values_buffer, num_values,
column_desc)
- .into_value_decoder(),
- )
- }
- Encoding::RLE_DICTIONARY => {
- if column_chunk_context.borrow().dictionary_values.is_some() {
- let value_bit_len =
Self::get_column_physical_bit_len(column_desc);
- let dictionary_decoder: Box<dyn ValueDecoder> = if
value_bit_len == 0
- {
- Box::new(VariableLenDictionaryDecoder::new(
- column_chunk_context,
- values_buffer,
- num_values,
- ))
- } else {
- Box::new(FixedLenDictionaryDecoder::new(
- column_chunk_context,
- values_buffer,
- num_values,
- value_bit_len,
- ))
- };
- Ok(dictionary_decoder)
- } else {
- Err(general_err!("Dictionary values have not been
initialized."))
- }
- }
- // Encoding::RLE => Box::new(RleValueDecoder::new()),
- // Encoding::DELTA_BINARY_PACKED =>
Box::new(DeltaBitPackDecoder::new()),
- // Encoding::DELTA_LENGTH_BYTE_ARRAY =>
Box::new(DeltaLengthByteArrayDecoder::new()),
- Encoding::DELTA_BYTE_ARRAY =>
Ok(Box::new(DeltaByteArrayValueDecoder::new(
- values_buffer,
- num_values,
- )?)),
- e => return Err(nyi_err!("Encoding {} is not supported", e)),
- }
- }
-
- fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
- use crate::basic::Type as PhysicalType;
- // parquet only supports a limited number of physical types
- // later converters cast to a more specific arrow / logical type if
necessary
- match column_desc.physical_type() {
- PhysicalType::BOOLEAN => 1,
- PhysicalType::INT32 | PhysicalType::FLOAT => 32,
- PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
- PhysicalType::INT96 => 96,
- PhysicalType::BYTE_ARRAY => 0,
- PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as
usize * 8,
- }
- }
-
- fn get_plain_value_decoder(
- values_buffer: ByteBufferPtr,
- num_values: usize,
- column_desc: &ColumnDescriptor,
- ) -> Box<dyn PlainValueDecoder> {
- let value_bit_len = Self::get_column_physical_bit_len(column_desc);
- if value_bit_len == 0 {
- Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
- } else {
- Box::new(FixedLenPlainDecoder::new(
- values_buffer,
- num_values,
- value_bit_len,
- ))
- }
- }
-
- fn build_level_array(
- level_decoder: &mut impl ValueDecoder,
- batch_size: usize,
- ) -> Result<Int16Array> {
- use arrow::datatypes::Int16Type;
- let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
- let array_data =
- level_converter.convert_value_bytes(level_decoder, batch_size)?;
- Ok(Int16Array::from(array_data))
- }
-}
-
-impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn get_data_type(&self) -> &ArrowType {
- &self.data_type
- }
-
- fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
- if Self::rep_levels_available(&self.column_desc) {
- // read rep levels if available
- let rep_level_array =
- Self::build_level_array(&mut self.rep_level_decoder,
batch_size)?;
- self.last_rep_levels = Some(rep_level_array);
- }
-
- // check if def levels are available
- let (values_to_read, null_bitmap_array) =
- if !Self::def_levels_available(&self.column_desc) {
- // if no def levels - just read (up to) batch_size values
- (batch_size, None)
- } else {
- // if def levels are available - they determine how many
values will be read
- // decode def levels, return first error if any
- let def_level_array =
- Self::build_level_array(&mut self.def_level_decoder,
batch_size)?;
- let def_level_count = def_level_array.len();
- // use eq_scalar to efficiently build null bitmap array from
def levels
- let null_bitmap_array = arrow::compute::eq_scalar(
- &def_level_array,
- self.column_desc.max_def_level(),
- )?;
- self.last_def_levels = Some(def_level_array);
- // efficiently calculate values to read
- let values_to_read = null_bitmap_array
- .values()
- .count_set_bits_offset(0, def_level_count);
- let maybe_null_bitmap = if values_to_read !=
null_bitmap_array.len() {
- Some(null_bitmap_array)
- } else {
- // shortcut if no NULLs
- None
- };
- (values_to_read, maybe_null_bitmap)
- };
-
- // read a batch of values
- // converter only creates a no-null / all value array data
- let mut value_array_data = self
- .array_converter
- .convert_value_bytes(&mut self.value_decoder, values_to_read)?;
-
- if let Some(null_bitmap_array) = null_bitmap_array {
- // Only if def levels are available - insert null values
efficiently using MutableArrayData.
- // This will require value bytes to be copied again, but converter
requirements are reduced.
- // With a small number of NULLs, this will only be a few copies of
large byte sequences.
- let actual_batch_size = null_bitmap_array.len();
- // use_nulls is false, because null_bitmap_array is already
calculated and re-used
- let mut mutable = arrow::array::MutableArrayData::new(
- vec![&value_array_data],
- false,
- actual_batch_size,
- );
- // SlicesIterator slices only the true values, NULLs are inserted
to fill any gaps
- arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(
- |(start, end)| {
- // the gap needs to be filled with NULLs
- if start > mutable.len() {
- let nulls_to_add = start - mutable.len();
- mutable.extend_nulls(nulls_to_add);
- }
- // fill values, adjust start and end with NULL count so far
- let nulls_added = mutable.null_count();
- mutable.extend(0, start - nulls_added, end - nulls_added);
- },
- );
- // any remaining part is NULLs
- if mutable.len() < actual_batch_size {
- let nulls_to_add = actual_batch_size - mutable.len();
- mutable.extend_nulls(nulls_to_add);
- }
-
- value_array_data = unsafe {
- mutable
- .into_builder()
- .null_bit_buffer(null_bitmap_array.values().clone())
- .build_unchecked()
- };
- }
- let mut array = arrow::array::make_array(value_array_data);
- if array.data_type() != &self.data_type {
- // cast array to self.data_type if necessary
- array = arrow::compute::cast(&array, &self.data_type)?
- }
- Ok(array)
- }
-
- fn get_def_levels(&self) -> Option<&[i16]> {
- self.last_def_levels.as_ref().map(|x| x.values())
- }
-
- fn get_rep_levels(&self) -> Option<&[i16]> {
- self.last_rep_levels.as_ref().map(|x| x.values())
- }
-}
-
-use crate::encodings::rle::RleDecoder;
-
-pub trait ValueDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize>;
-}
-
-trait DictionaryValueDecoder {
- fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
-}
-
-trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
- fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
- fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn
DictionaryValueDecoder>;
-}
-
-impl<T> PlainValueDecoder for T
-where
- T: ValueDecoder + DictionaryValueDecoder + 'static,
-{
- fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
- self
- }
-
- fn into_dictionary_decoder(self: Box<T>) -> Box<dyn
DictionaryValueDecoder> {
- self
- }
-}
-
-impl dyn ValueDecoder {
- fn empty() -> impl ValueDecoder {
- SingleValueDecoder::new(Ok(0))
- }
-
- fn once(value: Result<usize>) -> impl ValueDecoder {
- SingleValueDecoder::new(value)
- }
-}
-
-impl ValueDecoder for Box<dyn ValueDecoder> {
- #[inline]
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- self.as_mut().read_value_bytes(num_values, read_bytes)
- }
-}
-
-struct SingleValueDecoder {
- value: Result<usize>,
-}
-
-impl SingleValueDecoder {
- fn new(value: Result<usize>) -> Self {
- Self { value }
- }
-}
-
-impl ValueDecoder for SingleValueDecoder {
- fn read_value_bytes(
- &mut self,
- _num_values: usize,
- _read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- self.value.clone()
- }
-}
-
-struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
- current_decoder: Option<Box<dyn ValueDecoder>>,
- decoder_iter: I,
-}
-
-impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
- fn new(mut decoder_iter: I) -> Self {
- let current_decoder = decoder_iter.next();
- Self {
- current_decoder,
- decoder_iter,
- }
- }
-}
-
-impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder
- for CompositeValueDecoder<I>
-{
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- let mut values_to_read = num_values;
- while values_to_read > 0 {
- let value_decoder = match self.current_decoder.as_mut() {
- Some(d) => d,
- // no more decoders
- None => break,
- };
- while values_to_read > 0 {
- let values_read =
- value_decoder.read_value_bytes(values_to_read,
read_bytes)?;
- if values_read > 0 {
- values_to_read -= values_read;
- } else {
- // no more values in current decoder
- self.current_decoder = self.decoder_iter.next();
- break;
- }
- }
- }
-
- Ok(num_values - values_to_read)
- }
-}
-
-struct LevelValueDecoder {
- level_decoder: crate::encodings::levels::LevelDecoder,
- level_value_buffer: Vec<i16>,
-}
-
-impl LevelValueDecoder {
- fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
- Self {
- level_decoder,
- level_value_buffer: vec![0i16; 2048],
- }
- }
-}
-
-impl ValueDecoder for LevelValueDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- let value_size = std::mem::size_of::<i16>();
- let mut total_values_read = 0;
- while total_values_read < num_values {
- let values_to_read = std::cmp::min(
- num_values - total_values_read,
- self.level_value_buffer.len(),
- );
- let values_read = match self
- .level_decoder
- .get(&mut self.level_value_buffer[..values_to_read])
- {
- Ok(values_read) => values_read,
- Err(e) => return Err(e),
- };
- if values_read > 0 {
- let level_value_bytes =
- &self.level_value_buffer.to_byte_slice()[..values_read *
value_size];
- read_bytes(level_value_bytes, values_read);
- total_values_read += values_read;
- } else {
- break;
- }
- }
- Ok(total_values_read)
- }
-}
-
-pub(crate) struct FixedLenPlainDecoder {
- data: ByteBufferPtr,
- num_values: usize,
- value_bit_len: usize,
-}
-
-impl FixedLenPlainDecoder {
- pub(crate) fn new(
- data: ByteBufferPtr,
- num_values: usize,
- value_bit_len: usize,
- ) -> Self {
- Self {
- data,
- num_values,
- value_bit_len,
- }
- }
-}
-
-impl DictionaryValueDecoder for FixedLenPlainDecoder {
- fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
- let value_byte_len = self.value_bit_len / 8;
- let available_values = self.data.len() / value_byte_len;
- let values_to_read = std::cmp::min(available_values, self.num_values);
- let byte_len = values_to_read * value_byte_len;
- let values = vec![self.data.range(0, byte_len)];
- self.num_values = 0;
- self.data.set_range(self.data.start(), 0);
- Ok(values)
- }
-}
-
-impl ValueDecoder for FixedLenPlainDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- let available_values = self.data.len() * 8 / self.value_bit_len;
- if available_values > 0 {
- let values_to_read = std::cmp::min(available_values, num_values);
- let byte_len = values_to_read * self.value_bit_len / 8;
- read_bytes(&self.data.data()[..byte_len], values_to_read);
- self.data
- .set_range(self.data.start() + byte_len, self.data.len() -
byte_len);
- Ok(values_to_read)
- } else {
- Ok(0)
- }
- }
-}
-
-pub(crate) struct VariableLenPlainDecoder {
- data: ByteBufferPtr,
- num_values: usize,
- position: usize,
-}
-
-impl VariableLenPlainDecoder {
- pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
- Self {
- data,
- num_values,
- position: 0,
- }
- }
-}
-
-impl DictionaryValueDecoder for VariableLenPlainDecoder {
- fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
- const LEN_SIZE: usize = std::mem::size_of::<u32>();
- let data = self.data.data();
- let data_len = data.len();
- let values_to_read = self.num_values;
- let mut values = Vec::with_capacity(values_to_read);
- let mut values_read = 0;
- while self.position < data_len && values_read < values_to_read {
- let len: usize =
- read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
- self.position += LEN_SIZE;
- if data_len < self.position + len {
- return Err(eof_err!("Not enough bytes to decode"));
- }
- values.push(self.data.range(self.position, len));
- self.position += len;
- values_read += 1;
- }
- self.num_values -= values_read;
- Ok(values)
- }
-}
-
-impl ValueDecoder for VariableLenPlainDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- const LEN_SIZE: usize = std::mem::size_of::<u32>();
- let data = self.data.data();
- let data_len = data.len();
- let values_to_read = std::cmp::min(self.num_values, num_values);
- let mut values_read = 0;
- while self.position < data_len && values_read < values_to_read {
- let len: usize =
- read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
- self.position += LEN_SIZE;
- if data_len < self.position + len {
- return Err(eof_err!("Not enough bytes to decode"));
- }
- read_bytes(&data[self.position..][..len], 1);
- self.position += len;
- values_read += 1;
- }
- self.num_values -= values_read;
- Ok(values_read)
- }
-}
-
-pub(crate) struct FixedLenDictionaryDecoder {
- context_ref: Rc<RefCell<ColumnChunkContext>>,
- key_data_bufer: ByteBufferPtr,
- num_values: usize,
- rle_decoder: RleDecoder,
- value_byte_len: usize,
- keys_buffer: Vec<i32>,
-}
-
-impl FixedLenDictionaryDecoder {
- pub(crate) fn new(
- column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
- key_data_bufer: ByteBufferPtr,
- num_values: usize,
- value_bit_len: usize,
- ) -> Self {
- assert!(
- value_bit_len % 8 == 0,
- "value_bit_size must be a multiple of 8"
- );
- // First byte in `data` is bit width
- let bit_width = key_data_bufer.data()[0];
- let mut rle_decoder = RleDecoder::new(bit_width);
- rle_decoder.set_data(key_data_bufer.start_from(1));
-
- Self {
- context_ref: column_chunk_context,
- key_data_bufer,
- num_values,
- rle_decoder,
- value_byte_len: value_bit_len / 8,
- keys_buffer: vec![0; 2048],
- }
- }
-}
-
-impl ValueDecoder for FixedLenDictionaryDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- if self.num_values == 0 {
- return Ok(0);
- }
- let context = self.context_ref.borrow();
- let values = context.dictionary_values.as_ref().unwrap();
- let input_value_bytes = values[0].data();
- // read no more than available values or requested values
- let values_to_read = std::cmp::min(self.num_values, num_values);
- let mut values_read = 0;
- while values_read < values_to_read {
- // read values in batches of up to self.keys_buffer.len()
- let keys_to_read =
- std::cmp::min(values_to_read - values_read,
self.keys_buffer.len());
- let keys_read = match self
- .rle_decoder
- .get_batch(&mut self.keys_buffer[..keys_to_read])
- {
- Ok(keys_read) => keys_read,
- Err(e) => return Err(e),
- };
- if keys_read == 0 {
- self.num_values = 0;
- return Ok(values_read);
- }
- for i in 0..keys_read {
- let key = self.keys_buffer[i] as usize;
- read_bytes(
- &input_value_bytes[key * self.value_byte_len..]
- [..self.value_byte_len],
- 1,
- );
- }
- values_read += keys_read;
- }
- self.num_values -= values_read;
- Ok(values_read)
- }
-}
-
-pub(crate) struct VariableLenDictionaryDecoder {
- context_ref: Rc<RefCell<ColumnChunkContext>>,
- key_data_bufer: ByteBufferPtr,
- num_values: usize,
- rle_decoder: RleDecoder,
- keys_buffer: Vec<i32>,
-}
-
-impl VariableLenDictionaryDecoder {
- pub(crate) fn new(
- column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
- key_data_bufer: ByteBufferPtr,
- num_values: usize,
- ) -> Self {
- // First byte in `data` is bit width
- let bit_width = key_data_bufer.data()[0];
- let mut rle_decoder = RleDecoder::new(bit_width);
- rle_decoder.set_data(key_data_bufer.start_from(1));
-
- Self {
- context_ref: column_chunk_context,
- key_data_bufer,
- num_values,
- rle_decoder,
- keys_buffer: vec![0; 2048],
- }
- }
-}
-
-impl ValueDecoder for VariableLenDictionaryDecoder {
- fn read_value_bytes(
- &mut self,
- num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- if self.num_values == 0 {
- return Ok(0);
- }
- let context = self.context_ref.borrow();
- let values = context.dictionary_values.as_ref().unwrap();
- let values_to_read = std::cmp::min(self.num_values, num_values);
- let mut values_read = 0;
- while values_read < values_to_read {
- // read values in batches of up to self.keys_buffer.len()
- let keys_to_read =
- std::cmp::min(values_to_read - values_read,
self.keys_buffer.len());
- let keys_read = match self
- .rle_decoder
- .get_batch(&mut self.keys_buffer[..keys_to_read])
- {
- Ok(keys_read) => keys_read,
- Err(e) => return Err(e),
- };
- if keys_read == 0 {
- self.num_values = 0;
- return Ok(values_read);
- }
- for i in 0..keys_read {
- let key = self.keys_buffer[i] as usize;
- read_bytes(values[key].data(), 1);
- }
- values_read += keys_read;
- }
- self.num_values -= values_read;
- Ok(values_read)
- }
-}
-
-pub(crate) struct DeltaByteArrayValueDecoder {
- decoder: DeltaByteArrayDecoder<ByteArrayType>,
-}
-
-impl DeltaByteArrayValueDecoder {
- pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
- let mut decoder = DeltaByteArrayDecoder::new();
- decoder.set_data(data, num_values)?;
- Ok(Self { decoder })
- }
-}
-
-impl ValueDecoder for DeltaByteArrayValueDecoder {
- fn read_value_bytes(
- &mut self,
- mut num_values: usize,
- read_bytes: &mut dyn FnMut(&[u8], usize),
- ) -> Result<usize> {
- num_values = std::cmp::min(num_values, self.decoder.values_left());
- let mut values_read = 0;
- let mut buf = [ByteArray::new()];
- while values_read < num_values {
- let num_read = self.decoder.get(&mut buf)?;
- debug_assert_eq!(num_read, 1);
-
- read_bytes(buf[0].data(), 1);
-
- values_read += 1;
- }
- Ok(values_read)
- }
-}
-
-use arrow::datatypes::ArrowPrimitiveType;
-
-pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
- _phantom_data: PhantomData<T>,
-}
-
-impl<T: ArrowPrimitiveType> PrimitiveArrayConverter<T> {
- pub fn new() -> Self {
- Self {
- _phantom_data: PhantomData,
- }
- }
-}
-
-impl<T: ArrowPrimitiveType> ArrayConverter for PrimitiveArrayConverter<T> {
- fn convert_value_bytes(
- &self,
- value_decoder: &mut impl ValueDecoder,
- num_values: usize,
- ) -> Result<arrow::array::ArrayData> {
- let value_size = T::get_byte_width();
- let values_byte_capacity = num_values * value_size;
- let mut values_buffer = MutableBuffer::new(values_byte_capacity);
-
- value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| {
- values_buffer.extend_from_slice(value_bytes);
- })?;
-
- // calculate actual data_len, which may be different from the
iterator's upper bound
- let value_count = values_buffer.len() / value_size;
- let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE)
- .len(value_count)
- .add_buffer(values_buffer.into());
- let array_data = unsafe { array_data.build_unchecked() };
- Ok(array_data)
- }
-}
-
-pub struct StringArrayConverter {}
-
-impl StringArrayConverter {
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl ArrayConverter for StringArrayConverter {
- fn convert_value_bytes(
- &self,
- value_decoder: &mut impl ValueDecoder,
- num_values: usize,
- ) -> Result<arrow::array::ArrayData> {
- use arrow::datatypes::ArrowNativeType;
- let offset_size = std::mem::size_of::<i32>();
- let mut offsets_buffer = MutableBuffer::new((num_values + 1) *
offset_size);
- // allocate initial capacity of 1 byte for each item
- let values_byte_capacity = num_values;
- let mut values_buffer = MutableBuffer::new(values_byte_capacity);
-
- let mut length_so_far = i32::default();
- offsets_buffer.push(length_so_far);
-
- value_decoder.read_value_bytes(num_values, &mut |value_bytes,
values_read| {
- debug_assert_eq!(
- values_read, 1,
- "offset length value buffers can only contain bytes for a
single value"
- );
- length_so_far +=
- <i32 as
ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
- // this should be safe because a ValueDecoder should not read more
than num_values
- unsafe {
- offsets_buffer.push_unchecked(length_so_far);
- }
- values_buffer.extend_from_slice(value_bytes);
- })?;
- // calculate actual data_len, which may be different from the
iterator's upper bound
- let data_len = (offsets_buffer.len() / offset_size) - 1;
- let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
- .len(data_len)
- .add_buffer(offsets_buffer.into())
- .add_buffer(values_buffer.into());
- let array_data = unsafe { array_data.build_unchecked() };
- Ok(array_data)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::arrow::{ArrowReader, ParquetFileArrowReader};
- use crate::basic::ConvertedType;
- use crate::column::page::Page;
- use crate::column::writer::ColumnWriter;
- use crate::data_type::ByteArray;
- use crate::data_type::ByteArrayType;
- use crate::encodings::encoding::{DictEncoder, Encoder};
- use crate::file::properties::WriterProperties;
- use crate::file::reader::SerializedFileReader;
- use crate::file::serialized_reader::SliceableCursor;
- use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
- use crate::util::memory::MemTracker;
- use crate::schema::parser::parse_message_type;
- use crate::schema::types::SchemaDescriptor;
- use crate::util::test_common::page_util::{
- DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
- };
- use crate::{
- basic::Encoding, column::page::PageReader,
schema::types::SchemaDescPtr,
- };
- use arrow::array::{PrimitiveArray, StringArray};
- use arrow::datatypes::Int32Type as ArrowInt32;
- use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
- use std::io::{Cursor, Seek, SeekFrom, Write};
- use std::sync::{Arc, Mutex};
-
- /// Iterator for testing reading empty columns
- struct EmptyPageIterator {
- schema: SchemaDescPtr,
- }
-
- impl EmptyPageIterator {
- fn new(schema: SchemaDescPtr) -> Self {
- EmptyPageIterator { schema }
- }
- }
-
- impl Iterator for EmptyPageIterator {
- type Item = Result<Box<dyn PageReader>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- None
- }
- }
-
- impl PageIterator for EmptyPageIterator {
- fn schema(&mut self) -> Result<SchemaDescPtr> {
- Ok(self.schema.clone())
- }
-
- fn column_schema(&mut self) -> Result<ColumnDescPtr> {
- Ok(self.schema.column(0))
- }
- }
-
- #[test]
- fn test_array_reader_empty_pages() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REQUIRED INT32 leaf;
- }
- ";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
-
- let column_desc = schema.column(0);
- let page_iterator = EmptyPageIterator::new(schema);
-
- let converter =
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
- let mut array_reader =
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None)
- .unwrap();
-
- // expect no values to be read
- let array = array_reader.next_batch(50).unwrap();
- assert!(array.is_empty());
- }
-
- fn make_column_chunks<T: crate::data_type::DataType>(
- column_desc: ColumnDescPtr,
- encoding: Encoding,
- num_levels: usize,
- min_value: T::T,
- max_value: T::T,
- def_levels: &mut Vec<i16>,
- rep_levels: &mut Vec<i16>,
- values: &mut Vec<T::T>,
- page_lists: &mut Vec<Vec<Page>>,
- use_v2: bool,
- num_chunks: usize,
- ) where
- T::T: PartialOrd + SampleUniform + Copy,
- {
- for _i in 0..num_chunks {
- let mut pages = VecDeque::new();
- let mut data = Vec::new();
- let mut page_def_levels = Vec::new();
- let mut page_rep_levels = Vec::new();
-
- crate::util::test_common::make_pages::<T>(
- column_desc.clone(),
- encoding,
- 1,
- num_levels,
- min_value,
- max_value,
- &mut page_def_levels,
- &mut page_rep_levels,
- &mut data,
- &mut pages,
- use_v2,
- );
-
- def_levels.append(&mut page_def_levels);
- rep_levels.append(&mut page_rep_levels);
- values.append(&mut data);
- page_lists.push(Vec::from(pages));
- }
- }
-
- #[test]
- fn test_primitive_array_reader_data() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REQUIRED INT32 leaf;
- }
- ";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
-
- let column_desc = schema.column(0);
-
- // Construct page iterator
- {
- let mut data = Vec::new();
- let mut page_lists = Vec::new();
- make_column_chunks::<crate::data_type::Int32Type>(
- column_desc.clone(),
- Encoding::PLAIN,
- 100,
- 1,
- 200,
- &mut Vec::new(),
- &mut Vec::new(),
- &mut data,
- &mut page_lists,
- true,
- 2,
- );
- let page_iterator =
- InMemoryPageIterator::new(schema, column_desc.clone(),
page_lists);
-
- let converter =
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
- let mut array_reader =
- ArrowArrayReader::try_new(page_iterator, column_desc,
converter, None)
- .unwrap();
-
- // Read first 50 values, which are all from the first column chunk
- let array = array_reader.next_batch(50).unwrap();
- let array = array
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap();
-
- assert_eq!(
- &PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
- array
- );
-
- // Read next 100 values, the first 50 ones are from the first
column chunk,
- // and the last 50 ones are from the second column chunk
- let array = array_reader.next_batch(100).unwrap();
- let array = array
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap();
-
- assert_eq!(
- &PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
- array
- );
-
- // Try to read 100 values, however there are only 50 values
- let array = array_reader.next_batch(100).unwrap();
- let array = array
- .as_any()
- .downcast_ref::<PrimitiveArray<ArrowInt32>>()
- .unwrap();
-
- assert_eq!(
- &PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
- array
- );
- }
- }
-
- #[test]
- fn test_primitive_array_reader_def_and_rep_levels() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL INT32 leaf;
- }
- }
- ";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
-
- let column_desc = schema.column(0);
-
- // Construct page iterator
- {
- let mut def_levels = Vec::new();
- let mut rep_levels = Vec::new();
- let mut page_lists = Vec::new();
- make_column_chunks::<crate::data_type::Int32Type>(
- column_desc.clone(),
- Encoding::PLAIN,
- 100,
- 1,
- 200,
- &mut def_levels,
- &mut rep_levels,
- &mut Vec::new(),
- &mut page_lists,
- true,
- 2,
- );
-
- let page_iterator =
- InMemoryPageIterator::new(schema, column_desc.clone(),
page_lists);
-
- let converter =
PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
- let mut array_reader =
- ArrowArrayReader::try_new(page_iterator, column_desc,
converter, None)
- .unwrap();
-
- let mut accu_len: usize = 0;
-
- // Read first 50 values, which are all from the first column chunk
- let array = array_reader.next_batch(50).unwrap();
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- accu_len += array.len();
-
- // Read next 100 values, the first 50 ones are from the first
column chunk,
- // and the last 50 ones are from the second column chunk
- let array = array_reader.next_batch(100).unwrap();
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- accu_len += array.len();
-
- // Try to read 100 values, however there are only 50 values
- let array = array_reader.next_batch(100).unwrap();
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
-
- assert_eq!(accu_len + array.len(), 200);
- }
- }
-
- #[test]
- fn test_arrow_array_reader_string() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL BYTE_ARRAY leaf (UTF8);
- }
- }
- ";
- let num_pages = 2;
- let values_per_page = 100;
- let str_base = "Hello World";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
- let column_desc = schema.column(0);
- let max_def_level = column_desc.max_def_level();
- let max_rep_level = column_desc.max_rep_level();
-
- assert_eq!(max_def_level, 2);
- assert_eq!(max_rep_level, 1);
-
- let mut rng = thread_rng();
- let mut pages: Vec<Vec<Page>> = Vec::new();
-
- let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
- for i in 0..num_pages {
- let mut values = Vec::with_capacity(values_per_page);
-
- for _ in 0..values_per_page {
- let def_level = rng.gen_range(0..max_def_level + 1);
- let rep_level = rng.gen_range(0..max_rep_level + 1);
- if def_level == max_def_level {
- let len = rng.gen_range(1..str_base.len());
- let slice = &str_base[..len];
- values.push(ByteArray::from(slice));
- all_values.push(Some(slice.to_string()));
- } else {
- all_values.push(None)
- }
- rep_levels.push(rep_level);
- def_levels.push(def_level)
- }
-
- let range = i * values_per_page..(i + 1) * values_per_page;
- let mut pb =
- DataPageBuilderImpl::new(column_desc.clone(), values.len() as
u32, true);
-
- pb.add_rep_levels(max_rep_level,
&rep_levels.as_slice()[range.clone()]);
- pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
- pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
-
- let data_page = pb.consume();
- pages.push(vec![data_page]);
- }
-
- let page_iterator = InMemoryPageIterator::new(schema,
column_desc.clone(), pages);
- let converter = StringArrayConverter::new();
- let mut array_reader =
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None)
- .unwrap();
-
- let mut accu_len: usize = 0;
-
- let array = array_reader.next_batch(values_per_page / 2).unwrap();
- assert_eq!(array.len(), values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- accu_len += array.len();
-
- // Read next values_per_page values, the first values_per_page/2 ones
are from the first column chunk,
- // and the last values_per_page/2 ones are from the second column chunk
- let array = array_reader.next_batch(values_per_page).unwrap();
- assert_eq!(array.len(), values_per_page);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
- for i in 0..array.len() {
- if array.is_valid(i) {
- assert_eq!(
- all_values[i + accu_len].as_ref().unwrap().as_str(),
- strings.value(i)
- )
- } else {
- assert_eq!(all_values[i + accu_len], None)
- }
- }
- accu_len += array.len();
-
- // Try to read values_per_page values, however there are only
values_per_page/2 values
- let array = array_reader.next_batch(values_per_page).unwrap();
- assert_eq!(array.len(), values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- }
-
- #[test]
- fn test_arrow_array_reader_dict_enc_string() {
- // Construct column schema
- let message_type = "
- message test_schema {
- REPEATED Group test_mid {
- OPTIONAL BYTE_ARRAY leaf (UTF8);
- }
- }
- ";
- let num_pages = 2;
- let values_per_page = 100;
- let str_base = "Hello World";
-
- let schema = parse_message_type(message_type)
- .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
- .unwrap();
- let column_desc = schema.column(0);
- let max_def_level = column_desc.max_def_level();
- let max_rep_level = column_desc.max_rep_level();
-
- assert_eq!(max_def_level, 2);
- assert_eq!(max_rep_level, 1);
-
- let mut rng = thread_rng();
- let mut pages: Vec<Vec<Page>> = Vec::new();
-
- let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
- let mut all_values = Vec::with_capacity(num_pages * values_per_page);
-
- for i in 0..num_pages {
- let mem_tracker = Arc::new(MemTracker::new());
- let mut dict_encoder =
- DictEncoder::<ByteArrayType>::new(column_desc.clone(),
mem_tracker);
- // add data page
- let mut values = Vec::with_capacity(values_per_page);
-
- for _ in 0..values_per_page {
- let def_level = rng.gen_range(0..max_def_level + 1);
- let rep_level = rng.gen_range(0..max_rep_level + 1);
- if def_level == max_def_level {
- let len = rng.gen_range(1..str_base.len());
- let slice = &str_base[..len];
- values.push(ByteArray::from(slice));
- all_values.push(Some(slice.to_string()));
- } else {
- all_values.push(None)
- }
- rep_levels.push(rep_level);
- def_levels.push(def_level)
- }
-
- let range = i * values_per_page..(i + 1) * values_per_page;
- let mut pb =
- DataPageBuilderImpl::new(column_desc.clone(), values.len() as
u32, true);
- pb.add_rep_levels(max_rep_level,
&rep_levels.as_slice()[range.clone()]);
- pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
- let _ = dict_encoder.put(&values);
- let indices = dict_encoder
- .write_indices()
- .expect("write_indices() should be OK");
- pb.add_indices(indices);
- let data_page = pb.consume();
- // for each page log num_values vs actual values in page
- // println!("page num_values: {}, values.len(): {}",
data_page.num_values(), values.len());
- // add dictionary page
- let dict = dict_encoder
- .write_dict()
- .expect("write_dict() should be OK");
- let dict_page = Page::DictionaryPage {
- buf: dict,
- num_values: dict_encoder.num_entries() as u32,
- encoding: Encoding::RLE_DICTIONARY,
- is_sorted: false,
- };
- pages.push(vec![dict_page, data_page]);
- }
-
- let page_iterator = InMemoryPageIterator::new(schema,
column_desc.clone(), pages);
- let converter = StringArrayConverter::new();
- let mut array_reader =
- ArrowArrayReader::try_new(page_iterator, column_desc, converter,
None)
- .unwrap();
-
- let mut accu_len: usize = 0;
-
- // println!("---------- reading a batch of {} values ----------",
values_per_page / 2);
- let array = array_reader.next_batch(values_per_page / 2).unwrap();
- assert_eq!(array.len(), values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- accu_len += array.len();
-
- // Read next values_per_page values, the first values_per_page/2 ones
are from the first column chunk,
- // and the last values_per_page/2 ones are from the second column chunk
- // println!("---------- reading a batch of {} values ----------",
values_per_page);
- let array = array_reader.next_batch(values_per_page).unwrap();
- assert_eq!(array.len(), values_per_page);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
- for i in 0..array.len() {
- if array.is_valid(i) {
- assert_eq!(
- all_values[i + accu_len].as_ref().unwrap().as_str(),
- strings.value(i)
- )
- } else {
- assert_eq!(all_values[i + accu_len], None)
- }
- }
- accu_len += array.len();
-
- // Try to read values_per_page values, however there are only
values_per_page/2 values
- // println!("---------- reading a batch of {} values ----------",
values_per_page);
- let array = array_reader.next_batch(values_per_page).unwrap();
- assert_eq!(array.len(), values_per_page / 2);
- assert_eq!(
- Some(&def_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_def_levels()
- );
- assert_eq!(
- Some(&rep_levels[accu_len..(accu_len + array.len())]),
- array_reader.get_rep_levels()
- );
- }
-
- /// Allows to write parquet into memory. Intended only for use in tests.
- #[derive(Clone)]
- struct VecWriter {
- data: Arc<Mutex<Cursor<Vec<u8>>>>,
- }
-
- impl VecWriter {
- pub fn new() -> VecWriter {
- VecWriter {
- data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
- }
- }
-
- pub fn consume(self) -> Vec<u8> {
- Arc::try_unwrap(self.data)
- .unwrap()
- .into_inner()
- .unwrap()
- .into_inner()
- }
- }
-
- impl TryClone for VecWriter {
- fn try_clone(&self) -> std::io::Result<Self> {
- Ok(self.clone())
- }
- }
-
- impl Seek for VecWriter {
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
- self.data.lock().unwrap().seek(pos)
- }
-
- fn stream_position(&mut self) -> std::io::Result<u64> {
- self.data.lock().unwrap().stream_position()
- }
- }
-
- impl Write for VecWriter {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.data.lock().unwrap().write(buf)
- }
-
- fn flush(&mut self) -> std::io::Result<()> {
- self.data.lock().unwrap().flush()
- }
- }
-
- #[test]
- fn test_string_delta_byte_array() {
- use crate::basic;
- use crate::schema::types::Type;
-
- let data = VecWriter::new();
- let schema = Arc::new(
- Type::group_type_builder("string_test")
- .with_fields(&mut vec![Arc::new(
- Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
- .with_converted_type(ConvertedType::UTF8)
- .build()
- .unwrap(),
- )])
- .build()
- .unwrap(),
- );
- // Disable dictionary and use the fallback encoding.
- let p = Arc::new(
- WriterProperties::builder()
- .set_dictionary_enabled(false)
- .set_encoding(Encoding::DELTA_BYTE_ARRAY)
- .build(),
- );
- // Write a few strings.
- let mut w = SerializedFileWriter::new(data.clone(), schema,
p).unwrap();
- let mut rg = w.next_row_group().unwrap();
- let mut c = rg.next_column().unwrap().unwrap();
- match &mut c {
- ColumnWriter::ByteArrayColumnWriter(c) => {
- c.write_batch(
- &[ByteArray::from("foo"), ByteArray::from("bar")],
- Some(&[0, 1, 0, 0, 1, 0]),
- Some(&[0, 0, 0, 0, 0, 0]),
- )
- .unwrap();
- }
- _ => panic!("unexpected column"),
- };
- rg.close_column(c).unwrap();
- w.close_row_group(rg).unwrap();
- w.close().unwrap();
- std::mem::drop(w);
-
- // Check we can read them back.
- let r =
SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
- .unwrap();
- let mut r = ParquetFileArrowReader::new(Arc::new(r));
-
- let batch = r
- .get_record_reader_by_columns([0], 1024)
- .unwrap()
- .next()
- .unwrap()
- .unwrap();
- assert_eq!(batch.columns().len(), 1);
-
- let strings = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
- assert_eq!(
- strings.into_iter().collect::<Vec<_>>(),
- vec![None, Some("foo"), None, None, Some("bar"), None]
- );
- }
-}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index fbbf655..a2f885a 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -119,7 +119,6 @@
//! ```
experimental_mod!(array_reader);
-experimental_mod!(arrow_array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
mod bit_util;