This is an automated email from the ASF dual-hosted git repository.

jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fafedf  ARROW-10804: [Rust] Removed some unsafe code from the parquet 
crate
6fafedf is described below

commit 6fafedf784c531c22d12303f53fb1adfabb85355
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Wed Dec 16 05:44:16 2020 +0000

    ARROW-10804: [Rust] Removed some unsafe code from the parquet crate
    
    This PR removes an unsafe code by its safe counterpart.
    
    Closes #8829 from jorgecarleitao/remove_unsafe
    
    Lead-authored-by: Jorge C. Leitao <[email protected]>
    Co-authored-by: Neville Dipale <[email protected]>
    Signed-off-by: Jorge C. Leitao <[email protected]>
---
 rust/parquet/src/arrow/record_reader.rs | 224 ++++++++++++--------------------
 1 file changed, 85 insertions(+), 139 deletions(-)

diff --git a/rust/parquet/src/arrow/record_reader.rs 
b/rust/parquet/src/arrow/record_reader.rs
index 16b0846..14908de 100644
--- a/rust/parquet/src/arrow/record_reader.rs
+++ b/rust/parquet/src/arrow/record_reader.rs
@@ -16,10 +16,7 @@
 // under the License.
 
 use std::cmp::{max, min};
-use std::mem::align_of;
-use std::mem::size_of;
-use std::mem::{replace, swap};
-use std::slice;
+use std::mem::{replace, size_of};
 
 use crate::column::{page::PageReader, reader::ColumnReaderImpl};
 use crate::data_type::DataType;
@@ -28,7 +25,6 @@ use crate::schema::types::ColumnDescPtr;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::{Buffer, MutableBuffer};
-use arrow::memory;
 
 const MIN_BATCH_SIZE: usize = 1024;
 
@@ -53,45 +49,6 @@ pub struct RecordReader<T: DataType> {
     in_middle_of_record: bool,
 }
 
