alamb commented on code in PR #6159:
URL: https://github.com/apache/arrow-rs/pull/6159#discussion_r1702087919


##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,117 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!("Input data is not of fixed length"));

Review Comment:
   Could we please make this slightly more informative -- something like `data 
length {} is not a multiple of type width {}`



##########
parquet/src/encodings/decoding.rs:
##########
@@ -27,6 +27,9 @@ use super::rle::RleDecoder;
 use crate::basic::*;
 use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
+use crate::encodings::decoding::byte_stream_split_decoder::{
+    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,

Review Comment:
   I think this is a good model -- to have a ByteStreamSplit deocder and a 
VariableWidthByteStreamSplitDecoder (I realize I also partly suggested it but I 
like how it looks)



##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -62,6 +63,22 @@ fn join_streams_const<const TYPE_SIZE: usize>(
     }
 }
 
+// Like the above, but type_size is not known at compile time.

Review Comment:
   maybe it could be called `join_streams_variable` to match the name of the 
decoder



##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,117 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!("Input data is not of fixed length"));
+        }
+
+        match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => {
+                self.encoded_bytes = data;
+                self.total_num_values = num_values;
+                self.values_decoded = 0;
+                Ok(())
+            }
+            _ => Err(general_err!(
+                "VariableWidthByteStreamSplitDecoder only supports 
FixedLenByteArrayType"
+            )),
+        }
+    }
+
+    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+        let total_remaining_values = self.values_left();
+        let num_values = buffer.len().min(total_remaining_values);
+        let buffer = &mut buffer[..num_values];
+        let type_size = self.type_width;
+
+        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use 
slice_as_bytes_mut. Instead we'll
+        // have to do some data copies.
+        let mut tmp_vec = vec![0_u8; num_values * type_size];
+        let raw_out_bytes = tmp_vec.as_mut_slice();
+
+        let stride = self.encoded_bytes.len() / type_size;
+        match type_size {
+            2 => join_streams_const::<2>(

Review Comment:
   since this is the variable length decoder, is there any reason to generate 
code for these special case lengths (2, 4, ...)? As in it could simply call 
`join_streams` directly 🤔 
   
   I don't think it would be all that bad but it also may be unecessary



##########
parquet/src/encodings/encoding/byte_stream_split_encoder.rs:
##########
@@ -53,13 +52,24 @@ fn split_streams_const<const TYPE_SIZE: usize>(src: &[u8], 
dst: &mut [u8]) {
     }
 }
 
+// Like above, but type_size is not known at compile time.
+fn split_streams(src: &[u8], dst: &mut [u8], type_size: usize) {

Review Comment:
   ditto here -- maybe 
   
   ```suggestion
   fn split_streams_variable(src: &[u8], dst: &mut [u8], type_size: usize) {
   ```



##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,117 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!("Input data is not of fixed length"));
+        }
+
+        match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => {
+                self.encoded_bytes = data;
+                self.total_num_values = num_values;
+                self.values_decoded = 0;
+                Ok(())
+            }
+            _ => Err(general_err!(
+                "VariableWidthByteStreamSplitDecoder only supports 
FixedLenByteArrayType"
+            )),
+        }
+    }
+
+    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+        let total_remaining_values = self.values_left();
+        let num_values = buffer.len().min(total_remaining_values);
+        let buffer = &mut buffer[..num_values];
+        let type_size = self.type_width;
+
+        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use 
slice_as_bytes_mut. Instead we'll
+        // have to do some data copies.
+        let mut tmp_vec = vec![0_u8; num_values * type_size];
+        let raw_out_bytes = tmp_vec.as_mut_slice();
+
+        let stride = self.encoded_bytes.len() / type_size;
+        match type_size {
+            2 => join_streams_const::<2>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            4 => join_streams_const::<4>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            8 => join_streams_const::<8>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            16 => join_streams_const::<16>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            _ => join_streams(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                type_size,
+                self.values_decoded,
+            ),
+        }
+        self.values_decoded += num_values;
+
+        // FIXME(ets): there's got to be a better way to do this
+        for i in 0..num_values {
+            if let Some(bi) = 
buffer[i].as_mut_any().downcast_mut::<FixedLenByteArray>() {

Review Comment:
   Shouldn't this error/panic if the value isn't actually a 
`FixexLenByteArary`? 
   
   Something like 
   ```rust
   let bi = buffer[i].as_mut_any().downcast_mut::<FixedLenByteArray>()
     .expect("Decoding fixed length byte array");
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1641,6 +1643,86 @@ mod tests {
         assert_eq!(row_count, 300);
     }
 
+    #[test]
+    fn test_read_extended_byte_stream_split() {
+        let path = format!(

Review Comment:
   The fact that the existing, non BSS columns (not changes by this PR) come 
back the same gives me confidence that the code is doing the right thing. I 
just found it straange that python seemed to give me a different result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to