alamb commented on code in PR #6159:
URL: https://github.com/apache/arrow-rs/pull/6159#discussion_r1702114084
##########
parquet/src/encodings/decoding/byte_stream_split_decoder.rs:
##########
@@ -119,3 +136,117 @@ impl<T: DataType> Decoder<T> for
ByteStreamSplitDecoder<T> {
Ok(to_skip)
}
}
+
+pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
+ _phantom: PhantomData<T>,
+ encoded_bytes: Bytes,
+ total_num_values: usize,
+ values_decoded: usize,
+ type_width: usize,
+}
+
+impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
+ pub(crate) fn new(type_length: i32) -> Self {
+ Self {
+ _phantom: PhantomData,
+ encoded_bytes: Bytes::new(),
+ total_num_values: 0,
+ values_decoded: 0,
+ type_width: type_length as usize,
+ }
+ }
+}
+
+impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
+ fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
+ // Rough check that all data elements are the same length
+ if data.len() % self.type_width != 0 {
+ return Err(general_err!("Input data is not of fixed length"));
+ }
+
+ match T::get_physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ self.encoded_bytes = data;
+ self.total_num_values = num_values;
+ self.values_decoded = 0;
+ Ok(())
+ }
+ _ => Err(general_err!(
+ "VariableWidthByteStreamSplitDecoder only supports
FixedLenByteArrayType"
+ )),
+ }
+ }
+
+ fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
+ let total_remaining_values = self.values_left();
+ let num_values = buffer.len().min(total_remaining_values);
+ let buffer = &mut buffer[..num_values];
+ let type_size = self.type_width;
+
+ // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use
slice_as_bytes_mut. Instead we'll
+ // have to do some data copies.
+ let mut tmp_vec = vec![0_u8; num_values * type_size];
+ let raw_out_bytes = tmp_vec.as_mut_slice();
+
+ let stride = self.encoded_bytes.len() / type_size;
+ match type_size {
+ 2 => join_streams_const::<2>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 4 => join_streams_const::<4>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 8 => join_streams_const::<8>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ 16 => join_streams_const::<16>(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ self.values_decoded,
+ ),
+ _ => join_streams(
+ &self.encoded_bytes,
+ raw_out_bytes,
+ stride,
+ type_size,
+ self.values_decoded,
+ ),
+ }
+ self.values_decoded += num_values;
+
+ // FIXME(ets): there's got to be a better way to do this
+ for i in 0..num_values {
+ if let Some(bi) =
buffer[i].as_mut_any().downcast_mut::<FixedLenByteArray>() {
Review Comment:
So this is what I came up with:
```rust
// create a buffer from the vec so far (and leave a new Vec in its
place)
let vec_with_data = std::mem::take(&mut tmp_vec);
// convert Vec to Bytes (which is a ref counted wrapper)
let bytes_with_data = Bytes::from(vec_with_data);
for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
// Get a view into the data, without also copying the bytes
let data = bytes_with_data.slice(i * type_size..(i + 1) *
type_size);
let bi = bi.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.expect("Decoding fixed length byte array");
bi.set_data(data);
}
```
I think it avoids a bunch of allocations (only does one allocation for each
batch) but it is still pretty bad in terms of the `downcast_mut` stuff 🤮. I
suspect we would need to add some other trait method to `DataType` (like
`set_from_bytes` or something to make it work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]