This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d39cf283e9a feat: initial support string_view and binary_view,  
supports layout and basic construction + tests (#5481)
d39cf283e9a is described below

commit d39cf283e9a68b221a24b4132c27f34100439086
Author: Yijun Zhao <[email protected]>
AuthorDate: Fri Mar 15 01:05:04 2024 +0800

    feat: initial support string_view and binary_view,  supports layout and 
basic construction + tests (#5481)
    
    * support string_view and binary_view
    
    * fix reviewer comments
---
 arrow-array/src/array/byte_array.rs                |   6 +-
 arrow-array/src/array/byte_view_array.rs           | 480 +++++++++++++++++++++
 arrow-array/src/array/mod.rs                       |   7 +
 .../src/builder/generic_bytes_view_builder.rs      | 215 +++++++++
 arrow-array/src/builder/mod.rs                     |   3 +
 arrow-array/src/record_batch.rs                    |  28 +-
 arrow-array/src/types.rs                           |  68 +++
 arrow-buffer/src/native.rs                         |   1 +
 arrow-data/src/byte_view.rs                        | 123 ++++++
 arrow-data/src/data.rs                             |  85 ++--
 arrow-data/src/equal/byte_view.rs                  |  74 ++++
 arrow-data/src/equal/mod.rs                        |   4 +-
 arrow-data/src/lib.rs                              |   3 +
 arrow-data/src/transform/mod.rs                    | 172 +++++---
 arrow/tests/array_equal.rs                         |  48 ++-
 arrow/tests/array_transform.rs                     |  39 ++
 16 files changed, 1244 insertions(+), 112 deletions(-)

diff --git a/arrow-array/src/array/byte_array.rs 
b/arrow-array/src/array/byte_array.rs
index db825bbea97..a57abc5b1e7 100644
--- a/arrow-array/src/array/byte_array.rs
+++ b/arrow-array/src/array/byte_array.rs
@@ -94,7 +94,7 @@ pub struct GenericByteArray<T: ByteArrayType> {
 impl<T: ByteArrayType> Clone for GenericByteArray<T> {
     fn clone(&self) -> Self {
         Self {
-            data_type: self.data_type.clone(),
+            data_type: T::DATA_TYPE,
             value_offsets: self.value_offsets.clone(),
             value_data: self.value_data.clone(),
             nulls: self.nulls.clone(),
@@ -323,7 +323,7 @@ impl<T: ByteArrayType> GenericByteArray<T> {
     /// Returns a zero-copy slice of this array with the indicated offset and 
length.
     pub fn slice(&self, offset: usize, length: usize) -> Self {
         Self {
-            data_type: self.data_type.clone(),
+            data_type: T::DATA_TYPE,
             value_offsets: self.value_offsets.slice(offset, length),
             value_data: self.value_data.clone(),
             nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)),
@@ -511,7 +511,7 @@ impl<T: ByteArrayType> From<ArrayData> for 
GenericByteArray<T> {
         Self {
             value_offsets,
             value_data,
-            data_type: data.data_type().clone(),
+            data_type: T::DATA_TYPE,
             nulls: data.nulls().cloned(),
         }
     }
diff --git a/arrow-array/src/array/byte_view_array.rs 
b/arrow-array/src/array/byte_view_array.rs
new file mode 100644
index 00000000000..e22e9b1688b
--- /dev/null
+++ b/arrow-array/src/array/byte_view_array.rs
@@ -0,0 +1,480 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::array::print_long_array;
+use crate::builder::GenericByteViewBuilder;
+use crate::iterator::ArrayIter;
+use crate::types::bytes::ByteArrayNativeType;
+use crate::types::{BinaryViewType, ByteViewType, StringViewType};
+use crate::{Array, ArrayAccessor, ArrayRef};
+use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
+use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
+use arrow_schema::{ArrowError, DataType};
+use std::any::Any;
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+/// [Variable-size Binary View Layout]: An array of variable length bytes view 
arrays.
+///
+/// Different than [`crate::GenericByteArray`] as it stores both an offset and 
length
+/// meaning that take / filter operations can be implemented without copying 
the underlying data.
+///
+/// [Variable-size Binary View Layout]: 
https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
+pub struct GenericByteViewArray<T: ByteViewType + ?Sized> {
+    data_type: DataType,
+    views: ScalarBuffer<u128>,
+    buffers: Vec<Buffer>,
+    phantom: PhantomData<T>,
+    nulls: Option<NullBuffer>,
+}
+
+impl<T: ByteViewType + ?Sized> Clone for GenericByteViewArray<T> {
+    fn clone(&self) -> Self {
+        Self {
+            data_type: T::DATA_TYPE,
+            views: self.views.clone(),
+            buffers: self.buffers.clone(),
+            nulls: self.nulls.clone(),
+            phantom: Default::default(),
+        }
+    }
+}
+
+impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
+    /// Create a new [`GenericByteViewArray`] from the provided parts, 
panicking on failure
+    ///
+    /// # Panics
+    ///
+    /// Panics if [`GenericByteViewArray::try_new`] returns an error
+    pub fn new(views: ScalarBuffer<u128>, buffers: Vec<Buffer>, nulls: 
Option<NullBuffer>) -> Self {
+        Self::try_new(views, buffers, nulls).unwrap()
+    }
+
+    /// Create a new [`GenericByteViewArray`] from the provided parts, 
returning an error on failure
+    ///
+    /// # Errors
+    ///
+    /// * `views.len() != nulls.len()`
+    /// * [ByteViewType::validate] fails
+    pub fn try_new(
+        views: ScalarBuffer<u128>,
+        buffers: Vec<Buffer>,
+        nulls: Option<NullBuffer>,
+    ) -> Result<Self, ArrowError> {
+        T::validate(&views, &buffers)?;
+
+        if let Some(n) = nulls.as_ref() {
+            if n.len() != views.len() {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Incorrect length of null buffer for {}ViewArray, expected 
{} got {}",
+                    T::PREFIX,
+                    views.len(),
+                    n.len(),
+                )));
+            }
+        }
+
+        Ok(Self {
+            data_type: T::DATA_TYPE,
+            views,
+            buffers,
+            nulls,
+            phantom: Default::default(),
+        })
+    }
+
+    /// Create a new [`GenericByteViewArray`] from the provided parts, without 
validation
+    ///
+    /// # Safety
+    ///
+    /// Safe if [`Self::try_new`] would not error
+    pub unsafe fn new_unchecked(
+        views: ScalarBuffer<u128>,
+        buffers: Vec<Buffer>,
+        nulls: Option<NullBuffer>,
+    ) -> Self {
+        Self {
+            data_type: T::DATA_TYPE,
+            phantom: Default::default(),
+            views,
+            buffers,
+            nulls,
+        }
+    }
+
+    /// Create a new [`GenericByteViewArray`] of length `len` where all values 
are null
+    pub fn new_null(len: usize) -> Self {
+        Self {
+            data_type: T::DATA_TYPE,
+            views: vec![0; len].into(),
+            buffers: vec![],
+            nulls: Some(NullBuffer::new_null(len)),
+            phantom: Default::default(),
+        }
+    }
+
+    /// Creates a [`GenericByteViewArray`] based on an iterator of values 
without nulls
+    pub fn from_iter_values<Ptr, I>(iter: I) -> Self
+    where
+        Ptr: AsRef<T::Native>,
+        I: IntoIterator<Item = Ptr>,
+    {
+        let iter = iter.into_iter();
+        let mut builder = 
GenericByteViewBuilder::<T>::with_capacity(iter.size_hint().0);
+        for v in iter {
+            builder.append_value(v);
+        }
+        builder.finish()
+    }
+
+    /// Deconstruct this array into its constituent parts
+    pub fn into_parts(self) -> (ScalarBuffer<u128>, Vec<Buffer>, 
Option<NullBuffer>) {
+        (self.views, self.buffers, self.nulls)
+    }
+
+    /// Returns the views buffer
+    #[inline]
+    pub fn views(&self) -> &ScalarBuffer<u128> {
+        &self.views
+    }
+
+    /// Returns the buffers storing string data
+    #[inline]
+    pub fn data_buffers(&self) -> &[Buffer] {
+        &self.buffers
+    }
+
+    /// Returns the element at index `i`
+    /// # Panics
+    /// Panics if index `i` is out of bounds.
+    pub fn value(&self, i: usize) -> &T::Native {
+        assert!(
+            i < self.len(),
+            "Trying to access an element at index {} from a {}ViewArray of 
length {}",
+            i,
+            T::PREFIX,
+            self.len()
+        );
+
+        unsafe { self.value_unchecked(i) }
+    }
+
+    /// Returns the element at index `i`
+    /// # Safety
+    /// Caller is responsible for ensuring that the index is within the bounds 
of the array
+    pub unsafe fn value_unchecked(&self, idx: usize) -> &T::Native {
+        let v = self.views.get_unchecked(idx);
+        let len = *v as u32;
+        let b = if len <= 12 {
+            let ptr = self.views.as_ptr() as *const u8;
+            std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize)
+        } else {
+            let view = ByteView::from(*v);
+            let data = self.buffers.get_unchecked(view.buffer_index as usize);
+            let offset = view.offset as usize;
+            data.get_unchecked(offset..offset + len as usize)
+        };
+        T::Native::from_bytes_unchecked(b)
+    }
+
+    /// constructs a new iterator
+    pub fn iter(&self) -> ArrayIter<&Self> {
+        ArrayIter::new(self)
+    }
+
+    /// Returns a zero-copy slice of this array with the indicated offset and 
length.
+    pub fn slice(&self, offset: usize, length: usize) -> Self {
+        Self {
+            data_type: T::DATA_TYPE,
+            views: self.views.slice(offset, length),
+            buffers: self.buffers.clone(),
+            nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)),
+            phantom: Default::default(),
+        }
+    }
+}
+
+impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}ViewArray\n[\n", T::PREFIX)?;
+        print_long_array(self, f, |array, index, f| {
+            std::fmt::Debug::fmt(&array.value(index), f)
+        })?;
+        write!(f, "]")
+    }
+}
+
+impl<T: ByteViewType + ?Sized> Array for GenericByteViewArray<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn to_data(&self) -> ArrayData {
+        self.clone().into()
+    }
+
+    fn into_data(self) -> ArrayData {
+        self.into()
+    }
+
+    fn data_type(&self) -> &DataType {
+        &self.data_type
+    }
+
+    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
+        Arc::new(self.slice(offset, length))
+    }
+
+    fn len(&self) -> usize {
+        self.views.len()
+    }
+
+    fn is_empty(&self) -> bool {
+        self.views.is_empty()
+    }
+
+    fn offset(&self) -> usize {
+        0
+    }
+
+    fn nulls(&self) -> Option<&NullBuffer> {
+        self.nulls.as_ref()
+    }
+
+    fn get_buffer_memory_size(&self) -> usize {
+        let mut sum = self.buffers.iter().map(|b| b.capacity()).sum::<usize>();
+        sum += self.views.inner().capacity();
+        if let Some(x) = &self.nulls {
+            sum += x.buffer().capacity()
+        }
+        sum
+    }
+
+    fn get_array_memory_size(&self) -> usize {
+        std::mem::size_of::<Self>() + self.get_buffer_memory_size()
+    }
+}
+
+impl<'a, T: ByteViewType + ?Sized> ArrayAccessor for &'a 
GenericByteViewArray<T> {
+    type Item = &'a T::Native;
+
+    fn value(&self, index: usize) -> Self::Item {
+        GenericByteViewArray::value(self, index)
+    }
+
+    unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
+        GenericByteViewArray::value_unchecked(self, index)
+    }
+}
+
+impl<'a, T: ByteViewType + ?Sized> IntoIterator for &'a 
GenericByteViewArray<T> {
+    type Item = Option<&'a T::Native>;
+    type IntoIter = ArrayIter<Self>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        ArrayIter::new(self)
+    }
+}
+
+impl<T: ByteViewType + ?Sized> From<ArrayData> for GenericByteViewArray<T> {
+    fn from(value: ArrayData) -> Self {
+        let views = value.buffers()[0].clone();
+        let views = ScalarBuffer::new(views, value.offset(), value.len());
+        let buffers = value.buffers()[1..].to_vec();
+        Self {
+            data_type: T::DATA_TYPE,
+            views,
+            buffers,
+            nulls: value.nulls().cloned(),
+            phantom: Default::default(),
+        }
+    }
+}
+
+impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
+    fn from(mut array: GenericByteViewArray<T>) -> Self {
+        let len = array.len();
+        array.buffers.insert(0, array.views.into_inner());
+        let builder = ArrayDataBuilder::new(T::DATA_TYPE)
+            .len(len)
+            .buffers(array.buffers)
+            .nulls(array.nulls);
+
+        unsafe { builder.build_unchecked() }
+    }
+}
+
+impl<Ptr, T: ByteViewType + ?Sized> FromIterator<Option<Ptr>> for 
GenericByteViewArray<T>
+where
+    Ptr: AsRef<T::Native>,
+{
+    fn from_iter<I: IntoIterator<Item = Option<Ptr>>>(iter: I) -> Self {
+        let iter = iter.into_iter();
+        let mut builder = 
GenericByteViewBuilder::<T>::with_capacity(iter.size_hint().0);
+        builder.extend(iter);
+        builder.finish()
+    }
+}
+
+/// A [`GenericByteViewArray`] of `[u8]`
+pub type BinaryViewArray = GenericByteViewArray<BinaryViewType>;
+
+/// A [`GenericByteViewArray`] of `str`
+///
+/// ```
+/// use arrow_array::StringViewArray;
+/// let array = StringViewArray::from_iter_values(vec!["hello", "world", 
"lulu", "large payload over 12 bytes"]);
+/// assert_eq!(array.value(0), "hello");
+/// assert_eq!(array.value(3), "large payload over 12 bytes");
+/// ```
+pub type StringViewArray = GenericByteViewArray<StringViewType>;
+
+impl From<Vec<&str>> for StringViewArray {
+    fn from(v: Vec<&str>) -> Self {
+        Self::from_iter_values(v)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::builder::StringViewBuilder;
+    use crate::{Array, BinaryViewArray, StringViewArray};
+    use arrow_buffer::{Buffer, ScalarBuffer};
+    use arrow_data::ByteView;
+
+    #[test]
+    fn try_new() {
+        let array = StringViewArray::from_iter_values(vec![
+            "hello",
+            "world",
+            "lulu",
+            "large payload over 12 bytes",
+        ]);
+        assert_eq!(array.value(0), "hello");
+        assert_eq!(array.value(3), "large payload over 12 bytes");
+
+        let array = BinaryViewArray::from_iter_values(vec![
+            b"hello".as_slice(),
+            b"world".as_slice(),
+            b"lulu".as_slice(),
+            b"large payload over 12 bytes".as_slice(),
+        ]);
+        assert_eq!(array.value(0), b"hello");
+        assert_eq!(array.value(3), b"large payload over 12 bytes");
+
+        // test empty array
+        let array = {
+            let mut builder = StringViewBuilder::new();
+            builder.finish()
+        };
+        assert!(array.is_empty());
+
+        // test builder append
+        let array = {
+            let mut builder = StringViewBuilder::new();
+            builder.append_value("hello");
+            builder.append_null();
+            builder.append_option(Some("large payload over 12 bytes"));
+            builder.finish()
+        };
+        assert_eq!(array.value(0), "hello");
+        assert!(array.is_null(1));
+        assert_eq!(array.value(2), "large payload over 12 bytes");
+
+        // test builder's in_progress re-created
+        let array = {
+            // make a builder with small block size.
+            let mut builder = StringViewBuilder::new().with_block_size(14);
+            builder.append_value("large payload over 12 bytes");
+            builder.append_option(Some("another large payload over 12 bytes 
that double than the first one, so that we can trigger the in_progress in 
builder re-created"));
+            builder.finish()
+        };
+        assert_eq!(array.value(0), "large payload over 12 bytes");
+        assert_eq!(array.value(1), "another large payload over 12 bytes that 
double than the first one, so that we can trigger the in_progress in builder 
re-created");
+        assert_eq!(2, array.buffers.len());
+    }
+
+    #[test]
+    #[should_panic(expected = "Invalid buffer index at 0: got index 3 but only 
has 1 buffers")]
+    fn new_with_invalid_view_data() {
+        let v = "large payload over 12 bytes";
+        let view = ByteView {
+            length: 13,
+            prefix: u32::from_le_bytes(v.as_bytes()[0..4].try_into().unwrap()),
+            buffer_index: 3,
+            offset: 1,
+        };
+        let views = ScalarBuffer::from(vec![view.into()]);
+        let buffers = vec![Buffer::from_slice_ref(v)];
+        StringViewArray::new(views, buffers, None);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "Encountered non-UTF-8 data at index 0: invalid utf-8 
sequence of 1 bytes from index 0"
+    )]
+    fn new_with_invalid_utf8_data() {
+        let v: Vec<u8> = vec![0xf0, 0x80, 0x80, 0x80];
+        let view = ByteView {
+            length: v.len() as u32,
+            prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
+            buffer_index: 0,
+            offset: 0,
+        };
+        let views = ScalarBuffer::from(vec![view.into()]);
+        let buffers = vec![Buffer::from_slice_ref(v)];
+        StringViewArray::new(views, buffers, None);
+    }
+
+    #[test]
+    #[should_panic(expected = "View at index 0 contained non-zero padding for 
string of length 1")]
+    fn new_with_invalid_zero_padding() {
+        let mut data = [0; 12];
+        data[0] = b'H';
+        data[11] = 1; // no zero padding
+
+        let mut view_buffer = [0; 16];
+        view_buffer[0..4].copy_from_slice(&1u32.to_le_bytes());
+        view_buffer[4..].copy_from_slice(&data);
+
+        let view = ByteView::from(u128::from_le_bytes(view_buffer));
+        let views = ScalarBuffer::from(vec![view.into()]);
+        let buffers = vec![];
+        StringViewArray::new(views, buffers, None);
+    }
+
+    #[test]
+    #[should_panic(expected = "Mismatch between embedded prefix and data")]
+    fn test_mismatch_between_embedded_prefix_and_data() {
+        let input_str_1 = "Hello, Rustaceans!";
+        let input_str_2 = "Hallo, Rustaceans!";
+        let length = input_str_1.len() as u32;
+        assert!(input_str_1.len() > 12);
+
+        let mut view_buffer = [0; 16];
+        view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
+        view_buffer[4..8].copy_from_slice(&input_str_1.as_bytes()[0..4]);
+        view_buffer[8..12].copy_from_slice(&0u32.to_le_bytes());
+        view_buffer[12..].copy_from_slice(&0u32.to_le_bytes());
+        let view = ByteView::from(u128::from_le_bytes(view_buffer));
+        let views = ScalarBuffer::from(vec![view.into()]);
+        let buffers = vec![Buffer::from_slice_ref(input_str_2.as_bytes())];
+
+        StringViewArray::new(views, buffers, None);
+    }
+}
diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs
index 7aa3f92bfbd..b115ff9c14c 100644
--- a/arrow-array/src/array/mod.rs
+++ b/arrow-array/src/array/mod.rs
@@ -65,8 +65,13 @@ mod union_array;
 pub use union_array::*;
 
 mod run_array;
+
 pub use run_array::*;
 
+mod byte_view_array;
+
+pub use byte_view_array::*;
+
 /// An array in the [arrow columnar 
format](https://arrow.apache.org/docs/format/Columnar.html)
 pub trait Array: std::fmt::Debug + Send + Sync {
     /// Returns the array as [`Any`] so that it can be
@@ -596,8 +601,10 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
         DataType::Binary => Arc::new(BinaryArray::from(data)) as ArrayRef,
         DataType::LargeBinary => Arc::new(LargeBinaryArray::from(data)) as 
ArrayRef,
         DataType::FixedSizeBinary(_) => 
Arc::new(FixedSizeBinaryArray::from(data)) as ArrayRef,
+        DataType::BinaryView => Arc::new(BinaryViewArray::from(data)) as 
ArrayRef,
         DataType::Utf8 => Arc::new(StringArray::from(data)) as ArrayRef,
         DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data)) as 
ArrayRef,
+        DataType::Utf8View => Arc::new(StringViewArray::from(data)) as 
ArrayRef,
         DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef,
         DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as 
ArrayRef,
         DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef,
diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs 
b/arrow-array/src/builder/generic_bytes_view_builder.rs
new file mode 100644
index 00000000000..29de7feb0ec
--- /dev/null
+++ b/arrow-array/src/builder/generic_bytes_view_builder.rs
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::builder::ArrayBuilder;
+use crate::types::{BinaryViewType, ByteViewType, StringViewType};
+use crate::{ArrayRef, GenericByteViewArray};
+use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
+use arrow_data::ByteView;
+use std::any::Any;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
+
+/// A builder for [`GenericByteViewArray`]
+///
+/// See [`Self::append_value`] for the allocation strategy
+pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
+    views_builder: BufferBuilder<u128>,
+    null_buffer_builder: NullBufferBuilder,
+    completed: Vec<Buffer>,
+    in_progress: Vec<u8>,
+    block_size: u32,
+    phantom: PhantomData<T>,
+}
+
+impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
+    /// Creates a new [`GenericByteViewBuilder`].
+    pub fn new() -> Self {
+        Self::with_capacity(1024)
+    }
+
+    /// Creates a new [`GenericByteViewBuilder`] with space for `capacity` 
string values.
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            views_builder: BufferBuilder::new(capacity),
+            null_buffer_builder: NullBufferBuilder::new(capacity),
+            completed: vec![],
+            in_progress: vec![],
+            block_size: DEFAULT_BLOCK_SIZE,
+            phantom: Default::default(),
+        }
+    }
+
+    /// Override the size of buffers to allocate for holding string data
+    pub fn with_block_size(self, block_size: u32) -> Self {
+        Self { block_size, ..self }
+    }
+
+    /// Appends a value into the builder
+    ///
+    /// # Panics
+    ///
+    /// Panics if
+    /// - String buffer count exceeds `u32::MAX`
+    /// - String length exceeds `u32::MAX`
+    #[inline]
+    pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
+        let v: &[u8] = value.as_ref().as_ref();
+        let length: u32 = v.len().try_into().unwrap();
+        if length <= 12 {
+            let mut view_buffer = [0; 16];
+            view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
+            view_buffer[4..4 + v.len()].copy_from_slice(v);
+            self.views_builder.append(u128::from_le_bytes(view_buffer));
+            self.null_buffer_builder.append_non_null();
+            return;
+        }
+
+        let required_cap = self.in_progress.len() + v.len();
+        if self.in_progress.capacity() < required_cap {
+            let in_progress = Vec::with_capacity(v.len().max(self.block_size 
as usize));
+            let flushed = std::mem::replace(&mut self.in_progress, 
in_progress);
+            if !flushed.is_empty() {
+                assert!(self.completed.len() < u32::MAX as usize);
+                self.completed.push(flushed.into());
+            }
+        };
+        let offset = self.in_progress.len() as u32;
+        self.in_progress.extend_from_slice(v);
+
+        let view = ByteView {
+            length,
+            prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
+            buffer_index: self.completed.len() as u32,
+            offset,
+        };
+        self.views_builder.append(view.into());
+        self.null_buffer_builder.append_non_null();
+    }
+
+    /// Append an `Option` value into the builder
+    #[inline]
+    pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
+        match value {
+            None => self.append_null(),
+            Some(v) => self.append_value(v),
+        };
+    }
+
+    /// Append a null value into the builder
+    #[inline]
+    pub fn append_null(&mut self) {
+        self.null_buffer_builder.append_null();
+        self.views_builder.append(0);
+    }
+
+    /// Builds the [`GenericByteViewArray`] and reset this builder
+    pub fn finish(&mut self) -> GenericByteViewArray<T> {
+        let mut completed = std::mem::take(&mut self.completed);
+        if !self.in_progress.is_empty() {
+            completed.push(std::mem::take(&mut self.in_progress).into());
+        }
+        let len = self.views_builder.len();
+        let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
+        let nulls = self.null_buffer_builder.finish();
+        // SAFETY: valid by construction
+        unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
+    }
+
+    /// Builds the [`GenericByteViewArray`] without resetting the builder
+    pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
+        let mut completed = self.completed.clone();
+        if !self.in_progress.is_empty() {
+            completed.push(Buffer::from_slice_ref(&self.in_progress));
+        }
+        let len = self.views_builder.len();
+        let views = Buffer::from_slice_ref(self.views_builder.as_slice());
+        let views = ScalarBuffer::new(views, 0, len);
+        let nulls = self.null_buffer_builder.finish_cloned();
+        // SAFETY: valid by construction
+        unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
+    }
+}
+
+impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}ViewBuilder", T::PREFIX)?;
+        f.debug_struct("")
+            .field("views_builder", &self.views_builder)
+            .field("in_progress", &self.in_progress)
+            .field("completed", &self.completed)
+            .field("null_buffer_builder", &self.null_buffer_builder)
+            .finish()
+    }
+}
+
+impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
+    fn len(&self) -> usize {
+        self.null_buffer_builder.len()
+    }
+
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+}
+
+impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
+    for GenericByteViewBuilder<T>
+{
+    #[inline]
+    fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
+        for v in iter {
+            self.append_option(v)
+        }
+    }
+}
+
+/// Array builder for [`StringViewArray`][crate::StringViewArray]
+///
+/// Values can be appended using [`GenericByteViewBuilder::append_value`], and 
nulls with
+/// [`GenericByteViewBuilder::append_null`] as normal.
+pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
+
+///  Array builder for [`BinaryViewArray`][crate::BinaryViewArray]
+///
+/// Values can be appended using [`GenericByteViewBuilder::append_value`], and 
nulls with
+/// [`GenericByteViewBuilder::append_null`] as normal.
+pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs
index d33e565a868..e4ab7ae4ba2 100644
--- a/arrow-array/src/builder/mod.rs
+++ b/arrow-array/src/builder/mod.rs
@@ -178,7 +178,10 @@ mod generic_bytes_dictionary_builder;
 pub use generic_bytes_dictionary_builder::*;
 mod generic_byte_run_builder;
 pub use generic_byte_run_builder::*;
