alamb commented on code in PR #6159:
URL: https://github.com/apache/arrow-rs/pull/6159#discussion_r1702258773


##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,126 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!(
+                "Input data length is not a multiple of type width {}",
+                self.type_width
+            ));
+        }
+
+        match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => {
+                self.encoded_bytes = data;
+                self.total_num_values = num_values;
+                self.values_decoded = 0;
+                Ok(())
+            }
+            _ => Err(general_err!(
+                "VariableWidthByteStreamSplitDecoder only supports 
FixedLenByteArrayType"
+            )),
+        }
+    }
+
+    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+        let total_remaining_values = self.values_left();
+        let num_values = buffer.len().min(total_remaining_values);
+        let buffer = &mut buffer[..num_values];
+        let type_size = self.type_width;
+
+        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use 
slice_as_bytes_mut. Instead we'll
+        // have to do some data copies.
+        let mut tmp_vec = vec![0_u8; num_values * type_size];
+        let raw_out_bytes = tmp_vec.as_mut_slice();
+
+        let stride = self.encoded_bytes.len() / type_size;
+        match type_size {
+            2 => join_streams_const::<2>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            4 => join_streams_const::<4>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            8 => join_streams_const::<8>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            16 => join_streams_const::<16>(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                self.values_decoded,
+            ),
+            _ => join_streams_variable(
+                &self.encoded_bytes,
+                raw_out_bytes,
+                stride,
+                type_size,
+                self.values_decoded,
+            ),
+        }
+        self.values_decoded += num_values;
+
+        // create a buffer from the vec so far (and leave a new Vec in its 
place)
+        let vec_with_data = std::mem::take(&mut tmp_vec);
+        // convert Vec to Bytes (which is a ref counted wrapper)
+        let bytes_with_data = Bytes::from(vec_with_data);
+        for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
+            // Get a view into the data, without also copying the bytes
+            let data = bytes_with_data.slice(i * type_size..(i + 1) * 
type_size);
+            // TODO: perhaps add a `set_from_bytes` method to `DataType` to 
avoid downcasting

Review Comment:
   Maybe @tustvold  or @XiangpengHao  has some suggestion on how to avoid this 
downcasting



##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,117 @@ impl<T: DataType> Decoder<T> for 
ByteStreamSplitDecoder<T> {
         Ok(to_skip)
     }
 }
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+    _phantom: PhantomData<T>,
+    encoded_bytes: Bytes,
+    total_num_values: usize,
+    values_decoded: usize,
+    type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+    pub(crate) fn new(type_length: i32) -> Self {
+        Self {
+            _phantom: PhantomData,
+            encoded_bytes: Bytes::new(),
+            total_num_values: 0,
+            values_decoded: 0,
+            type_width: type_length as usize,
+        }
+    }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+        // Rough check that all data elements are the same length
+        if data.len() % self.type_width != 0 {
+            return Err(general_err!("Input data is not of fixed length"));
+        }
+
+        match T::get_physical_type() {
+            Type::FIXED_LEN_BYTE_ARRAY => {
+                self.encoded_bytes = data;
+                self.total_num_values = num_values;
+                self.values_decoded = 0;
+                Ok(())
+            }
+            _ => Err(general_err!(
+                "VariableWidthByteStreamSplitDecoder only supports 
FixedLenByteArrayType"
+            )),
+        }
+    }
+
+    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+        let total_remaining_values = self.values_left();
+        let num_values = buffer.len().min(total_remaining_values);
+        let buffer = &mut buffer[..num_values];
+        let type_size = self.type_width;
+
+        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use 
slice_as_bytes_mut. Instead we'll
+        // have to do some data copies.
+        let mut tmp_vec = vec![0_u8; num_values * type_size];
+        let raw_out_bytes = tmp_vec.as_mut_slice();
+
+        let stride = self.encoded_bytes.len() / type_size;
+        match type_size {
+            2 => join_streams_const::<2>(

Review Comment:
   I think this is fine to keep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to