This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4f269e94 Support Parquet `BYTE_STREAM_SPLIT` for INT32, INT64, and 
FIXED_LEN_BYTE_ARRAY primitive types (#6159)
2a4f269e94 is described below

commit 2a4f269e94e155131d6ae0bc8af48101012e3d45
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Aug 6 05:46:56 2024 -0700

    Support Parquet `BYTE_STREAM_SPLIT` for INT32, INT64, and 
FIXED_LEN_BYTE_ARRAY primitive types (#6159)
    
    * add todos to help trace flow
    
    * add support for byte_stream_split encoding for INT32 and INT64 data
    
    * byte_stream_split encoding for fixed_len_byte_array
    
    * revert changes to Decoder and add VariableWidthByteStreamSplitDecoder
    
    * remove set_type_width as it is now unused
    
    * begin implementing roundtrip test
    
    * move test
    
    * clean up some documentation
    
    * add test of byte_stream_split with flba
    
    * add check for and test of mismatched sizes
    
    * remove type_length from Encoder and add VaribleWidthByteStreamSplitEncoder
    
    * fix clippy error
    
    * change type of argument to new()
    
    * formatting
    
    * add another test
    
    * add variable to split/join streams for FLBA
    
    * more informative error message
    
    * avoid buffer copies in decoder per suggestion from review
    
    * add roundtrip test
    
    * optimized version...but clippy complains
    
    * clippy was right...replace loop with copy_from_slice
    
    * fix test
    
    * optimize split_streams_variable for long type widths
---
 parquet/benches/encoding.rs                        |  24 ++--
 .../src/arrow/array_reader/fixed_len_byte_array.rs |  45 +++++++
 parquet/src/arrow/array_reader/test_util.rs        |   3 +-
 parquet/src/arrow/arrow_reader/mod.rs              |  82 ++++++++++++
 parquet/src/arrow/arrow_writer/mod.rs              |   6 +-
 parquet/src/column/writer/encoder.rs               |   1 +
 parquet/src/encodings/decoding.rs                  | 120 ++++++++++++++---
 .../decoding/byte_stream_split_decoder.rs          | 146 ++++++++++++++++++++-
 .../encoding/byte_stream_split_encoder.rs          | 145 +++++++++++++++++++-
 parquet/src/encodings/encoding/mod.rs              |  69 ++++++----
 parquet/src/file/serialized_reader.rs              |  38 ++++++
 parquet/src/file/writer.rs                         | 117 +++++++++++++++++
 parquet/src/util/test_common/page_util.rs          |  18 ++-
 13 files changed, 743 insertions(+), 71 deletions(-)

diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs
index bdbca3567a..80befe8dad 100644
--- a/parquet/benches/encoding.rs
+++ b/parquet/benches/encoding.rs
@@ -30,30 +30,30 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values: 
&[T::T], encoding: Encodi
         std::any::type_name::<T::T>(),
         encoding
     );
+    let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
+        Arc::new(
+            Type::primitive_type_builder("", T::get_physical_type())
+                .build()
+                .unwrap(),
+        ),
+        0,
+        0,
+        ColumnPath::new(vec![]),
+    ));
     c.bench_function(&format!("encoding: {}", name), |b| {
         b.iter(|| {
-            let mut encoder = get_encoder::<T>(encoding).unwrap();
+            let mut encoder = get_encoder::<T>(encoding, 
&column_desc_ptr).unwrap();
             encoder.put(values).unwrap();
             encoder.flush_buffer().unwrap();
         });
     });
 
-    let mut encoder = get_encoder::<T>(encoding).unwrap();
+    let mut encoder = get_encoder::<T>(encoding, &column_desc_ptr).unwrap();
     encoder.put(values).unwrap();
     let encoded = encoder.flush_buffer().unwrap();
     println!("{} encoded as {} bytes", name, encoded.len(),);
 
     let mut buffer = vec![T::T::default(); values.len()];
-    let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
-        Arc::new(
-            Type::primitive_type_builder("", T::get_physical_type())
-                .build()
-                .unwrap(),
-        ),
-        0,
-        0,
-        ColumnPath::new(vec![]),
-    ));
     c.bench_function(&format!("decoding: {}", name), |b| {
         b.iter(|| {
             let mut decoder: Box<dyn Decoder<T>> =
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index a9159bb471..8098f3240a 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -341,6 +341,10 @@ impl ColumnValueDecoder for ValueDecoder {
             Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
                 decoder: DeltaByteArrayDecoder::new(data)?,
             },
+            Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
+                buf: data,
+                offset: 0,
+            },
             _ => {
                 return Err(general_err!(
                     "unsupported encoding for fixed length byte array: {}",
@@ -400,6 +404,20 @@ impl ColumnValueDecoder for ValueDecoder {
                     Ok(())
                 })
             }
+            Decoder::ByteStreamSplit { buf, offset } => {
+                // we have n=`byte_length` streams of length 
`buf.len/byte_length`
+                // to read value i, we need the i'th byte from each of the 
streams
+                // so `offset` should be the value offset, not the byte offset
+                let total_values = buf.len() / self.byte_length;
+                let to_read = num_values.min(total_values - *offset);
+                out.buffer.reserve(to_read * self.byte_length);
+
+                // now read the n streams and reassemble values into the 
output buffer
+                read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, 
self.byte_length);
+
+                *offset += to_read;
+                Ok(to_read)
+            }
         }
     }
 
@@ -412,6 +430,32 @@ impl ColumnValueDecoder for ValueDecoder {
             }
             Decoder::Dict { decoder } => decoder.skip(num_values),
             Decoder::Delta { decoder } => decoder.skip(num_values),
+            Decoder::ByteStreamSplit { offset, buf } => {
+                let total_values = buf.len() / self.byte_length;
+                let to_read = num_values.min(total_values - *offset);
+                *offset += to_read;
+                Ok(to_read)
+            }
+        }
+    }
+}
+
+// `src` is an array laid out like a NxM matrix where N == `data_width` and
+// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, 
with stream `i`
+// containing the `ith` byte for each value. Each "column" is a single value.
+// This will reassemble `num_values` values by reading columns of the matrix 
starting at
+// `offset`. Values will be appended to `dst`.
+fn read_byte_stream_split(
+    dst: &mut Vec<u8>,
+    src: &mut Bytes,
+    offset: usize,
+    num_values: usize,
+    data_width: usize,
+) {
+    let stride = src.len() / data_width;
+    for i in 0..num_values {
+        for j in 0..data_width {
+            dst.push(src[offset + j * stride + i]);
         }
     }
 }