-#[derive(Debug)]
-struct FatPtr<'a, T> {
-    ptr: &'a mut [T],
-}
-
-impl<'a, T> FatPtr<'a, T> {
-    fn new(ptr: &'a mut [T]) -> Self {
-        Self { ptr }
-    }
-
-    fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self {
-        FatPtr::<T>::with_offset_and_size(buf, offset, size_of::<T>())
-    }
-
-    fn with_offset_and_size(
-        buf: &'a mut MutableBuffer,
-        offset: usize,
-        type_size: usize,
-    ) -> Self {
-        assert!(align_of::<T>() <= memory::ALIGNMENT);
-        // TODO Prevent this from being called with non primitive types (like 
`Box<A>`)
-        unsafe {
-            FatPtr::new(slice::from_raw_parts_mut(
-                &mut *(buf.raw_data() as *mut T).add(offset),
-                buf.capacity() / type_size - offset,
-            ))
-        }
-    }
-
-    fn to_slice(&self) -> &[T] {
-        self.ptr
-    }
-
-    #[allow(clippy::wrong_self_convention)]
-    fn to_slice_mut(&mut self) -> &mut [T] {
-        self.ptr
-    }
-}
-
 impl<T: DataType> RecordReader<T> {
     pub fn new(column_schema: ColumnDescPtr) -> Self {
         let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
@@ -199,21 +156,19 @@ impl<T: DataType> RecordReader<T> {
     pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
         let new_buffer = if let Some(ref mut def_levels_buf) = &mut 
self.def_levels {
             let num_left_values = self.values_written - self.num_values;
-            let mut new_buffer = MutableBuffer::new(
-                size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
-            );
-            new_buffer.resize(num_left_values * size_of::<i16>());
+            // create an empty buffer, as it will be resized below
+            let mut new_buffer = MutableBuffer::new(0);
+            let num_bytes = num_left_values * size_of::<i16>();
+            let new_len = self.num_values * size_of::<i16>();
+
+            new_buffer.resize(num_bytes);
 
-            let mut new_def_levels = FatPtr::<i16>::with_offset(&mut 
new_buffer, 0);
-            let new_def_levels = new_def_levels.to_slice_mut();
-            let left_def_levels =
-                FatPtr::<i16>::with_offset(def_levels_buf, self.num_values);
-            let left_def_levels = left_def_levels.to_slice();
+            let new_def_levels = new_buffer.data_mut();
+            let left_def_levels = &def_levels_buf.data_mut()[new_len..];
 
-            new_def_levels[0..num_left_values]
-                .copy_from_slice(&left_def_levels[0..num_left_values]);
+            
new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]);
 
-            def_levels_buf.resize(self.num_values * size_of::<i16>());
+            def_levels_buf.resize(new_len);
             Some(new_buffer)
         } else {
             None
@@ -228,21 +183,19 @@ impl<T: DataType> RecordReader<T> {
         // TODO: Optimize to reduce the copy
         let new_buffer = if let Some(ref mut rep_levels_buf) = &mut 
self.rep_levels {
             let num_left_values = self.values_written - self.num_values;
-            let mut new_buffer = MutableBuffer::new(
-                size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
-            );
-            new_buffer.resize(num_left_values * size_of::<i16>());
+            // create an empty buffer, as it will be resized below
+            let mut new_buffer = MutableBuffer::new(0);
+            let num_bytes = num_left_values * size_of::<i16>();
+            let new_len = self.num_values * size_of::<i16>();
 
-            let mut new_rep_levels = FatPtr::<i16>::with_offset(&mut 
new_buffer, 0);
-            let new_rep_levels = new_rep_levels.to_slice_mut();
-            let left_rep_levels =
-                FatPtr::<i16>::with_offset(rep_levels_buf, self.num_values);
-            let left_rep_levels = left_rep_levels.to_slice();
+            new_buffer.resize(num_bytes);
 
-            new_rep_levels[0..num_left_values]
-                .copy_from_slice(&left_rep_levels[0..num_left_values]);
+            let new_rep_levels = new_buffer.data_mut();
+            let left_rep_levels = &rep_levels_buf.data_mut()[new_len..];
 
-            rep_levels_buf.resize(self.num_values * size_of::<i16>());
+            
new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]);
+
+            rep_levels_buf.resize(new_len);
 
             Some(new_buffer)
         } else {
@@ -257,24 +210,19 @@ impl<T: DataType> RecordReader<T> {
     pub fn consume_record_data(&mut self) -> Result<Buffer> {
         // TODO: Optimize to reduce the copy
         let num_left_values = self.values_written - self.num_values;
-        let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, 
num_left_values));
-        new_buffer.resize(num_left_values * T::get_type_size());
-
-        let mut new_records =
-            FatPtr::<T::T>::with_offset_and_size(&mut new_buffer, 0, 
T::get_type_size());
-        let new_records = new_records.to_slice_mut();
-        let mut left_records = FatPtr::<T::T>::with_offset_and_size(
-            &mut self.records,
-            self.num_values,
-            T::get_type_size(),
-        );
-        let left_records = left_records.to_slice_mut();
+        // create an empty buffer, as it will be resized below
+        let mut new_buffer = MutableBuffer::new(0);
+        let num_bytes = num_left_values * T::get_type_size();
+        let new_len = self.num_values * T::get_type_size();
 
-        for idx in 0..num_left_values {
-            swap(&mut new_records[idx], &mut left_records[idx]);
-        }
+        new_buffer.resize(num_bytes);
+
+        let new_records = new_buffer.data_mut();
+        let left_records = &mut self.records.data_mut()[new_len..];
 
-        self.records.resize(self.num_values * T::get_type_size());
+        new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]);
+
+        self.records.resize(new_len);
 
         Ok(replace(&mut self.records, new_buffer).freeze())
     }
