This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2a4f269e94 Support Parquet `BYTE_STREAM_SPLIT` for INT32, INT64, and
FIXED_LEN_BYTE_ARRAY primitive types (#6159)
2a4f269e94 is described below
commit 2a4f269e94e155131d6ae0bc8af48101012e3d45
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Aug 6 05:46:56 2024 -0700
Support Parquet `BYTE_STREAM_SPLIT` for INT32, INT64, and
FIXED_LEN_BYTE_ARRAY primitive types (#6159)
* add todos to help trace flow
* add support for byte_stream_split encoding for INT32 and INT64 data
* byte_stream_split encoding for fixed_len_byte_array
* revert changes to Decoder and add VariableWidthByteStreamSplitDecoder
* remove set_type_width as it is now unused
* begin implementing roundtrip test
* move test
* clean up some documentation
* add test of byte_stream_split with flba
* add check for and test of mismatched sizes
* remove type_length from Encoder and add VaribleWidthByteStreamSplitEncoder
* fix clippy error
* change type of argument to new()
* formatting
* add another test
* add variable to split/join streams for FLBA
* more informative error message
* avoid buffer copies in decoder per suggestion from review
* add roundtrip test
* optimized version...but clippy complains
* clippy was right...replace loop with copy_from_slice
* fix test
* optimize split_streams_variable for long type widths
---
parquet/benches/encoding.rs | 24 ++--
.../src/arrow/array_reader/fixed_len_byte_array.rs | 45 +++++++
parquet/src/arrow/array_reader/test_util.rs | 3 +-
parquet/src/arrow/arrow_reader/mod.rs | 82 ++++++++++++
parquet/src/arrow/arrow_writer/mod.rs | 6 +-
parquet/src/column/writer/encoder.rs | 1 +
parquet/src/encodings/decoding.rs | 120 ++++++++++++++---
.../decoding/byte_stream_split_decoder.rs | 146 ++++++++++++++++++++-
.../encoding/byte_stream_split_encoder.rs | 145 +++++++++++++++++++-
parquet/src/encodings/encoding/mod.rs | 69 ++++++----
parquet/src/file/serialized_reader.rs | 38 ++++++
parquet/src/file/writer.rs | 117 +++++++++++++++++
parquet/src/util/test_common/page_util.rs | 18 ++-
13 files changed, 743 insertions(+), 71 deletions(-)
diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs
index bdbca3567a..80befe8dad 100644
--- a/parquet/benches/encoding.rs
+++ b/parquet/benches/encoding.rs
@@ -30,30 +30,30 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values:
&[T::T], encoding: Encodi
std::any::type_name::<T::T>(),
encoding
);
+ let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
+ Arc::new(
+ Type::primitive_type_builder("", T::get_physical_type())
+ .build()
+ .unwrap(),
+ ),
+ 0,
+ 0,
+ ColumnPath::new(vec![]),
+ ));
c.bench_function(&format!("encoding: {}", name), |b| {
b.iter(|| {
- let mut encoder = get_encoder::<T>(encoding).unwrap();
+ let mut encoder = get_encoder::<T>(encoding,
&column_desc_ptr).unwrap();
encoder.put(values).unwrap();
encoder.flush_buffer().unwrap();
});
});
- let mut encoder = get_encoder::<T>(encoding).unwrap();
+ let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
encoder.put(values).unwrap();
let encoded = encoder.flush_buffer().unwrap();
println!("{} encoded as {} bytes", name, encoded.len(),);
let mut buffer = vec![T::T::default(); values.len()];
- let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
- Arc::new(
- Type::primitive_type_builder("", T::get_physical_type())
- .build()
- .unwrap(),
- ),
- 0,
- 0,
- ColumnPath::new(vec![]),
- ));
c.bench_function(&format!("decoding: {}", name), |b| {
b.iter(|| {
let mut decoder: Box<dyn Decoder<T>> =
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index a9159bb471..8098f3240a 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -341,6 +341,10 @@ impl ColumnValueDecoder for ValueDecoder {
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
decoder: DeltaByteArrayDecoder::new(data)?,
},
+ Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
+ buf: data,
+ offset: 0,
+ },
_ => {
return Err(general_err!(
"unsupported encoding for fixed length byte array: {}",
@@ -400,6 +404,20 @@ impl ColumnValueDecoder for ValueDecoder {
Ok(())
})
}
+ Decoder::ByteStreamSplit { buf, offset } => {
+ // we have n=`byte_length` streams of length
`buf.len/byte_length`
+ // to read value i, we need the i'th byte from each of the
streams
+ // so `offset` should be the value offset, not the byte offset
+ let total_values = buf.len() / self.byte_length;
+ let to_read = num_values.min(total_values - *offset);
+ out.buffer.reserve(to_read * self.byte_length);
+
+ // now read the n streams and reassemble values into the
output buffer
+ read_byte_stream_split(&mut out.buffer, buf, *offset, to_read,
self.byte_length);
+
+ *offset += to_read;
+ Ok(to_read)
+ }
}
}
@@ -412,6 +430,32 @@ impl ColumnValueDecoder for ValueDecoder {
}
Decoder::Dict { decoder } => decoder.skip(num_values),
Decoder::Delta { decoder } => decoder.skip(num_values),
+ Decoder::ByteStreamSplit { offset, buf } => {
+ let total_values = buf.len() / self.byte_length;
+ let to_read = num_values.min(total_values - *offset);
+ *offset += to_read;
+ Ok(to_read)
+ }
+ }
+ }
+}
+
+// `src` is an array laid out like a NxM matrix where N == `data_width` and
+// M == total_values_in_src. Each "row" of the matrix is a stream of bytes,
with stream `i`
+// containing the `ith` byte for each value. Each "column" is a single value.
+// This will reassemble `num_values` values by reading columns of the matrix
starting at
+// `offset`. Values will be appended to `dst`.
+fn read_byte_stream_split(
+ dst: &mut Vec<u8>,
+ src: &mut Bytes,
+ offset: usize,
+ num_values: usize,
+ data_width: usize,
+) {
+ let stride = src.len() / data_width;
+ for i in 0..num_values {
+ for j in 0..data_width {
+ dst.push(src[offset + j * stride + i]);
}
}
}
@@ -420,6 +464,7 @@ enum Decoder {
Plain { buf: Bytes, offset: usize },
Dict { decoder: DictIndexDecoder },
Delta { decoder: DeltaByteArrayDecoder },
+ ByteStreamSplit { buf: Bytes, offset: usize },
}
#[cfg(test)]
diff --git a/parquet/src/arrow/array_reader/test_util.rs
b/parquet/src/arrow/array_reader/test_util.rs
index 0503292013..a7ff8d6e41 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -46,7 +46,8 @@ pub fn utf8_column() -> ColumnDescPtr {
/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes {
- let mut encoder = get_encoder::<ByteArrayType>(encoding).unwrap();
+ let desc = utf8_column();
+ let mut encoder = get_encoder::<ByteArrayType>(encoding, &desc).unwrap();
encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index c696763d63..a0302fa86b 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -1059,6 +1059,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
+ Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
@@ -1070,6 +1071,7 @@ mod tests {
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
+ Encoding::BYTE_STREAM_SPLIT,
],
);
run_single_column_reader_tests::<FloatType, _, FloatType>(
@@ -1641,6 +1643,86 @@ mod tests {
assert_eq!(row_count, 300);
}
+ #[test]
+ fn test_read_extended_byte_stream_split() {
+ let path = format!(
+ "{}/byte_stream_split_extended.gzip.parquet",
+ arrow::util::test_util::parquet_test_data(),
+ );
+ let file = File::open(path).unwrap();
+ let record_reader = ParquetRecordBatchReader::try_new(file,
128).unwrap();
+
+ let mut row_count = 0;
+ for batch in record_reader {
+ let batch = batch.unwrap();
+ row_count += batch.num_rows();
+
+ // 0,1 are f16
+ let f16_col = batch.column(0).as_primitive::<Float16Type>();
+ let f16_bss = batch.column(1).as_primitive::<Float16Type>();
+ assert_eq!(f16_col.len(), f16_bss.len());
+ f16_col
+ .iter()
+ .zip(f16_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 2,3 are f32
+ let f32_col = batch.column(2).as_primitive::<Float32Type>();
+ let f32_bss = batch.column(3).as_primitive::<Float32Type>();
+ assert_eq!(f32_col.len(), f32_bss.len());
+ f32_col
+ .iter()
+ .zip(f32_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 4,5 are f64
+ let f64_col = batch.column(4).as_primitive::<Float64Type>();
+ let f64_bss = batch.column(5).as_primitive::<Float64Type>();
+ assert_eq!(f64_col.len(), f64_bss.len());
+ f64_col
+ .iter()
+ .zip(f64_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 6,7 are i32
+ let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
+ let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
+ assert_eq!(i32_col.len(), i32_bss.len());
+ i32_col
+ .iter()
+ .zip(i32_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 8,9 are i64
+ let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
+ let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
+ assert_eq!(i64_col.len(), i64_bss.len());
+ i64_col
+ .iter()
+ .zip(i64_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 10,11 are FLBA(5)
+ let flba_col = batch.column(10).as_fixed_size_binary();
+ let flba_bss = batch.column(11).as_fixed_size_binary();
+ assert_eq!(flba_col.len(), flba_bss.len());
+ flba_col
+ .iter()
+ .zip(flba_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+ // 12,13 are FLBA(4) (decimal(7,3))
+ let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
+ let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
+ assert_eq!(dec_col.len(), dec_bss.len());
+ dec_col
+ .iter()
+ .zip(dec_bss.iter())
+ .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+ }
+ assert_eq!(row_count, 200);
+ }
+
#[test]
fn test_read_incorrect_map_schema_file() {
let testdata = arrow::util::test_util::parquet_test_data();
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index db5d4502cf..0c07f541bd 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1791,7 +1791,11 @@ mod tests {
| DataType::UInt64
| DataType::UInt32
| DataType::UInt16
- | DataType::UInt8 => vec![Encoding::PLAIN,
Encoding::DELTA_BINARY_PACKED],
+ | DataType::UInt8 => vec![
+ Encoding::PLAIN,
+ Encoding::DELTA_BINARY_PACKED,
+ Encoding::BYTE_STREAM_SPLIT,
+ ],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
}
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
index 9d01c09040..7371c72a58 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -191,6 +191,7 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(),
props)),
+ descr,
)?;
let statistics_enabled = props.statistics_enabled(descr.path());
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index b3dd135237..16467b32dc 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -27,6 +27,9 @@ use super::rle::RleDecoder;
use crate::basic::*;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
+use crate::encodings::decoding::byte_stream_split_decoder::{
+ ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
+};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{self, BitReader};
@@ -87,6 +90,7 @@ pub(crate) mod private {
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
+ Encoding::BYTE_STREAM_SPLIT =>
Ok(Box::new(ByteStreamSplitDecoder::new())),
Encoding::DELTA_BINARY_PACKED =>
Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
@@ -99,6 +103,7 @@ pub(crate) mod private {
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
+ Encoding::BYTE_STREAM_SPLIT =>
Ok(Box::new(ByteStreamSplitDecoder::new())),
Encoding::DELTA_BINARY_PACKED =>
Ok(Box::new(DeltaBitPackDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
@@ -111,9 +116,7 @@ pub(crate) mod private {
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
- Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
- byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
- )),
+ Encoding::BYTE_STREAM_SPLIT =>
Ok(Box::new(ByteStreamSplitDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
@@ -124,9 +127,7 @@ pub(crate) mod private {
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
- Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
- byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
- )),
+ Encoding::BYTE_STREAM_SPLIT =>
Ok(Box::new(ByteStreamSplitDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
}
@@ -153,6 +154,9 @@ pub(crate) mod private {
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
+ Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
+
VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
+ )),
Encoding::DELTA_BYTE_ARRAY =>
Ok(Box::new(DeltaByteArrayDecoder::new())),
_ => get_decoder_default(descr, encoding),
}
@@ -1785,7 +1789,7 @@ mod tests {
],
vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
];
- test_byte_stream_split_decode::<FloatType>(data);
+ test_byte_stream_split_decode::<FloatType>(data, -1);
}
#[test]
@@ -1794,7 +1798,76 @@ mod tests {
f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
]];
- test_byte_stream_split_decode::<DoubleType>(data);
+ test_byte_stream_split_decode::<DoubleType>(data, -1);
+ }
+
+ #[test]
+ fn test_byte_stream_split_multiple_i32() {
+ let data = vec![
+ vec![
+ i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
+ i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
+ ],
+ vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
+ ];
+ test_byte_stream_split_decode::<Int32Type>(data, -1);
+ }
+
+ #[test]
+ fn test_byte_stream_split_i64() {
+ let data = vec![vec![
+ i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
+ i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
+ ]];
+ test_byte_stream_split_decode::<Int64Type>(data, -1);
+ }
+
+ fn test_byte_stream_split_flba(type_width: usize) {
+ let data = vec![
+ vec![
+ FixedLenByteArrayType::gen(type_width as i32),
+ FixedLenByteArrayType::gen(type_width as i32),
+ ],
+ vec![FixedLenByteArrayType::gen(type_width as i32)],
+ ];
+ test_byte_stream_split_decode::<FixedLenByteArrayType>(data,
type_width as i32);
+ }
+
+ #[test]
+ fn test_byte_stream_split_flba5() {
+ test_byte_stream_split_flba(5);
+ }
+
+ #[test]
+ fn test_byte_stream_split_flba16() {
+ test_byte_stream_split_flba(16);
+ }
+
+ #[test]
+ fn test_byte_stream_split_flba19() {
+ test_byte_stream_split_flba(19);
+ }
+
+ #[test]
+ #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
+ fn test_byte_stream_split_flba_mismatch() {
+ let data = vec![
+ vec![
+ FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
+ FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
+ ],
+ vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
+ ];
+ test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
+ }
+
+ #[test]
+ #[should_panic(expected = "Input data length is not a multiple of type
width 4")]
+ fn test_byte_stream_split_flba_bad_input() {
+ let mut decoder =
VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
+ decoder
+ .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
+ .unwrap();
}
#[test]
@@ -1808,33 +1881,42 @@ mod tests {
);
}
+ #[test]
+ fn test_skip_byte_stream_split_ints() {
+ let block_data = vec![3, 4, 1, 5];
+ test_skip::<Int32Type>(block_data.clone(),
Encoding::BYTE_STREAM_SPLIT, 2);
+ test_skip::<Int64Type>(
+ block_data.into_iter().map(|x| x as i64).collect(),
+ Encoding::BYTE_STREAM_SPLIT,
+ 100,
+ );
+ }
+
fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
- test_encode_decode::<T>(data, Encoding::RLE);
+ test_encode_decode::<T>(data, Encoding::RLE, -1);
}
fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
- test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED);
+ test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
}
- fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>) {
- test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT);
+ fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>,
type_width: i32) {
+ test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
}
fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
- test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY);
+ test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY,
-1);
}
// Input data represents vector of data slices to write (test multiple
`put()` calls)
// For example,
// vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
// vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
- fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding:
Encoding) {
- // Type length should not really matter for encode/decode test,
- // otherwise change it based on type
- let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
+ fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding:
Encoding, type_width: i32) {
+ let col_descr = create_test_col_desc_ptr(type_width,
T::get_physical_type());
// Encode data
- let mut encoder = get_encoder::<T>(encoding).expect("get encoder");
+ let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get
encoder");
for v in &data[..] {
encoder.put(&v[..]).expect("ok to encode");
@@ -1867,7 +1949,7 @@ mod tests {
let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
// Encode data
- let mut encoder = get_encoder::<T>(encoding).expect("get encoder");
+ let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get
encoder");
encoder.put(&data).expect("ok to encode");
diff --git a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
index 98841d21ec..9b2f43ace8 100644
--- a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
+++ b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
@@ -19,8 +19,9 @@ use std::marker::PhantomData;
use bytes::Bytes;
-use crate::basic::Encoding;
-use crate::data_type::{DataType, SliceAsBytes};
+use crate::basic::{Encoding, Type};
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::{DataType, FixedLenByteArray, SliceAsBytes};
use crate::errors::{ParquetError, Result};
use super::Decoder;
@@ -62,6 +63,22 @@ fn join_streams_const<const TYPE_SIZE: usize>(
}
}
+// Like the above, but type_size is not known at compile time.
+fn join_streams_variable(
+ src: &[u8],
+ dst: &mut [u8],
+ stride: usize,
+ type_size: usize,
+ values_decoded: usize,
+) {
+ let sub_src = &src[values_decoded..];
+ for i in 0..dst.len() / type_size {
+ for j in 0..type_size {
+ dst[i * type_size + j] = sub_src[i + j * stride];
+ }
+ }
+}
+
impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
self.encoded_bytes = data;
@@ -76,7 +93,7 @@ impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
let num_values = buffer.len().min(total_remaining_values);
let buffer = &mut buffer[..num_values];
- // SAFETY: f32 and f64 has no constraints on their internal
representation, so we can modify it as we want
+ // SAFETY: i/f32 and i/f64 has no constraints on their internal
representation, so we can modify it as we want
let raw_out_bytes = unsafe { <T as
DataType>::T::slice_as_bytes_mut(buffer) };
let type_size = T::get_type_size();
let stride = self.encoded_bytes.len() / type_size;
@@ -119,3 +136,126 @@ impl<T: DataType> Decoder<T> for
ByteStreamSplitDecoder<T> {
Ok(to_skip)
}
}
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+ _phantom: PhantomData<T>,
+ encoded_bytes: Bytes,
+ total_num_values: usize,
+ values_decoded: usize,
+ type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+ pub(crate) fn new(type_length: i32) -> Self {
+ Self {
+ _phantom: PhantomData,
+ encoded_bytes: Bytes::new(),
+ total_num_values: 0,
+ values_decoded: 0,
+ type_width: type_length as usize,
+ }
+ }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+ fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+ // Rough check that all data elements are the same length
+ if data.len() % self.type_width != 0 {
+ return Err(general_err!(
+ "Input data length is not a multiple of type width {}",
+ self.type_width
+ ));
+ }
+
+ match T::get_physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ self.encoded_bytes = data;
+ self.total_num_values = num_values;
+ self.values_decoded = 0;
+ Ok(())
+ }
+ _ => Err(general_err!(
+ "VariableWidthByteStreamSplitDecoder only supports
FixedLenByteArrayType"
+ )),
+ }
+ }
+
+ fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+ let total_remaining_values = self.values_left();
+ let num_values = buffer.len().min(total_remaining_values);
+ let buffer = &mut buffer[..num_values];
+ let type_size = self.type_width;
+
+ // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use
slice_as_bytes_mut. Instead we'll
+ // have to do some data copies.
+ let mut tmp_vec = vec![0_u8; num_values * type_size];
+ let raw_out_bytes = tmp_vec.as_mut_slice();
+
+ let stride = self.encoded_bytes.len() / type_size;
+ match type_size {
+ 2 => join_streams_const::<2>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 4 => join_streams_const::<4>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 8 => join_streams_const::<8>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 16 => join_streams_const::<16>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ _ => join_streams_variable(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ type_size,
+ self.values_decoded,
+ ),
+ }
+ self.values_decoded += num_values;
+
+ // create a buffer from the vec so far (and leave a new Vec in its
place)
+ let vec_with_data = std::mem::take(&mut tmp_vec);
+ // convert Vec to Bytes (which is a ref counted wrapper)
+ let bytes_with_data = Bytes::from(vec_with_data);
+ for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
+ // Get a view into the data, without also copying the bytes
+ let data = bytes_with_data.slice(i * type_size..(i + 1) *
type_size);
+ // TODO: perhaps add a `set_from_bytes` method to `DataType` to
avoid downcasting
+ let bi = bi
+ .as_mut_any()
+ .downcast_mut::<FixedLenByteArray>()
+ .expect("Decoding fixed length byte array");
+ bi.set_data(data);
+ }
+
+ Ok(num_values)
+ }
+
+ fn values_left(&self) -> usize {
+ self.total_num_values - self.values_decoded
+ }
+
+ fn encoding(&self) -> Encoding {
+ Encoding::BYTE_STREAM_SPLIT
+ }
+
+ fn skip(&mut self, num_values: usize) -> Result<usize> {
+ let to_skip = usize::min(self.values_left(), num_values);
+ self.values_decoded += to_skip;
+ Ok(to_skip)
+ }
+}
diff --git a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
index 3d5ba4cc2d..0726c6b1c9 100644
--- a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
+++ b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
@@ -16,14 +16,14 @@
// under the License.
use crate::basic::{Encoding, Type};
-use crate::data_type::DataType;
-use crate::data_type::SliceAsBytes;
+use crate::data_type::{AsBytes, DataType, SliceAsBytes};
use crate::errors::{ParquetError, Result};
use super::Encoder;
-use bytes::Bytes;
+use bytes::{BufMut, Bytes};
+use std::cmp;
use std::marker::PhantomData;
pub struct ByteStreamSplitEncoder<T> {
@@ -40,7 +40,7 @@ impl<T: DataType> ByteStreamSplitEncoder<T> {
}
}
-// Here we assume src contains the full data (which it must, since we're
+// Here we assume src contains the full data (which it must, since we
// can only know where to split the streams once all data is collected).
// We iterate over the input bytes and write them to their strided output
// byte locations.
@@ -53,13 +53,28 @@ fn split_streams_const<const TYPE_SIZE: usize>(src: &[u8],
dst: &mut [u8]) {
}
}
+// Like above, but type_size is not known at compile time.
+fn split_streams_variable(src: &[u8], dst: &mut [u8], type_size: usize) {
+ const BLOCK_SIZE: usize = 4;
+ let stride = src.len() / type_size;
+ for j in (0..type_size).step_by(BLOCK_SIZE) {
+ let jrange = cmp::min(BLOCK_SIZE, type_size - j);
+ for i in 0..stride {
+ for jj in 0..jrange {
+ dst[i + (j + jj) * stride] = src[i * type_size + j + jj];
+ }
+ }
+ }
+}
+
impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
self.buffer
.extend(<T as DataType>::T::slice_as_bytes(values));
+
ensure_phys_ty!(
- Type::FLOAT | Type::DOUBLE,
- "ByteStreamSplitEncoder only supports FloatType or DoubleType"
+ Type::FLOAT | Type::DOUBLE | Type::INT32 | Type::INT64,
+ "ByteStreamSplitEncoder does not support Int96, Boolean, or
ByteArray types"
);
Ok(())
@@ -96,3 +111,121 @@ impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T>
{
self.buffer.capacity() * std::mem::size_of::<u8>()
}
}
+
+pub struct VariableWidthByteStreamSplitEncoder<T> {
+ buffer: Vec<u8>,
+ type_width: usize,
+ _p: PhantomData<T>,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitEncoder<T> {
+ pub(crate) fn new(type_length: i32) -> Self {
+ Self {
+ buffer: Vec::new(),
+ type_width: type_length as usize,
+ _p: PhantomData,
+ }
+ }
+}
+
+fn put_fixed<T: DataType, const TYPE_SIZE: usize>(dst: &mut [u8], values:
&[T::T]) {
+ let mut idx = 0;
+ values.iter().for_each(|x| {
+ let bytes = x.as_bytes();
+ if bytes.len() != TYPE_SIZE {
+ panic!(
+ "Mismatched FixedLenByteArray sizes: {} != {}",
+ bytes.len(),
+ TYPE_SIZE
+ );
+ }
+ dst[idx..(TYPE_SIZE + idx)].copy_from_slice(&bytes[..TYPE_SIZE]);
+ idx += TYPE_SIZE;
+ });
+}
+
+fn put_variable<T: DataType>(dst: &mut [u8], values: &[T::T], type_width:
usize) {
+ let mut idx = 0;
+ values.iter().for_each(|x| {
+ let bytes = x.as_bytes();
+ if bytes.len() != type_width {
+ panic!(
+ "Mismatched FixedLenByteArray sizes: {} != {}",
+ bytes.len(),
+ type_width
+ );
+ }
+ dst[idx..idx + type_width].copy_from_slice(bytes);
+ idx += type_width;
+ });
+}
+
+impl<T: DataType> Encoder<T> for VariableWidthByteStreamSplitEncoder<T> {
+ fn put(&mut self, values: &[T::T]) -> Result<()> {
+ ensure_phys_ty!(
+ Type::FIXED_LEN_BYTE_ARRAY,
+ "VariableWidthByteStreamSplitEncoder only supports
FixedLenByteArray types"
+ );
+
+ // FixedLenByteArray is implemented as ByteArray, so there may be gaps
making
+ // slice_as_bytes untenable
+ let idx = self.buffer.len();
+ let data_len = values.len() * self.type_width;
+ // Ensure enough capacity for the new data
+ self.buffer.reserve(values.len() * self.type_width);
+ // ...and extend the size of buffer to allow direct access
+ self.buffer.put_bytes(0_u8, data_len);
+ // Get a slice of the buffer corresponding to the location of the new
data
+ let out_buf = &mut self.buffer[idx..idx + data_len];
+
+ // Now copy `values` into the buffer. For `type_width` <= 8 use a
fixed size when
+ // performing the copy as it is significantly faster.
+ match self.type_width {
+ 2 => put_fixed::<T, 2>(out_buf, values),
+ 3 => put_fixed::<T, 3>(out_buf, values),
+ 4 => put_fixed::<T, 4>(out_buf, values),
+ 5 => put_fixed::<T, 5>(out_buf, values),
+ 6 => put_fixed::<T, 6>(out_buf, values),
+ 7 => put_fixed::<T, 7>(out_buf, values),
+ 8 => put_fixed::<T, 8>(out_buf, values),
+ _ => put_variable::<T>(out_buf, values, self.type_width),
+ }
+
+ Ok(())
+ }
+
+ fn encoding(&self) -> Encoding {
+ Encoding::BYTE_STREAM_SPLIT
+ }
+
+ fn estimated_data_encoded_size(&self) -> usize {
+ self.buffer.len()
+ }
+
+ fn flush_buffer(&mut self) -> Result<Bytes> {
+ let mut encoded = vec![0; self.buffer.len()];
+ let type_size = match T::get_physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY => self.type_width,
+ _ => T::get_type_size(),
+ };
+ // split_streams_const() is faster up to type_width == 8
+ match type_size {
+ 2 => split_streams_const::<2>(&self.buffer, &mut encoded),
+ 3 => split_streams_const::<3>(&self.buffer, &mut encoded),
+ 4 => split_streams_const::<4>(&self.buffer, &mut encoded),
+ 5 => split_streams_const::<5>(&self.buffer, &mut encoded),
+ 6 => split_streams_const::<6>(&self.buffer, &mut encoded),
+ 7 => split_streams_const::<7>(&self.buffer, &mut encoded),
+ 8 => split_streams_const::<8>(&self.buffer, &mut encoded),
+ _ => split_streams_variable(&self.buffer, &mut encoded, type_size),
+ }
+
+ self.buffer.clear();
+ Ok(encoded.into())
+ }
+
+ /// return the estimated memory size of this encoder.
+ fn estimated_memory_size(&self) -> usize {
+ self.buffer.capacity() * std::mem::size_of::<u8>()
+ }
+}
diff --git a/parquet/src/encodings/encoding/mod.rs
b/parquet/src/encodings/encoding/mod.rs
index ecc3767124..f6d94e0331 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -24,8 +24,10 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{num_required_bits, BitWriter};
+use byte_stream_split_encoder::{ByteStreamSplitEncoder,
VariableWidthByteStreamSplitEncoder};
use bytes::Bytes;
pub use dict_encoder::DictEncoder;
@@ -78,7 +80,10 @@ pub trait Encoder<T: DataType>: Send {
/// Gets a encoder for the particular data type `T` and encoding `encoding`.
Memory usage
/// for the encoder instance is tracked by `mem_tracker`.
-pub fn get_encoder<T: DataType>(encoding: Encoding) -> Result<Box<dyn
Encoder<T>>> {
+pub fn get_encoder<T: DataType>(
+ encoding: Encoding,
+ descr: &ColumnDescPtr,
+) -> Result<Box<dyn Encoder<T>>> {
let encoder: Box<dyn Encoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainEncoder::new()),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
@@ -90,9 +95,12 @@ pub fn get_encoder<T: DataType>(encoding: Encoding) ->
Result<Box<dyn Encoder<T>
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY =>
Box::new(DeltaLengthByteArrayEncoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
- Encoding::BYTE_STREAM_SPLIT => {
- Box::new(byte_stream_split_encoder::ByteStreamSplitEncoder::new())
- }
+ Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY =>
Box::new(VariableWidthByteStreamSplitEncoder::new(
+ descr.type_length(),
+ )),
+ _ => Box::new(ByteStreamSplitEncoder::new()),
+ },
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(encoder)
@@ -498,8 +506,7 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
self.page_header_writer.estimated_memory_size()
+ self.bit_writer.estimated_memory_size()
+ self.deltas.capacity() * std::mem::size_of::<i64>()
- + std::mem::size_of::<Self>()
-
+ + std::mem::size_of::<Self>()
}
}
@@ -641,10 +648,7 @@ impl<T: DataType> Encoder<T> for
DeltaLengthByteArrayEncoder<T> {
/// return the estimated memory size of this encoder.
fn estimated_memory_size(&self) -> usize {
- self.len_encoder.estimated_memory_size()
- + self.data.len()
- + std::mem::size_of::<Self>()
-
+ self.len_encoder.estimated_memory_size() + self.data.len() +
std::mem::size_of::<Self>()
}
}
@@ -754,8 +758,7 @@ impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
fn estimated_memory_size(&self) -> usize {
self.prefix_len_encoder.estimated_memory_size()
+ self.suffix_writer.estimated_memory_size()
- + (self.previous.capacity() * std::mem::size_of::<u8>())
-
+ + (self.previous.capacity() * std::mem::size_of::<u8>())
}
}
@@ -767,28 +770,30 @@ mod tests {
use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder,
PlainDecoder};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath,
Type as SchemaType};
- use crate::util::test_common::rand_gen::{random_bytes, RandGen};
use crate::util::bit_util;
+ use crate::util::test_common::rand_gen::{random_bytes, RandGen};
const TEST_SET_SIZE: usize = 1024;
#[test]
fn test_get_encoders() {
// supported encodings
- create_and_check_encoder::<Int32Type>(Encoding::PLAIN, None);
- create_and_check_encoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED,
None);
-
create_and_check_encoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
- create_and_check_encoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY,
None);
- create_and_check_encoder::<BoolType>(Encoding::RLE, None);
+ create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
+ create_and_check_encoder::<Int32Type>(0,
Encoding::DELTA_BINARY_PACKED, None);
+ create_and_check_encoder::<Int32Type>(0,
Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
+ create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY,
None);
+ create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
// error when initializing
create_and_check_encoder::<Int32Type>(
+ 0,
Encoding::RLE_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
)),
);
create_and_check_encoder::<Int32Type>(
+ 0,
Encoding::PLAIN_DICTIONARY,
Some(general_err!(
"Cannot initialize this encoding through this function"
@@ -798,6 +803,7 @@ mod tests {
// unsupported
#[allow(deprecated)]
create_and_check_encoder::<Int32Type>(
+ 0,
Encoding::BIT_PACKED,
Some(nyi_err!("Encoding BIT_PACKED is not supported")),
);
@@ -815,6 +821,7 @@ mod tests {
Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
+ Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
@@ -822,6 +829,7 @@ mod tests {
Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
+ Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}
#[test]
@@ -853,10 +861,11 @@ mod tests {
}
#[test]
- fn test_fixed_lenbyte_array() {
+ fn test_fixed_len_byte_array() {
FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE,
100);
FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE,
100);
+ FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT,
TEST_SET_SIZE, 100);
}
#[test]
@@ -905,7 +914,7 @@ mod tests {
Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
Box::new(create_test_dict_encoder::<T>(type_length))
}
- _ => create_test_encoder::<T>(encoding),
+ _ => create_test_encoder::<T>(type_length, encoding),
};
assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
@@ -960,7 +969,7 @@ mod tests {
#[test]
fn test_byte_stream_split_example_f32() {
// Test data from
https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9
- let mut encoder =
create_test_encoder::<FloatType>(Encoding::BYTE_STREAM_SPLIT);
+ let mut encoder = create_test_encoder::<FloatType>(0,
Encoding::BYTE_STREAM_SPLIT);
let mut decoder = create_test_decoder::<FloatType>(0,
Encoding::BYTE_STREAM_SPLIT);
let input = vec![
@@ -989,7 +998,7 @@ mod tests {
// See: https://github.com/sunchao/parquet-rs/issues/47
#[test]
fn test_issue_47() {
- let mut encoder =
create_test_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+ let mut encoder = create_test_encoder::<ByteArrayType>(0,
Encoding::DELTA_BYTE_ARRAY);
let mut decoder = create_test_decoder::<ByteArrayType>(0,
Encoding::DELTA_BYTE_ARRAY);
let input = vec![
@@ -1039,7 +1048,7 @@ mod tests {
impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
fn test_internal(enc: Encoding, total: usize, type_length: i32) ->
Result<()> {
- let mut encoder = create_test_encoder::<T>(enc);
+ let mut encoder = create_test_encoder::<T>(type_length, enc);
let mut decoder = create_test_decoder::<T>(type_length, enc);
let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
let mut result_data = vec![T::T::default(); total];
@@ -1137,8 +1146,13 @@ mod tests {
decoder.get(output)
}
- fn create_and_check_encoder<T: DataType>(encoding: Encoding, err:
Option<ParquetError>) {
- let encoder = get_encoder::<T>(encoding);
+ fn create_and_check_encoder<T: DataType>(
+ type_length: i32,
+ encoding: Encoding,
+ err: Option<ParquetError>,
+ ) {
+ let desc = create_test_col_desc_ptr(type_length,
T::get_physical_type());
+ let encoder = get_encoder::<T>(encoding, &desc);
match err {
Some(parquet_error) => {
assert_eq!(
@@ -1164,8 +1178,9 @@ mod tests {
))
}
- fn create_test_encoder<T: DataType>(enc: Encoding) -> Box<dyn Encoder<T>> {
- get_encoder(enc).unwrap()
+ fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) ->
Box<dyn Encoder<T>> {
+ let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
+ get_encoder(enc, &desc).unwrap()
}
fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) ->
Box<dyn Decoder<T>> {
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 70aea6fd5a..07e80f7f69 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1765,4 +1765,42 @@ mod tests {
_ => unreachable!(),
}
}
+
+ #[test]
+ fn test_byte_stream_split_extended() {
+ let path = format!(
+ "{}/byte_stream_split_extended.gzip.parquet",
+ arrow::util::test_util::parquet_test_data(),
+ );
+ let file = File::open(path).unwrap();
+ let reader = Box::new(SerializedFileReader::new(file).expect("Failed
to create reader"));
+
+ // Use full schema as projected schema
+ let mut iter = reader
+ .get_row_iter(None)
+ .expect("Failed to create row iterator");
+
+ let mut start = 0;
+ let end = reader.metadata().file_metadata().num_rows();
+
+ let check_row = |row: Result<Row, ParquetError>| {
+ assert!(row.is_ok());
+ let r = row.unwrap();
+ assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
+ assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
+ assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
+ assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
+ assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
+ assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
+ assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
+ };
+
+ while start < end {
+ match iter.next() {
+ Some(row) => check_row(row),
+ None => break,
+ };
+ start += 1;
+ }
+ }
}
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index bcf8449b44..cf383103df 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -814,9 +814,15 @@ impl<'a, W: Write + Send> PageWriter for
SerializedPageWriter<'a, W> {
mod tests {
use super::*;
+ #[cfg(feature = "arrow")]
+ use arrow_array::RecordBatchReader;
use bytes::Bytes;
use std::fs::File;
+ #[cfg(feature = "arrow")]
+ use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ #[cfg(feature = "arrow")]
+ use crate::arrow::ArrowWriter;
use crate::basic::{
ColumnOrder, Compression, ConvertedType, Encoding, LogicalType,
Repetition, SortOrder, Type,
};
@@ -2063,4 +2069,115 @@ mod tests {
assert_eq!(offset_index[0].len(), 1);
assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
}
+
+ #[test]
+ #[cfg(feature = "arrow")]
+ fn test_byte_stream_split_extended_roundtrip() {
+ let path = format!(
+ "{}/byte_stream_split_extended.gzip.parquet",
+ arrow::util::test_util::parquet_test_data(),
+ );
+ let file = File::open(path).unwrap();
+
+ // Read in test file and rewrite to tmp
+ let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
+ .expect("parquet open")
+ .build()
+ .expect("parquet open");
+
+ let file = tempfile::tempfile().unwrap();
+ let props = WriterProperties::builder()
+ .set_dictionary_enabled(false)
+ .set_column_encoding(
+ ColumnPath::from("float16_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("float_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("double_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("int32_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("int64_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("flba5_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .set_column_encoding(
+ ColumnPath::from("decimal_byte_stream_split"),
+ Encoding::BYTE_STREAM_SPLIT,
+ )
+ .build();
+
+ let mut parquet_writer = ArrowWriter::try_new(
+ file.try_clone().expect("cannot open file"),
+ parquet_reader.schema(),
+ Some(props),
+ )
+ .expect("create arrow writer");
+
+ for maybe_batch in parquet_reader {
+ let batch = maybe_batch.expect("reading batch");
+ parquet_writer.write(&batch).expect("writing data");
+ }
+
+ parquet_writer.close().expect("finalizing file");
+
+ let reader = SerializedFileReader::new(file).expect("Failed to create
reader");
+ let filemeta = reader.metadata();
+
+ // Make sure byte_stream_split encoding was used
+ let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
+ assert!(filemeta
+ .row_group(0)
+ .column(x)
+ .encodings()
+ .contains(&Encoding::BYTE_STREAM_SPLIT));
+ };
+
+ check_encoding(1, filemeta);
+ check_encoding(3, filemeta);
+ check_encoding(5, filemeta);
+ check_encoding(7, filemeta);
+ check_encoding(9, filemeta);
+ check_encoding(11, filemeta);
+ check_encoding(13, filemeta);
+
+ // Read back tmpfile and make sure all values are correct
+ let mut iter = reader
+ .get_row_iter(None)
+ .expect("Failed to create row iterator");
+
+ let mut start = 0;
+ let end = reader.metadata().file_metadata().num_rows();
+
+ let check_row = |row: Result<Row, ParquetError>| {
+ assert!(row.is_ok());
+ let r = row.unwrap();
+ assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
+ assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
+ assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
+ assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
+ assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
+ assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
+ assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
+ };
+
+ while start < end {
+ match iter.next() {
+ Some(row) => check_row(row),
+ None => break,
+ };
+ start += 1;
+ }
+ }
}
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index 858fd8c234..3db43aef0e 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -24,9 +24,10 @@ use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, Encoder};
use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
-use crate::schema::types::ColumnDescPtr;
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type
as SchemaType};
use std::iter::Peekable;
use std::mem;
+use std::sync::Arc;
pub trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -107,9 +108,22 @@ impl DataPageBuilder for DataPageBuilderImpl {
self.num_values,
values.len()
);
+ // Create test column descriptor.
+ let desc = {
+ let ty = SchemaType::primitive_type_builder("t",
T::get_physical_type())
+ .with_length(0)
+ .build()
+ .unwrap();
+ Arc::new(ColumnDescriptor::new(
+ Arc::new(ty),
+ 0,
+ 0,
+ ColumnPath::new(vec![]),
+ ))
+ };
self.encoding = Some(encoding);
let mut encoder: Box<dyn Encoder<T>> =
- get_encoder::<T>(encoding).expect("get_encoder() should be OK");
+ get_encoder::<T>(encoding, &desc).expect("get_encoder() should be
OK");
encoder.put(values).expect("put() should be OK");
let encoded_values = encoder
.flush_buffer()