This is an automated email from the ASF dual-hosted git repository.
jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6fafedf ARROW-10804: [Rust] Removed some unsafe code from the parquet
crate
6fafedf is described below
commit 6fafedf784c531c22d12303f53fb1adfabb85355
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Wed Dec 16 05:44:16 2020 +0000
ARROW-10804: [Rust] Removed some unsafe code from the parquet crate
This PR removes an unsafe code by its safe counterpart.
Closes #8829 from jorgecarleitao/remove_unsafe
Lead-authored-by: Jorge C. Leitao <[email protected]>
Co-authored-by: Neville Dipale <[email protected]>
Signed-off-by: Jorge C. Leitao <[email protected]>
---
rust/parquet/src/arrow/record_reader.rs | 224 ++++++++++++--------------------
1 file changed, 85 insertions(+), 139 deletions(-)
diff --git a/rust/parquet/src/arrow/record_reader.rs
b/rust/parquet/src/arrow/record_reader.rs
index 16b0846..14908de 100644
--- a/rust/parquet/src/arrow/record_reader.rs
+++ b/rust/parquet/src/arrow/record_reader.rs
@@ -16,10 +16,7 @@
// under the License.
use std::cmp::{max, min};
-use std::mem::align_of;
-use std::mem::size_of;
-use std::mem::{replace, swap};
-use std::slice;
+use std::mem::{replace, size_of};
use crate::column::{page::PageReader, reader::ColumnReaderImpl};
use crate::data_type::DataType;
@@ -28,7 +25,6 @@ use crate::schema::types::ColumnDescPtr;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::{Buffer, MutableBuffer};
-use arrow::memory;
const MIN_BATCH_SIZE: usize = 1024;
@@ -53,45 +49,6 @@ pub struct RecordReader<T: DataType> {
in_middle_of_record: bool,
}
-#[derive(Debug)]
-struct FatPtr<'a, T> {
- ptr: &'a mut [T],
-}
-
-impl<'a, T> FatPtr<'a, T> {
- fn new(ptr: &'a mut [T]) -> Self {
- Self { ptr }
- }
-
- fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self {
- FatPtr::<T>::with_offset_and_size(buf, offset, size_of::<T>())
- }
-
- fn with_offset_and_size(
- buf: &'a mut MutableBuffer,
- offset: usize,
- type_size: usize,
- ) -> Self {
- assert!(align_of::<T>() <= memory::ALIGNMENT);
- // TODO Prevent this from being called with non primitive types (like
`Box<A>`)
- unsafe {
- FatPtr::new(slice::from_raw_parts_mut(
- &mut *(buf.raw_data() as *mut T).add(offset),
- buf.capacity() / type_size - offset,
- ))
- }
- }
-
- fn to_slice(&self) -> &[T] {
- self.ptr
- }
-
- #[allow(clippy::wrong_self_convention)]
- fn to_slice_mut(&mut self) -> &mut [T] {
- self.ptr
- }
-}
-
impl<T: DataType> RecordReader<T> {
pub fn new(column_schema: ColumnDescPtr) -> Self {
let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
@@ -199,21 +156,19 @@ impl<T: DataType> RecordReader<T> {
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
let new_buffer = if let Some(ref mut def_levels_buf) = &mut
self.def_levels {
let num_left_values = self.values_written - self.num_values;
- let mut new_buffer = MutableBuffer::new(
- size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
- );
- new_buffer.resize(num_left_values * size_of::<i16>());
+ // create an empty buffer, as it will be resized below
+ let mut new_buffer = MutableBuffer::new(0);
+ let num_bytes = num_left_values * size_of::<i16>();
+ let new_len = self.num_values * size_of::<i16>();
+
+ new_buffer.resize(num_bytes);
- let mut new_def_levels = FatPtr::<i16>::with_offset(&mut
new_buffer, 0);
- let new_def_levels = new_def_levels.to_slice_mut();
- let left_def_levels =
- FatPtr::<i16>::with_offset(def_levels_buf, self.num_values);
- let left_def_levels = left_def_levels.to_slice();
+ let new_def_levels = new_buffer.data_mut();
+ let left_def_levels = &def_levels_buf.data_mut()[new_len..];
- new_def_levels[0..num_left_values]
- .copy_from_slice(&left_def_levels[0..num_left_values]);
+
new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]);
- def_levels_buf.resize(self.num_values * size_of::<i16>());
+ def_levels_buf.resize(new_len);
Some(new_buffer)
} else {
None
@@ -228,21 +183,19 @@ impl<T: DataType> RecordReader<T> {
// TODO: Optimize to reduce the copy
let new_buffer = if let Some(ref mut rep_levels_buf) = &mut
self.rep_levels {
let num_left_values = self.values_written - self.num_values;
- let mut new_buffer = MutableBuffer::new(
- size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
- );
- new_buffer.resize(num_left_values * size_of::<i16>());
+ // create an empty buffer, as it will be resized below
+ let mut new_buffer = MutableBuffer::new(0);
+ let num_bytes = num_left_values * size_of::<i16>();
+ let new_len = self.num_values * size_of::<i16>();
- let mut new_rep_levels = FatPtr::<i16>::with_offset(&mut
new_buffer, 0);
- let new_rep_levels = new_rep_levels.to_slice_mut();
- let left_rep_levels =
- FatPtr::<i16>::with_offset(rep_levels_buf, self.num_values);
- let left_rep_levels = left_rep_levels.to_slice();
+ new_buffer.resize(num_bytes);
- new_rep_levels[0..num_left_values]
- .copy_from_slice(&left_rep_levels[0..num_left_values]);
+ let new_rep_levels = new_buffer.data_mut();
+ let left_rep_levels = &rep_levels_buf.data_mut()[new_len..];
- rep_levels_buf.resize(self.num_values * size_of::<i16>());
+
new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]);
+
+ rep_levels_buf.resize(new_len);
Some(new_buffer)
} else {
@@ -257,24 +210,19 @@ impl<T: DataType> RecordReader<T> {
pub fn consume_record_data(&mut self) -> Result<Buffer> {
// TODO: Optimize to reduce the copy
let num_left_values = self.values_written - self.num_values;
- let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE,
num_left_values));
- new_buffer.resize(num_left_values * T::get_type_size());
-
- let mut new_records =
- FatPtr::<T::T>::with_offset_and_size(&mut new_buffer, 0,
T::get_type_size());
- let new_records = new_records.to_slice_mut();
- let mut left_records = FatPtr::<T::T>::with_offset_and_size(
- &mut self.records,
- self.num_values,
- T::get_type_size(),
- );
- let left_records = left_records.to_slice_mut();
+ // create an empty buffer, as it will be resized below
+ let mut new_buffer = MutableBuffer::new(0);
+ let num_bytes = num_left_values * T::get_type_size();
+ let new_len = self.num_values * T::get_type_size();
- for idx in 0..num_left_values {
- swap(&mut new_records[idx], &mut left_records[idx]);
- }
+ new_buffer.resize(num_bytes);
+
+ let new_records = new_buffer.data_mut();
+ let left_records = &mut self.records.data_mut()[new_len..];
- self.records.resize(self.num_values * T::get_type_size());
+ new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]);
+
+ self.records.resize(new_len);
Ok(replace(&mut self.records, new_buffer).freeze())
}
@@ -331,70 +279,71 @@ impl<T: DataType> RecordReader<T> {
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
// Reserve spaces
self.records
- .reserve(self.records.len() + batch_size * T::get_type_size());
+ .resize(self.records.len() + batch_size * T::get_type_size());
if let Some(ref mut buf) = self.rep_levels {
- buf.reserve(buf.len() + batch_size * size_of::<i16>());
+ buf.resize(buf.len() + batch_size * size_of::<i16>());
}
if let Some(ref mut buf) = self.def_levels {
- buf.reserve(buf.len() + batch_size * size_of::<i16>());
+ buf.resize(buf.len() + batch_size * size_of::<i16>());
}
- // Convert mutable buffer spaces to mutable slices
- let mut values_buf = FatPtr::<T::T>::with_offset_and_size(
- &mut self.records,
- self.values_written,
- T::get_type_size(),
- );
-
let values_written = self.values_written;
- let mut def_levels_buf = self
- .def_levels
- .as_mut()
- .map(|buf| FatPtr::<i16>::with_offset(buf, values_written));
- let mut rep_levels_buf = self
- .rep_levels
+ // Convert mutable buffer spaces to mutable slices
+ let (prefix, values, suffix) =
+ unsafe { self.records.data_mut().align_to_mut::<T::T>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+ let values = &mut values[values_written..];
+
+ let def_levels = self.def_levels.as_mut().map(|buf| {
+ let (prefix, def_levels, suffix) =
+ unsafe { buf.data_mut().align_to_mut::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+ &mut def_levels[values_written..]
+ });
+
+ let rep_levels = self.rep_levels.as_mut().map(|buf| {
+ let (prefix, rep_levels, suffix) =
+ unsafe { buf.data_mut().align_to_mut::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+ &mut rep_levels[values_written..]
+ });
+
+ let (values_read, levels_read) = self
+ .column_reader
.as_mut()
- .map(|buf| FatPtr::<i16>::with_offset(buf, values_written));
+ .unwrap()
+ .read_batch(batch_size, def_levels, rep_levels, values)?;
- let (values_read, levels_read) =
- self.column_reader.as_mut().unwrap().read_batch(
- batch_size,
- def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
- rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
- values_buf.to_slice_mut(),
- )?;
+ // get new references for the def levels.
+ let def_levels = self.def_levels.as_ref().map(|buf| {
+ let (prefix, def_levels, suffix) = unsafe {
buf.data().align_to::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+ &def_levels[values_written..]
+ });
let max_def_level = self.column_desc.max_def_level();
if values_read < levels_read {
- // This means that there are null values in column data
- // TODO: Move this into ColumnReader
-
- let values_buf = values_buf.to_slice_mut();
-
- let def_levels_buf = def_levels_buf
- .as_mut()
- .map(|ptr| ptr.to_slice_mut())
- .ok_or_else(|| {
- general_err!(
- "Definition levels should exist when data is less than
levels!"
- )
- })?;
+ let def_levels = def_levels.ok_or_else(|| {
+ general_err!(
+ "Definition levels should exist when data is less than
levels!"
+ )
+ })?;
// Fill spaces in column data with default values
let mut values_pos = values_read;
let mut level_pos = levels_read;
while level_pos > values_pos {
- if def_levels_buf[level_pos - 1] == max_def_level {
+ if def_levels[level_pos - 1] == max_def_level {
// This values is not empty
// We use swap rather than assign here because T::T doesn't
// implement Copy
- values_buf.swap(level_pos - 1, values_pos - 1);
+ values.swap(level_pos - 1, values_pos - 1);
values_pos -= 1;
} else {
- values_buf[level_pos - 1] = T::T::default();
+ values[level_pos - 1] = T::T::default();
}
level_pos -= 1;
@@ -403,16 +352,13 @@ impl<T: DataType> RecordReader<T> {
// Fill in bitmap data
if let Some(null_buffer) = self.null_bitmap.as_mut() {
- let def_levels_buf = def_levels_buf
- .as_mut()
- .map(|ptr| ptr.to_slice_mut())
- .ok_or_else(|| {
- general_err!(
- "Definition levels should exist when data is less than
levels!"
- )
- })?;
+ let def_levels = def_levels.ok_or_else(|| {
+ general_err!(
+ "Definition levels should exist when data is less than
levels!"
+ )
+ })?;
(0..levels_read).try_for_each(|idx| {
- null_buffer.append(def_levels_buf[idx] == max_def_level)
+ null_buffer.append(def_levels[idx] == max_def_level)
})?;
}
@@ -424,13 +370,13 @@ impl<T: DataType> RecordReader<T> {
/// Split values into records according repetition definition and returns
number of
/// records read.
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
- let rep_levels_buf = self
- .rep_levels
- .as_mut()
- .map(|buf| FatPtr::<i16>::with_offset(buf, 0));
- let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice());
+ let rep_levels = self.rep_levels.as_ref().map(|buf| {
+ let (prefix, rep_levels, suffix) = unsafe {
buf.data().align_to::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+ rep_levels
+ });
- match rep_levels_buf {
+ match rep_levels {
Some(buf) => {
let mut records_read = 0;