+mod generic_bytes_view_builder;
+pub use generic_bytes_view_builder::*;
 mod union_builder;
+
 pub use union_builder::*;
 
 use crate::ArrayRef;
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index 314445bba61..c56b1fd308c 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -626,7 +626,9 @@ mod tests {
     use std::collections::HashMap;
 
     use super::*;
-    use crate::{BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, 
StringArray};
+    use crate::{
+        BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, 
StringArray, StringViewArray,
+    };
     use arrow_buffer::{Buffer, ToByteSlice};
     use arrow_data::{ArrayData, ArrayDataBuilder};
     use arrow_schema::Fields;
@@ -646,6 +648,30 @@ mod tests {
         check_batch(record_batch, 5)
     }
 
+    #[test]
+    fn create_string_view_record_batch() {
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8View, false),
+        ]);
+
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = StringViewArray::from(vec!["a", "b", "c", "d", "e"]);
+
+        let record_batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), 
Arc::new(b)]).unwrap();
+
+        assert_eq!(5, record_batch.num_rows());
+        assert_eq!(2, record_batch.num_columns());
+        assert_eq!(&DataType::Int32, 
record_batch.schema().field(0).data_type());
+        assert_eq!(
+            &DataType::Utf8View,
+            record_batch.schema().field(1).data_type()
+        );
+        assert_eq!(5, record_batch.column(0).len());
+        assert_eq!(5, record_batch.column(1).len());
+    }
+
     #[test]
     fn byte_size_should_not_regress() {
         let schema = Schema::new(vec![
diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs
index 83a229c1da0..e33f7bde7cb 100644
--- a/arrow-array/src/types.rs
+++ b/arrow-array/src/types.rs
@@ -25,12 +25,14 @@ use crate::timezone::Tz;
 use crate::{ArrowNativeTypeOp, OffsetSizeTrait};
 use arrow_buffer::{i256, Buffer, OffsetBuffer};
 use arrow_data::decimal::{validate_decimal256_precision, 
validate_decimal_precision};
+use arrow_data::{validate_binary_view, validate_string_view};
 use arrow_schema::{
     ArrowError, DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, 
DECIMAL128_MAX_SCALE,
     DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, DECIMAL_DEFAULT_SCALE,
 };
 use chrono::{Duration, NaiveDate, NaiveDateTime};
 use half::f16;
+use std::fmt::Debug;
 use std::marker::PhantomData;
 use std::ops::{Add, Sub};
 
@@ -1544,6 +1546,72 @@ pub type BinaryType = GenericBinaryType<i32>;
 /// An arrow binary array with i64 offsets
 pub type LargeBinaryType = GenericBinaryType<i64>;
 
+mod byte_view {
+    use crate::types::{BinaryViewType, StringViewType};
+
+    pub trait Sealed: Send + Sync {}
+    impl Sealed for StringViewType {}
+    impl Sealed for BinaryViewType {}
+}
+
+/// A trait over the variable length bytes view array types
+pub trait ByteViewType: byte_view::Sealed + 'static + PartialEq + Send + Sync {
+    /// If element in array is utf8 encoded string.
+    const IS_UTF8: bool;
+
+    /// Datatype of array elements
+    const DATA_TYPE: DataType = if Self::IS_UTF8 {
+        DataType::Utf8View
+    } else {
+        DataType::BinaryView
+    };
+
+    /// "Binary" or "String", for use in displayed or error messages
+    const PREFIX: &'static str;
+
+    /// Type for representing its equivalent rust type i.e
+    /// Utf8Array will have native type has &str
+    /// BinaryArray will have type as [u8]
+    type Native: bytes::ByteArrayNativeType + AsRef<Self::Native> + 
AsRef<[u8]> + ?Sized;
+
+    /// Type for owned corresponding to `Native`
+    type Owned: Debug + Clone + Sync + Send + AsRef<Self::Native>;
+
+    /// Verifies that the provided buffers are valid for this array type
+    fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError>;
+}
+
+/// [`ByteViewType`] for string arrays
+#[derive(PartialEq)]
+pub struct StringViewType {}
+
+impl ByteViewType for StringViewType {
+    const IS_UTF8: bool = true;
+    const PREFIX: &'static str = "String";
+
+    type Native = str;
+    type Owned = String;
+
+    fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> {
+        validate_string_view(views, buffers)
+    }
+}
+
+/// [`BinaryViewType`] for string arrays
+#[derive(PartialEq)]
+pub struct BinaryViewType {}
+
+impl ByteViewType for BinaryViewType {
+    const IS_UTF8: bool = false;
+    const PREFIX: &'static str = "Binary";
+    type Native = [u8];
+    type Owned = Vec<u8>;
+
+    fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> {
+        validate_binary_view(views, buffers)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/arrow-buffer/src/native.rs b/arrow-buffer/src/native.rs
index 38074a8dc26..5184d60ac1f 100644
--- a/arrow-buffer/src/native.rs
+++ b/arrow-buffer/src/native.rs
@@ -149,6 +149,7 @@ native_integer!(u8);
 native_integer!(u16);
 native_integer!(u32);
 native_integer!(u64);
+native_integer!(u128);
 
 macro_rules! native_float {
     ($t:ty, $s:ident, $as_usize: expr, $i:ident, $usize_as: expr) => {
diff --git a/arrow-data/src/byte_view.rs b/arrow-data/src/byte_view.rs
new file mode 100644
index 00000000000..b8b1731ac60
--- /dev/null
+++ b/arrow-data/src/byte_view.rs
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_buffer::Buffer;
+use arrow_schema::ArrowError;
+
+#[derive(Debug, Copy, Clone, Default)]
+#[repr(C)]
+pub struct ByteView {
+    /// The length of the string/bytes.
+    pub length: u32,
+    /// First 4 bytes of string/bytes data.
+    pub prefix: u32,
+    /// The buffer index.
+    pub buffer_index: u32,
+    /// The offset into the buffer.
+    pub offset: u32,
+}
+
+impl ByteView {
+    #[inline(always)]
+    pub fn as_u128(self) -> u128 {
+        (self.length as u128)
+            | ((self.prefix as u128) << 32)
+            | ((self.buffer_index as u128) << 64)
+            | ((self.offset as u128) << 96)
+    }
+}
+
+impl From<u128> for ByteView {
+    #[inline]
+    fn from(value: u128) -> Self {
+        Self {
+            length: value as u32,
+            prefix: (value >> 32) as u32,
+            buffer_index: (value >> 64) as u32,
+            offset: (value >> 96) as u32,
+        }
+    }
+}
+
+impl From<ByteView> for u128 {
+    #[inline]
+    fn from(value: ByteView) -> Self {
+        value.as_u128()
+    }
+}
+
+/// Validates the combination of `views` and `buffers` is a valid BinaryView
+pub fn validate_binary_view(views: &[u128], buffers: &[Buffer]) -> Result<(), 
ArrowError> {
+    validate_view_impl(views, buffers, |_, _| Ok(()))
+}
+
+/// Validates the combination of `views` and `buffers` is a valid StringView
+pub fn validate_string_view(views: &[u128], buffers: &[Buffer]) -> Result<(), 
ArrowError> {
+    validate_view_impl(views, buffers, |idx, b| {
+        std::str::from_utf8(b).map_err(|e| {
+            ArrowError::InvalidArgumentError(format!(
+                "Encountered non-UTF-8 data at index {idx}: {e}"
+            ))
+        })?;
+        Ok(())
+    })
+}
+
+fn validate_view_impl<F>(views: &[u128], buffers: &[Buffer], f: F) -> 
Result<(), ArrowError>
+where
+    F: Fn(usize, &[u8]) -> Result<(), ArrowError>,
+{
+    for (idx, v) in views.iter().enumerate() {
+        let len = *v as u32;
+        if len <= 12 {
+            if len < 12 && (v >> (32 + len * 8)) != 0 {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "View at index {idx} contained non-zero padding for string 
of length {len}",
+                )));
+            }
+            f(idx, &v.to_le_bytes()[4..4 + len as usize])?;
+        } else {
+            let view = ByteView::from(*v);
+            let data = buffers.get(view.buffer_index as usize).ok_or_else(|| {
+                ArrowError::InvalidArgumentError(format!(
+                    "Invalid buffer index at {idx}: got index {} but only has 
{} buffers",
+                    view.buffer_index,
+                    buffers.len()
+                ))
+            })?;
+
+            let start = view.offset as usize;
+            let end = start + len as usize;
+            let b = data.get(start..end).ok_or_else(|| {
+                ArrowError::InvalidArgumentError(format!(
+                    "Invalid buffer slice at {idx}: got {start}..{end} but 
buffer {} has length {}",
+                    view.buffer_index,
+                    data.len()
+                ))
+            })?;
+
+            if !b.starts_with(&view.prefix.to_le_bytes()) {
+                return Err(ArrowError::InvalidArgumentError(
+                    "Mismatch between embedded prefix and data".to_string(),
+                ));
+            }
+
+            f(idx, b)?;
+        }
+    }
+    Ok(())
+}
diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs
index 16637570f52..e227b168eee 100644
--- a/arrow-data/src/data.rs
+++ b/arrow-data/src/data.rs
@@ -26,7 +26,7 @@ use std::mem;
 use std::ops::Range;
 use std::sync::Arc;
 
-use crate::equal;
+use crate::{equal, validate_binary_view, validate_string_view};
 
 /// A collection of [`Buffer`]
 #[doc(hidden)]
@@ -159,29 +159,6 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: 
usize) -> [MutableBuff
     }
 }
 
-/// Maps 2 [`MutableBuffer`]s into a vector of [Buffer]s whose size depends on 
`data_type`.
-#[inline]
-pub(crate) fn into_buffers(
-    data_type: &DataType,
-    buffer1: MutableBuffer,
-    buffer2: MutableBuffer,
-) -> Vec<Buffer> {
-    match data_type {
-        DataType::Null | DataType::Struct(_) | DataType::FixedSizeList(_, _) 
=> vec![],
-        DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | 
DataType::LargeBinary => {
-            vec![buffer1.into(), buffer2.into()]
-        }
-        DataType::Union(_, mode) => {
-            match mode {
-                // Based on Union's DataTypeLayout
-                UnionMode::Sparse => vec![buffer1.into()],
-                UnionMode::Dense => vec![buffer1.into(), buffer2.into()],
-            }
-        }
-        _ => vec![buffer1.into()],
-    }
-}
-
 /// A generic representation of Arrow array data which encapsulates common 
attributes and
 /// operations for Arrow array. Specific operations for different arrays types 
(e.g.,
 /// primitive, list, struct) are implemented in `Array`.
@@ -745,7 +722,10 @@ impl ArrayData {
             )));
         }
 
