This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 05cb6b0066 Add ListView support to `arrow-row` and `arrow-ord` (#9176)
05cb6b0066 is described below

commit 05cb6b00667b123162b252906e3f3ffc6bc99f9e
Author: Frederic Branczyk <[email protected]>
AuthorDate: Fri Jan 30 16:08:30 2026 +0100

    Add ListView support to `arrow-row` and `arrow-ord` (#9176)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    Closes https://github.com/apache/arrow-rs/issues/9174
    
    # What changes are included in this PR?
    
    Implementation and tests. It's mostly copied from `List`.
    
    # Are these changes tested?
    
    Yes, see unit tests.
    
    # Are there any user-facing changes?
    
    No, purely additive.
    
    @alamb @Jefffrey
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ord/src/ord.rs  | 267 ++++++++++++++++++----
 arrow-row/src/lib.rs  | 615 +++++++++++++++++++++++++++++++++++++++++++++++++-
 arrow-row/src/list.rs | 229 +++++++++++++++++--
 3 files changed, 1039 insertions(+), 72 deletions(-)

diff --git a/arrow-ord/src/ord.rs b/arrow-ord/src/ord.rs
index c09fff807a..f120ae6e8d 100644
--- a/arrow-ord/src/ord.rs
+++ b/arrow-ord/src/ord.rs
@@ -233,6 +233,42 @@ fn compare_fixed_list(
     Ok(f)
 }
 
+fn compare_list_view<O: OffsetSizeTrait>(
+    left: &dyn Array,
+    right: &dyn Array,
+    opts: SortOptions,
+) -> Result<DynComparator, ArrowError> {
+    let left = left.as_list_view::<O>();
+    let right = right.as_list_view::<O>();
+
+    let c_opts = child_opts(opts);
+    let cmp = make_comparator(left.values().as_ref(), right.values().as_ref(), 
c_opts)?;
+
+    let l_offsets = left.offsets().clone();
+    let l_sizes = left.sizes().clone();
+    let r_offsets = right.offsets().clone();
+    let r_sizes = right.sizes().clone();
+
+    let f = compare(left, right, opts, move |i, j| {
+        let l_start = l_offsets[i].as_usize();
+        let l_len = l_sizes[i].as_usize();
+        let l_end = l_start + l_len;
+
+        let r_start = r_offsets[j].as_usize();
+        let r_len = r_sizes[j].as_usize();
+        let r_end = r_start + r_len;
+
+        for (i, j) in (l_start..l_end).zip(r_start..r_end) {
+            match cmp(i, j) {
+                Ordering::Equal => continue,
+                r => return r,
+            }
+        }
+        l_len.cmp(&r_len)
+    });
+    Ok(f)
+}
+
 fn compare_map(
     left: &dyn Array,
     right: &dyn Array,
@@ -470,6 +506,8 @@ pub fn make_comparator(
         },
         (List(_), List(_)) => compare_list::<i32>(left, right, opts),
         (LargeList(_), LargeList(_)) => compare_list::<i64>(left, right, opts),
+        (ListView(_), ListView(_)) => compare_list_view::<i32>(left, right, 
opts),
+        (LargeListView(_), LargeListView(_)) => compare_list_view::<i64>(left, 
right, opts),
         (FixedSizeList(_, _), FixedSizeList(_, _)) => compare_fixed_list(left, 
right, opts),
         (Struct(_), Struct(_)) => compare_struct(left, right, opts),
         (Dictionary(l_key, _), Dictionary(r_key, _)) => {
@@ -497,7 +535,7 @@ pub fn make_comparator(
 mod tests {
     use super::*;
     use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, 
StringBuilder};
-    use arrow_buffer::{IntervalDayTime, OffsetBuffer, ScalarBuffer, i256};
+    use arrow_buffer::{IntervalDayTime, NullBuffer, OffsetBuffer, 
ScalarBuffer, i256};
     use arrow_schema::{DataType, Field, Fields, UnionFields};
     use half::f16;
     use std::sync::Arc;
@@ -895,6 +933,18 @@ mod tests {
         test_bytes_impl::<LargeBinaryType>();
     }
 
+    fn assert_cmp_cases<A: Array>(
+        array1: &A,
+        array2: &A,
+        opts: SortOptions,
+        cases: &[(usize, usize, Ordering)],
+    ) {
+        let cmp = make_comparator(array1, array2, opts).unwrap();
+        for (left, right, expected) in cases {
+            assert_eq!(cmp(*left, *right), *expected);
+        }
+    }
+
     #[test]
     fn test_lists() {
         let mut a = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
@@ -922,53 +972,180 @@ mod tests {
         ]);
         let b = b.finish();
 
-        let opts = SortOptions {
-            descending: false,
-            nulls_first: true,
-        };
-        let cmp = make_comparator(&a, &b, opts).unwrap();
-        assert_eq!(cmp(0, 0), Ordering::Equal);
-        assert_eq!(cmp(0, 1), Ordering::Less);
-        assert_eq!(cmp(0, 2), Ordering::Less);
-        assert_eq!(cmp(1, 2), Ordering::Less);
-        assert_eq!(cmp(1, 3), Ordering::Greater);
-        assert_eq!(cmp(2, 0), Ordering::Less);
+        // Ascending with nulls first.
+        assert_cmp_cases(
+            &a,
+            &b,
+            SortOptions {
+                descending: false,
+                nulls_first: true,
+            },
+            &[
+                (0, 0, Ordering::Equal),
+                (0, 1, Ordering::Less),
+                (0, 2, Ordering::Less),
+                (1, 2, Ordering::Less),
+                (1, 3, Ordering::Greater),
+                (2, 0, Ordering::Less),
+            ],
+        );
 
-        let opts = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
-        let cmp = make_comparator(&a, &b, opts).unwrap();
-        assert_eq!(cmp(0, 0), Ordering::Equal);
-        assert_eq!(cmp(0, 1), Ordering::Less);
-        assert_eq!(cmp(0, 2), Ordering::Less);
-        assert_eq!(cmp(1, 2), Ordering::Greater);
-        assert_eq!(cmp(1, 3), Ordering::Greater);
-        assert_eq!(cmp(2, 0), Ordering::Greater);
+        // Descending with nulls first.
+        assert_cmp_cases(
+            &a,
+            &b,
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+            &[
+                (0, 0, Ordering::Equal),
+                (0, 1, Ordering::Less),
+                (0, 2, Ordering::Less),
+                (1, 2, Ordering::Greater),
+                (1, 3, Ordering::Greater),
+                (2, 0, Ordering::Greater),
+            ],
+        );
 
-        let opts = SortOptions {
-            descending: true,
-            nulls_first: false,
-        };
-        let cmp = make_comparator(&a, &b, opts).unwrap();
-        assert_eq!(cmp(0, 0), Ordering::Equal);
-        assert_eq!(cmp(0, 1), Ordering::Greater);
-        assert_eq!(cmp(0, 2), Ordering::Greater);
-        assert_eq!(cmp(1, 2), Ordering::Greater);
-        assert_eq!(cmp(1, 3), Ordering::Less);
-        assert_eq!(cmp(2, 0), Ordering::Greater);
+        // Descending with nulls last.
+        assert_cmp_cases(
+            &a,
+            &b,
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+            &[
+                (0, 0, Ordering::Equal),
+                (0, 1, Ordering::Greater),
+                (0, 2, Ordering::Greater),
+                (1, 2, Ordering::Greater),
+                (1, 3, Ordering::Less),
+                (2, 0, Ordering::Greater),
+            ],
+        );
 
-        let opts = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-        let cmp = make_comparator(&a, &b, opts).unwrap();
-        assert_eq!(cmp(0, 0), Ordering::Equal);
-        assert_eq!(cmp(0, 1), Ordering::Greater);
-        assert_eq!(cmp(0, 2), Ordering::Greater);
-        assert_eq!(cmp(1, 2), Ordering::Less);
-        assert_eq!(cmp(1, 3), Ordering::Less);
-        assert_eq!(cmp(2, 0), Ordering::Less);
+        // Ascending with nulls last.
+        assert_cmp_cases(
+            &a,
+            &b,
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+            &[
+                (0, 0, Ordering::Equal),
+                (0, 1, Ordering::Greater),
+                (0, 2, Ordering::Greater),
+                (1, 2, Ordering::Less),
+                (1, 3, Ordering::Less),
+                (2, 0, Ordering::Less),
+            ],
+        );
+    }
+
+    fn list_view_array<O: OffsetSizeTrait>(
+        values: Vec<i32>,
+        offsets: &[usize],
+        sizes: &[usize],
+        valid: Option<&[bool]>,
+    ) -> GenericListViewArray<O> {
+        let offsets = offsets
+            .iter()
+            .map(|v| O::from_usize(*v).unwrap())
+            .collect::<ScalarBuffer<O>>();
+        let sizes = sizes
+            .iter()
+            .map(|v| O::from_usize(*v).unwrap())
+            .collect::<ScalarBuffer<O>>();
+        let field = Arc::new(Field::new_list_field(DataType::Int32, true));
+        let values = Int32Array::from(values);
+        let nulls = valid.map(NullBuffer::from);
+        GenericListViewArray::new(field, offsets, sizes, Arc::new(values), 
nulls)
+    }
+
+    fn test_list_view_comparisons<O: OffsetSizeTrait>() {
+        let array = list_view_array::<O>(
+            vec![1, 2, 3, 4, 5],
+            &[0, 2, 1, 0, 3],
+            &[2, 2, 2, 0, 2],
+            Some(&[true, true, true, true, false]),
+        );
+
+        // Ascending with nulls first (non-monotonic offsets and empty list).
+        assert_cmp_cases(
+            &array,
+            &array,
+            SortOptions {
+                descending: false,
+                nulls_first: true,
+            },
+            &[
+                (0, 2, Ordering::Less),    // [1,2] < [2,3]
+                (1, 2, Ordering::Greater), // [3,4] > [2,3]
+                (3, 0, Ordering::Less),    // [] < [1,2]
+                (4, 0, Ordering::Less),    // null < [1,2]
+            ],
+        );
+
+        // Ascending with nulls last.
+        assert_cmp_cases(
+            &array,
+            &array,
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+            &[
+                (0, 2, Ordering::Less),
+                (1, 2, Ordering::Greater),
+                (3, 0, Ordering::Less),
+                (4, 0, Ordering::Greater), // null last
+            ],
+        );
+
+        // Descending with nulls first.
+        assert_cmp_cases(
+            &array,
+            &array,
+            SortOptions {
+                descending: true,
+                nulls_first: true,
+            },
+            &[
+                (0, 2, Ordering::Greater),
+                (1, 2, Ordering::Less),
+                (3, 0, Ordering::Greater),
+                (4, 0, Ordering::Less),
+            ],
+        );
+
+        // Descending with nulls last.
+        assert_cmp_cases(
+            &array,
+            &array,
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+            &[
+                (0, 2, Ordering::Greater),
+                (1, 2, Ordering::Less),
+                (3, 0, Ordering::Greater),
+                (4, 0, Ordering::Greater),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_list_view() {
+        test_list_view_comparisons::<i32>();
+    }
+
+    #[test]
+    fn test_large_list_view() {
+        test_list_view_comparisons::<i64>();
     }
 
     #[test]
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 3c82730608..f56b42ba54 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -417,6 +417,54 @@ mod variable;
 ///
 ///```
 ///
+/// ## ListView Encoding
+///
+/// ListView arrays differ from List arrays in their representation: instead 
of using
+/// consecutive offset pairs to define each list, ListView uses explicit 
offset and size
+/// pairs for each element. This allows ListView elements to reference 
arbitrary (potentially
+/// overlapping) regions of the child array.
+///
+/// Despite this structural difference, ListView uses the **same row encoding 
as List**.
+/// Each list value is encoded as the concatenation of its child elements 
(each separately
+/// variable-length encoded), followed by a variable-length encoded empty byte 
array terminator.
+///
+/// **Important**: When a ListView is decoded back from row format, it is 
still a
+/// ListView, but any child element sharing that may have existed in the 
original
+/// (where multiple list entries could reference overlapping regions of the 
child
+/// array) is **not preserved** - each list's children are decoded 
independently
+/// with sequential offsets.
+///
+/// For example, given a ListView with offset/size pairs:
+///
+/// ```text
+/// offsets: [0, 1, 0]
+/// sizes:   [2, 2, 0]
+/// values:  [1_u8, 2_u8, 3_u8]
+///
+/// Resulting lists:
+/// [1_u8, 2_u8]  (offset=0, size=2 -> values[0..2])
+/// [2_u8, 3_u8]  (offset=1, size=2 -> values[1..3])
+/// []            (offset=0, size=0 -> empty)
+/// ```
+///
+/// The elements would be encoded identically to List encoding:
+///
+/// ```text
+///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+///  [1_u8, 2_u8]           │02│01│01│00│00│02│02│01│02│00│00│02│01│
+///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘
+///
+///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
+///  [2_u8, 3_u8]           │02│01│02│00│00│02│02│01│03│00│00│02│01│
+///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
+///                          └──── 2_u8 ────┘   └──── 3_u8 ────┘
+///
+///```
+///
+/// Note that element `2_u8` appears in both encoded rows, even though it was 
shared
+/// in the original ListView, and `[]` is represented by an empty byte array.
+///
 /// ## Union Encoding
 ///
 /// A union value is encoded as a single type-id byte followed by the row 
encoding of the selected child value.
@@ -506,6 +554,46 @@ enum Codec {
     Union(Vec<RowConverter>, Vec<i8>, Vec<OwnedRow>),
 }
 
+/// Computes the minimum offset and maximum end (offset + size) for a ListView 
array.
+/// Returns (min_offset, max_end) which can be used to slice the values array.
+fn compute_list_view_bounds<O: OffsetSizeTrait>(array: 
&GenericListViewArray<O>) -> (usize, usize) {
+    if array.is_empty() {
+        return (0, 0);
+    }
+
+    let offsets = array.value_offsets();
+    let sizes = array.value_sizes();
+    let values_len = array.values().len();
+
+    let mut min_offset = usize::MAX;
+    let mut max_end = 0usize;
+
+    for i in 0..array.len() {
+        let offset = offsets[i].as_usize();
+        let size = sizes[i].as_usize();
+        let end = offset + size;
+
+        if size > 0 {
+            min_offset = min_offset.min(offset);
+            max_end = max_end.max(end);
+        }
+
+        // Early exit if we've found the full range of the values array. This 
is possible with
+        // ListViews since offsets and sizes are arbitrary and the full range 
can be covered early
+        // in the iteration contrary to regular Lists.
+        if min_offset == 0 && max_end == values_len {
+            break;
+        }
+    }
+
+    if min_offset == usize::MAX {
+        // All lists are empty
+        (0, 0)
+    } else {
+        (min_offset, max_end)
+    }
+}
+
 impl Codec {
     fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
         match &sort_field.data_type {
@@ -535,7 +623,10 @@ impl Codec {
                 Ok(Self::RunEndEncoded(converter))
             }
             d if !d.is_nested() => Ok(Self::Stateless),
-            DataType::List(f) | DataType::LargeList(f) => {
+            DataType::List(f)
+            | DataType::LargeList(f)
+            | DataType::ListView(f)
+            | DataType::LargeListView(f) => {
                 // The encoded contents will be inverted if descending is set 
to true
                 // As such we set `descending` to false and negate nulls first 
if it
                 // it set to true
@@ -648,6 +739,20 @@ impl Codec {
                             .values()
                             .slice(first_offset, last_offset - first_offset)
                     }
+                    DataType::ListView(_) => {
+                        let list_view_array = array.as_list_view::<i32>();
+                        let (min_offset, max_end) = 
compute_list_view_bounds(list_view_array);
+                        list_view_array
+                            .values()
+                            .slice(min_offset, max_end - min_offset)
+                    }
+                    DataType::LargeListView(_) => {
+                        let list_view_array = array.as_list_view::<i64>();
+                        let (min_offset, max_end) = 
compute_list_view_bounds(list_view_array);
+                        list_view_array
+                            .values()
+                            .slice(min_offset, max_end - min_offset)
+                    }
                     DataType::FixedSizeList(_, _) => {
                         as_fixed_size_list_array(array).values().clone()
                     }
@@ -788,9 +893,11 @@ impl RowConverter {
     fn supports_datatype(d: &DataType) -> bool {
         match d {
             _ if !d.is_nested() => true,
-            DataType::List(f) | DataType::LargeList(f) | 
DataType::FixedSizeList(f, _) => {
-                Self::supports_datatype(f.data_type())
-            }
+            DataType::List(f)
+            | DataType::LargeList(f)
+            | DataType::ListView(f)
+            | DataType::LargeListView(f)
+            | DataType::FixedSizeList(f, _) => 
Self::supports_datatype(f.data_type()),
             DataType::Struct(f) => f.iter().all(|x| 
Self::supports_datatype(x.data_type())),
             DataType::RunEndEncoded(_, values) => 
Self::supports_datatype(values.data_type()),
             DataType::Union(fs, _mode) => fs
@@ -1617,6 +1724,26 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) 
-> LengthTracker {
                 DataType::LargeList(_) => {
                     list::compute_lengths(tracker.materialized(), rows, 
as_large_list_array(array))
                 }
+                DataType::ListView(_) => {
+                    let list_view = array.as_list_view::<i32>();
+                    let (min_offset, _) = compute_list_view_bounds(list_view);
+                    list::compute_lengths_list_view(
+                        tracker.materialized(),
+                        rows,
+                        list_view,
+                        min_offset,
+                    )
+                }
+                DataType::LargeListView(_) => {
+                    let list_view = array.as_list_view::<i64>();
+                    let (min_offset, _) = compute_list_view_bounds(list_view);
+                    list::compute_lengths_list_view(
+                        tracker.materialized(),
+                        rows,
+                        list_view,
+                        min_offset,
+                    )
+                }
                 DataType::FixedSizeList(_, _) => 
compute_lengths_fixed_size_list(
                     &mut tracker,
                     rows,
@@ -1845,6 +1972,16 @@ fn encode_column(
             DataType::LargeList(_) => {
                 list::encode(data, offsets, rows, opts, 
as_large_list_array(column))
             }
+            DataType::ListView(_) => {
+                let list_view = column.as_list_view::<i32>();
+                let (min_offset, _) = compute_list_view_bounds(list_view);
+                list::encode_list_view(data, offsets, rows, opts, list_view, 
min_offset)
+            }
+            DataType::LargeListView(_) => {
+                let list_view = column.as_list_view::<i64>();
+                let (min_offset, _) = compute_list_view_bounds(list_view);
+                list::encode_list_view(data, offsets, rows, opts, list_view, 
min_offset)
+            }
             DataType::FixedSizeList(_, _) => {
                 encode_fixed_size_list(data, offsets, rows, opts, 
as_fixed_size_list_array(column))
             }
@@ -2001,6 +2138,12 @@ unsafe fn decode_column(
             DataType::LargeList(_) => {
                 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, 
validate_utf8) }?)
             }
+            DataType::ListView(_) => Arc::new(unsafe {
+                list::decode_list_view::<i32>(converter, rows, field, 
validate_utf8)
+            }?),
+            DataType::LargeListView(_) => Arc::new(unsafe {
+                list::decode_list_view::<i64>(converter, rows, field, 
validate_utf8)
+            }?),
             DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
                 list::decode_fixed_size_list(
                     converter,
@@ -3193,6 +3336,403 @@ mod tests {
         test_nested_list::<i64>();
     }
 
+    fn test_single_list_view<O: OffsetSizeTrait>() {
+        let mut builder = GenericListViewBuilder::<O, 
_>::new(Int32Builder::new());
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.values().append_value(32);
+        builder.append(true);
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.values().append_value(12);
+        builder.append(true);
+        builder.values().append_value(32);
+        builder.values().append_value(52);
+        builder.append(true);
+        builder.values().append_value(32); // MASKED
+        builder.values().append_value(52); // MASKED
+        builder.append(false);
+        builder.values().append_value(32);
+        builder.values().append_null();
+        builder.append(true);
+        builder.append(true);
+        builder.values().append_value(17); // MASKED
+        builder.values().append_null(); // MASKED
+        builder.append(false);
+
+        let list = Arc::new(builder.finish()) as ArrayRef;
+        let d = list.data_type().clone();
+
+        let converter = 
RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
+
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
+        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
+        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
+        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
+        assert!(rows.row(3) < rows.row(5)); // null < []
+        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked 
values)
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        // Verify the content matches (ListView may have different physical 
layout but same logical content)
+        let back_list_view = back[0]
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+        let orig_list_view = list
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+
+        assert_eq!(back_list_view.len(), orig_list_view.len());
+        for i in 0..back_list_view.len() {
+            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
+            if back_list_view.is_valid(i) {
+                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
+            }
+        }
+
+        let options = SortOptions::default().asc().with_nulls_first(false);
+        let field = SortField::new_with_options(d.clone(), options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
+        assert!(rows.row(2) < rows.row(1)); // [32, 52] < [32, 52, 12]
+        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
+        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
+        assert!(rows.row(5) < rows.row(2)); // [] < [32, 52]
+        assert!(rows.row(3) > rows.row(5)); // null > []
+        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked 
values)
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        let options = SortOptions::default().desc().with_nulls_first(false);
+        let field = SortField::new_with_options(d.clone(), options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
+        assert!(rows.row(3) > rows.row(2)); // null > [32, 52]
+        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 52]
+        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
+        assert!(rows.row(3) > rows.row(5)); // null > []
+        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked 
values)
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        let options = SortOptions::default().desc().with_nulls_first(true);
+        let field = SortField::new_with_options(d, options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
+        assert!(rows.row(2) > rows.row(1)); // [32, 52] > [32, 52, 12]
+        assert!(rows.row(3) < rows.row(2)); // null < [32, 52]
+        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 52]
+        assert!(rows.row(5) > rows.row(2)); // [] > [32, 52]
+        assert!(rows.row(3) < rows.row(5)); // null < []
+        assert_eq!(rows.row(3), rows.row(6)); // null = null (different masked 
values)
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        let sliced_list = list.slice(1, 5);
+        let rows_on_sliced_list = converter
+            .convert_columns(&[Arc::clone(&sliced_list)])
+            .unwrap();
+
+        assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); // 
[32, 52] > [32, 52, 12]
+        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); // 
null < [32, 52]
+        assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); // 
[32, null] < [32, 52]
+        assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); // 
[] > [32, 52]
+        assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); // 
null < []
+
+        let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+    }
+
+    fn test_nested_list_view<O: OffsetSizeTrait>() {
+        let mut builder = GenericListViewBuilder::<O, 
_>::new(GenericListViewBuilder::<O, _>::new(
+            Int32Builder::new(),
+        ));
+
+        // Row 0: [[1, 2], [1, null]]
+        builder.values().values().append_value(1);
+        builder.values().values().append_value(2);
+        builder.values().append(true);
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.append(true);
+
+        // Row 1: [[1, null], [1, null]]
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.append(true);
+
+        // Row 2: [[1, null], null]
+        builder.values().values().append_value(1);
+        builder.values().values().append_null();
+        builder.values().append(true);
+        builder.values().append(false);
+        builder.append(true);
+
+        // Row 3: null
+        builder.append(false);
+
+        // Row 4: [[1, 2]]
+        builder.values().values().append_value(1);
+        builder.values().values().append_value(2);
+        builder.values().append(true);
+        builder.append(true);
+
+        let list = Arc::new(builder.finish()) as ArrayRef;
+        let d = list.data_type().clone();
+
+        // [
+        //   [[1, 2], [1, null]],
+        //   [[1, null], [1, null]],
+        //   [[1, null], null]
+        //   null
+        //   [[1, 2]]
+        // ]
+        let options = SortOptions::default().asc().with_nulls_first(true);
+        let field = SortField::new_with_options(d.clone(), options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1));
+        assert!(rows.row(1) > rows.row(2));
+        assert!(rows.row(2) > rows.row(3));
+        assert!(rows.row(4) < rows.row(0));
+        assert!(rows.row(4) > rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        // Verify the content matches (ListView may have different physical 
layout but same logical content)
+        let back_list_view = back[0]
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+        let orig_list_view = list
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+
+        assert_eq!(back_list_view.len(), orig_list_view.len());
+        for i in 0..back_list_view.len() {
+            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
+            if back_list_view.is_valid(i) {
+                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
+            }
+        }
+
+        let options = SortOptions::default().desc().with_nulls_first(true);
+        let field = SortField::new_with_options(d.clone(), options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) > rows.row(1));
+        assert!(rows.row(1) > rows.row(2));
+        assert!(rows.row(2) > rows.row(3));
+        assert!(rows.row(4) > rows.row(0));
+        assert!(rows.row(4) > rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        // Verify the content matches
+        let back_list_view = back[0]
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+
+        assert_eq!(back_list_view.len(), orig_list_view.len());
+        for i in 0..back_list_view.len() {
+            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
+            if back_list_view.is_valid(i) {
+                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
+            }
+        }
+
+        let options = SortOptions::default().desc().with_nulls_first(false);
+        let field = SortField::new_with_options(d.clone(), options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        assert!(rows.row(0) < rows.row(1));
+        assert!(rows.row(1) < rows.row(2));
+        assert!(rows.row(2) < rows.row(3));
+        assert!(rows.row(4) > rows.row(0));
+        assert!(rows.row(4) < rows.row(1));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        // Verify the content matches
+        let back_list_view = back[0]
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+
+        assert_eq!(back_list_view.len(), orig_list_view.len());
+        for i in 0..back_list_view.len() {
+            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
+            if back_list_view.is_valid(i) {
+                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
+            }
+        }
+
+        let sliced_list = list.slice(1, 3);
+        let rows = converter
+            .convert_columns(&[Arc::clone(&sliced_list)])
+            .unwrap();
+
+        assert!(rows.row(0) < rows.row(1));
+        assert!(rows.row(1) < rows.row(2));
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+    }
+
+    #[test]
+    fn test_list_view() {
+        test_single_list_view::<i32>();
+        test_nested_list_view::<i32>();
+    }
+
+    #[test]
+    fn test_large_list_view() {
+        test_single_list_view::<i64>();
+        test_nested_list_view::<i64>();
+    }
+
+    fn test_list_view_with_shared_values<O: OffsetSizeTrait>() {
+        // Create a values array: [1, 2, 3, 4, 5, 6, 7, 8]
+        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
+        let field = Arc::new(Field::new_list_field(DataType::Int32, true));
+
+        // Create a ListView where:
+        // - Row 0: offset=0, size=3 -> [1, 2, 3]
+        // - Row 1: offset=0, size=3 -> [1, 2, 3] (same offset+size as row 0)
+        // - Row 2: offset=5, size=2 -> [6, 7] (non-monotonic offset)
+        // - Row 3: offset=2, size=2 -> [3, 4] (offset goes back)
+        // - Row 4: offset=1, size=4 -> [2, 3, 4, 5] (subset of values that 
contains row 3's range)
+        // - Row 5: offset=2, size=1 -> [3] (subset of row 3 and row 4)
+        let offsets = ScalarBuffer::<O>::from(vec![
+            O::from_usize(0).unwrap(),
+            O::from_usize(0).unwrap(),
+            O::from_usize(5).unwrap(),
+            O::from_usize(2).unwrap(),
+            O::from_usize(1).unwrap(),
+            O::from_usize(2).unwrap(),
+        ]);
+        let sizes = ScalarBuffer::<O>::from(vec![
+            O::from_usize(3).unwrap(),
+            O::from_usize(3).unwrap(),
+            O::from_usize(2).unwrap(),
+            O::from_usize(2).unwrap(),
+            O::from_usize(4).unwrap(),
+            O::from_usize(1).unwrap(),
+        ]);
+
+        let list_view: GenericListViewArray<O> =
+            GenericListViewArray::try_new(field, offsets, sizes, 
Arc::new(values), None).unwrap();
+
+        let d = list_view.data_type().clone();
+        let list = Arc::new(list_view) as ArrayRef;
+
+        let converter = 
RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        // Row 0 and Row 1 have the same content [1, 2, 3], so they should be 
equal
+        assert_eq!(rows.row(0), rows.row(1));
+
+        // [1, 2, 3] < [6, 7] (comparing first elements: 1 < 6)
+        assert!(rows.row(0) < rows.row(2));
+
+        // [3, 4] > [1, 2, 3] (comparing first elements: 3 > 1)
+        assert!(rows.row(3) > rows.row(0));
+
+        // [2, 3, 4, 5] > [1, 2, 3] (comparing first elements: 2 > 1)
+        assert!(rows.row(4) > rows.row(0));
+
+        // [3] < [3, 4] (same prefix but shorter)
+        assert!(rows.row(5) < rows.row(3));
+
+        // [3] < [2, 3, 4, 5] (comparing first elements: 3 > 2)
+        assert!(rows.row(5) > rows.row(4));
+
+        // Round-trip conversion
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+
+        // Verify logical content matches
+        let back_list_view = back[0]
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+        let orig_list_view = list
+            .as_any()
+            .downcast_ref::<GenericListViewArray<O>>()
+            .unwrap();
+
+        assert_eq!(back_list_view.len(), orig_list_view.len());
+        for i in 0..back_list_view.len() {
+            assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
+            if back_list_view.is_valid(i) {
+                assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
+            }
+        }
+
+        // Test with descending order
+        let options = SortOptions::default().desc();
+        let field = SortField::new_with_options(d, options);
+        let converter = RowConverter::new(vec![field]).unwrap();
+        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
+
+        // In descending order, comparisons are reversed
+        assert_eq!(rows.row(0), rows.row(1)); // Equal rows stay equal
+        assert!(rows.row(0) > rows.row(2)); // [1, 2, 3] > [6, 7] in desc
+        assert!(rows.row(3) < rows.row(0)); // [3, 4] < [1, 2, 3] in desc
+
+        let back = converter.convert_rows(&rows).unwrap();
+        assert_eq!(back.len(), 1);
+        back[0].to_data().validate_full().unwrap();
+    }
+
+    #[test]
+    fn test_list_view_shared_values() {
+        test_list_view_with_shared_values::<i32>();
+    }
+
+    #[test]
+    fn test_large_list_view_shared_values() {
+        test_list_view_with_shared_values::<i64>();
+    }
+
     #[test]
     fn test_fixed_size_list() {
         let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
@@ -3676,6 +4216,43 @@ mod tests {
         ListArray::new(field, offsets, values, Some(nulls))
     }
 
+    fn generate_list_view<F>(
+        rng: &mut impl RngCore,
+        len: usize,
+        valid_percent: f64,
+        values: F,
+    ) -> ListViewArray
+    where
+        F: FnOnce(usize) -> ArrayRef,
+    {
+        // Generate sizes first, then create a values array large enough
+        let sizes: Vec<i32> = (0..len).map(|_| 
rng.random_range(0..10)).collect();
+        let values_len: usize = sizes.iter().map(|s| *s as 
usize).sum::<usize>().max(1);
+        let values = values(values_len);
+
+        // Generate offsets that can overlap, be non-monotonic, or share ranges
+        let offsets: Vec<i32> = sizes
+            .iter()
+            .map(|&size| {
+                if size == 0 {
+                    0
+                } else {
+                    rng.random_range(0..=(values_len as i32 - size))
+                }
+            })
+            .collect();
+
+        let nulls = NullBuffer::from_iter((0..len).map(|_| 
rng.random_bool(valid_percent)));
+        let field = Arc::new(Field::new_list_field(values.data_type().clone(), 
true));
+        ListViewArray::new(
+            field,
+            ScalarBuffer::from(offsets),
+            ScalarBuffer::from(sizes),
+            values,
+            Some(nulls),
+        )
+    }
+
     fn generate_nulls(rng: &mut impl RngCore, len: usize) -> 
Option<NullBuffer> {
         Some(NullBuffer::from_iter(
             (0..len).map(|_| rng.random_bool(0.8)),
@@ -3810,8 +4387,8 @@ mod tests {
         )
     }
 
-    fn generate_column(rng: &mut impl RngCore, len: usize) -> ArrayRef {
-        match rng.random_range(0..19) {
+    fn generate_column(rng: &mut (impl RngCore + Clone), len: usize) -> 
ArrayRef {
+        match rng.random_range(0..23) {
             0 => Arc::new(generate_primitive_array::<Int32Type>(rng, len, 
0.8)),
             1 => Arc::new(generate_primitive_array::<UInt32Type>(rng, len, 
0.8)),
             2 => Arc::new(generate_primitive_array::<Int64Type>(rng, len, 
0.8)),
@@ -3850,12 +4427,36 @@ mod tests {
             15 => Arc::new(generate_byte_view(rng, len, 0.8)),
             16 => Arc::new(generate_fixed_stringview_column(len)),
             17 => Arc::new(
-                generate_list(rng, len + 1000, 0.8, |rng, values_len| {
+                generate_list(&mut rng.clone(), len + 1000, 0.8, |rng, 
values_len| {
                     Arc::new(generate_primitive_array::<Int64Type>(rng, 
values_len, 0.8))
                 })
                 .slice(500, len),
             ),
             18 => Arc::new(generate_boolean_array(rng, len, 0.8)),
+            19 => Arc::new(generate_list_view(
+                &mut rng.clone(),
+                len,
+                0.8,
+                |values_len| 
Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8)),
+            )),
+            20 => Arc::new(generate_list_view(
+                &mut rng.clone(),
+                len,
+                0.8,
+                |values_len| Arc::new(generate_strings::<i32>(rng, values_len, 
0.8)),
+            )),
+            21 => Arc::new(generate_list_view(
+                &mut rng.clone(),
+                len,
+                0.8,
+                |values_len| Arc::new(generate_struct(rng, values_len, 0.8)),
+            )),
+            22 => Arc::new(
+                generate_list_view(&mut rng.clone(), len + 1000, 0.8, 
|values_len| {
+                    Arc::new(generate_primitive_array::<Int64Type>(rng, 
values_len, 0.8))
+                })
+                .slice(500, len),
+            ),
             _ => unreachable!(),
         }
     }
diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs
index 6e552b0a93..843101673f 100644
--- a/arrow-row/src/list.rs
+++ b/arrow-row/src/list.rs
@@ -16,8 +16,13 @@
 // under the License.
 
 use crate::{LengthTracker, RowConverter, Rows, SortField, fixed, 
null_sentinel};
-use arrow_array::{Array, FixedSizeListArray, GenericListArray, 
OffsetSizeTrait, new_null_array};
-use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
+use arrow_array::{
+    Array, FixedSizeListArray, GenericListArray, GenericListViewArray, 
OffsetSizeTrait,
+    new_null_array,
+};
+use arrow_buffer::{
+    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, 
ScalarBuffer,
+};
 use arrow_data::ArrayDataBuilder;
 use arrow_schema::{ArrowError, DataType, SortOptions};
 use std::{ops::Range, sync::Arc};
@@ -27,29 +32,17 @@ pub fn compute_lengths<O: OffsetSizeTrait>(
     rows: &Rows,
     array: &GenericListArray<O>,
 ) {
-    let offsets = array.value_offsets().windows(2);
-    let mut rows_length_iter = rows.lengths();
+    let shift = array.value_offsets()[0].as_usize();
 
     lengths
         .iter_mut()
-        .zip(offsets)
+        .zip(array.value_offsets().windows(2))
         .enumerate()
         .for_each(|(idx, (length, offsets))| {
-            let len = offsets[1].as_usize() - offsets[0].as_usize();
-            if array.is_valid(idx) {
-                *length += 1 + rows_length_iter
-                    .by_ref()
-                    .take(len)
-                    .map(Some)
-                    .map(super::variable::padded_length)
-                    .sum::<usize>()
-            } else {
-                // Advance rows iterator by len
-                if len > 0 {
-                    rows_length_iter.nth(len - 1);
-                }
-                *length += 1;
-            }
+            let start = offsets[0].as_usize() - shift;
+            let end = offsets[1].as_usize() - shift;
+            let range = array.is_valid(idx).then_some(start..end);
+            *length += list_element_encoded_len(rows, range);
         });
 }
 
@@ -323,3 +316,199 @@ pub unsafe fn decode_fixed_size_list(
         builder.build_unchecked()
     }))
 }
+
+/// Computes the encoded length for a single list element given its child rows.
+///
+/// This is used by list types (List, LargeList, ListView, LargeListView) to 
determine
+/// the encoded length of a list element. For null elements, returns 1 (null 
sentinel only).
+/// For valid elements, returns 1 + the sum of padded lengths for each child 
row.
+#[inline]
+fn list_element_encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize 
{
+    match range {
+        None => 1,
+        Some(range) => {
+            1 + range
+                .map(|i| 
super::variable::padded_length(Some(rows.row(i).as_ref().len())))
+                .sum::<usize>()
+        }
+    }
+}
+
+/// Computes the encoded lengths for a `GenericListViewArray`
+///
+/// `rows` should contain the encoded child elements
+pub fn compute_lengths_list_view<O: OffsetSizeTrait>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &GenericListViewArray<O>,
+    shift: usize,
+) {
+    let offsets = array.value_offsets();
+    let sizes = array.value_sizes();
+
+    lengths.iter_mut().enumerate().for_each(|(idx, length)| {
+        let size = sizes[idx].as_usize();
+        let range = array.is_valid(idx).then(|| {
+            // For empty lists (size=0), offset may be arbitrary and could 
underflow when shifted.
+            // Use 0 as start since the range is empty anyway.
+            let start = if size > 0 {
+                offsets[idx].as_usize() - shift
+            } else {
+                0
+            };
+            start..start + size
+        });
+        *length += list_element_encoded_len(rows, range);
+    });
+}
+
+/// Encodes the provided `GenericListViewArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded child elements
+pub fn encode_list_view<O: OffsetSizeTrait>(
+    data: &mut [u8],
+    out_offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &GenericListViewArray<O>,
+    shift: usize,
+) {
+    let offsets = array.value_offsets();
+    let sizes = array.value_sizes();
+
+    out_offsets
+        .iter_mut()
+        .skip(1)
+        .enumerate()
+        .for_each(|(idx, offset)| {
+            let size = sizes[idx].as_usize();
+            let range = array.is_valid(idx).then(|| {
+                // For empty lists (size=0), offset may be arbitrary and could 
underflow when shifted.
+                // Use 0 as start since the range is empty anyway.
+                let start = if size > 0 {
+                    offsets[idx].as_usize() - shift
+                } else {
+                    0
+                };
+                start..start + size
+            });
+            let out = &mut data[*offset..];
+            *offset += encode_one(out, rows, range, opts)
+        });
+}
+
+/// Decodes a `GenericListViewArray` from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode_list_view<O: OffsetSizeTrait>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<GenericListViewArray<O>, ArrowError> {
+    let opts = field.options;
+
+    let mut values_bytes = 0;
+
+    let mut child_count = 0usize;
+    let mut list_sizes: Vec<O> = Vec::with_capacity(rows.len());
+
+    // First pass: count children and compute sizes
+    for row in rows.iter_mut() {
+        let mut row_offset = 0;
+        let mut list_size = 0usize;
+        loop {
+            let decoded = super::variable::decode_blocks(&row[row_offset..], 
opts, |x| {
+                values_bytes += x.len();
+            });
+            if decoded <= 1 {
+                list_sizes.push(O::usize_as(list_size));
+                break;
+            }
+            row_offset += decoded;
+            child_count += 1;
+            list_size += 1;
+        }
+    }
+    O::from_usize(child_count).expect("overflow");
+
+    let mut null_count = 0;
+    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
+        let valid = rows[x][0] != null_sentinel(opts);
+        null_count += !valid as usize;
+        valid
+    });
+
+    let mut values_offsets_vec = Vec::with_capacity(child_count);
+    let mut values_bytes = Vec::with_capacity(values_bytes);
+    for row in rows.iter_mut() {
+        let mut row_offset = 0;
+        loop {
+            let decoded = super::variable::decode_blocks(&row[row_offset..], 
opts, |x| {
+                values_bytes.extend_from_slice(x)
+            });
+            row_offset += decoded;
+            if decoded <= 1 {
+                break;
+            }
+            values_offsets_vec.push(values_bytes.len());
+        }
+        *row = &row[row_offset..];
+    }
+
+    if opts.descending {
+        values_bytes.iter_mut().for_each(|o| *o = !*o);
+    }
+
+    let mut last_value_offset = 0;
+    let mut child_rows: Vec<_> = values_offsets_vec
+        .into_iter()
+        .map(|offset| {
+            let v = &values_bytes[last_value_offset..offset];
+            last_value_offset = offset;
+            v
+        })
+        .collect();
+
+    let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) 
}?;
+    assert_eq!(child.len(), 1);
+
+    let child_data = child[0].to_data();
+
+    // Technically ListViews don't have to have offsets follow each other 
precisely, but can be
+    // reused. However, because we cannot preserve that sharing within the row 
format, this is the
+    // best we can do.
+    let mut list_offsets: Vec<O> = Vec::with_capacity(rows.len());
+    let mut current_offset = O::usize_as(0);
+    for size in &list_sizes {
+        list_offsets.push(current_offset);
+        current_offset += *size;
+    }
+
+    // Since RowConverter flattens certain data types (i.e. Dictionary),
+    // we need to use updated data type instead of original field
+    let corrected_inner_field = match &field.data_type {
+        DataType::ListView(inner_field) | DataType::LargeListView(inner_field) 
=> Arc::new(
+            inner_field
+                .as_ref()
+                .clone()
+                .with_data_type(child_data.data_type().clone()),
+        ),
+        _ => unreachable!(),
+    };
+
+    // SAFETY: null_count was computed correctly when building the nulls 
buffer above
+    let null_buffer = unsafe {
+        NullBuffer::new_unchecked(BooleanBuffer::new(nulls.into(), 0, 
rows.len()), null_count)
+    };
+
+    GenericListViewArray::try_new(
+        corrected_inner_field,
+        ScalarBuffer::from(list_offsets),
+        ScalarBuffer::from(list_sizes),
+        child[0].clone(),
+        Some(null_buffer).filter(|n| n.null_count() > 0),
+    )
+}

Reply via email to