This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1222996  ARROW-3398: [Rust] Update existing Builder to use 
MutableBuffer internally
1222996 is described below

commit 1222996c6d4353067a3f9178fd27834e438dcf6e
Author: Paddy Horan <[email protected]>
AuthorDate: Wed Oct 10 19:32:16 2018 +0200

    ARROW-3398: [Rust] Update existing Builder to use MutableBuffer internally
    
    I also updated a lint issue in buffer.rs that was caught by nightly 
rustfmt.  It's the same issue that was addressed in #2658 so I don't know how 
it got reverted.
    
    There may be other lint issues that I will address as CI flags them (I hope 
not) as locally I am running into [this 
issue](https://github.com/rust-lang-nursery/rustfmt/issues/2916) after running 
`rustup update`
    
    cc @kszucs @sunchao @andygrove
    
    Author: Paddy Horan <[email protected]>
    
    Closes #2700 from paddyhoran/ARROW-3398 and squashes the following commits:
    
    4e05ff03 <Paddy Horan> Re-phrased old doc-comment
    c1bff9e3 <Paddy Horan> Addressed review
    c670d07f <Paddy Horan> Added tests for `write_bytes`
    8dc5ed37 <Paddy Horan> Fixed lint issues
    4cf5e376 <Paddy Horan> Updated `Builder` to use `MutableBuffer` internally
---
 rust/src/builder.rs | 275 +++++++++++++++++++++-------------------------------
 1 file changed, 112 insertions(+), 163 deletions(-)

diff --git a/rust/src/builder.rs b/rust/src/builder.rs
index 8b5438e..2584ae9 100644
--- a/rust/src/builder.rs
+++ b/rust/src/builder.rs
@@ -15,158 +15,99 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use libc;
-use std::cmp;
+//! Defines a `BufferBuilder` capable of creating a `Buffer` which can be used 
as an internal
+//! buffer in an `ArrayData` object.
+
+use std::io::Write;
+use std::marker::PhantomData;
 use std::mem;
-use std::ptr;
-use std::slice;
 
 use super::buffer::*;
 use super::datatypes::*;
-use super::memory::*;
+use error::{ArrowError, Result};
 
 /// Buffer builder with zero-copy build method
-pub struct Builder<T>
+pub struct BufferBuilder<T>
 where
     T: ArrowPrimitiveType,
 {
-    data: *mut T,
-    len: usize,
-    capacity: usize,
+    buffer: MutableBuffer,
+    len: i64,
+    _marker: PhantomData<T>,
 }
 
-impl<T> Builder<T>
+impl<T> BufferBuilder<T>
 where
     T: ArrowPrimitiveType,
 {
-    /// Creates a builder with a default capacity
-    pub fn new() -> Self {
-        Builder::with_capacity(64)
-    }
-
-    /// Creates a builder with a fixed capacity
-    pub fn with_capacity(capacity: usize) -> Self {
-        let sz = mem::size_of::<T>();
-        let buffer = allocate_aligned((capacity * sz) as i64).unwrap();
-        Builder {
+    /// Creates a builder with a fixed initial capacity
+    pub fn new(capacity: i64) -> Self {
+        let buffer = MutableBuffer::new(capacity as usize * 
mem::size_of::<T>());
+        Self {
+            buffer,
             len: 0,
-            capacity,
-            data: buffer as *mut T,
+            _marker: PhantomData,
         }
     }
 
-    /// Get the number of elements in the builder
-    pub fn len(&self) -> usize {
+    /// Returns the number of array elements (slots) in the builder
+    pub fn len(&self) -> i64 {
         self.len
     }
 
-    /// Get the capacity of the builder (number of elements)
-    pub fn capacity(&self) -> usize {
-        self.capacity
-    }
-
-    /// Get the internal byte-aligned memory buffer as a mutable slice
-    pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] {
-        assert!(
-            end <= self.capacity as usize,
-            "the end of the slice must be within the capacity"
-        );
-        assert!(
-            start <= end,
-            "the start of the slice cannot exceed the end of the slice"
-        );
-        unsafe {
-            slice::from_raw_parts_mut(self.data.offset(start as isize), (end - 
start) as usize)
-        }
-    }
-
-    /// Override the length
-    pub fn set_len(&mut self, len: usize) {
-        self.len = len;
+    /// Returns the current capacity of the builder (number of elements)
+    pub fn capacity(&self) -> i64 {
+        let byte_capacity = self.buffer.capacity();
+        (byte_capacity / mem::size_of::<T>()) as i64
     }
 
     /// Push a value into the builder, growing the internal buffer as needed
-    pub fn push(&mut self, v: T) {
-        assert!(!self.data.is_null(), "cannot push onto uninitialized data");
-        if self.len == self.capacity {
-            // grow capacity by 64 bytes or double the current capacity, 
whichever is greater
-            let new_capacity = cmp::max(64, self.capacity * 2);
-            self.grow(new_capacity);
-        }
-        assert!(self.len < self.capacity, "new length exceeds capacity");
-        unsafe {
-            *self.data.offset(self.len as isize) = v;
-        }
-        self.len += 1;
-    }
-
-    /// Set a value at a slot in the allocated memory without adjusting the 
length
-    pub fn set(&mut self, i: usize, v: T) {
-        assert!(
-            !self.data.is_null(),
-            "cannot set value if data is uninitialized"
-        );
-        assert!(i < self.capacity, "index exceeds capacity");
-        unsafe {
-            *self.data.offset(i as isize) = v;
-        }
+    pub fn push(&mut self, v: T) -> Result<()> {
+        self.reserve(1)?;
+        self.write_bytes(v.to_byte_slice(), 1)
     }
 
-    /// push a slice of type T, growing the internal buffer as needed
-    pub fn push_slice(&mut self, slice: &[T]) {
-        self.reserve(slice.len());
-        let sz = mem::size_of::<T>();
-        unsafe {
-            libc::memcpy(
-                self.data.offset(self.len() as isize) as *mut libc::c_void,
-                slice.as_ptr() as *const libc::c_void,
-                slice.len() * sz,
-            );
-        }
-        self.len += slice.len();
+    /// Push a slice of type T, growing the internal buffer as needed
+    pub fn push_slice(&mut self, slice: &[T]) -> Result<()> {
+        let array_slots = slice.len() as i64;
+        self.reserve(array_slots)?;
+        self.write_bytes(slice.to_byte_slice(), array_slots)
     }
 
     /// Reserve memory for n elements of type T
-    pub fn reserve(&mut self, n: usize) {
-        if self.len + n > self.capacity {
-            let new_capacity = cmp::max(self.capacity * 2, n);
-            self.grow(new_capacity);
-        }
-    }
-
-    /// Grow the buffer to the new size n (number of elements of type T)
-    fn grow(&mut self, new_capacity: usize) {
-        let sz = mem::size_of::<T>();
-        let old_buffer = self.data;
-        let new_buffer = allocate_aligned((new_capacity * sz) as i64).unwrap();
-        unsafe {
-            libc::memcpy(
-                new_buffer as *mut libc::c_void,
-                old_buffer as *const libc::c_void,
-                self.len * sz,
-            );
+    pub fn reserve(&mut self, n: i64) -> Result<()> {
+        let new_capacity = self.len + n;
+        if new_capacity > self.capacity() {
+            return self.grow(new_capacity);
         }
-        self.capacity = new_capacity;
-        self.data = new_buffer as *mut T;
-        free_aligned(old_buffer as *const u8);
-    }
-
-    /// Build a Buffer from the existing memory
-    pub fn finish(&mut self) -> Buffer {
-        assert!(!self.data.is_null(), "data has not been initialized");
-        let p = self.data;
-        self.data = ptr::null_mut(); // ensure builder cannot be re-used
-        Buffer::from_raw_parts(p as *mut u8, self.len)
-    }
-}
-
-impl<T> Drop for Builder<T>
-where
-    T: ArrowPrimitiveType,
-{
-    fn drop(&mut self) {
-        if !self.data.is_null() {
-            free_aligned(self.data as *const u8);
+        Ok(())
+    }
+
+    /// Grow the internal buffer to `new_capacity`, where `new_capacity` is 
the capacity in
+    /// elements of type T
+    fn grow(&mut self, new_capacity: i64) -> Result<()> {
+        let byte_capacity = mem::size_of::<T>() * new_capacity as usize;
+        self.buffer.resize(byte_capacity)
+    }
+
+    /// Build an immutable `Buffer` from the existing internal 
`MutableBuffer`'s memory
+    pub fn finish(self) -> Buffer {
+        self.buffer.freeze()
+    }
+
+    /// Writes a byte slice to the underlying buffer and updates the `len`, 
i.e. the number array
+    /// elements in the builder.  Also, converts the `io::Result` required by 
the `Write` trait
+    /// to the Arrow `Result` type.
+    fn write_bytes(&mut self, bytes: &[u8], len_added: i64) -> Result<()> {
+        let write_result = self.buffer.write(bytes);
+        // `io::Result` has many options one of which we use, so pattern 
matching is overkill here
+        if write_result.is_err() {
+            Err(ArrowError::MemoryError(
+                "Could not write to Buffer, not big enough".to_string(),
+            ))
+        } else {
+            self.len += len_added;
+            Ok(())
         }
     }
 }
@@ -177,83 +118,91 @@ mod tests {
 
     #[test]
     fn test_builder_i32_empty() {
-        let mut b: Builder<i32> = Builder::with_capacity(5);
+        let b = BufferBuilder::<i32>::new(5);
+        assert_eq!(0, b.len());
+        assert_eq!(16, b.capacity());
         let a = b.finish();
         assert_eq!(0, a.len());
     }
 
     #[test]
     fn test_builder_i32_alloc_zero_bytes() {
-        let mut b: Builder<i32> = Builder::with_capacity(0);
-        b.push(123);
+        let mut b = BufferBuilder::<i32>::new(0);
+        b.push(123).unwrap();
         let a = b.finish();
-        assert_eq!(1, a.len());
+        assert_eq!(4, a.len());
     }
 
     #[test]
     fn test_builder_i32() {
-        let mut b: Builder<i32> = Builder::with_capacity(5);
+        let mut b = BufferBuilder::<i32>::new(5);
         for i in 0..5 {
-            b.push(i);
+            b.push(i).unwrap();
         }
+        assert_eq!(16, b.capacity());
         let a = b.finish();
-        assert_eq!(5, a.len());
+        assert_eq!(20, a.len());
     }
 
     #[test]
     fn test_builder_i32_grow_buffer() {
-        let mut b: Builder<i32> = Builder::with_capacity(2);
-        for i in 0..5 {
-            b.push(i);
+        let mut b = BufferBuilder::<i32>::new(2);
+        assert_eq!(16, b.capacity());
+        for i in 0..20 {
+            b.push(i).unwrap();
         }
+        assert_eq!(32, b.capacity());
         let a = b.finish();
-        assert_eq!(5, a.len());
+        assert_eq!(80, a.len());
     }
 
     #[test]
     fn test_reserve() {
-        let mut b: Builder<u8> = Builder::with_capacity(2);
-        assert_eq!(2, b.capacity());
-        b.reserve(2);
-        assert_eq!(2, b.capacity());
-        b.reserve(3);
-        assert_eq!(4, b.capacity());
+        let mut b = BufferBuilder::<u8>::new(2);
+        assert_eq!(64, b.capacity());
+        b.reserve(64).unwrap();
+        assert_eq!(64, b.capacity());
+        b.reserve(65).unwrap();
+        assert_eq!(128, b.capacity());
+
+        let mut b = BufferBuilder::<i32>::new(2);
+        assert_eq!(16, b.capacity());
+        b.reserve(16).unwrap();
+        assert_eq!(16, b.capacity());
+        b.reserve(17).unwrap();
+        assert_eq!(32, b.capacity());
     }
 
     #[test]
     fn test_push_slice() {
-        let mut b: Builder<u8> = Builder::new();
-        b.push_slice("Hello, ".as_bytes());
-        b.push_slice("World!".as_bytes());
+        let mut b = BufferBuilder::<u8>::new(0);
+        b.push_slice("Hello, ".as_bytes()).unwrap();
+        b.push_slice("World!".as_bytes()).unwrap();
         let buffer = b.finish();
         assert_eq!(13, buffer.len());
-    }
-
-    #[test]
-    fn test_slice_empty_at_end() {
-        let mut b: Builder<u8> = Builder::with_capacity(2);
-        let s = b.slice_mut(2, 2);
-        assert_eq!(0, s.len());
-    }
 
-    #[test]
-    #[should_panic(expected = "the end of the slice must be within the 
capacity")]
-    fn test_slice_start_out_of_bounds() {
-        let mut b: Builder<u8> = Builder::with_capacity(2);
-        b.slice_mut(3, 3); // should panic
+        let mut b = BufferBuilder::<i32>::new(0);
+        b.push_slice(&[32, 54]).unwrap();
+        let buffer = b.finish();
+        assert_eq!(8, buffer.len());
     }
 
     #[test]
-    #[should_panic(expected = "the end of the slice must be within the 
capacity")]
-    fn test_slice_end_out_of_bounds() {
-        let mut b: Builder<u8> = Builder::with_capacity(2);
-        b.slice_mut(0, 3); // should panic
+    fn test_write_bytes() {
+        let mut b = BufferBuilder::<bool>::new(4);
+        let bytes = [false, true, false, true].to_byte_slice();
+        b.write_bytes(bytes, 4).unwrap();
+        assert_eq!(4, b.len());
+        assert_eq!(64, b.capacity());
+        let buffer = b.finish();
+        assert_eq!(4, buffer.len());
     }
 
     #[test]
-    #[should_panic(expected = "the start of the slice cannot exceed the end of 
the slice")]
-    fn test_slice_end_before_start() {
-        let mut b: Builder<u8> = Builder::with_capacity(2);
-        b.slice_mut(1, 0); // should panic
+    #[should_panic(expected = "Could not write to Buffer, not big enough")]
+    fn test_write_too_many_bytes() {
+        let mut b = BufferBuilder::<bool>::new(0);
+        let bytes = [false, true, false, true].to_byte_slice();
+        b.write_bytes(bytes, 4).unwrap();
     }
 }

Reply via email to