-        if self.buffers.len() != layout.buffers.len() {
+        // Check data buffers length for view types and other types
+        if self.buffers.len() < layout.buffers.len()
+            || (!layout.variadic && self.buffers.len() != layout.buffers.len())
+        {
             return Err(ArrowError::InvalidArgumentError(format!(
                 "Expected {} buffers in array of type {:?}, got {}",
                 layout.buffers.len(),
@@ -1240,6 +1220,14 @@ impl ArrayData {
             DataType::LargeUtf8 => self.validate_utf8::<i64>(),
             DataType::Binary => 
self.validate_offsets_full::<i32>(self.buffers[1].len()),
             DataType::LargeBinary => 
self.validate_offsets_full::<i64>(self.buffers[1].len()),
+            DataType::BinaryView => {
+                let views = self.typed_buffer::<u128>(0, self.len)?;
+                validate_binary_view(views, &self.buffers[1..])
+            }
+            DataType::Utf8View => {
+                let views = self.typed_buffer::<u128>(0, self.len)?;
+                validate_string_view(views, &self.buffers[1..])
+            }
             DataType::List(_) | DataType::Map(_, _) => {
                 let child = &self.child_data[0];
                 self.validate_offsets_full::<i32>(child.len)
@@ -1511,10 +1499,12 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
         DataType::Null => DataTypeLayout {
             buffers: vec![],
             can_contain_null_mask: false,
+            variadic: false,
         },
         DataType::Boolean => DataTypeLayout {
             buffers: vec![BufferSpec::BitMap],
             can_contain_null_mask: true,
+            variadic: false,
         },
         DataType::Int8 => DataTypeLayout::new_fixed_width::<i8>(),
         DataType::Int16 => DataTypeLayout::new_fixed_width::<i16>(),
@@ -1546,15 +1536,14 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
             DataTypeLayout {
                 buffers: vec![spec],
                 can_contain_null_mask: true,
+                variadic: false,
             }
         }
         DataType::Binary => DataTypeLayout::new_binary::<i32>(),
         DataType::LargeBinary => DataTypeLayout::new_binary::<i64>(),
         DataType::Utf8 => DataTypeLayout::new_binary::<i32>(),
         DataType::LargeUtf8 => DataTypeLayout::new_binary::<i64>(),
-        DataType::BinaryView | DataType::Utf8View => {
-            unimplemented!("BinaryView/Utf8View not implemented")
-        }
+        DataType::BinaryView | DataType::Utf8View => 
DataTypeLayout::new_view(),
         DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all 
in child data
         DataType::List(_) => DataTypeLayout::new_fixed_width::<i32>(),
         DataType::ListView(_) | DataType::LargeListView(_) => {
@@ -1586,6 +1575,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
                     }
                 },
                 can_contain_null_mask: false,
+                variadic: false,
             }
         }
         DataType::Dictionary(key_type, _value_type) => layout(key_type),
@@ -1601,6 +1591,11 @@ pub struct DataTypeLayout {
 
     /// Can contain a null bitmask
     pub can_contain_null_mask: bool,
+
+    /// This field only applies to the view type [`DataType::BinaryView`] and 
[`DataType::Utf8View`]
+    /// If `variadic` is true, the number of buffers expected is only 
lower-bounded by
+    /// buffers.len(). Buffers that exceed the lower bound are legal.
+    pub variadic: bool,
 }
 
 impl DataTypeLayout {
@@ -1612,6 +1607,7 @@ impl DataTypeLayout {
                 alignment: mem::align_of::<T>(),
             }],
             can_contain_null_mask: true,
+            variadic: false,
         }
     }
 
@@ -1622,6 +1618,7 @@ impl DataTypeLayout {
         Self {
             buffers: vec![],
             can_contain_null_mask: true,
+            variadic: false,
         }
     }
 
@@ -1640,6 +1637,19 @@ impl DataTypeLayout {
                 BufferSpec::VariableWidth,
             ],
             can_contain_null_mask: true,
+            variadic: false,
+        }
+    }
+
+    /// Describes a view type
+    pub fn new_view() -> Self {
+        Self {
+            buffers: vec![BufferSpec::FixedWidth {
+                byte_width: mem::size_of::<u128>(),
+                alignment: mem::align_of::<u128>(),
+            }],
+            can_contain_null_mask: true,
+            variadic: true,
         }
     }
 }