@@ -420,6 +464,7 @@ enum Decoder {
     Plain { buf: Bytes, offset: usize },
     Dict { decoder: DictIndexDecoder },
     Delta { decoder: DeltaByteArrayDecoder },
+    ByteStreamSplit { buf: Bytes, offset: usize },
 }
 
 #[cfg(test)]
diff --git a/parquet/src/arrow/array_reader/test_util.rs 
b/parquet/src/arrow/array_reader/test_util.rs
index 0503292013..a7ff8d6e41 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -46,7 +46,8 @@ pub fn utf8_column() -> ColumnDescPtr {
 
 /// Encode `data` with the provided `encoding`
 pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> Bytes {
-    let mut encoder = get_encoder::<ByteArrayType>(encoding).unwrap();
+    let desc = utf8_column();
+    let mut encoder = get_encoder::<ByteArrayType>(encoding, &desc).unwrap();
 
     encoder.put(data).unwrap();
     encoder.flush_buffer().unwrap()
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index c696763d63..a0302fa86b 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -1059,6 +1059,7 @@ mod tests {
                 Encoding::PLAIN,
                 Encoding::RLE_DICTIONARY,
                 Encoding::DELTA_BINARY_PACKED,
+                Encoding::BYTE_STREAM_SPLIT,
             ],
         );
         run_single_column_reader_tests::<Int64Type, _, Int64Type>(
@@ -1070,6 +1071,7 @@ mod tests {
                 Encoding::PLAIN,
                 Encoding::RLE_DICTIONARY,
                 Encoding::DELTA_BINARY_PACKED,
+                Encoding::BYTE_STREAM_SPLIT,
             ],
         );
         run_single_column_reader_tests::<FloatType, _, FloatType>(
@@ -1641,6 +1643,86 @@ mod tests {
         assert_eq!(row_count, 300);
     }
 
+    #[test]
+    fn test_read_extended_byte_stream_split() {
+        let path = format!(
+            "{}/byte_stream_split_extended.gzip.parquet",
+            arrow::util::test_util::parquet_test_data(),
+        );
+        let file = File::open(path).unwrap();
+        let record_reader = ParquetRecordBatchReader::try_new(file, 
128).unwrap();
+
+        let mut row_count = 0;
+        for batch in record_reader {
+            let batch = batch.unwrap();
+            row_count += batch.num_rows();
+
+            // 0,1 are f16
+            let f16_col = batch.column(0).as_primitive::<Float16Type>();
+            let f16_bss = batch.column(1).as_primitive::<Float16Type>();
+            assert_eq!(f16_col.len(), f16_bss.len());
+            f16_col
+                .iter()
+                .zip(f16_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 2,3 are f32
+            let f32_col = batch.column(2).as_primitive::<Float32Type>();
+            let f32_bss = batch.column(3).as_primitive::<Float32Type>();
+            assert_eq!(f32_col.len(), f32_bss.len());
+            f32_col
+                .iter()
+                .zip(f32_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 4,5 are f64
+            let f64_col = batch.column(4).as_primitive::<Float64Type>();
+            let f64_bss = batch.column(5).as_primitive::<Float64Type>();
+            assert_eq!(f64_col.len(), f64_bss.len());
+            f64_col
+                .iter()
+                .zip(f64_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 6,7 are i32
+            let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
+            let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
+            assert_eq!(i32_col.len(), i32_bss.len());
+            i32_col
+                .iter()
+                .zip(i32_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 8,9 are i64
+            let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
+            let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
+            assert_eq!(i64_col.len(), i64_bss.len());
+            i64_col
+                .iter()
+                .zip(i64_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 10,11 are FLBA(5)
+            let flba_col = batch.column(10).as_fixed_size_binary();
+            let flba_bss = batch.column(11).as_fixed_size_binary();
+            assert_eq!(flba_col.len(), flba_bss.len());
+            flba_col
+                .iter()
+                .zip(flba_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+
+            // 12,13 are FLBA(4) (decimal(7,3))
+            let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
+            let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
+            assert_eq!(dec_col.len(), dec_bss.len());
+            dec_col
+                .iter()
+                .zip(dec_bss.iter())
+                .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
+        }
+        assert_eq!(row_count, 200);
+    }
+
     #[test]
     fn test_read_incorrect_map_schema_file() {
         let testdata = arrow::util::test_util::parquet_test_data();
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index db5d4502cf..0c07f541bd 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1791,7 +1791,11 @@ mod tests {
             | DataType::UInt64
             | DataType::UInt32
             | DataType::UInt16
-            | DataType::UInt8 => vec![Encoding::PLAIN, 
Encoding::DELTA_BINARY_PACKED],
+            | DataType::UInt8 => vec![
+                Encoding::PLAIN,
+                Encoding::DELTA_BINARY_PACKED,
+                Encoding::BYTE_STREAM_SPLIT,
+            ],
             DataType::Float32 | DataType::Float64 => {
                 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
             }
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
index 9d01c09040..7371c72a58 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -191,6 +191,7 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
             props
                 .encoding(descr.path())
                 .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), 
props)),
+            descr,
         )?;
 
         let statistics_enabled = props.statistics_enabled(descr.path());
diff --git a/parquet/src/encodings/decoding.rs 
b/parquet/src/encodings/decoding.rs
index b3dd135237..16467b32dc 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -27,6 +27,9 @@ use super::rle::RleDecoder;
 use crate::basic::*;
 use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
+use crate::encodings::decoding::byte_stream_split_decoder::{
+    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
+};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::{self, BitReader};
@@ -87,6 +90,7 @@ pub(crate) mod private {
             encoding: Encoding,
         ) -> Result<Box<dyn Decoder<T>>> {
             match encoding {
+                Encoding::BYTE_STREAM_SPLIT => 
Ok(Box::new(ByteStreamSplitDecoder::new())),
                 Encoding::DELTA_BINARY_PACKED => 
Ok(Box::new(DeltaBitPackDecoder::new())),
                 _ => get_decoder_default(descr, encoding),
             }
@@ -99,6 +103,7 @@ pub(crate) mod private {
             encoding: Encoding,
         ) -> Result<Box<dyn Decoder<T>>> {
             match encoding {
+                Encoding::BYTE_STREAM_SPLIT => 
Ok(Box::new(ByteStreamSplitDecoder::new())),
                 Encoding::DELTA_BINARY_PACKED => 
Ok(Box::new(DeltaBitPackDecoder::new())),
                 _ => get_decoder_default(descr, encoding),
             }
@@ -111,9 +116,7 @@ pub(crate) mod private {
             encoding: Encoding,
         ) -> Result<Box<dyn Decoder<T>>> {
             match encoding {
-                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
-                    byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
-                )),
+                Encoding::BYTE_STREAM_SPLIT => 
Ok(Box::new(ByteStreamSplitDecoder::new())),
                 _ => get_decoder_default(descr, encoding),
             }
         }
@@ -124,9 +127,7 @@ pub(crate) mod private {
             encoding: Encoding,
         ) -> Result<Box<dyn Decoder<T>>> {
             match encoding {
-                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
-                    byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
-                )),
+                Encoding::BYTE_STREAM_SPLIT => 
Ok(Box::new(ByteStreamSplitDecoder::new())),
                 _ => get_decoder_default(descr, encoding),
             }
         }
@@ -153,6 +154,9 @@ pub(crate) mod private {
             encoding: Encoding,
         ) -> Result<Box<dyn Decoder<T>>> {
             match encoding {
+                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
+                    
VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
+                )),
                 Encoding::DELTA_BYTE_ARRAY => 
Ok(Box::new(DeltaByteArrayDecoder::new())),
                 _ => get_decoder_default(descr, encoding),
             }
@@ -1785,7 +1789,7 @@ mod tests {
             ],
             vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
         ];
-        test_byte_stream_split_decode::<FloatType>(data);
+        test_byte_stream_split_decode::<FloatType>(data, -1);
     }
 
     #[test]
@@ -1794,7 +1798,76 @@ mod tests {
             f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
             f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
         ]];
