ilya-biryukov commented on a change in pull request #709:
URL: https://github.com/apache/arrow-rs/pull/709#discussion_r694924721
##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
}
}
+pub(crate) struct DeltaByteArrayValueDecoder {
+ decoder: DeltaByteArrayDecoder<ByteArrayType>,
+}
+
+impl DeltaByteArrayValueDecoder {
+ pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
+ let mut decoder = DeltaByteArrayDecoder::new();
+ decoder.set_data(data, num_values)?;
+ Ok(Self { decoder })
+ }
+}
+
+impl ValueDecoder for DeltaByteArrayValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ mut num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ num_values = std::cmp::min(num_values, self.decoder.values_left());
+ let mut values_read = 0;
+ while values_read < num_values {
+ let mut buf = [ByteArray::new()];
Review comment:
1. I initially thought it should not make a difference as the allocation
is static. But your comment got me thinking and I did some
[benchmarking](https://gist.github.com/ilya-biryukov/d053433923ec7ab9b94e897a649a92c5).
Putting the let statement outside the loop is actually > 3x faster in the
benchmarks, so it's definitely a good habit to do this.
2. It's a restriction that `StringArrayConverter` imposes, see
https://github.com/apache/arrow-rs/blob/f438f72bb1b2e61c095c4ea580e6abe2d5048360/parquet/src/arrow/arrow_array_reader.rs#L1178
I was not eager to change it here as this focuses on fixing the bug, rather
than perf optimization.
##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -1559,4 +1605,120 @@ mod tests {
array_reader.get_rep_levels()
);
}
+
+ /// Allows to write parquet into memory. Intended only for use in tests.
+ #[derive(Clone)]
+ struct VecWriter {
+ data: Arc<Mutex<Cursor<Vec<u8>>>>,
+ }
+
+ impl VecWriter {
+ pub fn new() -> VecWriter {
+ VecWriter {
+ data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
+ }
+ }
+
+ pub fn consume(self) -> Vec<u8> {
+ Arc::try_unwrap(self.data)
+ .unwrap()
+ .into_inner()
+ .unwrap()
+ .into_inner()
+ }
+ }
+
+ impl TryClone for VecWriter {
+ fn try_clone(&self) -> std::io::Result<Self> {
+ Ok(self.clone())
+ }
+ }
+
+ impl Seek for VecWriter {
+ fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
+ self.data.lock().unwrap().seek(pos)
+ }
+
+ fn stream_position(&mut self) -> std::io::Result<u64> {
+ self.data.lock().unwrap().stream_position()
+ }
+ }
+
+ impl Write for VecWriter {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ self.data.lock().unwrap().write(buf)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.data.lock().unwrap().flush()
+ }
+ }
+
+ #[test]
+ fn test_string_delta_byte_array() {
+ use crate::basic;
+ use crate::schema::types::Type;
+
+ let data = VecWriter::new();
+ let schema = Arc::new(
+ Type::group_type_builder("string_test")
+ .with_fields(&mut vec![Arc::new(
+ Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
+ .with_converted_type(ConvertedType::UTF8)
+ .build()
+ .unwrap(),
+ )])
+ .build()
+ .unwrap(),
+ );
+ // Disable dictionary and use the fallback encoding.
+ let p = Arc::new(
+ WriterProperties::builder()
+ .set_dictionary_enabled(false)
+ .set_encoding(Encoding::DELTA_BYTE_ARRAY)
+ .build(),
+ );
+ // Write a few strings.
+ let mut w = SerializedFileWriter::new(data.clone(), schema,
p).unwrap();
+ let mut rg = w.next_row_group().unwrap();
+ let mut c = rg.next_column().unwrap().unwrap();
+ match &mut c {
+ ColumnWriter::ByteArrayColumnWriter(c) => {
+ c.write_batch(
+ &[ByteArray::from("foo"), ByteArray::from("bar")],
Review comment:
Done.
##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
}
}
+pub(crate) struct DeltaByteArrayValueDecoder {
+ decoder: DeltaByteArrayDecoder<ByteArrayType>,
+}
+
+impl DeltaByteArrayValueDecoder {
+ pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
+ let mut decoder = DeltaByteArrayDecoder::new();
+ decoder.set_data(data, num_values)?;
+ Ok(Self { decoder })
+ }
+}
+
+impl ValueDecoder for DeltaByteArrayValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ mut num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ num_values = std::cmp::min(num_values, self.decoder.values_left());
+ let mut values_read = 0;
+ while values_read < num_values {
+ let mut buf = [ByteArray::new()];
+ let num_read = self.decoder.get(&mut buf)?;
+ assert_eq!(num_read, 1);
Review comment:
Thx, good point!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]