@@ -1845,7 +1855,7 @@ impl From<ArrayData> for ArrayDataBuilder {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow_schema::{Field, UnionFields};
+    use arrow_schema::Field;
 
     // See arrow/tests/array_data_validation.rs for test of array validation
 
@@ -2093,23 +2103,6 @@ mod tests {
         assert!(!contains_nulls(Some(&buffer), 0, 0));
     }
 
-    #[test]
-    fn test_into_buffers() {
-        let data_types = vec![
-            DataType::Union(UnionFields::empty(), UnionMode::Dense),
-            DataType::Union(UnionFields::empty(), UnionMode::Sparse),
-        ];
-
-        for data_type in data_types {
-            let buffers = new_buffers(&data_type, 0);
-            let [buffer1, buffer2] = buffers;
-            let buffers = into_buffers(&data_type, buffer1, buffer2);
-
-            let layout = layout(&data_type);
-            assert_eq!(buffers.len(), layout.buffers.len());
-        }
-    }
-
     #[test]
     fn test_alignment() {
         let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
diff --git a/arrow-data/src/equal/byte_view.rs 
b/arrow-data/src/equal/byte_view.rs
new file mode 100644
index 00000000000..def39512536
--- /dev/null
+++ b/arrow-data/src/equal/byte_view.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{ArrayData, ByteView};
+
+pub(super) fn byte_view_equal(
+    lhs: &ArrayData,
+    rhs: &ArrayData,
+    lhs_start: usize,
+    rhs_start: usize,
+    len: usize,
+) -> bool {
+    let lhs_views = &lhs.buffer::<u128>(0)[lhs_start..lhs_start + len];
+    let lhs_buffers = &lhs.buffers()[1..];
+    let rhs_views = &rhs.buffer::<u128>(0)[rhs_start..rhs_start + len];
+    let rhs_buffers = &rhs.buffers()[1..];
+
+    for (idx, (l, r)) in lhs_views.iter().zip(rhs_views).enumerate() {
+        // Only checking one null mask here because by the time the control 
flow reaches
+        // this point, the equality of the two masks would have already been 
verified.
+        if lhs.is_null(idx) {
+            continue;
+        }
+
+        let l_len_prefix = *l as u64;
+        let r_len_prefix = *r as u64;
+        // short-circuit, check length and prefix
+        if l_len_prefix != r_len_prefix {
+            return false;
+        }
+
+        let len = l_len_prefix as u32;
+        // for inline storage, only need check view
+        if len <= 12 {
+            if l != r {
+                return false;
+            }
+            continue;
+        }
+
+        // check buffers
+        let l_view = ByteView::from(*l);
+        let r_view = ByteView::from(*r);
+
+        let l_buffer = &lhs_buffers[l_view.buffer_index as usize];
+        let r_buffer = &rhs_buffers[r_view.buffer_index as usize];
+
+        // prefixes are already known to be equal; skip checking them
+        let len = len as usize - 4;
+        let l_offset = l_view.offset as usize + 4;
+        let r_offset = r_view.offset as usize + 4;
+        if l_buffer[l_offset..l_offset + len] != r_buffer[r_offset..r_offset + 
len] {
+            return false;
+        }
+    }
+    true
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs
index 0987fd4c563..dba6a0186a5 100644
--- a/arrow-data/src/equal/mod.rs
+++ b/arrow-data/src/equal/mod.rs
@@ -25,6 +25,7 @@ use arrow_schema::{DataType, IntervalUnit};
 use half::f16;
 
 mod boolean;
+mod byte_view;
 mod dictionary;
 mod fixed_binary;
 mod fixed_list;
@@ -41,6 +42,7 @@ mod variable_size;
 // For this reason, they are not exposed and are instead used
 // to build the generic functions below (`equal_range` and `equal`).
 use boolean::boolean_equal;
+use byte_view::byte_view_equal;
 use dictionary::dictionary_equal;
 use fixed_binary::fixed_binary_equal;
 use fixed_list::fixed_list_equal;
@@ -97,7 +99,7 @@ fn equal_values(
         }
         DataType::FixedSizeBinary(_) => fixed_binary_equal(lhs, rhs, 
lhs_start, rhs_start, len),
         DataType::BinaryView | DataType::Utf8View => {
-            unimplemented!("BinaryView/Utf8View not yet implemented")
+            byte_view_equal(lhs, rhs, lhs_start, rhs_start, len)
         }
         DataType::List(_) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, 
len),
         DataType::ListView(_) | DataType::LargeListView(_) => {
diff --git a/arrow-data/src/lib.rs b/arrow-data/src/lib.rs
index cfa0dba66c3..59a049fe96c 100644
--- a/arrow-data/src/lib.rs
+++ b/arrow-data/src/lib.rs
@@ -30,3 +30,6 @@ pub mod decimal;
 
 #[cfg(feature = "ffi")]
 pub mod ffi;
+
+mod byte_view;
+pub use byte_view::*;
diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs
index b14f6e77103..b0d9475afcd 100644
--- a/arrow-data/src/transform/mod.rs
+++ b/arrow-data/src/transform/mod.rs
@@ -15,13 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::{
-    data::{into_buffers, new_buffers},
-    ArrayData, ArrayDataBuilder,
-};
+use super::{data::new_buffers, ArrayData, ArrayDataBuilder, ByteView};
 use crate::bit_mask::set_bits;
 use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
-use arrow_buffer::{bit_util, i256, ArrowNativeType, MutableBuffer};
+use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
 use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
 use half::f16;
 use num::Integer;
@@ -68,36 +65,6 @@ impl<'a> _MutableArrayData<'a> {
             .as_mut()
             .expect("MutableArrayData not nullable")
     }
-
-    fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
-        let buffers = into_buffers(&self.data_type, self.buffer1, 
self.buffer2);
-
-        let child_data = match self.data_type {
-            DataType::Dictionary(_, _) => vec![dictionary.unwrap()],
-            _ => {
-                let mut child_data = Vec::with_capacity(self.child_data.len());
-                for child in self.child_data {
-                    child_data.push(child.freeze());
-                }
-                child_data
-            }
-        };
-
-        let nulls = self
-            .null_buffer
-            .map(|nulls| {
-                let bools = BooleanBuffer::new(nulls.into(), 0, self.len);
-                unsafe { NullBuffer::new_unchecked(bools, self.null_count) }
-            })
-            .filter(|n| n.null_count() > 0);
-
-        ArrayDataBuilder::new(self.data_type)
-            .offset(0)
-            .len(self.len)
-            .nulls(nulls)
-            .buffers(buffers)
-            .child_data(child_data)
-    }
 }
 
 fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> 
ExtendNullBits {
@@ -138,26 +105,32 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: 
bool) -> ExtendNullBits
 pub struct MutableArrayData<'a> {
     #[allow(dead_code)]
     arrays: Vec<&'a ArrayData>,
-    // The attributes in [_MutableArrayData] cannot be in [MutableArrayData] 
due to
-    // mutability invariants (interior mutability):
-    // [MutableArrayData] contains a function that can only mutate 
[_MutableArrayData], not
-    // [MutableArrayData] itself
+    /// The attributes in [_MutableArrayData] cannot be in [MutableArrayData] 
due to
+    /// mutability invariants (interior mutability):
+    /// [MutableArrayData] contains a function that can only mutate 
[_MutableArrayData], not
+    /// [MutableArrayData] itself
     data: _MutableArrayData<'a>,
 
-    // the child data of the `Array` in Dictionary arrays.
-    // This is not stored in `MutableArrayData` because these values constant 
and only needed
-    // at the end, when freezing [_MutableArrayData].
+    /// the child data of the `Array` in Dictionary arrays.
+    /// This is not stored in `MutableArrayData` because these values constant 
and only needed
+    /// at the end, when freezing [_MutableArrayData].
     dictionary: Option<ArrayData>,
 
-    // function used to extend values from arrays. This function's lifetime is 
bound to the array
-    // because it reads values from it.
+    /// Variadic data buffers referenced by views
+    /// This is not stored in `MutableArrayData` because these values constant 
 and only needed
+    /// at the end, when freezing [_MutableArrayData]
+    variadic_data_buffers: Vec<Buffer>,
+
+    /// function used to extend values from arrays. This function's lifetime 
is bound to the array
+    /// because it reads values from it.
     extend_values: Vec<Extend<'a>>,
-    // function used to extend nulls from arrays. This function's lifetime is 
bound to the array
-    // because it reads nulls from it.
+
+    /// function used to extend nulls from arrays. This function's lifetime is 
bound to the array
+    /// because it reads nulls from it.
     extend_null_bits: Vec<ExtendNullBits<'a>>,
 
-    // function used to extend nulls.
-    // this is independent of the arrays and therefore has no lifetime.
+    /// function used to extend nulls.
+    /// this is independent of the arrays and therefore has no lifetime.
     extend_nulls: ExtendNulls,
 }
 
