This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new eff058fc7 Use NullBuffer in ArrayData (#3775) (#3778)
eff058fc7 is described below

commit eff058fc7a156d1b22569bd60a747d98960d97e7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 2 12:57:30 2023 +0000

    Use NullBuffer in ArrayData (#3775) (#3778)
    
    * Use NullBuffer in ArrayData (#3775)
    
    * Clippy
    
    * Format
    
    * Doc
    
    * Tweaks
    
    * Review feedback
---
 arrow-arith/src/aggregate.rs                       |  28 +--
 arrow-arith/src/arithmetic.rs                      |  11 +-
 arrow-arith/src/arity.rs                           |  14 +-
 arrow-arith/src/boolean.rs                         | 125 +++++-----
 arrow-array/src/array/binary_array.rs              |   2 +-
 arrow-array/src/array/boolean_array.rs             |  14 +-
 arrow-array/src/array/byte_array.rs                |   5 +-
 arrow-array/src/array/dictionary_array.rs          |  33 +--
 arrow-array/src/array/fixed_size_binary_array.rs   |   2 +-
 arrow-array/src/array/null_array.rs                |   2 +-
 arrow-array/src/array/primitive_array.rs           |  18 +-
 arrow-array/src/array/struct_array.rs              |  17 +-
 arrow-array/src/builder/boolean_builder.rs         |   4 +-
 arrow-array/src/builder/generic_bytes_builder.rs   |   4 +-
 arrow-array/src/builder/struct_builder.rs          |   6 +-
 arrow-array/src/lib.rs                             |   6 +-
 arrow-array/src/record_batch.rs                    |   2 +-
 arrow-buffer/src/buffer/boolean.rs                 |  60 ++++-
 arrow-buffer/src/buffer/null.rs                    |  38 +++-
 arrow-cast/src/cast.rs                             |  62 ++---
 arrow-data/src/bit_mask.rs                         |   5 +-
 arrow-data/src/bitmap.rs                           | 189 ---------------
 arrow-data/src/data/mod.rs                         | 253 ++++++++++++---------
 arrow-data/src/equal/boolean.rs                    |  18 +-
 arrow-data/src/equal/dictionary.rs                 |  12 +-
 arrow-data/src/equal/fixed_binary.rs               |  21 +-
 arrow-data/src/equal/fixed_list.rs                 |  11 +-
 arrow-data/src/equal/list.rs                       |  13 +-
 arrow-data/src/equal/primitive.rs                  |  21 +-
 arrow-data/src/equal/structure.rs                  |  11 +-
 arrow-data/src/equal/utils.rs                      |  19 +-
 arrow-data/src/equal/variable_size.rs              |  16 +-
 arrow-data/src/ffi.rs                              |  29 ++-
 arrow-data/src/lib.rs                              |   2 -
 arrow-data/src/transform/mod.rs                    |  15 +-
 arrow-ipc/src/reader.rs                            |   2 -
 arrow-ipc/src/writer.rs                            |   5 +-
 arrow-json/src/raw/list_array.rs                   |   9 +-
 arrow-json/src/raw/map_array.rs                    |  10 +-
 arrow-json/src/raw/mod.rs                          |  10 +-
 arrow-json/src/raw/struct_array.rs                 |  10 +-
 arrow-json/src/reader.rs                           |   6 +-
 arrow-ord/src/comparison.rs                        |  15 +-
 arrow-ord/src/sort.rs                              |   1 -
 arrow-row/src/list.rs                              |   3 +-
 arrow-select/src/filter.rs                         |   8 +-
 arrow-select/src/nullif.rs                         |  22 +-
 arrow-select/src/take.rs                           |  26 +--
 arrow-string/src/length.rs                         |   5 +-
 arrow-string/src/regexp.rs                         |   2 +-
 arrow-string/src/substring.rs                      |  20 +-
 arrow/src/lib.rs                                   |   4 -
 arrow/tests/array_validation.rs                    |  12 +-
 parquet/src/arrow/arrow_writer/levels.rs           |  16 +-
 .../src/arrow/record_reader/definition_levels.rs   |   7 +-
 parquet/src/arrow/record_reader/mod.rs             |  15 +-
 56 files changed, 577 insertions(+), 719 deletions(-)

diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs
index b578dbd4a..7777bb0ed 100644
--- a/arrow-arith/src/aggregate.rs
+++ b/arrow-arith/src/aggregate.rs
@@ -117,8 +117,8 @@ where
             .map(|i| unsafe { array.value_unchecked(i) })
             .reduce(|acc, item| if cmp(&acc, &item) { item } else { acc })
     } else {
-        let null_buffer = array.data_ref().null_buffer().unwrap();
-        let iter = BitIndexIterator::new(null_buffer, array.offset(), 
array.len());
+        let nulls = array.data().nulls().unwrap();
+        let iter = BitIndexIterator::new(nulls.validity(), nulls.offset(), 
nulls.len());
         unsafe {
             let idx = iter.reduce(|acc_idx, idx| {
                 let acc = array.value_unchecked(acc_idx);
@@ -288,7 +288,7 @@ where
 
     let data: &[T::Native] = array.values();
 
-    match array.data().null_buffer() {
+    match array.data().nulls() {
         None => {
             let sum = data.iter().fold(T::default_value(), |accumulator, 
value| {
                 accumulator.add_wrapping(*value)
@@ -296,12 +296,12 @@ where
 
             Some(sum)
         }
-        Some(buffer) => {
+        Some(nulls) => {
             let mut sum = T::default_value();
             let data_chunks = data.chunks_exact(64);
             let remainder = data_chunks.remainder();
 
-            let bit_chunks = buffer.bit_chunks(array.offset(), array.len());
+            let bit_chunks = nulls.inner().bit_chunks();
             data_chunks
                 .zip(bit_chunks.iter())
                 .for_each(|(chunk, mask)| {
@@ -347,7 +347,7 @@ where
 
     let data: &[T::Native] = array.values();
 
-    match array.data().null_buffer() {
+    match array.data().nulls() {
         None => {
             let sum = data
                 .iter()
@@ -357,14 +357,14 @@ where
 
             Ok(Some(sum))
         }
-        Some(buffer) => {
+        Some(nulls) => {
             let mut sum = T::default_value();
 
             try_for_each_valid_idx(
-                array.len(),
-                array.offset(),
-                null_count,
-                Some(buffer.as_slice()),
+                nulls.len(),
+                nulls.offset(),
+                nulls.null_count(),
+                Some(nulls.validity()),
                 |idx| {
                     unsafe { sum = 
sum.add_checked(array.value_unchecked(idx))? };
                     Ok::<_, ArrowError>(())
@@ -665,7 +665,7 @@ mod simd {
         let mut chunk_acc = A::init_accumulator_chunk();
         let mut rem_acc = A::init_accumulator_scalar();
 
-        match array.data().null_buffer() {
+        match array.data().nulls() {
             None => {
                 let data_chunks = data.chunks_exact(64);
                 let remainder = data_chunks.remainder();
@@ -681,12 +681,12 @@ mod simd {
                     A::accumulate_scalar(&mut rem_acc, *value);
                 });
             }
-            Some(buffer) => {
+            Some(nulls) => {
                 // process data in chunks of 64 elements since we also get 64 
bits of validity information at a time
                 let data_chunks = data.chunks_exact(64);
                 let remainder = data_chunks.remainder();
 
-                let bit_chunks = buffer.bit_chunks(array.offset(), 
array.len());
+                let bit_chunks = nulls.inner().bit_chunks();
                 let remainder_bits = bit_chunks.remainder_bits();
 
                 data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| {
diff --git a/arrow-arith/src/arithmetic.rs b/arrow-arith/src/arithmetic.rs
index 40e7d6780..0fb559f06 100644
--- a/arrow-arith/src/arithmetic.rs
+++ b/arrow-arith/src/arithmetic.rs
@@ -1572,6 +1572,7 @@ mod tests {
     use arrow_array::builder::{
         BooleanBufferBuilder, BufferBuilder, PrimitiveDictionaryBuilder,
     };
+    use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
     use arrow_buffer::i256;
     use arrow_data::ArrayDataBuilder;
     use chrono::NaiveDate;
@@ -3057,15 +3058,19 @@ mod tests {
         // `count_set_bits_offset` takes len in bits as parameter.
         assert_eq!(null_buffer.count_set_bits_offset(0, 13), 0);
 
+        let nulls = BooleanBuffer::new(null_buffer, 0, 13);
+        assert_eq!(nulls.count_set_bits(), 0);
+        let nulls = NullBuffer::new(nulls);
+        assert_eq!(nulls.null_count(), 13);
+
         let mut data_buffer_builder = BufferBuilder::<i32>::new(13);
         data_buffer_builder.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 12]);
         let data_buffer = data_buffer_builder.finish();
 
         let arg1: Int32Array = ArrayDataBuilder::new(DataType::Int32)
             .len(13)
-            .null_count(13)
+            .nulls(Some(nulls))
             .buffers(vec![data_buffer])
-            .null_bit_buffer(Some(null_buffer))
             .build()
             .unwrap()
             .into();
@@ -3078,9 +3083,7 @@ mod tests {
 
         let arg2: Int32Array = ArrayDataBuilder::new(DataType::Int32)
             .len(13)
-            .null_count(0)
             .buffers(vec![data_buffer])
-            .null_bit_buffer(None)
             .build()
             .unwrap()
             .into();
diff --git a/arrow-arith/src/arity.rs b/arrow-arith/src/arity.rs
index 3e7a81862..ea078765d 100644
--- a/arrow-arith/src/arity.rs
+++ b/arrow-arith/src/arity.rs
@@ -20,6 +20,7 @@
 use arrow_array::builder::BufferBuilder;
 use arrow_array::iterator::ArrayIter;
 use arrow_array::*;
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_buffer::{Buffer, MutableBuffer};
 use arrow_data::bit_iterator::try_for_each_valid_idx;
 use arrow_data::bit_mask::combine_option_bitmap;
@@ -276,10 +277,7 @@ where
     let len = a.len();
 
     let null_buffer = combine_option_bitmap(&[a.data(), b.data()], len);
-    let null_count = null_buffer
-        .as_ref()
-        .map(|x| len - x.count_set_bits_offset(0, len))
-        .unwrap_or_default();
+    let nulls = null_buffer.map(|b| NullBuffer::new(BooleanBuffer::new(b, 0, 
len)));
 
     let mut builder = a.into_builder()?;
 
@@ -289,13 +287,7 @@ where
         .zip(b.values())
         .for_each(|(l, r)| *l = op(*l, *r));
 
-    let array_builder = builder
-        .finish()
-        .data()
-        .clone()
-        .into_builder()
-        .null_bit_buffer(null_buffer)
-        .null_count(null_count);
+    let array_builder = 
builder.finish().into_data().into_builder().nulls(nulls);
 
     let array_data = unsafe { array_builder.build_unchecked() };
     Ok(Ok(PrimitiveArray::<T>::from(array_data)))
diff --git a/arrow-arith/src/boolean.rs b/arrow-arith/src/boolean.rs
index 4c1a02ad7..5bd39a673 100644
--- a/arrow-arith/src/boolean.rs
+++ b/arrow-arith/src/boolean.rs
@@ -39,16 +39,13 @@ use arrow_schema::{ArrowError, DataType};
 /// of one side if other side is a false value.
 pub(crate) fn build_null_buffer_for_and_kleene(
     left_data: &ArrayData,
-    left_offset: usize,
     right_data: &ArrayData,
-    right_offset: usize,
-    len_in_bits: usize,
 ) -> Option<Buffer> {
     let left_buffer = &left_data.buffers()[0];
     let right_buffer = &right_data.buffers()[0];
 
-    let left_null_buffer = left_data.null_buffer();
-    let right_null_buffer = right_data.null_buffer();
+    let left_null_buffer = left_data.nulls();
+    let right_null_buffer = right_data.nulls();
 
     match (left_null_buffer, right_null_buffer) {
         (None, None) => None,
@@ -58,22 +55,22 @@ pub(crate) fn build_null_buffer_for_and_kleene(
             // 1. left null bit is set, or
             // 2. right data bit is false (because null AND false = false).
             Some(bitwise_bin_op_helper(
-                left_null_buffer,
-                left_offset,
+                left_null_buffer.buffer(),
+                left_null_buffer.offset(),
                 right_buffer,
-                right_offset,
-                len_in_bits,
+                right_data.offset(),
+                left_data.len(),
                 |a, b| a | !b,
             ))
         }
         (None, Some(right_null_buffer)) => {
             // Same as above
             Some(bitwise_bin_op_helper(
-                right_null_buffer,
-                right_offset,
+                right_null_buffer.buffer(),
+                right_null_buffer.offset(),
                 left_buffer,
-                left_offset,
-                len_in_bits,
+                left_data.offset(),
+                left_data.len(),
                 |a, b| a | !b,
             ))
         }
@@ -85,13 +82,18 @@ pub(crate) fn build_null_buffer_for_and_kleene(
             // (a | (c & !d)) & (c | (a & !b))
             Some(bitwise_quaternary_op_helper(
                 [
-                    left_null_buffer,
+                    left_null_buffer.buffer(),
                     left_buffer,
-                    right_null_buffer,
+                    right_null_buffer.buffer(),
                     right_buffer,
                 ],
-                [left_offset, left_offset, right_offset, right_offset],
-                len_in_bits,
+                [
+                    left_null_buffer.offset(),
+                    left_data.offset(),
+                    right_null_buffer.offset(),
+                    right_data.offset(),
+                ],
+                left_data.len(),
                 |a, b, c, d| (a | (c & !d)) & (c | (a & !b)),
             ))
         }
@@ -101,13 +103,10 @@ pub(crate) fn build_null_buffer_for_and_kleene(
 /// For AND/OR kernels, the result of null buffer is simply a bitwise `and` 
operation.
 pub(crate) fn build_null_buffer_for_and_or(
     left_data: &ArrayData,
-    _left_offset: usize,
     right_data: &ArrayData,
-    _right_offset: usize,
-    len_in_bits: usize,
 ) -> Option<Buffer> {
     // `arrays` are not empty, so safely do `unwrap` directly.
-    combine_option_bitmap(&[left_data, right_data], len_in_bits)
+    combine_option_bitmap(&[left_data, right_data], left_data.len())
 }
 
 /// Updates null buffer based on data buffer and null buffer of the operand at 
other side
@@ -116,45 +115,39 @@ pub(crate) fn build_null_buffer_for_and_or(
 /// buffer of one side if other side is a true value.
 pub(crate) fn build_null_buffer_for_or_kleene(
     left_data: &ArrayData,
-    left_offset: usize,
     right_data: &ArrayData,
-    right_offset: usize,
-    len_in_bits: usize,
 ) -> Option<Buffer> {
     let left_buffer = &left_data.buffers()[0];
     let right_buffer = &right_data.buffers()[0];
 
-    let left_null_buffer = left_data.null_buffer();
-    let right_null_buffer = right_data.null_buffer();
-
-    match (left_null_buffer, right_null_buffer) {
+    match (left_data.nulls(), right_data.nulls()) {
         (None, None) => None,
-        (Some(left_null_buffer), None) => {
+        (Some(left_nulls), None) => {
             // The right side has no null values.
             // The final null bit is set only if:
             // 1. left null bit is set, or
             // 2. right data bit is true (because null OR true = true).
             Some(bitwise_bin_op_helper(
-                left_null_buffer,
-                left_offset,
+                left_nulls.buffer(),
+                left_nulls.offset(),
                 right_buffer,
-                right_offset,
-                len_in_bits,
+                right_data.offset(),
+                right_data.len(),
                 |a, b| a | b,
             ))
         }
-        (None, Some(right_null_buffer)) => {
+        (None, Some(right_nulls)) => {
             // Same as above
             Some(bitwise_bin_op_helper(
-                right_null_buffer,
-                right_offset,
+                right_nulls.buffer(),
+                right_nulls.offset(),
                 left_buffer,
-                left_offset,
-                len_in_bits,
+                left_data.offset(),
+                left_data.len(),
                 |a, b| a | b,
             ))
         }
-        (Some(left_null_buffer), Some(right_null_buffer)) => {
+        (Some(left_nulls), Some(right_nulls)) => {
             // Follow the same logic above. Both sides have null values.
             // Assume a is left null bits, b is left data bits, c is right 
null bits,
             // d is right data bits.
@@ -162,13 +155,18 @@ pub(crate) fn build_null_buffer_for_or_kleene(
             // (a | (c & d)) & (c | (a & b))
             Some(bitwise_quaternary_op_helper(
                 [
-                    left_null_buffer,
+                    left_nulls.buffer(),
                     left_buffer,
-                    right_null_buffer,
+                    right_nulls.buffer(),
                     right_buffer,
                 ],
-                [left_offset, left_offset, right_offset, right_offset],
-                len_in_bits,
+                [
+                    left_nulls.offset(),
+                    left_data.offset(),
+                    right_nulls.offset(),
+                    right_data.offset(),
+                ],
+                left_data.len(),
                 |a, b, c, d| (a | (c & d)) & (c | (a & b)),
             ))
         }
@@ -184,7 +182,7 @@ pub(crate) fn binary_boolean_kernel<F, U>(
 ) -> Result<BooleanArray, ArrowError>
 where
     F: Fn(&Buffer, usize, &Buffer, usize, usize) -> Buffer,
-    U: Fn(&ArrayData, usize, &ArrayData, usize, usize) -> Option<Buffer>,
+    U: Fn(&ArrayData, &ArrayData) -> Option<Buffer>,
 {
     if left.len() != right.len() {
         return Err(ArrowError::ComputeError(
@@ -202,7 +200,7 @@ where
     let left_offset = left.offset();
     let right_offset = right.offset();
 
-    let null_bit_buffer = null_op(left_data, left_offset, right_data, 
right_offset, len);
+    let null_bit_buffer = null_op(left_data, right_data);
 
     let values = op(left_buffer, left_offset, right_buffer, right_offset, len);
 
@@ -353,10 +351,7 @@ pub fn not(left: &BooleanArray) -> Result<BooleanArray, 
ArrowError> {
     let len = left.len();
 
     let data = left.data_ref();
-    let null_bit_buffer = data
-        .null_bitmap()
-        .as_ref()
-        .map(|b| b.buffer().bit_slice(left_offset, len));
+    let null_bit_buffer = data.nulls().map(|b| b.inner().sliced());
 
     let values = buffer_unary_not(&data.buffers()[0], left_offset, len);
 
@@ -388,12 +383,12 @@ pub fn not(left: &BooleanArray) -> Result<BooleanArray, 
ArrowError> {
 pub fn is_null(input: &dyn Array) -> Result<BooleanArray, ArrowError> {
     let len = input.len();
 
-    let output = match input.data_ref().null_buffer() {
+    let output = match input.data_ref().nulls() {
         None => {
             let len_bytes = ceil(len, 8);
             MutableBuffer::from_len_zeroed(len_bytes).into()
         }
-        Some(buffer) => buffer_unary_not(buffer, input.offset(), len),
+        Some(nulls) => buffer_unary_not(nulls.buffer(), nulls.offset(), 
nulls.len()),
     };
 
     let data = unsafe {
@@ -425,14 +420,14 @@ pub fn is_null(input: &dyn Array) -> Result<BooleanArray, 
ArrowError> {
 pub fn is_not_null(input: &dyn Array) -> Result<BooleanArray, ArrowError> {
     let len = input.len();
 
-    let output = match input.data_ref().null_buffer() {
+    let output = match input.data_ref().nulls() {
         None => {
             let len_bytes = ceil(len, 8);
             MutableBuffer::new(len_bytes)
                 .with_bitset(len_bytes, true)
                 .into()
         }
-        Some(buffer) => buffer.bit_slice(input.offset(), len),
+        Some(nulls) => nulls.inner().sliced(),
     };
 
     let data = unsafe {
@@ -615,7 +610,7 @@ mod tests {
         let a = BooleanArray::from(vec![false, false, false, true, true, 
true]);
 
         // ensure null bitmap of a is absent
-        assert!(a.data_ref().null_bitmap().is_none());
+        assert!(a.data().nulls().is_none());
 
         let b = BooleanArray::from(vec![
             Some(true),
@@ -627,7 +622,7 @@ mod tests {
         ]);
 
         // ensure null bitmap of b is present
-        assert!(b.data_ref().null_bitmap().is_some());
+        assert!(b.data().nulls().is_some());
 
         let c = or_kleene(&a, &b).unwrap();
 
@@ -655,12 +650,12 @@ mod tests {
         ]);
 
         // ensure null bitmap of b is absent
-        assert!(a.data_ref().null_bitmap().is_some());
+        assert!(a.data().nulls().is_some());
 
         let b = BooleanArray::from(vec![false, false, false, true, true, 
true]);
 
         // ensure null bitmap of a is present
-        assert!(b.data_ref().null_bitmap().is_none());
+        assert!(b.data().nulls().is_none());
 
         let c = or_kleene(&a, &b).unwrap();
 
@@ -857,7 +852,7 @@ mod tests {
         let expected = BooleanArray::from(vec![false, false, false, false]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -870,7 +865,7 @@ mod tests {
         let expected = BooleanArray::from(vec![false, false, false, false]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -882,7 +877,7 @@ mod tests {
         let expected = BooleanArray::from(vec![true, true, true, true]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -895,7 +890,7 @@ mod tests {
         let expected = BooleanArray::from(vec![true, true, true, true]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -907,7 +902,7 @@ mod tests {
         let expected = BooleanArray::from(vec![false, true, false, true]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -938,7 +933,7 @@ mod tests {
         let expected = BooleanArray::from(vec![false, true, false, true]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -950,7 +945,7 @@ mod tests {
         let expected = BooleanArray::from(vec![true, false, true, false]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 
     #[test]
@@ -981,6 +976,6 @@ mod tests {
         let expected = BooleanArray::from(vec![true, false, true, false]);
 
         assert_eq!(expected, res);
-        assert_eq!(None, res.data_ref().null_bitmap());
+        assert!(res.data().nulls().is_none());
     }
 }
diff --git a/arrow-array/src/array/binary_array.rs 
b/arrow-array/src/array/binary_array.rs
index 50757dcbe..1a3270a70 100644
--- a/arrow-array/src/array/binary_array.rs
+++ b/arrow-array/src/array/binary_array.rs
@@ -77,7 +77,7 @@ impl<OffsetSize: OffsetSizeTrait> 
GenericBinaryArray<OffsetSize> {
             .offset(v.offset())
             .add_buffer(v.data_ref().buffers()[0].clone())
             .add_buffer(child_data.buffers()[0].slice(child_data.offset()))
-            .null_bit_buffer(v.data_ref().null_buffer().cloned());
+            .nulls(v.data().nulls().cloned());
 
         let data = unsafe { builder.build_unchecked() };
         Self::from(data)
diff --git a/arrow-array/src/array/boolean_array.rs 
b/arrow-array/src/array/boolean_array.rs
index 8d1296c66..e924824e7 100644
--- a/arrow-array/src/array/boolean_array.rs
+++ b/arrow-array/src/array/boolean_array.rs
@@ -105,9 +105,9 @@ impl BooleanArray {
 
     /// Returns the number of non null, true values within this array
     pub fn true_count(&self) -> usize {
-        match self.data.null_buffer() {
+        match self.data.nulls() {
             Some(nulls) => {
-                let null_chunks = nulls.bit_chunks(self.offset(), self.len());
+                let null_chunks = nulls.inner().bit_chunks();
                 let value_chunks = self.values().bit_chunks(self.offset(), 
self.len());
                 null_chunks
                     .iter()
@@ -187,11 +187,7 @@ impl BooleanArray {
     where
         F: FnMut(T::Item) -> bool,
     {
-        let null_bit_buffer = left
-            .data()
-            .null_buffer()
-            .map(|b| b.bit_slice(left.offset(), left.len()));
-
+        let null_bit_buffer = left.data().nulls().map(|x| x.inner().sliced());
         let buffer = MutableBuffer::collect_bool(left.len(), |i| unsafe {
             // SAFETY: i in range 0..len
             op(left.value_unchecked(i))
@@ -459,7 +455,7 @@ mod tests {
         assert_eq!(4, arr.len());
         assert_eq!(0, arr.offset());
         assert_eq!(0, arr.null_count());
-        assert!(arr.data().null_buffer().is_none());
+        assert!(arr.data().nulls().is_none());
         for i in 0..3 {
             assert!(!arr.is_null(i));
             assert!(arr.is_valid(i));
@@ -474,7 +470,7 @@ mod tests {
         assert_eq!(4, arr.len());
         assert_eq!(0, arr.offset());
         assert_eq!(2, arr.null_count());
-        assert!(arr.data().null_buffer().is_some());
+        assert!(arr.data().nulls().is_some());
 
         assert!(arr.is_valid(0));
         assert!(arr.is_null(1));
diff --git a/arrow-array/src/array/byte_array.rs 
b/arrow-array/src/array/byte_array.rs
index f6946228c..442e795ce 100644
--- a/arrow-array/src/array/byte_array.rs
+++ b/arrow-array/src/array/byte_array.rs
@@ -137,10 +137,7 @@ impl<T: ByteArrayType> GenericByteArray<T> {
     /// offset and data buffers are not shared by others.
     pub fn into_builder(self) -> Result<GenericByteBuilder<T>, Self> {
         let len = self.len();
-        let null_bit_buffer = self
-            .data
-            .null_buffer()
-            .map(|b| b.bit_slice(self.data.offset(), len));
+        let null_bit_buffer = self.data.nulls().map(|b| b.inner().sliced());
 
         let element_len = std::mem::size_of::<T::Offset>();
         let offset_buffer = self.data.buffers()[0]
diff --git a/arrow-array/src/array/dictionary_array.rs 
b/arrow-array/src/array/dictionary_array.rs
index eb2f1b606..60426e5b3 100644
--- a/arrow-array/src/array/dictionary_array.rs
+++ b/arrow-array/src/array/dictionary_array.rs
@@ -249,23 +249,15 @@ impl<K: ArrowPrimitiveType> DictionaryArray<K> {
 
         // Note: This use the ArrayDataBuilder::build_unchecked and afterwards
         // call the new function which only validates that the keys are in 
bounds.
-        let mut data = ArrayData::builder(dict_data_type)
-            .len(keys.len())
-            .add_buffer(keys.data().buffers()[0].clone())
+        let data = keys.data().clone();
+        let builder = data
+            .into_builder()
+            .data_type(dict_data_type)
             .add_child_data(values.data().clone());
 
-        match keys.data().null_buffer() {
-            Some(buffer) if keys.data().null_count() > 0 => {
-                data = data
-                    .null_bit_buffer(Some(buffer.clone()))
-                    .null_count(keys.data().null_count());
-            }
-            _ => data = data.null_count(0),
-        }
-
         // Safety: `validate` ensures key type is correct, and
         //  `validate_values` ensures all offsets are within range
-        let array = unsafe { data.build_unchecked() };
+        let array = unsafe { builder.build_unchecked() };
 
         array.validate()?;
         array.validate_values()?;
@@ -430,16 +422,13 @@ impl<T: ArrowPrimitiveType> From<ArrayData> for 
DictionaryArray<T> {
             // create a zero-copy of the keys' data
             // SAFETY:
             // ArrayData is valid and verified type above
+
             let keys = PrimitiveArray::<T>::from(unsafe {
-                ArrayData::new_unchecked(
-                    T::DATA_TYPE,
-                    data.len(),
-                    Some(data.null_count()),
-                    data.null_buffer().cloned(),
-                    data.offset(),
-                    data.buffers().to_vec(),
-                    vec![],
-                )
+                data.clone()
+                    .into_builder()
+                    .data_type(T::DATA_TYPE)
+                    .child_data(vec![])
+                    .build_unchecked()
             });
             let values = make_array(data.child_data()[0].clone());
             Self {
diff --git a/arrow-array/src/array/fixed_size_binary_array.rs 
b/arrow-array/src/array/fixed_size_binary_array.rs
index 89ace430d..e927c8d8a 100644
--- a/arrow-array/src/array/fixed_size_binary_array.rs
+++ b/arrow-array/src/array/fixed_size_binary_array.rs
@@ -408,7 +408,7 @@ impl From<FixedSizeListArray> for FixedSizeBinaryArray {
             .len(v.len())
             .offset(v.offset())
             .add_buffer(child_data.buffers()[0].slice(child_data.offset()))
-            .null_bit_buffer(v.data_ref().null_buffer().cloned());
+            .nulls(v.data_ref().nulls().cloned());
 
         let data = unsafe { builder.build_unchecked() };
         Self::from(data)
diff --git a/arrow-array/src/array/null_array.rs 
b/arrow-array/src/array/null_array.rs
index 6b68aace7..8eb8e64b0 100644
--- a/arrow-array/src/array/null_array.rs
+++ b/arrow-array/src/array/null_array.rs
@@ -99,7 +99,7 @@ impl From<ArrayData> for NullArray {
             "NullArray data should contain 0 buffers"
         );
         assert!(
-            data.null_buffer().is_none(),
+            data.nulls().is_none(),
             "NullArray data should not contain a null buffer, as no buffers 
are required"
         );
         Self { data }
diff --git a/arrow-array/src/array/primitive_array.rs 
b/arrow-array/src/array/primitive_array.rs
index 53217a06f..0e28060b2 100644
--- a/arrow-array/src/array/primitive_array.rs
+++ b/arrow-array/src/array/primitive_array.rs
@@ -443,7 +443,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
         let len = self.len();
         let null_count = self.null_count();
 
-        let null_buffer = data.null_buffer().map(|b| 
b.bit_slice(data.offset(), len));
+        let null_buffer = data.nulls().map(|b| b.inner().sliced());
         let values = self.values().iter().map(|v| op(*v));
         // JUSTIFICATION
         //  Benefit
@@ -500,7 +500,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
         let len = self.len();
         let null_count = self.null_count();
 
-        let null_buffer = data.null_buffer().map(|b| 
b.bit_slice(data.offset(), len));
+        let null_buffer = data.nulls().map(|b| b.inner().sliced());
         let mut buffer = BufferBuilder::<O::Native>::new(len);
         buffer.append_n_zeroed(len);
         let slice = buffer.as_slice_mut();
@@ -567,9 +567,10 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
     {
         let data = self.data();
         let len = data.len();
-        let offset = data.offset();
-        let null_count = data.null_count();
-        let nulls = data.null_buffer().map(|x| x.as_slice());
+        let (nulls, null_count, offset) = match data.nulls() {
+            Some(n) => (Some(n.validity()), n.null_count(), n.offset()),
+            None => (None, 0, 0),
+        };
 
         let mut null_builder = BooleanBufferBuilder::new(len);
         match nulls {
@@ -608,10 +609,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
     /// data buffer is not shared by others.
     pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
         let len = self.len();
-        let null_bit_buffer = self
-            .data
-            .null_buffer()
-            .map(|b| b.bit_slice(self.data.offset(), len));
+        let null_bit_buffer = self.data.nulls().map(|b| b.inner().sliced());
 
         let element_len = std::mem::size_of::<T::Native>();
         let buffer = self.data.buffers()[0]
@@ -1791,7 +1789,7 @@ mod tests {
         let primitive_array = PrimitiveArray::<Int32Type>::from_iter(iter);
         assert_eq!(primitive_array.len(), 10);
         assert_eq!(primitive_array.null_count(), 0);
-        assert_eq!(primitive_array.data().null_buffer(), None);
+        assert!(primitive_array.data().nulls().is_none());
         assert_eq!(primitive_array.values(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
     }
 
diff --git a/arrow-array/src/array/struct_array.rs 
b/arrow-array/src/array/struct_array.rs
index 9149895f6..35d4444e0 100644
--- a/arrow-array/src/array/struct_array.rs
+++ b/arrow-array/src/array/struct_array.rs
@@ -154,22 +154,20 @@ impl TryFrom<Vec<(&str, ArrayRef)>> for StructArray {
             fields.push(Field::new(
                 field_name,
                 array.data_type().clone(),
-                child_datum.null_buffer().is_some(),
+                child_datum.nulls().is_some(),
             ));
 
-            if let Some(child_null_buffer) = child_datum.null_buffer() {
-                let child_datum_offset = child_datum.offset();
-
+            if let Some(child_nulls) = child_datum.nulls() {
                 null = Some(if let Some(null_buffer) = &null {
                     buffer_bin_or(
                         null_buffer,
                         0,
-                        child_null_buffer,
-                        child_datum_offset,
+                        child_nulls.buffer(),
+                        child_nulls.offset(),
                         child_datum_len,
                     )
                 } else {
-                    child_null_buffer.bit_slice(child_datum_offset, 
child_datum_len)
+                    child_nulls.inner().sliced()
                 });
             } else if null.is_some() {
                 // when one of the fields has no nulls, then there is no null 
in the array
@@ -321,7 +319,6 @@ mod tests {
         BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, 
StringArray,
     };
     use arrow_buffer::ToByteSlice;
-    use arrow_data::Bitmap;
     use std::sync::Arc;
 
     #[test]
@@ -410,8 +407,8 @@ mod tests {
         assert_eq!(1, struct_data.null_count());
         assert_eq!(
             // 00001011
-            Some(&Bitmap::from(Buffer::from(&[11_u8]))),
-            struct_data.null_bitmap()
+            &[11_u8],
+            struct_data.nulls().unwrap().validity()
         );
 
         let expected_string_data = ArrayData::builder(DataType::Utf8)
diff --git a/arrow-array/src/builder/boolean_builder.rs 
b/arrow-array/src/builder/boolean_builder.rs
index eeb39b802..0862b35b0 100644
--- a/arrow-array/src/builder/boolean_builder.rs
+++ b/arrow-array/src/builder/boolean_builder.rs
@@ -289,7 +289,7 @@ mod tests {
 
         let array = builder.finish();
         assert_eq!(0, array.null_count());
-        assert!(array.data().null_buffer().is_none());
+        assert!(array.data().nulls().is_none());
     }
 
     #[test]
@@ -311,7 +311,7 @@ mod tests {
         assert_eq!(4, array.false_count());
 
         assert_eq!(0, array.null_count());
-        assert!(array.data().null_buffer().is_none());
+        assert!(array.data().nulls().is_none());
     }
 
     #[test]
diff --git a/arrow-array/src/builder/generic_bytes_builder.rs 
b/arrow-array/src/builder/generic_bytes_builder.rs
index 406e79c31..c723b3349 100644
--- a/arrow-array/src/builder/generic_bytes_builder.rs
+++ b/arrow-array/src/builder/generic_bytes_builder.rs
@@ -425,7 +425,7 @@ mod tests {
         builder.append_value("parquet");
         let arr = builder.finish();
         // array should not have null buffer because there is not `null` value.
-        assert_eq!(None, arr.data().null_buffer());
+        assert!(arr.data().nulls().is_none());
         assert_eq!(GenericStringArray::<O>::from(vec!["arrow", "parquet"]), 
arr,)
     }
 
@@ -454,7 +454,7 @@ mod tests {
         builder.append_value("parquet");
         arr = builder.finish();
 
-        assert!(arr.data().null_buffer().is_some());
+        assert!(arr.data().nulls().is_some());
         assert_eq!(&[O::zero()], builder.offsets_slice());
         assert_eq!(5, arr.len());
     }
diff --git a/arrow-array/src/builder/struct_builder.rs 
b/arrow-array/src/builder/struct_builder.rs
index 72aa53e18..51b4c7cfc 100644
--- a/arrow-array/src/builder/struct_builder.rs
+++ b/arrow-array/src/builder/struct_builder.rs
@@ -284,7 +284,6 @@ impl StructBuilder {
 mod tests {
     use super::*;
     use arrow_buffer::Buffer;
-    use arrow_data::Bitmap;
 
     use crate::array::Array;
 
@@ -329,10 +328,7 @@ mod tests {
         let struct_data = arr.data();
         assert_eq!(4, struct_data.len());
         assert_eq!(1, struct_data.null_count());
-        assert_eq!(
-            Some(&Bitmap::from(Buffer::from(&[11_u8]))),
-            struct_data.null_bitmap()
-        );
+        assert_eq!(&[11_u8], struct_data.nulls().unwrap().validity());
 
         let expected_string_data = ArrayData::builder(DataType::Utf8)
             .len(4)
diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs
index 400b6e262..bfdc35c6c 100644
--- a/arrow-array/src/lib.rs
+++ b/arrow-array/src/lib.rs
@@ -141,18 +141,18 @@
 //!
 //! For example, the type [`Int16Array`] represents an array of 16-bit 
integers and consists of:
 //!
-//! * An optional [`Bitmap`] identifying any null values
+//! * An optional [`NullBuffer`] identifying any null values
 //! * A contiguous [`Buffer`] of 16-bit integers
 //!
 //! Similarly, the type [`StringArray`] represents an array of UTF-8 strings 
and consists of:
 //!
-//! * An optional [`Bitmap`] identifying any null values
+//! * An optional [`NullBuffer`] identifying any null values
 //! * An offsets [`Buffer`] of 32-bit integers identifying valid UTF-8 
sequences within the values buffer
 //! * A values [`Buffer`] of UTF-8 encoded string data
 //!
 //! [Arrow specification]: https://arrow.apache.org/docs/format/Columnar.html
 //! [`&dyn Array`]: Array
-//! [`Bitmap`]: arrow_data::Bitmap
+//! [`NullBuffer`]: arrow_buffer::buffer::NullBuffer
 //! [`Buffer`]: arrow_buffer::Buffer
 //! [`compute`]: https://docs.rs/arrow/latest/arrow/compute/index.html
 //! [`json`]: https://docs.rs/arrow/latest/arrow/json/index.html
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index 04a559f21..20e4e19ba 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -603,7 +603,7 @@ mod tests {
         let record_batch =
             RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), 
Arc::new(b)])
                 .unwrap();
-        assert_eq!(record_batch.get_array_memory_size(), 640);
+        assert_eq!(record_batch.get_array_memory_size(), 672);
     }
 
     fn check_batch(record_batch: RecordBatch, num_rows: usize) {
diff --git a/arrow-buffer/src/buffer/boolean.rs 
b/arrow-buffer/src/buffer/boolean.rs
index 82755a2b0..0239111cb 100644
--- a/arrow-buffer/src/buffer/boolean.rs
+++ b/arrow-buffer/src/buffer/boolean.rs
@@ -15,16 +15,33 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::bit_chunk_iterator::BitChunks;
 use crate::{bit_util, Buffer};
 
 /// A slice-able [`Buffer`] containing bit-packed booleans
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq)]
 pub struct BooleanBuffer {
     buffer: Buffer,
     offset: usize,
     len: usize,
 }
 
+impl PartialEq for BooleanBuffer {
+    fn eq(&self, other: &Self) -> bool {
+        if self.len != other.len {
+            return false;
+        }
+
+        let lhs = self.bit_chunks();
+        let rhs = other.bit_chunks();
+
+        if lhs.iter().zip(rhs.iter()).any(|(a, b)| a != b) {
+            return false;
+        }
+        lhs.remainder_bits() == rhs.remainder_bits()
+    }
+}
+
 impl BooleanBuffer {
     /// Create a new [`BooleanBuffer`] from a [`Buffer`], an `offset` and 
`length` in bits
     ///
@@ -47,6 +64,12 @@ impl BooleanBuffer {
         self.buffer.count_set_bits_offset(self.offset, self.len)
     }
 
+    /// Returns a `BitChunks` instance which can be used to iterate over
+    /// this buffer's bits in `u64` chunks
+    pub fn bit_chunks(&self) -> BitChunks {
+        BitChunks::new(self.values(), self.offset, self.len)
+    }
+
     /// Returns `true` if the bit at index `i` is set
     ///
     /// # Panics
@@ -81,4 +104,39 @@ impl BooleanBuffer {
     pub fn values(&self) -> &[u8] {
         &self.buffer
     }
+
+    /// Slices this [`BooleanBuffer`] by the provided `offset` and `length`
+    pub fn slice(&self, offset: usize, len: usize) -> Self {
+        assert!(
+            offset.saturating_add(len) <= self.len,
+            "the length + offset of the sliced BooleanBuffer cannot exceed the 
existing length"
+        );
+        Self {
+            buffer: self.buffer.clone(),
+            offset: self.offset + offset,
+            len,
+        }
+    }
+
+    /// Returns a [`Buffer`] containing the sliced contents of this 
[`BooleanBuffer`]
+    ///
+    /// Equivalent to `self.buffer.bit_slice(self.offset, self.len)`
+    pub fn sliced(&self) -> Buffer {
+        self.buffer.bit_slice(self.offset, self.len)
+    }
+
+    /// Returns true if this [`BooleanBuffer`] is equal to `other`, using 
pointer comparisons
+    /// to determine buffer equality. This is cheaper than `PartialEq::eq` but 
may
+    /// return false when the arrays are logically equal
+    pub fn ptr_eq(&self, other: &Self) -> bool {
+        self.buffer.as_ptr() == other.buffer.as_ptr()
+            && self.offset == other.offset
+            && self.len == other.len
+    }
+
+    /// Returns the inner [`Buffer`]
+    #[inline]
+    pub fn inner(&self) -> &Buffer {
+        &self.buffer
+    }
 }
diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs
index 2d52c9096..a4854f1ad 100644
--- a/arrow-buffer/src/buffer/null.rs
+++ b/arrow-buffer/src/buffer/null.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::buffer::BooleanBuffer;
+use crate::{Buffer, MutableBuffer};
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq, PartialEq)]
 pub struct NullBuffer {
     buffer: BooleanBuffer,
     null_count: usize,
@@ -30,6 +31,16 @@ impl NullBuffer {
         Self { buffer, null_count }
     }
 
+    /// Create a new [`NullBuffer`] of length `len` where all values are null
+    pub fn new_null(len: usize) -> Self {
+        let buffer = MutableBuffer::new_null(len).into_buffer();
+        let buffer = BooleanBuffer::new(buffer, 0, len);
+        Self {
+            buffer,
+            null_count: len,
+        }
+    }
+
     /// Create a new [`NullBuffer`] with the provided `buffer` and `null_count`
     ///
     /// # Safety
@@ -45,6 +56,12 @@ impl NullBuffer {
         self.buffer.len()
     }
 
+    /// Returns the offset of this [`NullBuffer`] in bits
+    #[inline]
+    pub fn offset(&self) -> usize {
+        self.buffer.offset()
+    }
+
     /// Returns true if this [`NullBuffer`] is empty
     #[inline]
     pub fn is_empty(&self) -> bool {
@@ -69,11 +86,28 @@ impl NullBuffer {
         !self.is_valid(idx)
     }
 
-    /// Returns the inner buffer
+    /// Returns the packed validity of this [`NullBuffer`] not including any 
offset
+    #[inline]
+    pub fn validity(&self) -> &[u8] {
+        self.buffer.values()
+    }
+
+    /// Slices this [`NullBuffer`] by the provided `offset` and `length`
+    pub fn slice(&self, offset: usize, len: usize) -> Self {
+        Self::new(self.buffer.slice(offset, len))
+    }
+
+    /// Returns the inner [`BooleanBuffer`]
     #[inline]
     pub fn inner(&self) -> &BooleanBuffer {
         &self.buffer
     }
+
+    /// Returns the underlying [`Buffer`]
+    #[inline]
+    pub fn buffer(&self) -> &Buffer {
+        self.buffer.inner()
+    }
 }
 
 #[cfg(test)]
diff --git a/arrow-cast/src/cast.rs b/arrow-cast/src/cast.rs
index d49775c98..8e3bde990 100644
--- a/arrow-cast/src/cast.rs
+++ b/arrow-cast/src/cast.rs
@@ -2942,22 +2942,15 @@ fn dictionary_cast<K: ArrowDictionaryKeyType>(
                 )));
             }
 
-            // keys are data, child_data is values (dictionary)
-            let data = unsafe {
-                ArrayData::new_unchecked(
-                    to_type.clone(),
-                    cast_keys.len(),
-                    Some(cast_keys.null_count()),
-                    cast_keys
-                        .data()
-                        .null_bitmap()
-                        .cloned()
-                        .map(|bitmap| bitmap.into_buffer()),
-                    cast_keys.data().offset(),
-                    cast_keys.data().buffers().to_vec(),
-                    vec![cast_values.into_data()],
-                )
-            };
+            let data = cast_keys.into_data();
+            let builder = data
+                .into_builder()
+                .data_type(to_type.clone())
+                .child_data(vec![cast_values.into_data()]);
+
+            // Safety
+            // Cast keys are still valid
+            let data = unsafe { builder.build_unchecked() };
 
             // create the appropriate array type
             let new_array: ArrayRef = match **to_index_type {
@@ -3184,11 +3177,7 @@ fn cast_primitive_to_list<OffsetSize: OffsetSizeTrait + 
NumCast>(
             to_type.clone(),
             array.len(),
             Some(cast_array.null_count()),
-            cast_array
-                .data()
-                .null_bitmap()
-                .cloned()
-                .map(|bitmap| bitmap.into_buffer()),
+            cast_array.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![offsets.into()],
             vec![cast_array.into_data()],
@@ -3207,23 +3196,18 @@ fn cast_list_inner<OffsetSize: OffsetSizeTrait>(
     to_type: &DataType,
     cast_options: &CastOptions,
 ) -> Result<ArrayRef, ArrowError> {
-    let data = array.data_ref();
+    let data = array.data().clone();
     let underlying_array = make_array(data.child_data()[0].clone());
-    let cast_array = cast_with_options(&underlying_array, to.data_type(), 
cast_options)?;
-    let array_data = unsafe {
-        ArrayData::new_unchecked(
-            to_type.clone(),
-            array.len(),
-            Some(data.null_count()),
-            data.null_bitmap()
-                .cloned()
-                .map(|bitmap| bitmap.into_buffer()),
-            array.offset(),
-            // reuse offset buffer
-            data.buffers().to_vec(),
-            vec![cast_array.into_data()],
-        )
-    };
+    let cast_array =
+        cast_with_options(underlying_array.as_ref(), to.data_type(), 
cast_options)?;
+    let builder = data
+        .into_builder()
+        .data_type(to_type.clone())
+        .child_data(vec![cast_array.into_data()]);
+
+    // Safety
+    // Data was valid before
+    let array_data = unsafe { builder.build_unchecked() };
     let list = GenericListArray::<OffsetSize>::from(array_data);
     Ok(Arc::new(list) as ArrayRef)
 }
@@ -3302,7 +3286,7 @@ where
         .len(array.len())
         .add_buffer(offset_buffer)
         .add_buffer(str_values_buf)
-        .null_bit_buffer(data.null_buffer().cloned());
+        .nulls(data.nulls().cloned());
 
     let array_data = unsafe { builder.build_unchecked() };
 
@@ -3377,7 +3361,7 @@ where
         .len(array.len())
         .add_buffer(offset_buffer)
         .add_child_data(value_data)
-        .null_bit_buffer(data.null_buffer().cloned());
+        .nulls(data.nulls().cloned());
 
     let array_data = unsafe { builder.build_unchecked() };
     Ok(make_array(array_data))
diff --git a/arrow-data/src/bit_mask.rs b/arrow-data/src/bit_mask.rs
index ed8e65257..94ea57259 100644
--- a/arrow-data/src/bit_mask.rs
+++ b/arrow-data/src/bit_mask.rs
@@ -74,7 +74,10 @@ pub fn combine_option_bitmap(
 ) -> Option<Buffer> {
     let (buffer, offset) = arrays
         .iter()
-        .map(|array| (array.null_buffer().cloned(), array.offset()))
+        .map(|array| match array.nulls() {
+            Some(n) => (Some(n.buffer().clone()), n.offset()),
+            None => (None, 0),
+        })
         .reduce(|acc, buffer_and_offset| match (acc, buffer_and_offset) {
             ((None, _), (None, _)) => (None, 0),
             ((Some(buffer), offset), (None, _)) | ((None, _), (Some(buffer), 
offset)) => {
diff --git a/arrow-data/src/bitmap.rs b/arrow-data/src/bitmap.rs
deleted file mode 100644
index a356b9ff7..000000000
--- a/arrow-data/src/bitmap.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines [Bitmap] for tracking validity bitmaps
-
-use arrow_buffer::bit_util;
-use arrow_schema::ArrowError;
-use std::mem;
-
-use arrow_buffer::buffer::{buffer_bin_and, buffer_bin_or, Buffer};
-use std::ops::{BitAnd, BitOr};
-
-#[derive(Debug, Clone)]
-/// Defines a bitmap, which is used to track which values in an Arrow
-/// array are null.
-///
-/// This is called a "validity bitmap" in the Arrow documentation.
-pub struct Bitmap {
-    pub(crate) bits: Buffer,
-}
-
-impl Bitmap {
-    pub fn new(num_bits: usize) -> Self {
-        let num_bytes = bit_util::ceil(num_bits, 8);
-        let len = bit_util::round_upto_multiple_of_64(num_bytes);
-        Bitmap {
-            bits: Buffer::from(&vec![0xFF; len]),
-        }
-    }
-
-    /// Return the length of this Bitmap in bits (not bytes)
-    pub fn bit_len(&self) -> usize {
-        self.bits.len() * 8
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.bits.is_empty()
-    }
-
-    pub fn is_set(&self, i: usize) -> bool {
-        assert!(i < (self.bits.len() << 3));
-        unsafe { bit_util::get_bit_raw(self.bits.as_ptr(), i) }
-    }
-
-    pub fn buffer(&self) -> &Buffer {
-        &self.bits
-    }
-
-    pub fn buffer_ref(&self) -> &Buffer {
-        &self.bits
-    }
-
-    pub fn into_buffer(self) -> Buffer {
-        self.bits
-    }
-
-    /// Returns the total number of bytes of memory occupied by the
-    /// buffers owned by this [Bitmap].
-    ///
-    /// If multiple [`Bitmap`]s refer to the same underlying
-    /// [`Buffer`] they will both report the same size.
-    pub fn get_buffer_memory_size(&self) -> usize {
-        self.bits.capacity()
-    }
-
-    /// Returns the total number of bytes of memory occupied
-    /// physically by this [Bitmap] and its [`Buffer`]s.
-    ///
-    /// Equivalent to: `size_of_val(self)` + [`Self::get_buffer_memory_size`]
-    pub fn get_array_memory_size(&self) -> usize {
-        self.bits.capacity() + mem::size_of_val(self)
-    }
-}
-
-impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap {
-    type Output = Result<Bitmap, ArrowError>;
-
-    fn bitand(self, rhs: &'b Bitmap) -> Result<Bitmap, ArrowError> {
-        if self.bits.len() != rhs.bits.len() {
-            return Err(ArrowError::ComputeError(
-                "Buffers must be the same size to apply Bitwise 
AND.".to_string(),
-            ));
-        }
-        Ok(Bitmap::from(buffer_bin_and(
-            &self.bits,
-            0,
-            &rhs.bits,
-            0,
-            self.bit_len(),
-        )))
-    }
-}
-
-impl<'a, 'b> BitOr<&'b Bitmap> for &'a Bitmap {
-    type Output = Result<Bitmap, ArrowError>;
-
-    fn bitor(self, rhs: &'b Bitmap) -> Result<Bitmap, ArrowError> {
-        if self.bits.len() != rhs.bits.len() {
-            return Err(ArrowError::ComputeError(
-                "Buffers must be the same size to apply Bitwise 
OR.".to_string(),
-            ));
-        }
-        Ok(Bitmap::from(buffer_bin_or(
-            &self.bits,
-            0,
-            &rhs.bits,
-            0,
-            self.bit_len(),
-        )))
-    }
-}
-
-impl From<Buffer> for Bitmap {
-    fn from(buf: Buffer) -> Self {
-        Self { bits: buf }
-    }
-}
-
-impl PartialEq for Bitmap {
-    fn eq(&self, other: &Self) -> bool {
-        // buffer equality considers capacity, but here we want to only compare
-        // actual data contents
-        let self_len = self.bits.len();
-        let other_len = other.bits.len();
-        if self_len != other_len {
-            return false;
-        }
-        self.bits.as_slice()[..self_len] == other.bits.as_slice()[..self_len]
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_bitmap_length() {
-        assert_eq!(512, Bitmap::new(63 * 8).bit_len());
-        assert_eq!(512, Bitmap::new(64 * 8).bit_len());
-        assert_eq!(1024, Bitmap::new(65 * 8).bit_len());
-    }
-
-    #[test]
-    fn test_bitwise_and() {
-        let bitmap1 = Bitmap::from(Buffer::from([0b01101010]));
-        let bitmap2 = Bitmap::from(Buffer::from([0b01001110]));
-        assert_eq!(
-            Bitmap::from(Buffer::from([0b01001010])),
-            (&bitmap1 & &bitmap2).unwrap()
-        );
-    }
-
-    #[test]
-    fn test_bitwise_or() {
-        let bitmap1 = Bitmap::from(Buffer::from([0b01101010]));
-        let bitmap2 = Bitmap::from(Buffer::from([0b01001110]));
-        assert_eq!(
-            Bitmap::from(Buffer::from([0b01101110])),
-            (&bitmap1 | &bitmap2).unwrap()
-        );
-    }
-
-    #[test]
-    fn test_bitmap_is_set() {
-        let bitmap = Bitmap::from(Buffer::from([0b01001010]));
-        assert!(!bitmap.is_set(0));
-        assert!(bitmap.is_set(1));
-        assert!(!bitmap.is_set(2));
-        assert!(bitmap.is_set(3));
-        assert!(!bitmap.is_set(4));
-        assert!(!bitmap.is_set(5));
-        assert!(bitmap.is_set(6));
-        assert!(!bitmap.is_set(7));
-    }
-}
diff --git a/arrow-data/src/data/mod.rs b/arrow-data/src/data/mod.rs
index 2f9e142b1..d76cb9eb1 100644
--- a/arrow-data/src/data/mod.rs
+++ b/arrow-data/src/data/mod.rs
@@ -18,8 +18,9 @@
 //! Contains [`ArrayData`], a generic representation of Arrow array data which 
encapsulates
 //! common attributes and operations for Arrow array.
 
-use crate::{bit_iterator::BitSliceIterator, bitmap::Bitmap};
+use crate::bit_iterator::BitSliceIterator;
 use arrow_buffer::bit_chunk_iterator::BitChunks;
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer};
 use arrow_schema::{ArrowError, DataType, UnionMode};
 use std::convert::TryInto;
@@ -48,27 +49,32 @@ mod union;
 
 #[inline]
 pub(crate) fn contains_nulls(
-    null_bit_buffer: Option<&Buffer>,
+    null_bit_buffer: Option<&NullBuffer>,
     offset: usize,
     len: usize,
 ) -> bool {
     match null_bit_buffer {
-        Some(buffer) => match BitSliceIterator::new(buffer, offset, 
len).next() {
-            Some((start, end)) => start != 0 || end != len,
-            None => len != 0, // No non-null values
-        },
+        Some(buffer) => {
+            match BitSliceIterator::new(buffer.validity(), buffer.offset() + 
offset, len)
+                .next()
+            {
+                Some((start, end)) => start != 0 || end != len,
+                None => len != 0, // No non-null values
+            }
+        }
         None => false, // No null buffer
     }
 }
 
 #[inline]
 pub(crate) fn count_nulls(
-    null_bit_buffer: Option<&Buffer>,
+    null_bit_buffer: Option<&NullBuffer>,
     offset: usize,
     len: usize,
 ) -> usize {
     if let Some(buf) = null_bit_buffer {
-        len - buf.count_set_bits_offset(offset, len)
+        let buffer = buf.buffer();
+        len - buffer.count_set_bits_offset(offset + buf.offset(), len)
     } else {
         0
     }
@@ -232,9 +238,6 @@ pub struct ArrayData {
     /// The number of elements in this array data
     len: usize,
 
-    /// The number of null elements in this array data
-    null_count: usize,
-
     /// The offset into this array data, in number of items
     offset: usize,
 
@@ -249,7 +252,7 @@ pub struct ArrayData {
 
     /// The null bitmap. A `None` value for this indicates all values are 
non-null in
     /// this array.
-    null_bitmap: Option<Bitmap>,
+    nulls: Option<NullBuffer>,
 }
 
 pub type ArrayDataRef = Arc<ArrayData>;
@@ -281,19 +284,21 @@ impl ArrayData {
         buffers: Vec<Buffer>,
         child_data: Vec<ArrayData>,
     ) -> Self {
-        let null_count = match null_count {
-            None => count_nulls(null_bit_buffer.as_ref(), offset, len),
-            Some(null_count) => null_count,
-        };
-        let null_bitmap = null_bit_buffer.filter(|_| null_count > 
0).map(Bitmap::from);
+        let nulls = null_bit_buffer
+            .map(|b| BooleanBuffer::new(b, offset, len))
+            .map(|b| match null_count {
+                None => NullBuffer::new(b),
+                Some(null_count) => NullBuffer::new_unchecked(b, null_count),
+            })
+            .filter(|b| b.null_count() > 0);
+
         let new_self = Self {
             data_type,
             len,
-            null_count,
             offset,
             buffers,
             child_data,
-            null_bitmap,
+            nulls,
         };
 
         // Provide a force_validate mode
@@ -378,30 +383,26 @@ impl ArrayData {
     }
 
     /// Returns whether the element at index `i` is null
+    #[inline]
     pub fn is_null(&self, i: usize) -> bool {
-        if let Some(ref b) = self.null_bitmap {
-            return !b.is_set(self.offset + i);
+        match &self.nulls {
+            Some(v) => v.is_null(i),
+            None => false,
         }
-        false
     }
 
-    /// Returns a reference to the null bitmap of this [`ArrayData`]
+    /// Returns a reference to the null buffer of this [`ArrayData`] if any
+    ///
+    /// Note: [`ArrayData::offset`] does NOT apply to the returned 
[`NullBuffer`]
     #[inline]
-    pub const fn null_bitmap(&self) -> Option<&Bitmap> {
-        self.null_bitmap.as_ref()
-    }
-
-    /// Returns a reference to the null buffer of this [`ArrayData`].
-    pub fn null_buffer(&self) -> Option<&Buffer> {
-        self.null_bitmap().as_ref().map(|b| b.buffer_ref())
+    pub fn nulls(&self) -> Option<&NullBuffer> {
+        self.nulls.as_ref()
     }
 
     /// Returns whether the element at index `i` is not null
+    #[inline]
     pub fn is_valid(&self, i: usize) -> bool {
-        if let Some(ref b) = self.null_bitmap {
-            return b.is_set(self.offset + i);
-        }
-        true
+        !self.is_null(i)
     }
 
     /// Returns the length (i.e., number of elements) of this [`ArrayData`].
@@ -424,8 +425,11 @@ impl ArrayData {
 
     /// Returns the total number of nulls in this array
     #[inline]
-    pub const fn null_count(&self) -> usize {
-        self.null_count
+    pub fn null_count(&self) -> usize {
+        self.nulls
+            .as_ref()
+            .map(|x| x.null_count())
+            .unwrap_or_default()
     }
 
     /// Returns the total number of bytes of memory occupied by the
@@ -444,8 +448,8 @@ impl ArrayData {
         for buffer in &self.buffers {
             size += buffer.capacity();
         }
-        if let Some(bitmap) = &self.null_bitmap {
-            size += bitmap.get_buffer_memory_size()
+        if let Some(bitmap) = &self.nulls {
+            size += bitmap.buffer().capacity()
         }
         for child in &self.child_data {
             size += child.get_buffer_memory_size();
@@ -510,7 +514,7 @@ impl ArrayData {
             }
         }
 
-        if self.null_bitmap().is_some() {
+        if self.nulls().is_some() {
             result += bit_util::ceil(self.len, 8);
         }
 
@@ -536,11 +540,8 @@ impl ArrayData {
             size += mem::size_of::<Buffer>();
             size += buffer.capacity();
         }
-        if let Some(bitmap) = &self.null_bitmap {
-            // this includes the size of the bitmap struct itself, since it is 
stored directly in
-            // this struct we already counted those bytes in the 
size_of_val(self) above
-            size += bitmap.get_array_memory_size();
-            size -= mem::size_of::<Bitmap>();
+        if let Some(nulls) = &self.nulls {
+            size += nulls.buffer().capacity();
         }
         for child in &self.child_data {
             size += child.get_array_memory_size();
@@ -565,7 +566,6 @@ impl ArrayData {
             let new_data = ArrayData {
                 data_type: self.data_type().clone(),
                 len: length,
-                null_count: count_nulls(self.null_buffer(), new_offset, 
length),
                 offset: new_offset,
                 buffers: self.buffers.clone(),
                 // Slice child data, to propagate offsets down to them
@@ -574,7 +574,7 @@ impl ArrayData {
                     .iter()
                     .map(|data| data.slice(offset, length))
                     .collect(),
-                null_bitmap: self.null_bitmap().cloned(),
+                nulls: self.nulls.as_ref().map(|x| x.slice(offset, length)),
             };
 
             new_data
@@ -583,9 +583,7 @@ impl ArrayData {
 
             new_data.len = length;
             new_data.offset = offset + self.offset;
-
-            new_data.null_count =
-                count_nulls(new_data.null_buffer(), new_data.offset, 
new_data.len);
+            new_data.nulls = self.nulls.as_ref().map(|x| x.slice(offset, 
length));
 
             new_data
         }
@@ -714,7 +712,7 @@ impl ArrayData {
             .child_data(child_data);
 
         if has_nulls {
-            builder = 
builder.null_count(len).null_bit_buffer(Some(zeroed(len)))
+            builder = builder.nulls(Some(NullBuffer::new_null(len)))
         }
 
         // SAFETY:
@@ -744,7 +742,7 @@ impl ArrayData {
         // Check that the data layout conforms to the spec
         let layout = layout(&self.data_type);
 
-        if !layout.can_contain_null_mask && self.null_bitmap.is_some() {
+        if !layout.can_contain_null_mask && self.nulls.is_some() {
             return Err(ArrowError::InvalidArgumentError(format!(
                 "Arrays of type {:?} cannot contain a null bitmask",
                 self.data_type,
@@ -796,29 +794,31 @@ impl ArrayData {
             }
         }
 
-        if self.null_count > self.len {
-            return Err(ArrowError::InvalidArgumentError(format!(
-                "null_count {} for an array exceeds length of {} elements",
-                self.null_count, self.len
-            )));
-        }
-
         // check null bit buffer size
-        if let Some(null_bit_map) = self.null_bitmap.as_ref() {
-            let null_bit_buffer = null_bit_map.buffer_ref();
+        if let Some(nulls) = self.nulls() {
+            if nulls.null_count() > self.len {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "null_count {} for an array exceeds length of {} elements",
+                    nulls.null_count(),
+                    self.len
+                )));
+            }
+
+            let actual_len = nulls.validity().len();
             let needed_len = bit_util::ceil(len_plus_offset, 8);
-            if null_bit_buffer.len() < needed_len {
+            if actual_len < needed_len {
                 return Err(ArrowError::InvalidArgumentError(format!(
-                    "null_bit_buffer size too small. got {} needed {}",
-                    null_bit_buffer.len(),
-                    needed_len
+                    "null_bit_buffer size too small. got {actual_len} needed 
{needed_len}",
+                )));
+            }
+
+            if nulls.len() != self.len {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "null buffer incorrect size. got {} expected {}",
+                    nulls.len(),
+                    self.len
                 )));
             }
-        } else if self.null_count > 0 {
-            return Err(ArrowError::InvalidArgumentError(format!(
-                "Array of type {} has {} nulls but no null bitmap",
-                self.data_type, self.null_count
-            )));
         }
 
         self.validate_child_data()?;
@@ -1155,14 +1155,14 @@ impl ArrayData {
     /// Validates the the null count is correct and that any
     /// nullability requirements of its children are correct
     pub fn validate_nulls(&self) -> Result<(), ArrowError> {
-        let nulls = self.null_buffer();
-
-        let actual_null_count = count_nulls(nulls, self.offset, self.len);
-        if actual_null_count != self.null_count {
-            return Err(ArrowError::InvalidArgumentError(format!(
-                "null_count value ({}) doesn't match actual number of nulls in 
array ({})",
-                self.null_count, actual_null_count
-            )));
+        if let Some(nulls) = &self.nulls {
+            let actual = nulls.len() - nulls.inner().count_set_bits();
+            if actual != nulls.null_count() {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "null_count value ({}) doesn't match actual number of 
nulls in array ({})",
+                    nulls.null_count(), actual
+                )));
+            }
         }
 
         // In general non-nullable children should not contain nulls, however, 
for certain
@@ -1178,7 +1178,7 @@ impl ArrayData {
             DataType::FixedSizeList(field, len) => {
                 let child = &self.child_data[0];
                 if !field.is_nullable() {
-                    match nulls {
+                    match &self.nulls {
                         Some(nulls) => {
                             let element_len = *len as usize;
                             let mut buffer =
@@ -1187,7 +1187,7 @@ impl ArrayData {
                             // Expand each bit within `null_mask` into 
`element_len`
                             // bits, constructing the implicit mask of the 
child elements
                             for i in 0..self.len {
-                                if !bit_util::get_bit(nulls.as_ref(), 
self.offset + i) {
+                                if nulls.is_null(i) {
                                     continue;
                                 }
                                 for j in 0..element_len {
@@ -1207,7 +1207,14 @@ impl ArrayData {
             DataType::Struct(fields) => {
                 for (field, child) in fields.iter().zip(&self.child_data) {
                     if !field.is_nullable() {
-                        self.validate_non_nullable(nulls, self.offset, child)?
+                        match &self.nulls {
+                            Some(n) => self.validate_non_nullable(
+                                Some(n.buffer()),
+                                n.offset(),
+                                child,
+                            )?,
+                            None => self.validate_non_nullable(None, 0, 
child)?,
+                        }
                     }
                 }
             }
@@ -1226,7 +1233,7 @@ impl ArrayData {
     ) -> Result<(), ArrowError> {
         let mask = match mask {
             Some(mask) => mask.as_ref(),
-            None => return match data.null_count {
+            None => return match data.null_count() {
                 0 => Ok(()),
                 _ => Err(ArrowError::InvalidArgumentError(format!(
                     "non-nullable child of type {} contains nulls not present 
in parent {}",
@@ -1236,10 +1243,10 @@ impl ArrayData {
             },
         };
 
-        match data.null_buffer() {
+        match data.nulls() {
             Some(nulls) => {
                 let mask = BitChunks::new(mask, offset, data.len);
-                let nulls = BitChunks::new(nulls.as_ref(), data.offset, 
data.len);
+                let nulls = BitChunks::new(nulls.validity(), nulls.offset(), 
data.len);
                 mask
                     .iter()
                     .zip(nulls.iter())
@@ -1510,7 +1517,6 @@ impl ArrayData {
     pub fn ptr_eq(&self, other: &Self) -> bool {
         if self.offset != other.offset
             || self.len != other.len
-            || self.null_count != other.null_count
             || self.data_type != other.data_type
             || self.buffers.len() != other.buffers.len()
             || self.child_data.len() != other.child_data.len()
@@ -1518,8 +1524,8 @@ impl ArrayData {
             return false;
         }
 
-        match (&self.null_bitmap, &other.null_bitmap) {
-            (Some(a), Some(b)) if a.bits.as_ptr() != b.bits.as_ptr() => return 
false,
+        match (&self.nulls, &other.nulls) {
+            (Some(a), Some(b)) if !a.inner().ptr_eq(b.inner()) => return false,
             (Some(_), None) | (None, Some(_)) => return false,
             _ => {}
         };
@@ -1714,6 +1720,7 @@ pub struct ArrayDataBuilder {
     len: usize,
     null_count: Option<usize>,
     null_bit_buffer: Option<Buffer>,
+    nulls: Option<NullBuffer>,
     offset: usize,
     buffers: Vec<Buffer>,
     child_data: Vec<ArrayData>,
@@ -1727,6 +1734,7 @@ impl ArrayDataBuilder {
             len: 0,
             null_count: None,
             null_bit_buffer: None,
+            nulls: None,
             offset: 0,
             buffers: vec![],
             child_data: vec![],
@@ -1744,12 +1752,20 @@ impl ArrayDataBuilder {
         self
     }
 
+    pub fn nulls(mut self, nulls: Option<NullBuffer>) -> Self {
+        self.nulls = nulls;
+        self.null_count = None;
+        self.null_bit_buffer = None;
+        self
+    }
+
     pub fn null_count(mut self, null_count: usize) -> Self {
         self.null_count = Some(null_count);
         self
     }
 
     pub fn null_bit_buffer(mut self, buf: Option<Buffer>) -> Self {
+        self.nulls = None;
         self.null_bit_buffer = buf;
         self
     }
@@ -1786,43 +1802,53 @@ impl ArrayDataBuilder {
     ///
     /// The same caveats as [`ArrayData::new_unchecked`]
     /// apply.
+    #[allow(clippy::let_and_return)]
     pub unsafe fn build_unchecked(self) -> ArrayData {
-        ArrayData::new_unchecked(
-            self.data_type,
-            self.len,
-            self.null_count,
-            self.null_bit_buffer,
-            self.offset,
-            self.buffers,
-            self.child_data,
-        )
+        let nulls = self.nulls.or_else(|| {
+            let buffer = self.null_bit_buffer?;
+            let buffer = BooleanBuffer::new(buffer, self.offset, self.len);
+            Some(match self.null_count {
+                Some(n) => NullBuffer::new_unchecked(buffer, n),
+                None => NullBuffer::new(buffer),
+            })
+        });
+
+        let data = ArrayData {
+            data_type: self.data_type,
+            len: self.len,
+            offset: self.offset,
+            buffers: self.buffers,
+            child_data: self.child_data,
+            nulls,
+        };
+
+        // Provide a force_validate mode
+        #[cfg(feature = "force_validate")]
+        data.validate_data().unwrap();
+        data
     }
 
     /// Creates an array data, validating all inputs
+    #[allow(clippy::let_and_return)]
     pub fn build(self) -> Result<ArrayData, ArrowError> {
-        ArrayData::try_new(
-            self.data_type,
-            self.len,
-            self.null_bit_buffer,
-            self.offset,
-            self.buffers,
-            self.child_data,
-        )
+        let data = unsafe { self.build_unchecked() };
+        #[cfg(not(feature = "force_validate"))]
+        data.validate_data()?;
+        Ok(data)
     }
 }
 
 impl From<ArrayData> for ArrayDataBuilder {
     fn from(d: ArrayData) -> Self {
-        // TODO: Store Bitmap on ArrayData (#1799)
-        let null_bit_buffer = d.null_buffer().cloned();
         Self {
-            null_bit_buffer,
             data_type: d.data_type,
             len: d.len,
-            null_count: Some(d.null_count),
             offset: d.offset,
             buffers: d.buffers,
             child_data: d.child_data,
+            nulls: d.nulls,
+            null_bit_buffer: None,
+            null_count: None,
         }
     }
 }
@@ -1936,8 +1962,8 @@ mod tests {
             .null_bit_buffer(Some(Buffer::from(bit_v)))
             .build()
             .unwrap();
-        assert!(arr_data.null_buffer().is_some());
-        assert_eq!(&bit_v, arr_data.null_buffer().unwrap().as_slice());
+        assert!(arr_data.nulls().is_some());
+        assert_eq!(&bit_v, arr_data.nulls().unwrap().validity());
     }
 
     #[test]
@@ -2055,11 +2081,12 @@ mod tests {
 
     #[test]
     fn test_count_nulls() {
-        let null_buffer = Some(Buffer::from(vec![0b00010110, 0b10011111]));
-        let count = count_nulls(null_buffer.as_ref(), 0, 16);
+        let buffer = Buffer::from(vec![0b00010110, 0b10011111]);
+        let buffer = NullBuffer::new(BooleanBuffer::new(buffer, 0, 16));
+        let count = count_nulls(Some(&buffer), 0, 16);
         assert_eq!(count, 7);
 
-        let count = count_nulls(null_buffer.as_ref(), 4, 8);
+        let count = count_nulls(Some(&buffer), 4, 8);
         assert_eq!(count, 3);
     }
 
@@ -2067,7 +2094,7 @@ mod tests {
     fn test_contains_nulls() {
         let buffer: Buffer =
             MutableBuffer::from_iter([false, false, false, true, true, 
false]).into();
-
+        let buffer = NullBuffer::new(BooleanBuffer::new(buffer, 0, 6));
         assert!(contains_nulls(Some(&buffer), 0, 6));
         assert!(contains_nulls(Some(&buffer), 0, 3));
         assert!(!contains_nulls(Some(&buffer), 3, 2));
diff --git a/arrow-data/src/equal/boolean.rs b/arrow-data/src/equal/boolean.rs
index 52e822f03..a20ca5ac0 100644
--- a/arrow-data/src/equal/boolean.rs
+++ b/arrow-data/src/equal/boolean.rs
@@ -33,7 +33,7 @@ pub(super) fn boolean_equal(
     let lhs_values = lhs.buffers()[0].as_slice();
     let rhs_values = rhs.buffers()[0].as_slice();
 
-    let contains_nulls = contains_nulls(lhs.null_buffer(), lhs_start + 
lhs.offset(), len);
+    let contains_nulls = contains_nulls(lhs.nulls(), lhs_start, len);
 
     if !contains_nulls {
         // Optimize performance for starting offset at u8 boundary.
@@ -76,15 +76,13 @@ pub(super) fn boolean_equal(
         )
     } else {
         // get a ref of the null buffer bytes, to use in testing for nullness
-        let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
+        let lhs_nulls = lhs.nulls().unwrap();
 
-        let lhs_start = lhs.offset() + lhs_start;
-        let rhs_start = rhs.offset() + rhs_start;
-
-        BitIndexIterator::new(lhs_null_bytes, lhs_start, len).all(|i| {
-            let lhs_pos = lhs_start + i;
-            let rhs_pos = rhs_start + i;
-            get_bit(lhs_values, lhs_pos) == get_bit(rhs_values, rhs_pos)
-        })
+        BitIndexIterator::new(lhs_nulls.validity(), lhs_start + 
lhs_nulls.offset(), len)
+            .all(|i| {
+                let lhs_pos = lhs_start + lhs.offset() + i;
+                let rhs_pos = rhs_start + rhs.offset() + i;
+                get_bit(lhs_values, lhs_pos) == get_bit(rhs_values, rhs_pos)
+            })
     }
 }
diff --git a/arrow-data/src/equal/dictionary.rs 
b/arrow-data/src/equal/dictionary.rs
index 5638c5c91..1d9c4b8d9 100644
--- a/arrow-data/src/equal/dictionary.rs
+++ b/arrow-data/src/equal/dictionary.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::data::{contains_nulls, ArrayData};
-use arrow_buffer::{bit_util::get_bit, ArrowNativeType};
+use arrow_buffer::ArrowNativeType;
 
 use super::equal_range;
 
@@ -35,7 +35,7 @@ pub(super) fn dictionary_equal<T: ArrowNativeType>(
 
     // Only checking one null mask here because by the time the control flow 
reaches
     // this point, the equality of the two masks would have already been 
verified.
-    if !contains_nulls(lhs.null_buffer(), lhs_start + lhs.offset(), len) {
+    if !contains_nulls(lhs.nulls(), lhs_start, len) {
         (0..len).all(|i| {
             let lhs_pos = lhs_start + i;
             let rhs_pos = rhs_start + i;
@@ -50,14 +50,14 @@ pub(super) fn dictionary_equal<T: ArrowNativeType>(
         })
     } else {
         // get a ref of the null buffer bytes, to use in testing for nullness
-        let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
-        let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
+        let lhs_nulls = lhs.nulls().unwrap();
+        let rhs_nulls = rhs.nulls().unwrap();
         (0..len).all(|i| {
             let lhs_pos = lhs_start + i;
             let rhs_pos = rhs_start + i;
 
-            let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
-            let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
+            let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+            let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
             lhs_is_null
                 || (lhs_is_null == rhs_is_null)
diff --git a/arrow-data/src/equal/fixed_binary.rs 
b/arrow-data/src/equal/fixed_binary.rs
index 17e470b5c..9e0e77ff7 100644
--- a/arrow-data/src/equal/fixed_binary.rs
+++ b/arrow-data/src/equal/fixed_binary.rs
@@ -19,7 +19,6 @@ use crate::bit_iterator::BitSliceIterator;
 use crate::contains_nulls;
 use crate::data::ArrayData;
 use crate::equal::primitive::NULL_SLICES_SELECTIVITY_THRESHOLD;
-use arrow_buffer::bit_util::get_bit;
 use arrow_schema::DataType;
 
 use super::utils::equal_len;
@@ -41,7 +40,7 @@ pub(super) fn fixed_binary_equal(
 
     // Only checking one null mask here because by the time the control flow 
reaches
     // this point, the equality of the two masks would have already been 
verified.
-    if !contains_nulls(lhs.null_buffer(), lhs_start + lhs.offset(), len) {
+    if !contains_nulls(lhs.nulls(), lhs_start, len) {
         equal_len(
             lhs_values,
             rhs_values,
@@ -54,15 +53,15 @@ pub(super) fn fixed_binary_equal(
 
         if selectivity_frac >= NULL_SLICES_SELECTIVITY_THRESHOLD {
             // get a ref of the null buffer bytes, to use in testing for 
nullness
-            let lhs_null_bytes = 
lhs.null_buffer().as_ref().unwrap().as_slice();
-            let rhs_null_bytes = 
rhs.null_buffer().as_ref().unwrap().as_slice();
+            let lhs_nulls = lhs.nulls().unwrap();
+            let rhs_nulls = rhs.nulls().unwrap();
             // with nulls, we need to compare item by item whenever it is not 
null
             (0..len).all(|i| {
                 let lhs_pos = lhs_start + i;
                 let rhs_pos = rhs_start + i;
 
-                let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + 
lhs.offset());
-                let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + 
rhs.offset());
+                let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+                let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
                 lhs_is_null
                     || (lhs_is_null == rhs_is_null)
@@ -75,14 +74,16 @@ pub(super) fn fixed_binary_equal(
                         )
             })
         } else {
+            let lhs_nulls = lhs.nulls().unwrap();
             let lhs_slices_iter = BitSliceIterator::new(
-                lhs.null_buffer().as_ref().unwrap(),
-                lhs_start + lhs.offset(),
+                lhs_nulls.validity(),
+                lhs_start + lhs_nulls.offset(),
                 len,
             );
+            let rhs_nulls = lhs.nulls().unwrap();
             let rhs_slices_iter = BitSliceIterator::new(
-                rhs.null_buffer().as_ref().unwrap(),
-                rhs_start + rhs.offset(),
+                rhs_nulls.validity(),
+                rhs_start + rhs_nulls.offset(),
                 len,
             );
 
diff --git a/arrow-data/src/equal/fixed_list.rs 
b/arrow-data/src/equal/fixed_list.rs
index 204a8658e..4b79e5c33 100644
--- a/arrow-data/src/equal/fixed_list.rs
+++ b/arrow-data/src/equal/fixed_list.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::data::{contains_nulls, ArrayData};
-use arrow_buffer::bit_util::get_bit;
 use arrow_schema::DataType;
 
 use super::equal_range;
@@ -38,7 +37,7 @@ pub(super) fn fixed_list_equal(
 
     // Only checking one null mask here because by the time the control flow 
reaches
     // this point, the equality of the two masks would have already been 
verified.
-    if !contains_nulls(lhs.null_buffer(), lhs_start + lhs.offset(), len) {
+    if !contains_nulls(lhs.nulls(), lhs_start, len) {
         equal_range(
             lhs_values,
             rhs_values,
@@ -48,15 +47,15 @@ pub(super) fn fixed_list_equal(
         )
     } else {
         // get a ref of the null buffer bytes, to use in testing for nullness
-        let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
-        let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
+        let lhs_nulls = lhs.nulls().unwrap();
+        let rhs_nulls = rhs.nulls().unwrap();
         // with nulls, we need to compare item by item whenever it is not null
         (0..len).all(|i| {
             let lhs_pos = lhs_start + i;
             let rhs_pos = rhs_start + i;
 
-            let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
-            let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
+            let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+            let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
             lhs_is_null
                 || (lhs_is_null == rhs_is_null)
diff --git a/arrow-data/src/equal/list.rs b/arrow-data/src/equal/list.rs
index 25273f8ba..cc4ba3cac 100644
--- a/arrow-data/src/equal/list.rs
+++ b/arrow-data/src/equal/list.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::data::{count_nulls, ArrayData};
-use arrow_buffer::bit_util::get_bit;
 use arrow_buffer::ArrowNativeType;
 use num::Integer;
 
@@ -90,8 +89,8 @@ pub(super) fn list_equal<T: ArrowNativeType + Integer>(
     let lhs_values = &lhs.child_data()[0];
     let rhs_values = &rhs.child_data()[0];
 
-    let lhs_null_count = count_nulls(lhs.null_buffer(), lhs_start + 
lhs.offset(), len);
-    let rhs_null_count = count_nulls(rhs.null_buffer(), rhs_start + 
rhs.offset(), len);
+    let lhs_null_count = count_nulls(lhs.nulls(), lhs_start, len);
+    let rhs_null_count = count_nulls(rhs.nulls(), rhs_start, len);
 
     if lhs_null_count != rhs_null_count {
         return false;
@@ -112,8 +111,8 @@ pub(super) fn list_equal<T: ArrowNativeType + Integer>(
             )
     } else {
         // get a ref of the parent null buffer bytes, to use in testing for 
nullness
-        let lhs_null_bytes = lhs.null_buffer().unwrap().as_slice();
-        let rhs_null_bytes = rhs.null_buffer().unwrap().as_slice();
+        let lhs_nulls = lhs.nulls().unwrap();
+        let rhs_nulls = rhs.nulls().unwrap();
 
         // with nulls, we need to compare item by item whenever it is not null
         // TODO: Could potentially compare runs of not NULL values
@@ -121,8 +120,8 @@ pub(super) fn list_equal<T: ArrowNativeType + Integer>(
             let lhs_pos = lhs_start + i;
             let rhs_pos = rhs_start + i;
 
-            let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
-            let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
+            let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+            let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
             if lhs_is_null != rhs_is_null {
                 return false;
diff --git a/arrow-data/src/equal/primitive.rs 
b/arrow-data/src/equal/primitive.rs
index f52541e28..7b3cbc9eb 100644
--- a/arrow-data/src/equal/primitive.rs
+++ b/arrow-data/src/equal/primitive.rs
@@ -17,7 +17,6 @@
 
 use crate::bit_iterator::BitSliceIterator;
 use crate::contains_nulls;
-use arrow_buffer::bit_util::get_bit;
 use std::mem::size_of;
 
 use crate::data::ArrayData;
@@ -39,7 +38,7 @@ pub(super) fn primitive_equal<T>(
 
     // Only checking one null mask here because by the time the control flow 
reaches
     // this point, the equality of the two masks would have already been 
verified.
-    if !contains_nulls(lhs.null_buffer(), lhs_start + lhs.offset(), len) {
+    if !contains_nulls(lhs.nulls(), lhs_start, len) {
         // without nulls, we just need to compare slices
         equal_len(
             lhs_values,
@@ -53,14 +52,14 @@ pub(super) fn primitive_equal<T>(
 
         if selectivity_frac >= NULL_SLICES_SELECTIVITY_THRESHOLD {
             // get a ref of the null buffer bytes, to use in testing for 
nullness
-            let lhs_null_bytes = 
lhs.null_buffer().as_ref().unwrap().as_slice();
-            let rhs_null_bytes = 
rhs.null_buffer().as_ref().unwrap().as_slice();
+            let lhs_nulls = lhs.nulls().unwrap();
+            let rhs_nulls = rhs.nulls().unwrap();
             // with nulls, we need to compare item by item whenever it is not 
null
             (0..len).all(|i| {
                 let lhs_pos = lhs_start + i;
                 let rhs_pos = rhs_start + i;
-                let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + 
lhs.offset());
-                let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + 
rhs.offset());
+                let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+                let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
                 lhs_is_null
                     || (lhs_is_null == rhs_is_null)
@@ -73,14 +72,16 @@ pub(super) fn primitive_equal<T>(
                         )
             })
         } else {
+            let lhs_nulls = lhs.nulls().unwrap();
             let lhs_slices_iter = BitSliceIterator::new(
-                lhs.null_buffer().as_ref().unwrap(),
-                lhs_start + lhs.offset(),
+                lhs_nulls.validity(),
+                lhs_start + lhs_nulls.offset(),
                 len,
             );
+            let rhs_nulls = rhs.nulls().unwrap();
             let rhs_slices_iter = BitSliceIterator::new(
-                rhs.null_buffer().as_ref().unwrap(),
-                rhs_start + rhs.offset(),
+                rhs_nulls.validity(),
+                rhs_start + rhs_nulls.offset(),
                 len,
             );
 
diff --git a/arrow-data/src/equal/structure.rs 
b/arrow-data/src/equal/structure.rs
index 25ab340cd..e4751c26f 100644
--- a/arrow-data/src/equal/structure.rs
+++ b/arrow-data/src/equal/structure.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::data::{contains_nulls, ArrayData};
-use arrow_buffer::bit_util::get_bit;
 
 use super::equal_range;
 
@@ -46,19 +45,19 @@ pub(super) fn struct_equal(
 ) -> bool {
     // Only checking one null mask here because by the time the control flow 
reaches
     // this point, the equality of the two masks would have already been 
verified.
-    if !contains_nulls(lhs.null_buffer(), lhs_start + lhs.offset(), len) {
+    if !contains_nulls(lhs.nulls(), lhs_start, len) {
         equal_child_values(lhs, rhs, lhs_start, rhs_start, len)
     } else {
         // get a ref of the null buffer bytes, to use in testing for nullness
-        let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
-        let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
+        let lhs_nulls = lhs.nulls().unwrap();
+        let rhs_nulls = rhs.nulls().unwrap();
         // with nulls, we need to compare item by item whenever it is not null
         (0..len).all(|i| {
             let lhs_pos = lhs_start + i;
             let rhs_pos = rhs_start + i;
             // if both struct and child had no null buffers,
-            let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
-            let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
+            let lhs_is_null = lhs_nulls.is_null(lhs_pos);
+            let rhs_is_null = rhs_nulls.is_null(rhs_pos);
 
             if lhs_is_null != rhs_is_null {
                 return false;
diff --git a/arrow-data/src/equal/utils.rs b/arrow-data/src/equal/utils.rs
index b3f7fc0b0..d1f0f392a 100644
--- a/arrow-data/src/equal/utils.rs
+++ b/arrow-data/src/equal/utils.rs
@@ -49,15 +49,16 @@ pub(super) fn equal_nulls(
     rhs_start: usize,
     len: usize,
 ) -> bool {
-    let lhs_offset = lhs_start + lhs.offset();
-    let rhs_offset = rhs_start + rhs.offset();
-
-    match (lhs.null_buffer(), rhs.null_buffer()) {
-        (Some(lhs), Some(rhs)) => {
-            equal_bits(lhs.as_slice(), rhs.as_slice(), lhs_offset, rhs_offset, 
len)
-        }
-        (Some(lhs), None) => !contains_nulls(Some(lhs), lhs_offset, len),
-        (None, Some(rhs)) => !contains_nulls(Some(rhs), rhs_offset, len),
+    match (lhs.nulls(), rhs.nulls()) {
+        (Some(lhs), Some(rhs)) => equal_bits(
+            lhs.validity(),
+            rhs.validity(),
+            lhs.offset() + lhs_start,
+            rhs.offset() + rhs_start,
+            len,
+        ),
+        (Some(lhs), None) => !contains_nulls(Some(lhs), lhs_start, len),
+        (None, Some(rhs)) => !contains_nulls(Some(rhs), rhs_start, len),
         (None, None) => true,
     }
 }
diff --git a/arrow-data/src/equal/variable_size.rs 
b/arrow-data/src/equal/variable_size.rs
index f661c614d..ae8804374 100644
--- a/arrow-data/src/equal/variable_size.rs
+++ b/arrow-data/src/equal/variable_size.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::data::{count_nulls, ArrayData};
-use arrow_buffer::bit_util::get_bit;
 use arrow_buffer::ArrowNativeType;
 use num::Integer;
 
@@ -60,8 +59,8 @@ pub(super) fn variable_sized_equal<T: ArrowNativeType + 
Integer>(
     let lhs_values = lhs.buffers()[1].as_slice();
     let rhs_values = rhs.buffers()[1].as_slice();
 
-    let lhs_null_count = count_nulls(lhs.null_buffer(), lhs_start + 
lhs.offset(), len);
-    let rhs_null_count = count_nulls(rhs.null_buffer(), rhs_start + 
rhs.offset(), len);
+    let lhs_null_count = count_nulls(lhs.nulls(), lhs_start, len);
+    let rhs_null_count = count_nulls(rhs.nulls(), rhs_start, len);
 
     if lhs_null_count == 0
         && rhs_null_count == 0
@@ -83,15 +82,8 @@ pub(super) fn variable_sized_equal<T: ArrowNativeType + 
Integer>(
             let rhs_pos = rhs_start + i;
 
             // the null bits can still be `None`, indicating that the value is 
valid.
-            let lhs_is_null = !lhs
-                .null_buffer()
-                .map(|v| get_bit(v.as_slice(), lhs.offset() + lhs_pos))
-                .unwrap_or(true);
-
-            let rhs_is_null = !rhs
-                .null_buffer()
-                .map(|v| get_bit(v.as_slice(), rhs.offset() + rhs_pos))
-                .unwrap_or(true);
+            let lhs_is_null = lhs.nulls().map(|v| 
v.is_null(lhs_pos)).unwrap_or_default();
+            let rhs_is_null = rhs.nulls().map(|v| 
v.is_null(rhs_pos)).unwrap_or_default();
 
             lhs_is_null
                 || (lhs_is_null == rhs_is_null)
diff --git a/arrow-data/src/ffi.rs b/arrow-data/src/ffi.rs
index e506653bb..b7d690fb9 100644
--- a/arrow-data/src/ffi.rs
+++ b/arrow-data/src/ffi.rs
@@ -17,8 +17,10 @@
 
 //! Contains declarations to bind to the [C Data 
Interface](https://arrow.apache.org/docs/format/CDataInterface.html).
 
+use crate::bit_mask::set_bits;
 use crate::{layout, ArrayData};
-use arrow_buffer::Buffer;
+use arrow_buffer::buffer::NullBuffer;
+use arrow_buffer::{Buffer, MutableBuffer};
 use arrow_schema::DataType;
 use std::ffi::c_void;
 
@@ -83,6 +85,29 @@ unsafe extern "C" fn release_array(array: *mut 
FFI_ArrowArray) {
     array.release = None;
 }
 
+/// Aligns the provided `nulls` to the provided `data_offset`
+///
+/// This is a temporary measure until offset is removed from ArrayData (#1799)
+fn align_nulls(data_offset: usize, nulls: Option<&NullBuffer>) -> 
Option<Buffer> {
+    let nulls = nulls?;
+    if data_offset == nulls.offset() {
+        // Underlying buffer is already aligned
+        return Some(nulls.buffer().clone());
+    }
+    if data_offset == 0 {
+        return Some(nulls.inner().sliced());
+    }
+    let mut builder = MutableBuffer::new_null(data_offset + nulls.len());
+    set_bits(
+        builder.as_slice_mut(),
+        nulls.validity(),
+        data_offset,
+        nulls.offset(),
+        nulls.len(),
+    );
+    Some(builder.into())
+}
+
 struct ArrayPrivateData {
     #[allow(dead_code)]
     buffers: Vec<Option<Buffer>>,
@@ -102,7 +127,7 @@ impl FFI_ArrowArray {
         let buffers = if data_layout.can_contain_null_mask {
             // * insert the null buffer at the start
             // * make all others `Option<Buffer>`.
-            std::iter::once(data.null_buffer().cloned())
+            std::iter::once(align_nulls(data.offset(), data.nulls()))
                 .chain(data.buffers().iter().map(|b| Some(b.clone())))
                 .collect::<Vec<_>>()
         } else {
diff --git a/arrow-data/src/lib.rs b/arrow-data/src/lib.rs
index b37a8c5da..2b105f5bb 100644
--- a/arrow-data/src/lib.rs
+++ b/arrow-data/src/lib.rs
@@ -17,8 +17,6 @@
 
 //! Array data abstractions for [Apache Arrow](https://docs.rs/arrow)
 
-mod bitmap;
-pub use bitmap::Bitmap;
 mod data;
 pub use data::*;
 
diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs
index fef6d4be4..2719b96b6 100644
--- a/arrow-data/src/transform/mod.rs
+++ b/arrow-data/src/transform/mod.rs
@@ -20,6 +20,7 @@ use super::{
     ArrayData, ArrayDataBuilder,
 };
 use crate::bit_mask::set_bits;
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_buffer::{bit_util, i256, ArrowNativeType, MutableBuffer};
 use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
 use half::f16;
@@ -76,26 +77,30 @@ impl<'a> _MutableArrayData<'a> {
             }
         };
 
+        let nulls = (self.null_count > 0).then(|| {
+            let bools = BooleanBuffer::new(self.null_buffer.into(), 0, 
self.len);
+            unsafe { NullBuffer::new_unchecked(bools, self.null_count) }
+        });
+
         ArrayDataBuilder::new(self.data_type)
             .offset(0)
             .len(self.len)
-            .null_count(self.null_count)
+            .nulls(nulls)
             .buffers(buffers)
             .child_data(child_data)
-            .null_bit_buffer((self.null_count > 0).then(|| 
self.null_buffer.into()))
     }
 }
 
 fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> 
ExtendNullBits {
-    if let Some(bitmap) = array.null_bitmap() {
-        let bytes = bitmap.buffer().as_slice();
+    if let Some(nulls) = array.nulls() {
+        let bytes = nulls.validity();
         Box::new(move |mutable, start, len| {
             utils::resize_for_bits(&mut mutable.null_buffer, mutable.len + 
len);
             mutable.null_count += set_bits(
                 mutable.null_buffer.as_slice_mut(),
                 bytes,
                 mutable.len,
-                array.offset() + start,
+                nulls.offset() + start,
                 len,
             );
         })
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 6842474fb..bb367f944 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -227,10 +227,8 @@ fn create_array(
             buffer_index = values_triple.2;
 
             let run_array_length = run_node.length() as usize;
-            let run_array_null_count = run_node.null_count() as usize;
             let data = ArrayData::builder(data_type.clone())
                 .len(run_array_length)
-                .null_count(run_array_null_count)
                 .offset(0)
                 .add_child_data(run_ends_triple.0.into_data())
                 .add_child_data(values_triple.0.into_data())
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index f01934015..75c48bebc 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -617,7 +617,6 @@ fn into_zero_offset_run_array<R: RunEndIndexType>(
         // The function builds a valid run_ends array and hence need not be 
validated.
         ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
             .len(physical_length)
-            .null_count(0)
             .add_buffer(builder.finish())
             .build_unchecked()
     };
@@ -1220,7 +1219,7 @@ fn write_array_data(
     }
     if has_validity_bitmap(array_data.data_type(), write_options) {
         // write null buffer if exists
-        let null_buffer = match array_data.null_buffer() {
+        let null_buffer = match array_data.nulls() {
             None => {
                 // create a buffer and fill it with valid bits
                 let num_bytes = bit_util::ceil(num_rows, 8);
@@ -1228,7 +1227,7 @@ fn write_array_data(
                 let buffer = buffer.with_bitset(num_bytes, true);
                 buffer.into()
             }
-            Some(buffer) => buffer.bit_slice(array_data.offset(), 
array_data.len()),
+            Some(buffer) => buffer.inner().sliced(),
         };
 
         offset = write_buffer(
diff --git a/arrow-json/src/raw/list_array.rs b/arrow-json/src/raw/list_array.rs
index 91ca4b727..a57f42733 100644
--- a/arrow-json/src/raw/list_array.rs
+++ b/arrow-json/src/raw/list_array.rs
@@ -19,6 +19,7 @@ use crate::raw::tape::{Tape, TapeElement};
 use crate::raw::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
 use arrow_array::OffsetSizeTrait;
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_data::{ArrayData, ArrayDataBuilder};
 use arrow_schema::{ArrowError, DataType};
 use std::marker::PhantomData;
@@ -62,7 +63,6 @@ impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> 
{
         let mut offsets = BufferBuilder::<O>::new(pos.len() + 1);
         offsets.append(O::from_usize(0).unwrap());
 
-        let mut null_count = 0;
         let mut nulls = self
             .is_nullable
             .then(|| BooleanBufferBuilder::new(pos.len()));
@@ -76,7 +76,6 @@ impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> 
{
                 }
                 (TapeElement::Null, Some(nulls)) => {
                     nulls.append(false);
-                    null_count += 1;
                     *p + 1
                 }
                 (d, _) => return Err(tape_error(d, "[")),
@@ -102,11 +101,13 @@ impl<O: OffsetSizeTrait> ArrayDecoder for 
ListArrayDecoder<O> {
         }
 
         let child_data = self.decoder.decode(tape, &child_pos)?;
+        let nulls = nulls
+            .as_mut()
+            .map(|x| NullBuffer::new(BooleanBuffer::new(x.finish(), 0, 
pos.len())));
 
         let data = ArrayDataBuilder::new(self.data_type.clone())
             .len(pos.len())
-            .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
-            .null_count(null_count)
+            .nulls(nulls)
             .add_buffer(offsets.finish())
             .child_data(vec![child_data]);
 
diff --git a/arrow-json/src/raw/map_array.rs b/arrow-json/src/raw/map_array.rs
index ac48d8bce..dee142bef 100644
--- a/arrow-json/src/raw/map_array.rs
+++ b/arrow-json/src/raw/map_array.rs
@@ -18,6 +18,7 @@
 use crate::raw::tape::{Tape, TapeElement};
 use crate::raw::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_buffer::ArrowNativeType;
 use arrow_data::{ArrayData, ArrayDataBuilder};
 use arrow_schema::{ArrowError, DataType};
@@ -88,7 +89,6 @@ impl ArrayDecoder for MapArrayDecoder {
         let mut key_pos = Vec::with_capacity(pos.len());
         let mut value_pos = Vec::with_capacity(pos.len());
 
-        let mut null_count = 0;
         let mut nulls = self
             .is_nullable
             .then(|| BooleanBufferBuilder::new(pos.len()));
@@ -102,7 +102,6 @@ impl ArrayDecoder for MapArrayDecoder {
                 }
                 (TapeElement::Null, Some(nulls)) => {
                     nulls.append(false);
-                    null_count += 1;
                     p + 1
                 }
                 (d, _) => return Err(tape_error(d, "{")),
@@ -140,11 +139,14 @@ impl ArrayDecoder for MapArrayDecoder {
         // Valid by construction
         let struct_data = unsafe { struct_data.build_unchecked() };
 
+        let nulls = nulls
+            .as_mut()
+            .map(|x| NullBuffer::new(BooleanBuffer::new(x.finish(), 0, 
pos.len())));
+
         let builder = ArrayDataBuilder::new(self.data_type.clone())
             .len(pos.len())
             .buffers(vec![offsets.finish()])
-            .null_count(null_count)
-            .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+            .nulls(nulls)
             .child_data(vec![struct_data]);
 
         // Safety:
diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs
index 595a54c10..a0dbcbd53 100644
--- a/arrow-json/src/raw/mod.rs
+++ b/arrow-json/src/raw/mod.rs
@@ -481,9 +481,10 @@ mod tests {
         assert_eq!(batches.len(), 1);
 
         let list = as_list_array(batches[0].column(0).as_ref());
+        assert_eq!(list.len(), 3);
         assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
         assert_eq!(list.null_count(), 1);
-        assert!(list.is_null(4));
+        assert!(list.is_null(2));
         let list_values = 
as_primitive_array::<Int32Type>(list.values().as_ref());
         assert_eq!(list_values.values(), &[5, 6]);
 
@@ -501,10 +502,15 @@ mod tests {
         assert!(b.is_null(2));
 
         let nested_list = as_struct_array(batches[0].column(2).as_ref());
+        assert_eq!(nested_list.len(), 3);
+        assert_eq!(nested_list.null_count(), 1);
+        assert!(nested_list.is_null(2));
+
         let list2 = as_list_array(nested_list.column(0).as_ref());
+        assert_eq!(list2.len(), 3);
         assert_eq!(list2.null_count(), 1);
         assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
-        assert!(list2.is_null(3));
+        assert!(list2.is_null(2));
 
         let list2_values = as_struct_array(list2.values().as_ref());
 
diff --git a/arrow-json/src/raw/struct_array.rs 
b/arrow-json/src/raw/struct_array.rs
index 64ceff224..1d0019993 100644
--- a/arrow-json/src/raw/struct_array.rs
+++ b/arrow-json/src/raw/struct_array.rs
@@ -18,6 +18,7 @@
 use crate::raw::tape::{Tape, TapeElement};
 use crate::raw::{make_decoder, tape_error, ArrayDecoder};
 use arrow_array::builder::BooleanBufferBuilder;
+use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
 use arrow_data::{ArrayData, ArrayDataBuilder};
 use arrow_schema::{ArrowError, DataType, Field};
 
@@ -54,7 +55,6 @@ impl ArrayDecoder for StructArrayDecoder {
         let mut child_pos: Vec<_> =
             (0..fields.len()).map(|_| vec![0; pos.len()]).collect();
 
-        let mut null_count = 0;
         let mut nulls = self
             .is_nullable
             .then(|| BooleanBufferBuilder::new(pos.len()));
@@ -68,7 +68,6 @@ impl ArrayDecoder for StructArrayDecoder {
                 }
                 (TapeElement::Null, Some(nulls)) => {
                     nulls.append(false);
-                    null_count += 1;
                     continue;
                 }
                 (d, _) => return Err(tape_error(d, "{")),
@@ -108,10 +107,13 @@ impl ArrayDecoder for StructArrayDecoder {
             .iter()
             .for_each(|x| assert_eq!(x.len(), pos.len()));
 
+        let nulls = nulls
+            .as_mut()
+            .map(|x| NullBuffer::new(BooleanBuffer::new(x.finish(), 0, 
pos.len())));
+
         let data = ArrayDataBuilder::new(self.data_type.clone())
             .len(pos.len())
-            .null_count(null_count)
-            .null_bit_buffer(nulls.as_mut().map(|x| x.finish()))
+            .nulls(nulls)
             .child_data(child_data);
 
         // Safety
diff --git a/arrow-json/src/reader.rs b/arrow-json/src/reader.rs
index 7df63bf8d..0ef438e36 100644
--- a/arrow-json/src/reader.rs
+++ b/arrow-json/src/reader.rs
@@ -2378,7 +2378,7 @@ mod tests {
             Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7])
         );
         // compare list null buffers
-        assert_eq!(read.data().null_buffer(), expected.data().null_buffer());
+        assert_eq!(read.data().nulls(), expected.data().nulls());
         // build struct from list
         let struct_array = as_struct_array(read.values());
         let expected_struct_array = as_struct_array(expected.values());
@@ -2389,8 +2389,8 @@ mod tests {
         assert_eq!(1, expected_struct_array.null_count());
         // test struct's nulls
         assert_eq!(
-            struct_array.data().null_buffer(),
-            expected_struct_array.data().null_buffer()
+            struct_array.data().nulls(),
+            expected_struct_array.data().nulls()
         );
         // test struct's fields
         let read_b = struct_array.column(0);
diff --git a/arrow-ord/src/comparison.rs b/arrow-ord/src/comparison.rs
index a4f1fdb88..2702514ed 100644
--- a/arrow-ord/src/comparison.rs
+++ b/arrow-ord/src/comparison.rs
@@ -104,10 +104,7 @@ pub fn eq_utf8<OffsetSize: OffsetSizeTrait>(
 fn utf8_empty<OffsetSize: OffsetSizeTrait, const EQ: bool>(
     left: &GenericStringArray<OffsetSize>,
 ) -> Result<BooleanArray, ArrowError> {
-    let null_bit_buffer = left
-        .data()
-        .null_buffer()
-        .map(|b| b.bit_slice(left.offset(), left.len()));
+    let null_bit_buffer = left.data().nulls().map(|b| b.inner().sliced());
 
     let buffer = unsafe {
         
MutableBuffer::from_trusted_len_iter_bool(left.value_offsets().windows(2).map(
@@ -213,10 +210,7 @@ pub fn eq_bool_scalar(
             DataType::Boolean,
             len,
             None,
-            left.data_ref()
-                .null_bitmap()
-                .as_ref()
-                .map(|b| b.buffer().bit_slice(left_offset, len)),
+            left.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![values],
             vec![],
@@ -1439,10 +1433,7 @@ where
         result_remainder.copy_from_slice(remainder_mask_as_bytes);
     }
 
-    let null_bit_buffer = left
-        .data_ref()
-        .null_buffer()
-        .map(|b| b.bit_slice(left.offset(), left.len()));
+    let null_bit_buffer = left.data().nulls().map(|b| b.inner().sliced());
 
     // null count is the same as in the input since the right side of the 
scalar comparison cannot be null
     let null_count = left.null_count();
diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs
index c4baa2283..230eb9390 100644
--- a/arrow-ord/src/sort.rs
+++ b/arrow-ord/src/sort.rs
@@ -675,7 +675,6 @@ fn sort_run_downcasted<R: RunEndIndexType>(
         // The function builds a valid run_ends array and hence need not be 
validated.
         ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
             .len(new_physical_len)
-            .null_count(0)
             .add_buffer(new_run_ends_builder.finish())
             .build_unchecked()
     };
diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs
index dcd247be1..833baac7b 100644
--- a/arrow-row/src/list.rs
+++ b/arrow-row/src/list.rs
@@ -168,8 +168,7 @@ pub unsafe fn decode<O: OffsetSizeTrait>(
 
     let builder = ArrayDataBuilder::new(field.data_type.clone())
         .len(rows.len())
-        .null_count(canonical.null_count())
-        .null_bit_buffer(canonical.data().null_buffer().cloned())
+        .nulls(canonical.data().nulls().cloned())
         .add_buffer(offsets.finish())
         .add_child_data(child_data);
 
diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs
index fde4b41b0..d8ea9fceb 100644
--- a/arrow-select/src/filter.rs
+++ b/arrow-select/src/filter.rs
@@ -153,11 +153,12 @@ pub fn build_filter(filter: &BooleanArray) -> 
Result<Filter, ArrowError> {
 /// Remove null values by do a bitmask AND operation with null bits and the 
boolean bits.
 pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
     let array_data = filter.data_ref();
-    let null_bitmap = array_data.null_buffer().unwrap();
+    let nulls = array_data.nulls().unwrap();
     let mask = filter.values();
     let offset = filter.offset();
 
-    let new_mask = buffer_bin_and(mask, offset, null_bitmap, offset, 
filter.len());
+    let new_mask =
+        buffer_bin_and(mask, offset, nulls.buffer(), nulls.offset(), 
filter.len());
 
     let array_data = ArrayData::builder(DataType::Boolean)
         .len(filter.len())
@@ -410,7 +411,8 @@ fn filter_null_mask(
         return None;
     }
 
-    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let nulls = data.nulls()?;
+    let nulls = filter_bits(nulls.buffer(), nulls.offset(), predicate);
     // The filtered `nulls` has a length of `predicate.count` bits and
     // therefore the null count is this minus the number of valid bits
     let null_count = predicate.count - nulls.count_set_bits_offset(0, 
predicate.count);
diff --git a/arrow-select/src/nullif.rs b/arrow-select/src/nullif.rs
index 34876e948..4b052ce00 100644
--- a/arrow-select/src/nullif.rs
+++ b/arrow-select/src/nullif.rs
@@ -53,13 +53,13 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> 
Result<ArrayRef, ArrowE
     //              OR left null bitmap & !(right_values & right_bitmap)
 
     // Compute right_values & right_bitmap
-    let (right, r_offset) = match right_data.null_buffer() {
-        Some(buffer) => (
+    let (right, r_offset) = match right_data.nulls() {
+        Some(nulls) => (
             buffer_bin_and(
                 &right_data.buffers()[0],
                 right_data.offset(),
-                buffer,
-                right_data.offset(),
+                nulls.buffer(),
+                nulls.offset(),
                 len,
             ),
             0,
@@ -69,15 +69,21 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> 
Result<ArrayRef, ArrowE
 
     // Compute left null bitmap & !right
 
-    let (combined, null_count) = match left_data.null_buffer() {
+    let (combined, null_count) = match left_data.nulls() {
         Some(left) => {
             let mut valid_count = 0;
-            let b =
-                bitwise_bin_op_helper(left, l_offset, &right, r_offset, len, 
|l, r| {
+            let b = bitwise_bin_op_helper(
+                left.buffer(),
+                left.offset(),
+                &right,
+                r_offset,
+                len,
+                |l, r| {
                     let t = l & !r;
                     valid_count += t.count_ones() as usize;
                     t
-                });
+                },
+            );
             (b, len - valid_count)
         }
         None => {
diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index 6436dc0d5..771a7eeb5 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -340,12 +340,7 @@ where
     // Soundness: `slice.map` is `TrustedLen`.
     let buffer = unsafe { Buffer::try_from_trusted_len_iter(values)? };
 
-    Ok((
-        buffer,
-        indices_data
-            .null_buffer()
-            .map(|b| b.bit_slice(indices_data.offset(), indices.len())),
-    ))
+    Ok((buffer, indices_data.nulls().map(|b| b.inner().sliced())))
 }
 
 // take implementation when both values and indices contain nulls
@@ -530,14 +525,11 @@ where
     IndexType::Native: ToPrimitive,
 {
     let val_buf = take_bits(values.values(), values.offset(), indices)?;
-    let null_buf = match values.data().null_buffer() {
-        Some(buf) if values.null_count() > 0 => {
-            Some(take_bits(buf, values.offset(), indices)?)
+    let null_buf = match values.data().nulls() {
+        Some(nulls) if nulls.null_count() > 0 => {
+            Some(take_bits(nulls.buffer(), nulls.offset(), indices)?)
         }
-        _ => indices
-            .data()
-            .null_buffer()
-            .map(|b| b.bit_slice(indices.offset(), indices.len())),
+        _ => indices.data().nulls().map(|b| b.inner().sliced()),
     };
 
     let data = unsafe {
@@ -626,7 +618,7 @@ where
             }
             *offset = length_so_far;
         }
-        nulls = indices.data_ref().null_buffer().cloned();
+        nulls = indices.data().nulls().map(|b| b.inner().sliced());
     } else {
         let num_bytes = bit_util::ceil(data_len, 8);
 
@@ -791,7 +783,7 @@ where
             values.data_type().clone(),
             new_keys.len(),
             Some(new_keys_data.null_count()),
-            new_keys_data.null_buffer().cloned(),
+            new_keys_data.nulls().map(|b| b.inner().sliced()),
             0,
             new_keys_data.buffers().to_vec(),
             values.data().child_data().to_vec(),
@@ -1639,7 +1631,7 @@ mod tests {
             let expected_list_data = ArrayData::builder(list_data_type)
                 .len(5)
                 // null buffer remains the same as only the indices have nulls
-                .null_bit_buffer(index.data().null_buffer().cloned())
+                .nulls(index.data().nulls().cloned())
                 .add_buffer(expected_offsets)
                 .add_child_data(expected_data)
                 .build()
@@ -1713,7 +1705,7 @@ mod tests {
             let expected_list_data = ArrayData::builder(list_data_type)
                 .len(5)
                 // null buffer remains the same as only the indices have nulls
-                .null_bit_buffer(index.data().null_buffer().cloned())
+                .nulls(index.data().nulls().cloned())
                 .add_buffer(expected_offsets)
                 .add_child_data(expected_data)
                 .build()
diff --git a/arrow-string/src/length.rs b/arrow-string/src/length.rs
index 9651bef27..cd588fe01 100644
--- a/arrow-string/src/length.rs
+++ b/arrow-string/src/length.rs
@@ -37,10 +37,7 @@ macro_rules! unary_offsets {
         //      `values` come from a slice iterator with a known size.
         let buffer = unsafe { Buffer::from_trusted_len_iter(lengths) };
 
-        let null_bit_buffer = $array
-            .data_ref()
-            .null_buffer()
-            .map(|b| b.bit_slice($array.offset(), $array.len()));
+        let null_bit_buffer = $array.data().nulls().map(|b| 
b.inner().sliced());
 
         let data = unsafe {
             ArrayData::new_unchecked(
diff --git a/arrow-string/src/regexp.rs b/arrow-string/src/regexp.rs
index 4072d8ba0..bf6e60cfe 100644
--- a/arrow-string/src/regexp.rs
+++ b/arrow-string/src/regexp.rs
@@ -122,7 +122,7 @@ pub fn regexp_is_match_utf8_scalar<OffsetSize: 
OffsetSizeTrait>(
     regex: &str,
     flag: Option<&str>,
 ) -> Result<BooleanArray, ArrowError> {
-    let null_bit_buffer = array.data().null_buffer().cloned();
+    let null_bit_buffer = array.data().nulls().map(|x| x.inner().sliced());
     let mut result = BooleanBufferBuilder::new(array.len());
 
     let pattern = match flag {
diff --git a/arrow-string/src/substring.rs b/arrow-string/src/substring.rs
index 7d0430477..a59a54d7e 100644
--- a/arrow-string/src/substring.rs
+++ b/arrow-string/src/substring.rs
@@ -210,10 +210,7 @@ pub fn substring_by_char<OffsetSize: OffsetSizeTrait>(
             GenericStringArray::<OffsetSize>::DATA_TYPE,
             array.len(),
             None,
-            array
-                .data_ref()
-                .null_buffer()
-                .map(|b| b.bit_slice(array.offset(), array.len())),
+            array.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![new_offsets.finish(), vals.finish()],
             vec![],
@@ -297,10 +294,7 @@ fn binary_substring<OffsetSize: OffsetSizeTrait>(
             GenericBinaryArray::<OffsetSize>::DATA_TYPE,
             array.len(),
             None,
-            array
-                .data_ref()
-                .null_buffer()
-                .map(|b| b.bit_slice(array.offset(), array.len())),
+            array.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![Buffer::from_slice_ref(&new_offsets), new_values.into()],
             vec![],
@@ -345,10 +339,7 @@ fn fixed_size_binary_substring(
             DataType::FixedSizeBinary(new_len),
             num_of_elements,
             None,
-            array
-                .data_ref()
-                .null_buffer()
-                .map(|b| b.bit_slice(array.offset(), num_of_elements)),
+            array.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![new_values.into()],
             vec![],
@@ -427,10 +418,7 @@ fn utf8_substring<OffsetSize: OffsetSizeTrait>(
             GenericStringArray::<OffsetSize>::DATA_TYPE,
             array.len(),
             None,
-            array
-                .data_ref()
-                .null_buffer()
-                .map(|b| b.bit_slice(array.offset(), array.len())),
+            array.data().nulls().map(|b| b.inner().sliced()),
             0,
             vec![Buffer::from_slice_ref(&new_offsets), new_values.into()],
             vec![],
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index f7ce24a97..3d1bced29 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -306,10 +306,6 @@ pub use arrow_array::{downcast_dictionary_array, 
downcast_primitive_array};
 
 pub use arrow_buffer::{alloc, buffer};
 
-pub mod bitmap {
-    pub use arrow_data::Bitmap;
-}
-
 pub mod array;
 pub mod compute;
 #[cfg(feature = "csv")]
diff --git a/arrow/tests/array_validation.rs b/arrow/tests/array_validation.rs
index 3cdec46b5..7e45ee7af 100644
--- a/arrow/tests/array_validation.rs
+++ b/arrow/tests/array_validation.rs
@@ -943,17 +943,7 @@ fn test_try_new_sliced_struct() {
     let struct_array_slice = struct_array.slice(1, 3);
     let struct_array_data = struct_array_slice.data();
 
-    let cloned_data = ArrayData::try_new(
-        struct_array_slice.data_type().clone(),
-        struct_array_slice.len(),
-        struct_array_data.null_buffer().cloned(),
-        struct_array_slice.offset(),
-        struct_array_data.buffers().to_vec(),
-        struct_array_data.child_data().to_vec(),
-    )
-    .unwrap();
-    let cloned = make_array(cloned_data);
-
+    let cloned = make_array(struct_array_data.clone());
     assert_eq!(&struct_array_slice, &cloned);
 }
 
diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index 11ed35263..de4cba4ad 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -270,12 +270,12 @@ impl LevelInfoBuilder {
             })
         };
 
-        match list_data.null_bitmap() {
+        match list_data.nulls() {
             Some(nulls) => {
-                let null_offset = list_data.offset() + range.start;
+                let null_offset = range.start;
                 // TODO: Faster bitmask iteration (#1757)
                 for (idx, w) in offsets.windows(2).enumerate() {
-                    let is_valid = nulls.is_set(idx + null_offset);
+                    let is_valid = nulls.is_valid(idx + null_offset);
                     let start_idx = w[0].as_usize();
                     let end_idx = w[1].as_usize();
                     if !is_valid {
@@ -329,15 +329,14 @@ impl LevelInfoBuilder {
             }
         };
 
-        match array.data().null_bitmap() {
+        match array.data().nulls() {
             Some(validity) => {
-                let null_offset = array.data().offset();
                 let mut last_non_null_idx = None;
                 let mut last_null_idx = None;
 
                 // TODO: Faster bitmask iteration (#1757)
                 for i in range.clone() {
-                    match validity.is_set(i + null_offset) {
+                    match validity.is_valid(i) {
                         true => {
                             if let Some(last_idx) = last_null_idx.take() {
                                 write_null(children, last_idx..i)
@@ -379,12 +378,11 @@ impl LevelInfoBuilder {
                 def_levels.reserve(len);
                 info.non_null_indices.reserve(len);
 
-                match array.data().null_bitmap() {
+                match array.data().nulls() {
                     Some(nulls) => {
-                        let nulls_offset = array.data().offset();
                         // TODO: Faster bitmask iteration (#1757)
                         for i in range {
-                            match nulls.is_set(i + nulls_offset) {
+                            match nulls.is_valid(i) {
                                 true => {
                                     def_levels.push(info.max_def_level);
                                     info.non_null_indices.push(i)
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 84b7ab94c..7c27a365f 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -20,7 +20,6 @@ use std::ops::Range;
 use arrow_array::builder::BooleanBufferBuilder;
 use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
 use arrow_buffer::Buffer;
-use arrow_data::Bitmap;
 
 use crate::arrow::buffer::bit_util::count_set_bits;
 use crate::arrow::record_reader::buffer::BufferQueue;
@@ -105,7 +104,7 @@ impl DefinitionLevelBuffer {
     }
 
     /// Split `len` levels out of `self`
-    pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+    pub fn split_bitmask(&mut self, len: usize) -> Buffer {
         let old_builder = match &mut self.inner {
             BufferInner::Full { nulls, .. } => nulls,
             BufferInner::Mask { nulls } => nulls,
@@ -124,7 +123,7 @@ impl DefinitionLevelBuffer {
 
         // Swap into self
         self.len = new_builder.len();
-        Bitmap::from(std::mem::replace(old_builder, new_builder).finish())
+        std::mem::replace(old_builder, new_builder).finish()
     }
 
     pub fn nulls(&self) -> &BooleanBufferBuilder {
@@ -516,7 +515,7 @@ mod tests {
         let bitmap = buffer.split_bitmask(19);
 
         // Should have split off 19 records leaving, 81 behind
-        assert_eq!(bitmap.bit_len(), 3 * 8); // Note: bitmask only tracks 
bytes not bits
+        assert_eq!(bitmap.len(), 3); // Note: bitmask only tracks bytes not 
bits
         assert_eq!(buffer.nulls().len(), 81);
     }
 }
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index ef17b8d0e..e47bdee1c 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -18,7 +18,6 @@
 use std::cmp::{max, min};
 
 use arrow_buffer::Buffer;
-use arrow_data::Bitmap;
 
 use crate::arrow::record_reader::{
     buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
@@ -271,7 +270,7 @@ where
     /// Returns currently stored null bitmap data.
     /// The side effect is similar to `consume_def_levels`.
     pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
-        self.consume_bitmap().map(|b| b.into_buffer())
+        self.consume_bitmap()
     }
 
     /// Reset state of record reader.
@@ -284,7 +283,7 @@ where
     }
 
     /// Returns bitmap data.
-    pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
+    pub fn consume_bitmap(&mut self) -> Option<Buffer> {
         self.def_levels
             .as_mut()
             .map(|levels| levels.split_bitmask(self.num_values))
@@ -409,7 +408,6 @@ fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
 mod tests {
     use std::sync::Arc;
 
-    use arrow::bitmap::Bitmap;
     use arrow::buffer::Buffer;
     use arrow_array::builder::{Int16BufferBuilder, Int32BufferBuilder};
 
@@ -584,8 +582,7 @@ mod tests {
         // Verify bitmap
         let expected_valid = &[false, true, false, true, true, false, true];
         let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
-        let expected_bitmap = Bitmap::from(expected_buffer);
-        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
 
         // Verify result record data
         let actual = record_reader.consume_record_data();
@@ -695,8 +692,7 @@ mod tests {
         // Verify bitmap
         let expected_valid = &[true, false, false, true, true, true, true, 
true, true];
         let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
-        let expected_bitmap = Bitmap::from(expected_buffer);
-        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
 
         // Verify result record data
         let actual = record_reader.consume_record_data();
@@ -966,8 +962,7 @@ mod tests {
         // Verify bitmap
         let expected_valid = &[false, true, true];
         let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
-        let expected_bitmap = Bitmap::from(expected_buffer);
-        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
 
         // Verify result record data
         let actual = record_reader.consume_record_data();

Reply via email to