-        test_byte_stream_split_decode::<DoubleType>(data);
+        test_byte_stream_split_decode::<DoubleType>(data, -1);
+    }
+
+    #[test]
+    fn test_byte_stream_split_multiple_i32() {
+        let data = vec![
+            vec![
+                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
+                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
+            ],
+            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
+        ];
+        test_byte_stream_split_decode::<Int32Type>(data, -1);
+    }
+
+    #[test]
+    fn test_byte_stream_split_i64() {
+        let data = vec![vec![
+            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
+            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
+        ]];
+        test_byte_stream_split_decode::<Int64Type>(data, -1);
+    }
+
+    fn test_byte_stream_split_flba(type_width: usize) {
+        let data = vec![
+            vec![
+                FixedLenByteArrayType::gen(type_width as i32),
+                FixedLenByteArrayType::gen(type_width as i32),
+            ],
+            vec![FixedLenByteArrayType::gen(type_width as i32)],
+        ];
+        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 
type_width as i32);
+    }
+
+    #[test]
+    fn test_byte_stream_split_flba5() {
+        test_byte_stream_split_flba(5);
+    }
+
+    #[test]
+    fn test_byte_stream_split_flba16() {
+        test_byte_stream_split_flba(16);
+    }
+
+    #[test]
+    fn test_byte_stream_split_flba19() {
+        test_byte_stream_split_flba(19);
+    }
+
+    #[test]
+    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
+    fn test_byte_stream_split_flba_mismatch() {
+        let data = vec![
+            vec![
+                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
+                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
+            ],
+            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
+        ];
+        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
+    }
+
+    #[test]
+    #[should_panic(expected = "Input data length is not a multiple of type 
width 4")]
+    fn test_byte_stream_split_flba_bad_input() {
+        let mut decoder = 
VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
+        decoder
+            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
+            .unwrap();
     }
 
     #[test]