@@ -197,6 +170,26 @@ fn build_extend_dictionary(array: &ArrayData, offset: 
usize, max: usize) -> Opti
     }
 }
 
+/// Builds an extend that adds `buffer_offset` to any buffer indices 
encountered
+fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend {
+    let views = array.buffer::<u128>(0);
+    Box::new(
+        move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
+            mutable
+                .buffer1
+                .extend(views[start..start + len].iter().map(|v| {
+                    let len = *v as u32;
+                    if len <= 12 {
+                        return *v; // Stored inline
+                    }
+                    let mut view = ByteView::from(*v);
+                    view.buffer_index += buffer_offset;
+                    view.into()
+                }))
+        },
+    )
+}
+
 fn build_extend(array: &ArrayData) -> Extend {
     match array.data_type() {
         DataType::Null => null::build_extend(array),
@@ -224,9 +217,7 @@ fn build_extend(array: &ArrayData) -> Extend {
         DataType::Decimal256(_, _) => primitive::build_extend::<i256>(array),
         DataType::Utf8 | DataType::Binary => 
variable_size::build_extend::<i32>(array),
         DataType::LargeUtf8 | DataType::LargeBinary => 
variable_size::build_extend::<i64>(array),
-        DataType::BinaryView | DataType::Utf8View => {
-            unimplemented!("BinaryView/Utf8View not implemented")
-        }
+        DataType::BinaryView | DataType::Utf8View => unreachable!("should use 
build_extend_view"),
         DataType::Map(_, _) | DataType::List(_) => 
list::build_extend::<i32>(array),
         DataType::ListView(_) | DataType::LargeListView(_) => {
             unimplemented!("ListView/LargeListView not implemented")
@@ -272,9 +263,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
         DataType::Decimal256(_, _) => primitive::extend_nulls::<i256>,
         DataType::Utf8 | DataType::Binary => 
variable_size::extend_nulls::<i32>,
         DataType::LargeUtf8 | DataType::LargeBinary => 
variable_size::extend_nulls::<i64>,
-        DataType::BinaryView | DataType::Utf8View => {
-            unimplemented!("BinaryView/Utf8View not implemented")
-        }
+        DataType::BinaryView | DataType::Utf8View => 
primitive::extend_nulls::<u128>,
         DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::<i32>,
         DataType::ListView(_) | DataType::LargeListView(_) => {
             unimplemented!("ListView/LargeListView not implemented")
@@ -429,11 +418,10 @@ impl<'a> MutableArrayData<'a> {
             | DataType::Binary
             | DataType::LargeUtf8
             | DataType::LargeBinary
+            | DataType::BinaryView
+            | DataType::Utf8View
             | DataType::Interval(_)
             | DataType::FixedSizeBinary(_) => vec![],
-            DataType::BinaryView | DataType::Utf8View => {
-                unimplemented!("BinaryView/Utf8View not implemented")
-            }
             DataType::ListView(_) | DataType::LargeListView(_) => {
                 unimplemented!("ListView/LargeListView not implemented")
             }
@@ -566,6 +554,15 @@ impl<'a> MutableArrayData<'a> {
             _ => (None, false),
         };
 
+        let variadic_data_buffers = match &data_type {
+            DataType::BinaryView | DataType::Utf8View => arrays
+                .iter()
+                .flat_map(|x| x.buffers().iter().skip(1))
+                .map(Buffer::clone)
+                .collect(),
+            _ => vec![],
+        };
+
         let extend_nulls = build_extend_nulls(data_type);
 
         let extend_null_bits = arrays
@@ -598,6 +595,20 @@ impl<'a> MutableArrayData<'a> {
 
                 extend_values.expect("MutableArrayData::new is infallible")
             }
+            DataType::BinaryView | DataType::Utf8View => {
+                let mut next_offset = 0u32;
+                arrays
+                    .iter()
+                    .map(|arr| {
+                        let num_data_buffers = (arr.buffers().len() - 1) as 
u32;
+                        let offset = next_offset;
+                        next_offset = next_offset
+                            .checked_add(num_data_buffers)
+                            .expect("view buffer index overflow");
+                        build_extend_view(arr, offset)
+                    })
+                    .collect()
+            }
             _ => arrays.iter().map(|array| build_extend(array)).collect(),
         };
 
@@ -614,6 +625,7 @@ impl<'a> MutableArrayData<'a> {
             arrays,
             data,
             dictionary,
+            variadic_data_buffers,
             extend_values,
             extend_null_bits,
             extend_nulls,
@@ -673,13 +685,55 @@ impl<'a> MutableArrayData<'a> {
 
     /// Creates a [ArrayData] from the pushed regions up to this point, 
consuming `self`.
     pub fn freeze(self) -> ArrayData {
-        unsafe { self.data.freeze(self.dictionary).build_unchecked() }
+        unsafe { self.into_builder().build_unchecked() }
     }
 
     /// Creates a [ArrayDataBuilder] from the pushed regions up to this point, 
consuming `self`.
     /// This is useful for extending the default behavior of MutableArrayData.
     pub fn into_builder(self) -> ArrayDataBuilder {
-        self.data.freeze(self.dictionary)
+        let data = self.data;
+
+        let buffers = match data.data_type {
+            DataType::Null | DataType::Struct(_) | DataType::FixedSizeList(_, 
_) => {
+                vec![]
+            }
+            DataType::BinaryView | DataType::Utf8View => {
+                let mut b = self.variadic_data_buffers;
+                b.insert(0, data.buffer1.into());
+                b
+            }
+            DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | 
DataType::LargeBinary => {
+                vec![data.buffer1.into(), data.buffer2.into()]
+            }
+            DataType::Union(_, mode) => {
+                match mode {
+                    // Based on Union's DataTypeLayout
+                    UnionMode::Sparse => vec![data.buffer1.into()],
+                    UnionMode::Dense => vec![data.buffer1.into(), 
data.buffer2.into()],
+                }
+            }
+            _ => vec![data.buffer1.into()],
+        };
+
+        let child_data = match data.data_type {
+            DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()],
+            _ => data.child_data.into_iter().map(|x| x.freeze()).collect(),
+        };
+
+        let nulls = data
+            .null_buffer
+            .map(|nulls| {
+                let bools = BooleanBuffer::new(nulls.into(), 0, data.len);
+                unsafe { NullBuffer::new_unchecked(bools, data.null_count) }
+            })
+            .filter(|n| n.null_count() > 0);
+
+        ArrayDataBuilder::new(data.data_type)
+            .offset(0)
+            .len(data.len)
+            .nulls(nulls)
+            .buffers(buffers)
+            .child_data(child_data)
     }
 }
 
diff --git a/arrow/tests/array_equal.rs b/arrow/tests/array_equal.rs
index 9bd27642888..15011c54728 100644
--- a/arrow/tests/array_equal.rs
+++ b/arrow/tests/array_equal.rs
@@ -22,8 +22,8 @@ use arrow::array::{
     StringArray, StringDictionaryBuilder, StructArray, UnionBuilder,
 };
 use arrow::datatypes::{Int16Type, Int32Type};
-use arrow_array::builder::{StringBuilder, StructBuilder};
-use arrow_array::{DictionaryArray, FixedSizeListArray};
+use arrow_array::builder::{StringBuilder, StringViewBuilder, StructBuilder};
+use arrow_array::{DictionaryArray, FixedSizeListArray, StringViewArray};
 use arrow_buffer::{Buffer, ToByteSlice};
 use arrow_data::{ArrayData, ArrayDataBuilder};
 use arrow_schema::{DataType, Field, Fields};
@@ -307,6 +307,50 @@ fn test_fixed_size_binary_array() {
     test_equal(&a, &b, true);
 }
 
+#[test]
+fn test_string_view_equal() {
+    let a1 = StringViewArray::from(vec!["foo", "very long string over 12 
bytes", "bar"]);
+    let a2 = StringViewArray::from(vec![
+        "a very long string over 12 bytes",
+        "foo",
+        "very long string over 12 bytes",
+        "bar",
+    ]);
+    test_equal(&a1, &a2.slice(1, 3), true);
+
+    let a1 = StringViewArray::from(vec!["foo", "very long string over 12 
bytes", "bar"]);
+    let a2 = StringViewArray::from(vec!["foo", "very long string over 12 
bytes", "bar"]);
+    test_equal(&a1, &a2, true);
+
+    let a1_s = a1.slice(1, 1);
+    let a2_s = a2.slice(1, 1);
+    test_equal(&a1_s, &a2_s, true);
+
+    let a1_s = a1.slice(2, 1);
+    let a2_s = a2.slice(0, 1);
+    test_equal(&a1_s, &a2_s, false);
+
+    // test will null value.
+    let a1 = StringViewArray::from(vec!["foo", "very long string over 12 
bytes", "bar"]);
+    let a2 = {
+        let mut builder = StringViewBuilder::new();
+        builder.append_value("foo");
+        builder.append_null();
+        builder.append_option(Some("very long string over 12 bytes"));
+        builder.append_value("bar");
+        builder.finish()
+    };
+    test_equal(&a1, &a2, false);
+
+    let a1_s = a1.slice(1, 2);
+    let a2_s = a2.slice(1, 3);
+    test_equal(&a1_s, &a2_s, false);
+
+    let a1_s = a1.slice(1, 2);
+    let a2_s = a2.slice(2, 2);
+    test_equal(&a1_s, &a2_s, true);
+}
+
 #[test]
 fn test_string_offset() {
     let a = StringArray::from(vec![Some("a"), None, Some("b")]);
diff --git a/arrow/tests/array_transform.rs b/arrow/tests/array_transform.rs
index 5a267c876d6..83d3003a058 100644
--- a/arrow/tests/array_transform.rs
+++ b/arrow/tests/array_transform.rs
@@ -22,6 +22,7 @@ use arrow::array::{
     UnionArray,
 };
 use arrow::datatypes::Int16Type;
+use arrow_array::StringViewArray;
 use arrow_buffer::Buffer;
 use arrow_data::transform::MutableArrayData;
 use arrow_data::ArrayData;
@@ -1027,6 +1028,44 @@ fn test_extend_nulls_panic() {
     mutable.extend_nulls(2);
 }
 
+#[test]
+fn test_string_view() {
+    let a1 =
+        StringViewArray::from(vec!["foo", "very long string over 12 bytes", 
"bar"]).into_data();
+    let a2 = StringViewArray::from_iter(vec![
+        Some("bar"),
+        None,
+        Some("long string also over 12 bytes"),
+    ])
+    .into_data();
+
+    a1.validate_full().unwrap();
+    a2.validate_full().unwrap();
+
+    let mut mutable = MutableArrayData::new(vec![&a1, &a2], false, 4);
+    mutable.extend(1, 0, 1);
+    mutable.extend(0, 1, 2);
+    mutable.extend(0, 0, 1);
+    mutable.extend(1, 2, 3);
+
+    let array = StringViewArray::from(mutable.freeze());
+    assert_eq!(array.data_buffers().len(), 2);
+    // Should have reused data buffers
+    assert_eq!(array.data_buffers()[0].as_ptr(), a1.buffers()[1].as_ptr());
+    assert_eq!(array.data_buffers()[1].as_ptr(), a2.buffers()[1].as_ptr());
+
+    let v = array.iter().collect::<Vec<_>>();
+    assert_eq!(
+        v,
+        vec![
+            Some("bar"),
+            Some("very long string over 12 bytes"),
+            Some("foo"),
+            Some("long string also over 12 bytes")
+        ]
+    )
+}
+
 #[test]
 #[should_panic(expected = "Arrays with inconsistent types passed to 
MutableArrayData")]
 fn test_mixed_types() {

Reply via email to