@@ -331,70 +279,71 @@ impl<T: DataType> RecordReader<T> {
     fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
         // Reserve spaces
         self.records
-            .reserve(self.records.len() + batch_size * T::get_type_size());
+            .resize(self.records.len() + batch_size * T::get_type_size());
         if let Some(ref mut buf) = self.rep_levels {
-            buf.reserve(buf.len() + batch_size * size_of::<i16>());
+            buf.resize(buf.len() + batch_size * size_of::<i16>());
         }
         if let Some(ref mut buf) = self.def_levels {
-            buf.reserve(buf.len() + batch_size * size_of::<i16>());
+            buf.resize(buf.len() + batch_size * size_of::<i16>());
         }
 
-        // Convert mutable buffer spaces to mutable slices
-        let mut values_buf = FatPtr::<T::T>::with_offset_and_size(
-            &mut self.records,
-            self.values_written,
-            T::get_type_size(),
-        );
-
         let values_written = self.values_written;
-        let mut def_levels_buf = self
-            .def_levels
-            .as_mut()
-            .map(|buf| FatPtr::<i16>::with_offset(buf, values_written));
 
-        let mut rep_levels_buf = self
-            .rep_levels
+        // Convert mutable buffer spaces to mutable slices
+        let (prefix, values, suffix) =
+            unsafe { self.records.data_mut().align_to_mut::<T::T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        let values = &mut values[values_written..];
+
+        let def_levels = self.def_levels.as_mut().map(|buf| {
+            let (prefix, def_levels, suffix) =
+                unsafe { buf.data_mut().align_to_mut::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+            &mut def_levels[values_written..]
+        });
+
+        let rep_levels = self.rep_levels.as_mut().map(|buf| {
+            let (prefix, rep_levels, suffix) =
+                unsafe { buf.data_mut().align_to_mut::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+            &mut rep_levels[values_written..]
+        });
+
+        let (values_read, levels_read) = self
+            .column_reader
             .as_mut()
-            .map(|buf| FatPtr::<i16>::with_offset(buf, values_written));
+            .unwrap()
+            .read_batch(batch_size, def_levels, rep_levels, values)?;
 
-        let (values_read, levels_read) =
-            self.column_reader.as_mut().unwrap().read_batch(
-                batch_size,
-                def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
-                rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()),
-                values_buf.to_slice_mut(),
-            )?;
+        // get new references for the def levels.
+        let def_levels = self.def_levels.as_ref().map(|buf| {
+            let (prefix, def_levels, suffix) = unsafe { 
buf.data().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+            &def_levels[values_written..]
+        });
 
         let max_def_level = self.column_desc.max_def_level();
 
         if values_read < levels_read {
-            // This means that there are null values in column data
-            // TODO: Move this into ColumnReader
-
-            let values_buf = values_buf.to_slice_mut();
-
-            let def_levels_buf = def_levels_buf
-                .as_mut()
-                .map(|ptr| ptr.to_slice_mut())
-                .ok_or_else(|| {
-                    general_err!(
-                        "Definition levels should exist when data is less than 
levels!"
-                    )
-                })?;
+            let def_levels = def_levels.ok_or_else(|| {
+                general_err!(
+                    "Definition levels should exist when data is less than 
levels!"
+                )
+            })?;
 
             // Fill spaces in column data with default values
             let mut values_pos = values_read;
             let mut level_pos = levels_read;
 
             while level_pos > values_pos {
-                if def_levels_buf[level_pos - 1] == max_def_level {
+                if def_levels[level_pos - 1] == max_def_level {
                     // This values is not empty
                     // We use swap rather than assign here because T::T doesn't
                     // implement Copy
-                    values_buf.swap(level_pos - 1, values_pos - 1);
+                    values.swap(level_pos - 1, values_pos - 1);
                     values_pos -= 1;
                 } else {
-                    values_buf[level_pos - 1] = T::T::default();
+                    values[level_pos - 1] = T::T::default();
                 }
 
                 level_pos -= 1;
@@ -403,16 +352,13 @@ impl<T: DataType> RecordReader<T> {
 
         // Fill in bitmap data
         if let Some(null_buffer) = self.null_bitmap.as_mut() {
-            let def_levels_buf = def_levels_buf
-                .as_mut()
-                .map(|ptr| ptr.to_slice_mut())
-                .ok_or_else(|| {
-                    general_err!(
-                        "Definition levels should exist when data is less than 
levels!"
-                    )
-                })?;
+            let def_levels = def_levels.ok_or_else(|| {
+                general_err!(
+                    "Definition levels should exist when data is less than 
levels!"
+                )
+            })?;
             (0..levels_read).try_for_each(|idx| {
-                null_buffer.append(def_levels_buf[idx] == max_def_level)
+                null_buffer.append(def_levels[idx] == max_def_level)
             })?;
         }
 
@@ -424,13 +370,13 @@ impl<T: DataType> RecordReader<T> {
     /// Split values into records according repetition definition and returns 
number of
     /// records read.
     fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
-        let rep_levels_buf = self
-            .rep_levels
-            .as_mut()
-            .map(|buf| FatPtr::<i16>::with_offset(buf, 0));
-        let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice());
+        let rep_levels = self.rep_levels.as_ref().map(|buf| {
+            let (prefix, rep_levels, suffix) = unsafe { 
buf.data().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+            rep_levels
+        });
 
-        match rep_levels_buf {
+        match rep_levels {
             Some(buf) => {
                 let mut records_read = 0;
 

Reply via email to