@@ -1808,33 +1881,42 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_skip_byte_stream_split_ints() {
+        let block_data = vec![3, 4, 1, 5];
+        test_skip::<Int32Type>(block_data.clone(), 
Encoding::BYTE_STREAM_SPLIT, 2);
+        test_skip::<Int64Type>(
+            block_data.into_iter().map(|x| x as i64).collect(),
+            Encoding::BYTE_STREAM_SPLIT,
+            100,
+        );
+    }
+
     fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
-        test_encode_decode::<T>(data, Encoding::RLE);
+        test_encode_decode::<T>(data, Encoding::RLE, -1);
     }
 
     fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
-        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED);
+        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
     }
 
-    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>) {
-        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT);
+    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, 
type_width: i32) {
+        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
     }
 
     fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
-        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY);
+        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, 
-1);
     }
 
     // Input data represents vector of data slices to write (test multiple 
`put()` calls)
     // For example,
     //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
     //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
-    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: 
Encoding) {
-        // Type length should not really matter for encode/decode test,
-        // otherwise change it based on type
-        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
+    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: 
Encoding, type_width: i32) {
+        let col_descr = create_test_col_desc_ptr(type_width, 
T::get_physical_type());
 
         // Encode data
-        let mut encoder = get_encoder::<T>(encoding).expect("get encoder");
+        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get 
encoder");
 
         for v in &data[..] {
             encoder.put(&v[..]).expect("ok to encode");
@@ -1867,7 +1949,7 @@ mod tests {
         let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
 
         // Encode data
-        let mut encoder = get_encoder::<T>(encoding).expect("get encoder");
+        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get 
encoder");
 
         encoder.put(&data).expect("ok to encode");
 
diff --git a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs 
b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
index 98841d21ec..9b2f43ace8 100644
--- a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
+++ b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs
@@ -19,8 +19,9 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::basic::Encoding;
-use crate::data_type::{DataType, SliceAsBytes};
+use crate::basic::{Encoding, Type};
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::{DataType, FixedLenByteArray, SliceAsBytes};
 use crate::errors::{ParquetError, Result};
 
 use super::Decoder;
@@ -62,6 +63,22 @@ fn join_streams_const<const TYPE_SIZE: usize>(
     }
 }
 
+// Like the above, but type_size is not known at compile time.
+fn join_streams_variable(
+    src: &[u8],
+    dst: &mut [u8],
+    stride: usize,
+    type_size: usize,
+    values_decoded: usize,
+) {
+    let sub_src = &src[values_decoded..];
+    for i in 0..dst.len() / type_size {
+        for j in 0..type_size {
+            dst[i * type_size + j] = sub_src[i + j * stride];
+        }
+    }
+}
+
 impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
     fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
         self.encoded_bytes = data;
@@ -76,7 +93,7 @@ impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
         let num_values = buffer.len().min(total_remaining_values);
         let buffer = &mut buffer[..num_values];
 
-        // SAFETY: f32 and f64 has no constraints on their internal 
representation, so we can modify it as we want
+        // SAFETY: i/f32 and i/f64 has no constraints on their internal 
representation, so we can modify it as we want
         let raw_out_bytes = unsafe { <T as 
DataType>::T::slice_as_bytes_mut(buffer) };
         let type_size = T::get_type_size();
         let stride = self.encoded_bytes.len() / type_size;
@@ -119,3 +136,126 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!(
+                "Input data length is not a multiple of type width {}",
+                self.type_width
+            ));
+        }
+
+        match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => {
+                self.encoded_bytes = data;
+                self.total_num_values = num_values;
+                self.values_decoded = 0;
+                Ok(())
+            }
+            _ => Err(general_err!(
+                "VariableWidthByteStreamSplitDecoder only supports 
FixedLenByteArrayType"
+            )),
+        }
+    }
+
+    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+        let total_remaining_values = self.values_left();
+        let num_values = buffer.len().min(total_remaining_values);
+        let buffer = &mut buffer[..num_values];
+        let type_size = self.type_width;
+
+        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use 
slice_as_bytes_mut. Instead we'll
+        // have to do some data copies.
+        let mut tmp_vec = vec![0_u8; num_values * type_size];
+        let raw_out_bytes = tmp_vec.as_mut_slice();
+
+        let stride = self.encoded_bytes.len() / type_size;
+        match type_size {
+            2 => join_streams_const::<2>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            4 => join_streams_const::<4>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            8 => join_streams_const::<8>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            16 => join_streams_const::<16>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            _ => join_streams_variable(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                type_size,
+                self.values_decoded,
+            ),
+        }
+        self.values_decoded += num_values;
+
+        // create a buffer from the vec so far (and leave a new Vec in its 
place)
+        let vec_with_data = std::mem::take(&mut tmp_vec);
+        // convert Vec to Bytes (which is a ref counted wrapper)
+        let bytes_with_data = Bytes::from(vec_with_data);
+        for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
+            // Get a view into the data, without also copying the bytes
+            let data = bytes_with_data.slice(i * type_size..(i + 1) * 
type_size);
+            // TODO: perhaps add a `set_from_bytes` method to `DataType` to 
avoid downcasting
+            let bi = bi
+                .as_mut_any()
+                .downcast_mut::<FixedLenByteArray>()
+                .expect("Decoding fixed length byte array");
+            bi.set_data(data);
+        }
+
+        Ok(num_values)
+    }
+
+    fn values_left(&self) -> usize {
+        self.total_num_values - self.values_decoded
+    }
+
+    fn encoding(&self) -> Encoding {
+        Encoding::BYTE_STREAM_SPLIT
+    }
+
+    fn skip(&mut self, num_values: usize) -> Result<usize> {
+        let to_skip = usize::min(self.values_left(), num_values);
+        self.values_decoded += to_skip;
+        Ok(to_skip)
+    }
+}
diff --git a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs 
b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
index 3d5ba4cc2d..0726c6b1c9 100644
--- a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
+++ b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs
@@ -16,14 +16,14 @@
 // under the License.
 
 use crate::basic::{Encoding, Type};
-use crate::data_type::DataType;
-use crate::data_type::SliceAsBytes;
+use crate::data_type::{AsBytes, DataType, SliceAsBytes};
 
 use crate::errors::{ParquetError, Result};
 
 use super::Encoder;
 
-use bytes::Bytes;
+use bytes::{BufMut, Bytes};
+use std::cmp;
 use std::marker::PhantomData;
 
 pub struct ByteStreamSplitEncoder<T> {
@@ -40,7 +40,7 @@ impl<T: DataType> ByteStreamSplitEncoder<T> {
     }
 }
 
-// Here we assume src contains the full data (which it must, since we're
+// Here we assume src contains the full data (which it must, since we
 // can only know where to split the streams once all data is collected).
 // We iterate over the input bytes and write them to their strided output
 // byte locations.
@@ -53,13 +53,28 @@ fn split_streams_const<const TYPE_SIZE: usize>(src: &[u8], 
dst: &mut [u8]) {
     }
 }
 
+// Like above, but type_size is not known at compile time.
+fn split_streams_variable(src: &[u8], dst: &mut [u8], type_size: usize) {
+    const BLOCK_SIZE: usize = 4;
+    let stride = src.len() / type_size;
+    for j in (0..type_size).step_by(BLOCK_SIZE) {
+        let jrange = cmp::min(BLOCK_SIZE, type_size - j);
+        for i in 0..stride {
+            for jj in 0..jrange {
+                dst[i + (j + jj) * stride] = src[i * type_size + j + jj];
+            }
+        }
+    }
+}
+
 impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
     fn put(&mut self, values: &[T::T]) -> Result<()> {
         self.buffer
             .extend(<T as DataType>::T::slice_as_bytes(values));
+
         ensure_phys_ty!(
-            Type::FLOAT | Type::DOUBLE,
-            "ByteStreamSplitEncoder only supports FloatType or DoubleType"
+            Type::FLOAT | Type::DOUBLE | Type::INT32 | Type::INT64,
+            "ByteStreamSplitEncoder does not support Int96, Boolean, or 
ByteArray types"
         );
 
         Ok(())
@@ -96,3 +111,121 @@ impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> 
{
         self.buffer.capacity() * std::mem::size_of::<u8>()
     }
 }
+
+pub struct VariableWidthByteStreamSplitEncoder<T> {
+    buffer: Vec<u8>,
+    type_width: usize,
+    _p: PhantomData<T>,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitEncoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            buffer: Vec::new(),
+            type_width: type_length as usize,
+            _p: PhantomData,
+        }
+    }
+}
+
+fn put_fixed<T: DataType, const TYPE_SIZE: usize>(dst: &mut [u8], values: 
&[T::T]) {
+    let mut idx = 0;
+    values.iter().for_each(|x| {
+        let bytes = x.as_bytes();
+        if bytes.len() != TYPE_SIZE {
+            panic!(
+                "Mismatched FixedLenByteArray sizes: {} != {}",
+                bytes.len(),
+                TYPE_SIZE
+            );
+        }
+        dst[idx..(TYPE_SIZE + idx)].copy_from_slice(&bytes[..TYPE_SIZE]);
+        idx += TYPE_SIZE;
+    });
+}
+
+fn put_variable<T: DataType>(dst: &mut [u8], values: &[T::T], type_width: 
usize) {
+    let mut idx = 0;
+    values.iter().for_each(|x| {
+        let bytes = x.as_bytes();
+        if bytes.len() != type_width {
+            panic!(
+                "Mismatched FixedLenByteArray sizes: {} != {}",
+                bytes.len(),
+                type_width
+            );
+        }
+        dst[idx..idx + type_width].copy_from_slice(bytes);
+        idx += type_width;
+    });
+}
+
+impl<T: DataType> Encoder<T> for VariableWidthByteStreamSplitEncoder<T> {
+    fn put(&mut self, values: &[T::T]) -> Result<()> {
+        ensure_phys_ty!(
+            Type::FIXED_LEN_BYTE_ARRAY,
+            "VariableWidthByteStreamSplitEncoder only supports 
FixedLenByteArray types"
+        );
+
+        // FixedLenByteArray is implemented as ByteArray, so there may be gaps 
making
+        // slice_as_bytes untenable
+        let idx = self.buffer.len();
+        let data_len = values.len() * self.type_width;
+        // Ensure enough capacity for the new data
+        self.buffer.reserve(values.len() * self.type_width);
+        // ...and extend the size of buffer to allow direct access
+        self.buffer.put_bytes(0_u8, data_len);
+        // Get a slice of the buffer corresponding to the location of the new 
data
+        let out_buf = &mut self.buffer[idx..idx + data_len];
+
+        // Now copy `values` into the buffer. For `type_width` <= 8 use a 
fixed size when
+        // performing the copy as it is significantly faster.
+        match self.type_width {
+            2 => put_fixed::<T, 2>(out_buf, values),
+            3 => put_fixed::<T, 3>(out_buf, values),
+            4 => put_fixed::<T, 4>(out_buf, values),
+            5 => put_fixed::<T, 5>(out_buf, values),
+            6 => put_fixed::<T, 6>(out_buf, values),
+            7 => put_fixed::<T, 7>(out_buf, values),
+            8 => put_fixed::<T, 8>(out_buf, values),
+            _ => put_variable::<T>(out_buf, values, self.type_width),
+        }
+
+        Ok(())
+    }
+
+    fn encoding(&self) -> Encoding {
+        Encoding::BYTE_STREAM_SPLIT
+    }
+
+    fn estimated_data_encoded_size(&self) -> usize {
+        self.buffer.len()
+    }
+
+    fn flush_buffer(&mut self) -> Result<Bytes> {
+        let mut encoded = vec![0; self.buffer.len()];
+        let type_size = match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => self.type_width,
+            _ => T::get_type_size(),
+        };
+        // split_streams_const() is faster up to type_width == 8
+        match type_size {
+            2 => split_streams_const::<2>(&self.buffer, &mut encoded),
+            3 => split_streams_const::<3>(&self.buffer, &mut encoded),
+            4 => split_streams_const::<4>(&self.buffer, &mut encoded),
+            5 => split_streams_const::<5>(&self.buffer, &mut encoded),
+            6 => split_streams_const::<6>(&self.buffer, &mut encoded),
+            7 => split_streams_const::<7>(&self.buffer, &mut encoded),
+            8 => split_streams_const::<8>(&self.buffer, &mut encoded),
+            _ => split_streams_variable(&self.buffer, &mut encoded, type_size),
+        }
+
+        self.buffer.clear();
+        Ok(encoded.into())
+    }
+
+    /// return the estimated memory size of this encoder.
+    fn estimated_memory_size(&self) -> usize {
+        self.buffer.capacity() * std::mem::size_of::<u8>()
+    }
+}
diff --git a/parquet/src/encodings/encoding/mod.rs 
b/parquet/src/encodings/encoding/mod.rs
index ecc3767124..f6d94e0331 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -24,8 +24,10 @@ use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
 use crate::encodings::rle::RleEncoder;
 use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::{num_required_bits, BitWriter};
 
+use byte_stream_split_encoder::{ByteStreamSplitEncoder, 
VariableWidthByteStreamSplitEncoder};
 use bytes::Bytes;
 pub use dict_encoder::DictEncoder;
 
@@ -78,7 +80,10 @@ pub trait Encoder<T: DataType>: Send {
 
 /// Gets a encoder for the particular data type `T` and encoding `encoding`. 
Memory usage
 /// for the encoder instance is tracked by `mem_tracker`.
-pub fn get_encoder<T: DataType>(encoding: Encoding) -> Result<Box<dyn 
Encoder<T>>> {
+pub fn get_encoder<T: DataType>(
+    encoding: Encoding,
+    descr: &ColumnDescPtr,
+) -> Result<Box<dyn Encoder<T>>> {
     let encoder: Box<dyn Encoder<T>> = match encoding {
         Encoding::PLAIN => Box::new(PlainEncoder::new()),
         Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
@@ -90,9 +95,12 @@ pub fn get_encoder<T: DataType>(encoding: Encoding) -> 
Result<Box<dyn Encoder<T>
         Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
         Encoding::DELTA_LENGTH_BYTE_ARRAY => 
Box::new(DeltaLengthByteArrayEncoder::new()),
         Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
-        Encoding::BYTE_STREAM_SPLIT => {
-            Box::new(byte_stream_split_encoder::ByteStreamSplitEncoder::new())
-        }
+        Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => 
Box::new(VariableWidthByteStreamSplitEncoder::new(
+                descr.type_length(),
+            )),
+            _ => Box::new(ByteStreamSplitEncoder::new()),
+        },
         e => return Err(nyi_err!("Encoding {} is not supported", e)),
     };
     Ok(encoder)
@@ -498,8 +506,7 @@ impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
         self.page_header_writer.estimated_memory_size()
             + self.bit_writer.estimated_memory_size()
             + self.deltas.capacity() * std::mem::size_of::<i64>()
-        + std::mem::size_of::<Self>()
-
+            + std::mem::size_of::<Self>()
     }
 }
 
@@ -641,10 +648,7 @@ impl<T: DataType> Encoder<T> for 
DeltaLengthByteArrayEncoder<T> {
 
     /// return the estimated memory size of this encoder.
     fn estimated_memory_size(&self) -> usize {
-        self.len_encoder.estimated_memory_size()
-            + self.data.len()
-        + std::mem::size_of::<Self>()
-
+        self.len_encoder.estimated_memory_size() + self.data.len() + 
std::mem::size_of::<Self>()
     }
 }
 
@@ -754,8 +758,7 @@ impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
     fn estimated_memory_size(&self) -> usize {
         self.prefix_len_encoder.estimated_memory_size()
             + self.suffix_writer.estimated_memory_size()
-        + (self.previous.capacity() * std::mem::size_of::<u8>())
-
+            + (self.previous.capacity() * std::mem::size_of::<u8>())
     }
 }
 
@@ -767,28 +770,30 @@ mod tests {
 
     use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, 
PlainDecoder};
     use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, 
Type as SchemaType};
-    use crate::util::test_common::rand_gen::{random_bytes, RandGen};
     use crate::util::bit_util;
+    use crate::util::test_common::rand_gen::{random_bytes, RandGen};
 
     const TEST_SET_SIZE: usize = 1024;
 
     #[test]
     fn test_get_encoders() {
         // supported encodings
-        create_and_check_encoder::<Int32Type>(Encoding::PLAIN, None);
-        create_and_check_encoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, 
None);
-        
create_and_check_encoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
-        create_and_check_encoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY, 
None);
-        create_and_check_encoder::<BoolType>(Encoding::RLE, None);
+        create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
+        create_and_check_encoder::<Int32Type>(0, 
Encoding::DELTA_BINARY_PACKED, None);
+        create_and_check_encoder::<Int32Type>(0, 
Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
+        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, 
None);
+        create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
 
         // error when initializing
         create_and_check_encoder::<Int32Type>(
+            0,
             Encoding::RLE_DICTIONARY,
             Some(general_err!(
                 "Cannot initialize this encoding through this function"
             )),
         );
         create_and_check_encoder::<Int32Type>(
+            0,
             Encoding::PLAIN_DICTIONARY,
             Some(general_err!(
                 "Cannot initialize this encoding through this function"
@@ -798,6 +803,7 @@ mod tests {
         // unsupported
         #[allow(deprecated)]
         create_and_check_encoder::<Int32Type>(
+            0,
             Encoding::BIT_PACKED,
             Some(nyi_err!("Encoding BIT_PACKED is not supported")),
         );
@@ -815,6 +821,7 @@ mod tests {
         Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
         Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
         Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
+        Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
     }
 
     #[test]
@@ -822,6 +829,7 @@ mod tests {
         Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
         Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
         Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
+        Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
     }
 
     #[test]
@@ -853,10 +861,11 @@ mod tests {
     }
 
     #[test]
-    fn test_fixed_lenbyte_array() {
+    fn test_fixed_len_byte_array() {
         FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
         FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 
100);
         FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 
100);
+        FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, 
TEST_SET_SIZE, 100);
     }
 
     #[test]
@@ -905,7 +914,7 @@ mod tests {
                 Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
                     Box::new(create_test_dict_encoder::<T>(type_length))
                 }
-                _ => create_test_encoder::<T>(encoding),
+                _ => create_test_encoder::<T>(type_length, encoding),
             };
             assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
 
@@ -960,7 +969,7 @@ mod tests {
     #[test]
     fn test_byte_stream_split_example_f32() {
         // Test data from 
https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9
-        let mut encoder = 
create_test_encoder::<FloatType>(Encoding::BYTE_STREAM_SPLIT);
+        let mut encoder = create_test_encoder::<FloatType>(0, 
Encoding::BYTE_STREAM_SPLIT);
         let mut decoder = create_test_decoder::<FloatType>(0, 
Encoding::BYTE_STREAM_SPLIT);
 
         let input = vec![
@@ -989,7 +998,7 @@ mod tests {
     // See: https://github.com/sunchao/parquet-rs/issues/47
     #[test]
     fn test_issue_47() {
-        let mut encoder = 
create_test_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+        let mut encoder = create_test_encoder::<ByteArrayType>(0, 
Encoding::DELTA_BYTE_ARRAY);
         let mut decoder = create_test_decoder::<ByteArrayType>(0, 
Encoding::DELTA_BYTE_ARRAY);
 
         let input = vec![
@@ -1039,7 +1048,7 @@ mod tests {
 
     impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
         fn test_internal(enc: Encoding, total: usize, type_length: i32) -> 
Result<()> {
-            let mut encoder = create_test_encoder::<T>(enc);
+            let mut encoder = create_test_encoder::<T>(type_length, enc);
             let mut decoder = create_test_decoder::<T>(type_length, enc);
             let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
             let mut result_data = vec![T::T::default(); total];
@@ -1137,8 +1146,13 @@ mod tests {
         decoder.get(output)
     }
 
-    fn create_and_check_encoder<T: DataType>(encoding: Encoding, err: 
Option<ParquetError>) {
-        let encoder = get_encoder::<T>(encoding);
+    fn create_and_check_encoder<T: DataType>(
+        type_length: i32,
+        encoding: Encoding,
+        err: Option<ParquetError>,
+    ) {
+        let desc = create_test_col_desc_ptr(type_length, 
T::get_physical_type());
+        let encoder = get_encoder::<T>(encoding, &desc);
         match err {
             Some(parquet_error) => {
                 assert_eq!(
@@ -1164,8 +1178,9 @@ mod tests {
         ))
     }
 
-    fn create_test_encoder<T: DataType>(enc: Encoding) -> Box<dyn Encoder<T>> {
-        get_encoder(enc).unwrap()
+    fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> 
Box<dyn Encoder<T>> {
+        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
+        get_encoder(enc, &desc).unwrap()
     }
 
     fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> 
Box<dyn Decoder<T>> {
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 70aea6fd5a..07e80f7f69 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1765,4 +1765,42 @@ mod tests {
             _ => unreachable!(),
         }
     }
+
+    #[test]
+    fn test_byte_stream_split_extended() {
+        let path = format!(
+            "{}/byte_stream_split_extended.gzip.parquet",
+            arrow::util::test_util::parquet_test_data(),
+        );
+        let file = File::open(path).unwrap();
+        let reader = Box::new(SerializedFileReader::new(file).expect("Failed 
to create reader"));
+
+        // Use full schema as projected schema
+        let mut iter = reader
+            .get_row_iter(None)
+            .expect("Failed to create row iterator");
+
+        let mut start = 0;
+        let end = reader.metadata().file_metadata().num_rows();
+
+        let check_row = |row: Result<Row, ParquetError>| {
+            assert!(row.is_ok());
+            let r = row.unwrap();
+            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
+            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
+            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
+            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
+            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
+            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
+            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
+        };
+
+        while start < end {
+            match iter.next() {
+                Some(row) => check_row(row),
+                None => break,
+            };
+            start += 1;
+        }
+    }
 }
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index bcf8449b44..cf383103df 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -814,9 +814,15 @@ impl<'a, W: Write + Send> PageWriter for 
SerializedPageWriter<'a, W> {
 mod tests {
     use super::*;
 
+    #[cfg(feature = "arrow")]
+    use arrow_array::RecordBatchReader;
     use bytes::Bytes;
     use std::fs::File;
 
+    #[cfg(feature = "arrow")]
+    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    #[cfg(feature = "arrow")]
+    use crate::arrow::ArrowWriter;
     use crate::basic::{
         ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, 
Repetition, SortOrder, Type,
     };
@@ -2063,4 +2069,115 @@ mod tests {
         assert_eq!(offset_index[0].len(), 1);
         assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
     }
+
+    #[test]
+    #[cfg(feature = "arrow")]
+    fn test_byte_stream_split_extended_roundtrip() {
+        let path = format!(
+            "{}/byte_stream_split_extended.gzip.parquet",
+            arrow::util::test_util::parquet_test_data(),
+        );
+        let file = File::open(path).unwrap();
+
+        // Read in test file and rewrite to tmp
+        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
+            .expect("parquet open")
+            .build()
+            .expect("parquet open");
+
+        let file = tempfile::tempfile().unwrap();
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_column_encoding(
+                ColumnPath::from("float16_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("float_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("double_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("int32_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("int64_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("flba5_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .set_column_encoding(
+                ColumnPath::from("decimal_byte_stream_split"),
+                Encoding::BYTE_STREAM_SPLIT,
+            )
+            .build();
+
+        let mut parquet_writer = ArrowWriter::try_new(
+            file.try_clone().expect("cannot open file"),
+            parquet_reader.schema(),
+            Some(props),
+        )
+        .expect("create arrow writer");
+
+        for maybe_batch in parquet_reader {
+            let batch = maybe_batch.expect("reading batch");
+            parquet_writer.write(&batch).expect("writing data");
+        }
+
+        parquet_writer.close().expect("finalizing file");
+
+        let reader = SerializedFileReader::new(file).expect("Failed to create 
reader");
+        let filemeta = reader.metadata();
+
+        // Make sure byte_stream_split encoding was used
+        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
+            assert!(filemeta
+                .row_group(0)
+                .column(x)
+                .encodings()
+                .contains(&Encoding::BYTE_STREAM_SPLIT));
+        };
+
+        check_encoding(1, filemeta);
+        check_encoding(3, filemeta);
+        check_encoding(5, filemeta);
+        check_encoding(7, filemeta);
+        check_encoding(9, filemeta);
+        check_encoding(11, filemeta);
+        check_encoding(13, filemeta);
+
+        // Read back tmpfile and make sure all values are correct
+        let mut iter = reader
+            .get_row_iter(None)
+            .expect("Failed to create row iterator");
+
+        let mut start = 0;
+        let end = reader.metadata().file_metadata().num_rows();
+
+        let check_row = |row: Result<Row, ParquetError>| {
+            assert!(row.is_ok());
+            let r = row.unwrap();
+            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
+            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
+            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
+            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
+            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
+            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
+            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
+        };
+
+        while start < end {
+            match iter.next() {
+                Some(row) => check_row(row),
+                None => break,
+            };
+            start += 1;
+        }
+    }
 }
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 858fd8c234..3db43aef0e 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -24,9 +24,10 @@ use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, Encoder};
 use crate::encodings::levels::LevelEncoder;
 use crate::errors::Result;
-use crate::schema::types::ColumnDescPtr;
+use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type 
as SchemaType};
 use std::iter::Peekable;
 use std::mem;
+use std::sync::Arc;
 
 pub trait DataPageBuilder {
     fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -107,9 +108,22 @@ impl DataPageBuilder for DataPageBuilderImpl {
             self.num_values,
             values.len()
         );
+        // Create test column descriptor.
+        let desc = {
+            let ty = SchemaType::primitive_type_builder("t", 
T::get_physical_type())
+                .with_length(0)
+                .build()
+                .unwrap();
+            Arc::new(ColumnDescriptor::new(
+                Arc::new(ty),
+                0,
+                0,
+                ColumnPath::new(vec![]),
+            ))
+        };
         self.encoding = Some(encoding);
         let mut encoder: Box<dyn Encoder<T>> =
-            get_encoder::<T>(encoding).expect("get_encoder() should be OK");
+            get_encoder::<T>(encoding, &desc).expect("get_encoder() should be 
OK");
         encoder.put(values).expect("put() should be OK");
         let encoded_values = encoder
             .flush_buffer()

Reply via email to