This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new ae6f651  Support ARRAY type with ORC-style flattened columnar storage 
(#52)
ae6f651 is described below

commit ae6f651cc3748e43759288a3ae799139d776d5b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jun 6 22:17:37 2026 +0800

    Support ARRAY type with ORC-style flattened columnar storage (#52)
    
    ARRAY<T> columns are stored as two independent streams:
    - Lengths column (INT32): number of elements per row, benefits from 
DICT/CONST encoding
    - Values sub-bucket (type T): all elements flattened across all rows, 
benefits from DICT/CONST encoding
    
    Nested arrays (ARRAY<ARRAY<INT>>) are supported via recursive decomposition.
    All leaf element values share a single dictionary across all arrays in the 
column.
---
 core/src/bucket_reader.rs                          | 203 ++++-
 core/src/bucket_writer.rs                          | 611 ++++++++++++---
 core/src/reader.rs                                 | 212 ++++-
 core/src/types.rs                                  |  35 +
 core/src/writer.rs                                 |  14 +-
 core/tests/array_type_test.rs                      | 856 +++++++++++++++++++++
 core/tests/gen_fixtures.rs                         | 218 ++++++
 core/tests/testdata/v1_no_array.mosaic             | Bin 0 -> 158 bytes
 core/tests/testdata/v1_with_array.mosaic           | Bin 0 -> 130 bytes
 cpp/test_mosaic.cpp                                | 186 ++++-
 docs/cpp-api.html                                  |  38 +
 docs/design.html                                   | 159 +++-
 docs/java-api.html                                 |  42 +
 docs/python-api.html                               |  24 +
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  |  91 +++
 python/tests/test_mosaic.py                        |  93 +++
 16 files changed, 2600 insertions(+), 182 deletions(-)

diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 115bae4..f81165e 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -342,9 +342,87 @@ fn build_array(
     })
 }
 
+pub fn reassemble_list_columns_pub(
+    arrays: &mut [ArrayRef],
+    children: &[ChildColumnMeta],
+    logical_type_refs: &[&DataType],
+    num_primary: usize,
+    num_rows: usize,
+) {
+    let logical_types: Vec<DataType> = logical_type_refs.iter().map(|t| 
(*t).clone()).collect();
+    reassemble_list_columns(arrays, children, &logical_types, num_primary, 
num_rows);
+}
+
+fn reassemble_list_columns(
+    arrays: &mut [ArrayRef],
+    children: &[ChildColumnMeta],
+    logical_types: &[DataType],
+    _num_primary: usize,
+    num_rows: usize,
+) {
+    for child in children.iter().rev() {
+        let phys_idx = child.physical_index;
+        let parent = child.parent_logical_col;
+        let values = arrays[phys_idx].clone();
+
+        // Find the lengths column for this child:
+        // - If the previous physical column (phys_idx - 1) is a child with 
the same parent,
+        //   this is a nested child and lengths are at phys_idx - 1.
+        // - Otherwise, this is a first-level child and lengths are at the 
parent primary column.
+        let is_nested = children
+            .iter()
+            .any(|c| c.physical_index == phys_idx - 1 && c.parent_logical_col 
== parent);
+        let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
+
+        let lengths = arrays[lengths_idx].clone();
+        let lengths_rows = lengths.len();
+
+        let element_field = child.element_field.clone();
+        let reassembled = reassemble_list_array(lengths, values, 
element_field, lengths_rows);
+        arrays[lengths_idx] = reassembled;
+    }
+
+    // Handle ALL_NULL list columns (where no children were created because 
all null)
+    for (i, lt) in logical_types.iter().enumerate() {
+        if matches!(lt, DataType::List(_)) && !children.iter().any(|c| 
c.parent_logical_col == i) {
+            arrays[i] = arrow_array::new_null_array(lt, num_rows);
+        }
+    }
+}
+
+fn reassemble_list_array(
+    lengths: ArrayRef,
+    values: ArrayRef,
+    element_field: Arc<Field>,
+    num_rows: usize,
+) -> ArrayRef {
+    let lengths_arr = lengths
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .expect("list lengths must be Int32Array");
+
+    let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
+    offsets.push(0);
+    for i in 0..num_rows {
+        let len = if lengths_arr.is_null(i) {
+            0
+        } else {
+            lengths_arr.value(i)
+        };
+        offsets.push(offsets.last().unwrap() + len);
+    }
+
+    let null_buf = lengths_arr.nulls().cloned();
+    let offset_buf = OffsetBuffer::new(ScalarBuffer::from(offsets));
+    Arc::new(ListArray::new(element_field, offset_buf, values, null_buf))
+}
+
+use crate::bucket_writer::{expand_col_types, ChildColumnMeta};
+
 pub struct BucketReader {
     data: Vec<u8>,
-    num_columns: usize,
+    num_primary: usize,
+    total_columns: usize,
     num_rows: usize,
     col_types: Vec<DataType>,
 
@@ -355,23 +433,49 @@ pub struct BucketReader {
     dict_values: Vec<Vec<Value>>,
     dict_bit_widths: Vec<usize>,
     data_cursors: Vec<usize>,
+
+    logical_types: Vec<DataType>,
+    children: Vec<ChildColumnMeta>,
+    child_num_rows: Vec<usize>,
 }
 
 impl BucketReader {
+    fn col_num_rows(&self, col: usize) -> usize {
+        if col < self.num_primary {
+            self.num_rows
+        } else {
+            let child_idx = col - self.num_primary;
+            if child_idx < self.child_num_rows.len() {
+                self.child_num_rows[child_idx]
+            } else {
+                0
+            }
+        }
+    }
+
     pub fn new(col_types: Vec<DataType>, data: Vec<u8>, num_rows: usize) -> 
io::Result<Self> {
-        let num_columns = col_types.len();
+        let logical_types = col_types.clone();
+        let num_primary = col_types.len();
+        let col_refs: Vec<&DataType> = col_types.iter().collect();
+        let (physical_types, children) = expand_col_types(&col_refs);
+        let total_columns = physical_types.len();
+
         let mut reader = BucketReader {
             data,
-            num_columns,
+            num_primary,
+            total_columns,
             num_rows,
-            col_types,
-            encodings: vec![0; num_columns],
-            has_nulls: vec![false; num_columns],
+            col_types: physical_types,
+            encodings: vec![0; total_columns],
+            has_nulls: vec![false; total_columns],
             null_bitmaps: Vec::new(),
             const_values: Vec::new(),
             dict_values: Vec::new(),
-            dict_bit_widths: vec![0; num_columns],
-            data_cursors: vec![0; num_columns],
+            dict_bit_widths: vec![0; total_columns],
+            data_cursors: vec![0; total_columns],
+            logical_types,
+            children,
+            child_num_rows: Vec::new(),
         };
         reader.init()?;
         Ok(reader)
@@ -391,16 +495,32 @@ impl BucketReader {
     }
 
     fn init(&mut self) -> io::Result<()> {
-        self.null_bitmaps = vec![Vec::new(); self.num_columns];
-        self.const_values = vec![Value::Null; self.num_columns];
-        self.dict_values = vec![Vec::new(); self.num_columns];
+        self.null_bitmaps = vec![Vec::new(); self.total_columns];
+        self.const_values = vec![Value::Null; self.total_columns];
+        self.dict_values = vec![Vec::new(); self.total_columns];
+
+        if self.num_rows == 0 || self.data.is_empty() {
+            return Ok(());
+        }
 
         let mut pos = 0;
 
+        // Header: only present when ARRAY columns exist (backward compatible 
with v1)
+        let has_children = !self.children.is_empty();
+        if has_children {
+            let _num_primary = varint::decode(&self.data, &mut pos)? as usize;
+            let num_children = varint::decode(&self.data, &mut pos)? as usize;
+            self.child_num_rows = Vec::with_capacity(num_children);
+            for _ in 0..num_children {
+                self.child_num_rows
+                    .push(varint::decode(&self.data, &mut pos)? as usize);
+            }
+        }
+
         // 1. Encoding flags (2 bits per column)
-        let encoding_flags_bytes = (self.num_columns * 2).div_ceil(8);
+        let encoding_flags_bytes = (self.total_columns * 2).div_ceil(8);
         self.check_bounds(pos, encoding_flags_bytes)?;
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             let byte_idx = (i * 2) / 8;
             let bit_idx = (i * 2) % 8;
             self.encodings[i] = (self.data[pos + byte_idx] >> bit_idx) & 0x03;
@@ -408,15 +528,15 @@ impl BucketReader {
         pos += encoding_flags_bytes;
 
         // 2. Has-nulls flags (1 bit per column)
-        let has_nulls_bytes = self.num_columns.div_ceil(8);
+        let has_nulls_bytes = self.total_columns.div_ceil(8);
         self.check_bounds(pos, has_nulls_bytes)?;
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             self.has_nulls[i] = (self.data[pos + i / 8] & (1 << (i % 8))) != 0;
         }
         pos += has_nulls_bytes;
 
         // 3. CONST metadata
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if self.encodings[i] == ENCODING_CONST {
                 let (value, size) = self.read_value_at(&self.col_types[i], 
pos)?;
                 self.const_values[i] = value;
@@ -425,7 +545,7 @@ impl BucketReader {
         }
 
         // 4. DICT metadata
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if self.encodings[i] == ENCODING_DICT {
                 let num_entries = varint::decode(&self.data, &mut pos)? as 
usize;
                 self.dict_bit_widths[i] = bit_width(num_entries);
@@ -439,10 +559,10 @@ impl BucketReader {
             }
         }
 
-        // 5. Null bitmaps
-        let null_bitmap_bytes = self.num_rows.div_ceil(8);
-        for i in 0..self.num_columns {
+        // 5. Null bitmaps (per-column row count)
+        for i in 0..self.total_columns {
             if self.has_nulls[i] && self.encodings[i] != ENCODING_ALL_NULL {
+                let null_bitmap_bytes = self.col_num_rows(i).div_ceil(8);
                 self.check_bounds(pos, null_bitmap_bytes)?;
                 self.null_bitmaps[i] = self.data[pos..pos + 
null_bitmap_bytes].to_vec();
                 pos += null_bitmap_bytes;
@@ -450,7 +570,7 @@ impl BucketReader {
         }
 
         // 6. Record column data start offsets, skip past data
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             self.data_cursors[i] = pos;
             if self.encodings[i] == ENCODING_PLAIN {
                 let w = types::fixed_width(&self.col_types[i]);
@@ -487,35 +607,37 @@ impl BucketReader {
     }
 
     fn count_non_null(&self, col: usize) -> usize {
+        let col_rows = self.col_num_rows(col);
         if !self.has_nulls[col] {
-            return self.num_rows;
+            return col_rows;
         }
         if self.encodings[col] == ENCODING_ALL_NULL {
             return 0;
         }
         let bitmap = &self.null_bitmaps[col];
-        let full_bytes = self.num_rows / 8;
+        let full_bytes = col_rows / 8;
         let mut null_count = 0usize;
         for byte in bitmap.iter().take(full_bytes) {
             null_count += (*byte as u32).count_ones() as usize;
         }
-        let remaining = self.num_rows % 8;
+        let remaining = col_rows % 8;
         if remaining > 0 {
             let mask = (1u8 << remaining) - 1;
             null_count += (bitmap[full_bytes] & mask).count_ones() as usize;
         }
-        self.num_rows - null_count
+        col_rows - null_count
     }
 
     pub fn read_all_columns(&self) -> io::Result<Vec<ArrayRef>> {
-        let num_rows = self.num_rows;
-        let mut result = Vec::with_capacity(self.num_columns);
+        // Read all N+C physical columns
+        let mut all_arrays: Vec<ArrayRef> = 
Vec::with_capacity(self.total_columns);
 
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
+            let col_rows = self.col_num_rows(i);
             let variant = data_variant_for_type(&self.col_types[i]);
 
             if self.encodings[i] == ENCODING_ALL_NULL {
-                result.push(build_all_null_array(&self.col_types[i], 
num_rows));
+                all_arrays.push(build_all_null_array(&self.col_types[i], 
col_rows));
                 continue;
             }
 
@@ -529,7 +651,7 @@ impl BucketReader {
             let data = match self.encodings[i] {
                 ENCODING_CONST => read_all_const(
                     &self.const_values[i],
-                    num_rows,
+                    col_rows,
                     has_nulls,
                     &self.null_bitmaps[i],
                     variant,
@@ -539,7 +661,7 @@ impl BucketReader {
                     self.data_cursors[i],
                     &self.dict_values[i],
                     self.dict_bit_widths[i],
-                    num_rows,
+                    col_rows,
                     has_nulls,
                     &self.null_bitmaps[i],
                     variant,
@@ -548,7 +670,7 @@ impl BucketReader {
                     &self.data,
                     self.data_cursors[i],
                     &self.col_types[i],
-                    num_rows,
+                    col_rows,
                     has_nulls,
                     &self.null_bitmaps[i],
                     variant,
@@ -556,20 +678,29 @@ impl BucketReader {
                 _ => empty_raw_data_for_type(&self.col_types[i]),
             };
 
-            result.push(build_array(
+            all_arrays.push(build_array(
                 data,
                 &self.col_types[i],
                 null_bitmap,
-                num_rows,
+                col_rows,
             )?);
         }
 
-        Ok(result)
+        reassemble_list_columns(
+            &mut all_arrays,
+            &self.children,
+            &self.logical_types,
+            self.num_primary,
+            self.num_rows,
+        );
+
+        // Return only the primary (logical) columns
+        Ok(all_arrays.into_iter().take(self.num_primary).collect())
     }
 }
 
 pub struct ColumnPageReader {
-    col_type: DataType,
+    pub(crate) col_type: DataType,
     encoding: u8,
     has_nulls: bool,
     const_value: Value,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 501e6ba..4bfd606 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -17,9 +17,11 @@
 
 use std::collections::HashMap;
 use std::io;
+use std::sync::Arc;
 
 use arrow_array::*;
-use arrow_schema::DataType;
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, 
ScalarBuffer};
+use arrow_schema::{DataType, Field};
 
 use crate::spec::*;
 use crate::types;
@@ -31,21 +33,30 @@ pub struct PagedBucketOutput {
     pub has_nulls: Vec<bool>,
     pub const_data: Vec<Vec<u8>>,
     pub column_pages: Vec<Option<Vec<u8>>>,
+    pub num_primary: usize,
+    pub children: Vec<ChildColumnMeta>,
+}
+
+#[derive(Clone)]
+pub struct ChildColumnMeta {
+    pub parent_logical_col: usize,
+    pub physical_index: usize,
+    pub element_field: Arc<Field>,
+    pub num_elements: usize,
 }
 
 pub struct BucketWriter {
-    num_columns: usize,
+    num_primary: usize,
+    total_columns: usize,
     fixed_widths: Vec<i32>,
 
     null_bitmaps: Vec<Vec<u8>>,
     value_buffers: Vec<Vec<u8>>,
     non_null_counts: Vec<usize>,
 
-    // CONST tracking
     const_tracking: Vec<bool>,
     first_value_len: Vec<usize>,
 
-    // Dict tracking: fixed-width <=8 uses u64 keys, variable-width uses byte 
keys
     long_dict_maps: Vec<Option<HashMap<u64, usize>>>,
     byte_dict_maps: Vec<Option<HashMap<Vec<u8>, usize>>>,
     dict_total_bytes: Vec<usize>,
@@ -53,6 +64,7 @@ pub struct BucketWriter {
     max_dict_entries: usize,
 
     num_rows: usize,
+    children: Vec<ChildColumnMeta>,
 }
 
 impl BucketWriter {
@@ -61,13 +73,14 @@ impl BucketWriter {
         max_dict_total_bytes: usize,
         max_dict_entries: usize,
     ) -> Self {
-        let num_columns = col_types.len();
-        let fixed_widths: Vec<i32> = col_types.iter().map(|t| 
types::fixed_width(t)).collect();
-
-        let mut long_dict_maps = Vec::with_capacity(num_columns);
-        let mut byte_dict_maps = Vec::with_capacity(num_columns);
-
-        for fw in fixed_widths.iter().take(num_columns) {
+        let num_primary = col_types.len();
+        let (physical_types, children) = expand_col_types(col_types);
+        let total_columns = physical_types.len();
+        let fixed_widths: Vec<i32> = 
physical_types.iter().map(types::fixed_width).collect();
+
+        let mut long_dict_maps = Vec::with_capacity(total_columns);
+        let mut byte_dict_maps = Vec::with_capacity(total_columns);
+        for fw in &fixed_widths {
             if uses_long_dict(*fw) {
                 long_dict_maps.push(Some(HashMap::new()));
                 byte_dict_maps.push(None);
@@ -78,19 +91,40 @@ impl BucketWriter {
         }
 
         BucketWriter {
-            num_columns,
+            num_primary,
+            total_columns,
             fixed_widths,
-            null_bitmaps: vec![vec![0u8; 128]; num_columns],
-            value_buffers: vec![Vec::with_capacity(1024); num_columns],
-            non_null_counts: vec![0; num_columns],
-            const_tracking: vec![true; num_columns],
-            first_value_len: vec![0; num_columns],
+            null_bitmaps: vec![vec![0u8; 128]; total_columns],
+            value_buffers: vec![Vec::with_capacity(1024); total_columns],
+            non_null_counts: vec![0; total_columns],
+            const_tracking: vec![true; total_columns],
+            first_value_len: vec![0; total_columns],
             long_dict_maps,
             byte_dict_maps,
-            dict_total_bytes: vec![0; num_columns],
+            dict_total_bytes: vec![0; total_columns],
             max_dict_total_bytes,
             max_dict_entries,
             num_rows: 0,
+            children,
+        }
+    }
+
+    pub fn num_primary(&self) -> usize {
+        self.num_primary
+    }
+
+    pub fn children(&self) -> &[ChildColumnMeta] {
+        &self.children
+    }
+
+    fn col_num_rows(&self, col: usize) -> usize {
+        if col < self.num_primary {
+            self.num_rows
+        } else {
+            self.children
+                .iter()
+                .find(|c| c.physical_index == col)
+                .map_or(0, |c| c.num_elements)
         }
     }
 
@@ -111,7 +145,7 @@ impl BucketWriter {
         arrays: &[&dyn Array],
         data_types: &[&DataType],
     ) -> io::Result<usize> {
-        debug_assert_eq!(arrays.len(), self.num_columns);
+        debug_assert_eq!(arrays.len(), self.num_primary);
         let num_new_rows = arrays[0].len();
         if num_new_rows == 0 {
             return Ok(0);
@@ -119,12 +153,76 @@ impl BucketWriter {
         let start_row = self.num_rows;
         let mut total_size = 0;
 
-        for i in 0..self.num_columns {
-            total_size += self.append_array_column(i, arrays[i], 
data_types[i], start_row)?;
+        // Split List arrays into lengths + values, collect child writes
+        let mut list_splits: Vec<(usize, Int32Array, ArrayRef)> = Vec::new();
+        for child in &self.children {
+            let col = child.parent_logical_col;
+            if list_splits.iter().any(|(c, _, _)| *c == col) {
+                continue; // already split this column (nested lists share 
parent)
+            }
+            let list_array = arrays[col]
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, 
"expected ListArray"))?;
+            let lengths = extract_list_lengths(list_array);
+            let values = flatten_list_values(list_array);
+            list_splits.push((col, lengths, values));
+        }
+
+        // Write primary columns (lengths for ARRAY cols, regular data for 
others)
+        let int32_dt = DataType::Int32;
+        for i in 0..self.num_primary {
+            if let Some(split) = list_splits.iter().find(|(col, _, _)| *col == 
i) {
+                total_size += self.append_array_column(i, &split.1, &int32_dt, 
start_row)?;
+            } else {
+                total_size += self.append_array_column(i, arrays[i], 
data_types[i], start_row)?;
+            }
+        }
+
+        // Recursively flatten all list levels and write to child columns
+        let mut pending: Vec<(usize, ArrayRef)> = Vec::new(); // (child_index, 
values)
+        for split in &list_splits {
+            let parent = split.0;
+            // Find the first child for this parent
+            if let Some(child_idx) = self
+                .children
+                .iter()
+                .position(|c| c.parent_logical_col == parent)
+            {
+                pending.push((child_idx, split.2.clone()));
+            }
+        }
+
+        while let Some((child_idx, values)) = pending.pop() {
+            if child_idx >= self.children.len() || values.is_empty() {
+                continue;
+            }
+            let phys_idx = self.children[child_idx].physical_index;
+            let child_start = self.children[child_idx].num_elements;
+
+            if let DataType::List(_) = values.data_type() {
+                let inner_list = 
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+                    io::Error::new(io::ErrorKind::InvalidInput, "expected 
ListArray")
+                })?;
+                let inner_lengths = extract_list_lengths(inner_list);
+                let inner_values = flatten_list_values(inner_list);
+                total_size +=
+                    self.append_array_column(phys_idx, &inner_lengths, 
&int32_dt, child_start)?;
+                self.children[child_idx].num_elements += inner_lengths.len();
+                // Queue the inner values for the next child
+                if child_idx + 1 < self.children.len() {
+                    pending.push((child_idx + 1, inner_values));
+                }
+            } else {
+                let elem_dt = 
self.children[child_idx].element_field.data_type().clone();
+                total_size +=
+                    self.append_array_column(phys_idx, values.as_ref(), 
&elem_dt, child_start)?;
+                self.children[child_idx].num_elements += values.len();
+            }
         }
 
         self.num_rows += num_new_rows;
-        total_size += num_new_rows * self.num_columns.div_ceil(8);
+        total_size += num_new_rows * self.num_primary.div_ceil(8);
         Ok(total_size)
     }
 
@@ -374,14 +472,15 @@ impl BucketWriter {
     }
 
     fn compute_encodings(&self) -> (Vec<u8>, Vec<bool>) {
-        let mut encodings = vec![0u8; self.num_columns];
-        let mut has_nulls = vec![false; self.num_columns];
-        for i in 0..self.num_columns {
+        let mut encodings = vec![0u8; self.total_columns];
+        let mut has_nulls = vec![false; self.total_columns];
+        for i in 0..self.total_columns {
+            let col_rows = self.col_num_rows(i);
             if self.non_null_counts[i] == 0 {
                 encodings[i] = ENCODING_ALL_NULL;
             } else if self.const_tracking[i] {
                 encodings[i] = ENCODING_CONST;
-                has_nulls[i] = self.non_null_counts[i] < self.num_rows;
+                has_nulls[i] = self.non_null_counts[i] < col_rows;
             } else {
                 let dict_size = self.get_dict_size(i);
                 if dict_size >= 2
@@ -392,7 +491,7 @@ impl BucketWriter {
                 } else {
                     encodings[i] = ENCODING_PLAIN;
                 }
-                has_nulls[i] = self.non_null_counts[i] < self.num_rows;
+                has_nulls[i] = self.non_null_counts[i] < col_rows;
             }
         }
         (encodings, has_nulls)
@@ -406,88 +505,92 @@ impl BucketWriter {
 
         let (encodings, has_nulls) = self.compute_encodings();
 
-        let out_size = self.compute_out_size(&encodings, &has_nulls);
-        let mut out = vec![0u8; out_size];
-        let mut pos = 0;
+        let mut out = Vec::new();
+
+        // Header: only written when ARRAY columns exist (backward compatible 
with v1)
+        if !self.children.is_empty() {
+            varint::encode(&mut out, self.num_primary as u32);
+            varint::encode(&mut out, self.children.len() as u32);
+            for child in &self.children {
+                varint::encode(&mut out, child.num_elements as u32);
+            }
+        }
 
         // Encoding flags: 2 bits per column
-        let encoding_flags_bytes = (self.num_columns * 2).div_ceil(8);
-        for i in 0..self.num_columns {
+        let encoding_flags_bytes = (self.total_columns * 2).div_ceil(8);
+        let ef_start = out.len();
+        out.resize(ef_start + encoding_flags_bytes, 0);
+        for i in 0..self.total_columns {
             let byte_idx = (i * 2) / 8;
             let bit_idx = (i * 2) % 8;
-            out[pos + byte_idx] |= encodings[i] << bit_idx;
+            out[ef_start + byte_idx] |= encodings[i] << bit_idx;
         }
-        pos += encoding_flags_bytes;
 
         // Has-nulls flags: 1 bit per column
-        let has_nulls_bytes = self.num_columns.div_ceil(8);
-        for i in 0..self.num_columns {
+        let has_nulls_bytes = self.total_columns.div_ceil(8);
+        let hn_start = out.len();
+        out.resize(hn_start + has_nulls_bytes, 0);
+        for i in 0..self.total_columns {
             if has_nulls[i] {
-                out[pos + i / 8] |= 1 << (i % 8);
+                out[hn_start + i / 8] |= 1 << (i % 8);
             }
         }
-        pos += has_nulls_bytes;
 
         // CONST metadata
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if encodings[i] == ENCODING_CONST {
                 let len = self.first_value_len[i];
-                out[pos..pos + 
len].copy_from_slice(&self.value_buffers[i][..len]);
-                pos += len;
+                out.extend_from_slice(&self.value_buffers[i][..len]);
             }
         }
 
         // Dict metadata
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if encodings[i] == ENCODING_DICT {
                 if let Some(ref dict) = self.long_dict_maps[i] {
                     let num_entries = dict.len();
-                    pos = varint::encode_to_slice(&mut out, pos, num_entries 
as u32);
+                    varint::encode(&mut out, num_entries as u32);
                     let w = self.fixed_widths[i];
                     let mut keys = vec![0u64; num_entries];
                     for (&key, &idx) in dict {
                         keys[idx] = key;
                     }
                     for key in &keys {
-                        write_fixed_key_to_slice(&mut out, &mut pos, *key, w);
+                        write_fixed_key_to_vec(&mut out, *key, w);
                     }
                 } else if let Some(ref dict) = self.byte_dict_maps[i] {
                     let num_entries = dict.len();
-                    pos = varint::encode_to_slice(&mut out, pos, num_entries 
as u32);
+                    varint::encode(&mut out, num_entries as u32);
                     let mut keys: Vec<(&Vec<u8>, &usize)> = 
dict.iter().collect();
                     keys.sort_by_key(|&(_, idx)| *idx);
                     for (key, _) in keys {
-                        out[pos..pos + key.len()].copy_from_slice(key);
-                        pos += key.len();
+                        out.extend_from_slice(key);
                     }
                 }
             }
         }
 
-        // Null bitmaps
-        let null_bitmap_bytes = self.num_rows.div_ceil(8);
-        for i in 0..self.num_columns {
+        // Null bitmaps (per-column row count)
+        for i in 0..self.total_columns {
             if has_nulls[i] && encodings[i] != ENCODING_ALL_NULL {
-                out[pos..pos + null_bitmap_bytes]
-                    
.copy_from_slice(&self.null_bitmaps[i][..null_bitmap_bytes]);
-                pos += null_bitmap_bytes;
+                let nbytes = self.col_num_rows(i).div_ceil(8);
+                out.extend_from_slice(&self.null_bitmaps[i][..nbytes]);
             }
         }
 
         // Column data
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if encodings[i] == ENCODING_PLAIN {
-                let len = self.value_buffers[i].len();
-                out[pos..pos + len].copy_from_slice(&self.value_buffers[i]);
-                pos += len;
+                out.extend_from_slice(&self.value_buffers[i]);
             } else if encodings[i] == ENCODING_DICT {
-                let data_start = pos;
-                let bit_offset = self.write_dict_bit_packed(i, &mut out, 
data_start);
-                pos += bit_offset.div_ceil(8);
+                let bw = bit_width(self.get_dict_size(i));
+                let packed_bytes = (self.non_null_counts[i] * bw).div_ceil(8);
+                let data_start = out.len();
+                out.resize(data_start + packed_bytes, 0);
+                self.write_dict_bit_packed(i, &mut out, data_start);
             }
         }
 
-        debug_assert_eq!(pos, out.len());
         out
     }
 
@@ -499,16 +602,20 @@ impl BucketWriter {
                 has_nulls: Vec::new(),
                 const_data: Vec::new(),
                 column_pages: Vec::new(),
+                num_primary: self.num_primary,
+                children: self.children.clone(),
             };
         }
 
         let (encodings, has_nulls) = self.compute_encodings();
-        let null_bitmap_bytes = self.num_rows.div_ceil(8);
 
-        let mut const_data = vec![Vec::new(); self.num_columns];
-        let mut column_pages: Vec<Option<Vec<u8>>> = vec![None; 
self.num_columns];
+        let mut const_data = vec![Vec::new(); self.total_columns];
+        let mut column_pages: Vec<Option<Vec<u8>>> = vec![None; 
self.total_columns];
+
+        for i in 0..self.total_columns {
+            let col_rows = self.col_num_rows(i);
+            let null_bitmap_bytes = col_rows.div_ceil(8);
 
-        for i in 0..self.num_columns {
             match encodings[i] {
                 ENCODING_ALL_NULL => {}
                 ENCODING_CONST => {
@@ -520,7 +627,6 @@ impl BucketWriter {
                 }
                 ENCODING_DICT => {
                     let mut page = Vec::new();
-                    // Dict table
                     if let Some(ref dict) = self.long_dict_maps[i] {
                         let num_entries = dict.len();
                         varint::encode(&mut page, num_entries as u32);
@@ -541,11 +647,9 @@ impl BucketWriter {
                             page.extend_from_slice(key);
                         }
                     }
-                    // Null bitmap
                     if has_nulls[i] {
                         
page.extend_from_slice(&self.null_bitmaps[i][..null_bitmap_bytes]);
                     }
-                    // Bit-packed indices
                     let dict_size = self.get_dict_size(i);
                     let bw = bit_width(dict_size);
                     let packed_bytes = (self.non_null_counts[i] * 
bw).div_ceil(8);
@@ -571,11 +675,13 @@ impl BucketWriter {
             has_nulls,
             const_data,
             column_pages,
+            num_primary: self.num_primary,
+            children: self.children.clone(),
         }
     }
 
     pub fn reset(&mut self) {
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             for b in &mut self.null_bitmaps[i] {
                 *b = 0;
             }
@@ -597,6 +703,9 @@ impl BucketWriter {
             }
         }
         self.num_rows = 0;
+        for child in &mut self.children {
+            child.num_elements = 0;
+        }
     }
 
     fn get_dict_size(&self, col: usize) -> usize {
@@ -616,7 +725,7 @@ impl BucketWriter {
         let mut bit_offset = 0usize;
         let mut val_pos = 0usize;
 
-        for r in 0..self.num_rows {
+        for r in 0..self.col_num_rows(col) {
             let is_null = (self.null_bitmaps[col][r / 8] & (1 << (r % 8))) != 
0;
             if !is_null {
                 let idx = if let Some(ref dict) = self.long_dict_maps[col] {
@@ -659,15 +768,24 @@ impl BucketWriter {
     }
 
     fn compute_out_size(&self, encodings: &[u8], has_nulls: &[bool]) -> usize {
-        let null_bitmap_bytes = self.num_rows.div_ceil(8);
-        let mut size = (self.num_columns * 2).div_ceil(8) + 
self.num_columns.div_ceil(8);
+        // Header: varint(num_primary) + varint(num_children) + varint per 
child
+        let mut size = 0;
+        if !self.children.is_empty() {
+            size += varint::encoded_size(self.num_primary as u32)
+                + varint::encoded_size(self.children.len() as u32);
+            for child in &self.children {
+                size += varint::encoded_size(child.num_elements as u32);
+            }
+        }
+
+        size += (self.total_columns * 2).div_ceil(8) + 
self.total_columns.div_ceil(8);
 
-        for i in 0..self.num_columns {
+        for i in 0..self.total_columns {
             if encodings[i] == ENCODING_ALL_NULL {
                 continue;
             }
             if has_nulls[i] {
-                size += null_bitmap_bytes;
+                size += self.col_num_rows(i).div_ceil(8);
             }
             match encodings[i] {
                 ENCODING_CONST => {
@@ -926,6 +1044,284 @@ fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
     bytes[start..].to_vec()
 }
 
+fn extract_list_lengths(list_array: &ListArray) -> Int32Array {
+    let offsets = list_array.value_offsets();
+    let num_rows = list_array.len();
+    let mut lengths = Vec::with_capacity(num_rows);
+    for i in 0..num_rows {
+        if list_array.is_null(i) {
+            lengths.push(0);
+        } else {
+            lengths.push(offsets[i + 1] - offsets[i]);
+        }
+    }
+    let null_buf = list_array.nulls().cloned();
+    Int32Array::new(ScalarBuffer::from(lengths), null_buf)
+}
+
+fn flatten_list_values(list_array: &ListArray) -> ArrayRef {
+    let offsets = list_array.value_offsets();
+    let values = list_array.values();
+    let num_rows = list_array.len();
+
+    if list_array.null_count() == 0 {
+        let start = offsets[0] as usize;
+        let end = offsets[num_rows] as usize;
+        return values.slice(start, end - start);
+    }
+
+    // Skip child values for null rows — collect only non-null row ranges
+    let mut indices: Vec<u32> = Vec::new();
+    for i in 0..num_rows {
+        if !list_array.is_null(i) {
+            let start = offsets[i] as u32;
+            let end = offsets[i + 1] as u32;
+            for idx in start..end {
+                indices.push(idx);
+            }
+        }
+    }
+
+    if indices.is_empty() {
+        return values.slice(0, 0);
+    }
+
+    let idx_array = UInt32Array::from(indices);
+    take_array(values.as_ref(), &idx_array)
+}
+
+fn take_array(array: &dyn Array, indices: &UInt32Array) -> ArrayRef {
+    use arrow_array::builder::*;
+    macro_rules! take_prim {
+        ($arr_ty:ty, $bld_ty:ty) => {{
+            let src = array.as_any().downcast_ref::<$arr_ty>().unwrap();
+            let mut b = <$bld_ty>::with_capacity(indices.len());
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            Arc::new(b.finish()) as ArrayRef
+        }};
+    }
+    match array.data_type() {
+        DataType::Boolean => take_prim!(BooleanArray, BooleanBuilder),
+        DataType::Int8 => take_prim!(Int8Array, Int8Builder),
+        DataType::Int16 => take_prim!(Int16Array, Int16Builder),
+        DataType::Int32 => take_prim!(Int32Array, Int32Builder),
+        DataType::Int64 => take_prim!(Int64Array, Int64Builder),
+        DataType::Float32 => take_prim!(Float32Array, Float32Builder),
+        DataType::Float64 => take_prim!(Float64Array, Float64Builder),
+        DataType::Date32 => take_prim!(Date32Array, Date32Builder),
+        DataType::Time32(_) => take_prim!(Time32MillisecondArray, 
Time32MillisecondBuilder),
+        DataType::Decimal128(p, s) => {
+            let src = 
array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+            let mut b = Decimal128Builder::new()
+                .with_precision_and_scale(*p, *s)
+                .unwrap();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            Arc::new(b.finish()) as ArrayRef
+        }
+        DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => {
+            let src = array
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap();
+            let mut b = TimestampMillisecondBuilder::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            let arr = b.finish();
+            Arc::new(if let Some(tz) = tz {
+                arr.with_timezone(tz.clone())
+            } else {
+                arr
+            })
+        }
+        DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
+            let src = array
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap();
+            let mut b = TimestampMicrosecondBuilder::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            let arr = b.finish();
+            Arc::new(if let Some(tz) = tz {
+                arr.with_timezone(tz.clone())
+            } else {
+                arr
+            })
+        }
+        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+            let src = array
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap();
+            let mut b = TimestampNanosecondBuilder::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            let arr = b.finish();
+            Arc::new(if let Some(tz) = tz {
+                arr.with_timezone(tz.clone())
+            } else {
+                arr
+            })
+        }
+        DataType::Utf8 => {
+            let src = array.as_any().downcast_ref::<StringArray>().unwrap();
+            let mut b = StringBuilder::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            Arc::new(b.finish())
+        }
+        DataType::Binary => {
+            let src = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+            let mut b = BinaryBuilder::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                if src.is_null(idx) {
+                    b.append_null();
+                } else {
+                    b.append_value(src.value(idx));
+                }
+            }
+            Arc::new(b.finish())
+        }
+        DataType::List(_) => {
+            // For nested arrays, rebuild by collecting slices
+            let src = array.as_any().downcast_ref::<ListArray>().unwrap();
+            let mut offsets_builder = vec![0i32];
+            let mut child_indices: Vec<u32> = Vec::new();
+            for i in 0..indices.len() {
+                let idx = indices.value(i) as usize;
+                let start = src.value_offsets()[idx] as u32;
+                let end = src.value_offsets()[idx + 1] as u32;
+                for ci in start..end {
+                    child_indices.push(ci);
+                }
+                offsets_builder.push(child_indices.len() as i32);
+            }
+            let child_idx_arr = UInt32Array::from(child_indices);
+            let new_values = take_array(src.values().as_ref(), &child_idx_arr);
+            let field = match array.data_type() {
+                DataType::List(f) => f.clone(),
+                _ => unreachable!(),
+            };
+            let null_buf = if !indices.is_empty() {
+                let mut bm = vec![0u8; indices.len().div_ceil(8)];
+                for i in 0..indices.len() {
+                    let idx = indices.value(i) as usize;
+                    if !src.is_null(idx) {
+                        bm[i / 8] |= 1 << (i % 8);
+                    }
+                }
+                if bm.iter().all(|&b| b == 0xFF) || indices.is_empty() {
+                    None
+                } else {
+                    Some(NullBuffer::new(BooleanBuffer::new(
+                        Buffer::from_vec(bm),
+                        0,
+                        indices.len(),
+                    )))
+                }
+            } else {
+                None
+            };
+            Arc::new(ListArray::new(
+                field,
+                OffsetBuffer::new(ScalarBuffer::from(offsets_builder)),
+                new_values,
+                null_buf,
+            ))
+        }
+        other => panic!("take_array: unsupported DataType {:?}", other),
+    }
+}
+
+pub(crate) fn expand_col_types(col_types: &[&DataType]) -> (Vec<DataType>, 
Vec<ChildColumnMeta>) {
+    let mut physical_types: Vec<DataType> = col_types
+        .iter()
+        .map(|t| {
+            if matches!(t, DataType::List(_)) {
+                DataType::Int32
+            } else {
+                (*t).clone()
+            }
+        })
+        .collect();
+    let mut children = Vec::new();
+
+    for (i, t) in col_types.iter().enumerate() {
+        if let DataType::List(element_field) = t {
+            expand_child(i, element_field, &mut physical_types, &mut children);
+        }
+    }
+    (physical_types, children)
+}
+
+fn expand_child(
+    parent_logical: usize,
+    element_field: &Arc<Field>,
+    physical_types: &mut Vec<DataType>,
+    children: &mut Vec<ChildColumnMeta>,
+) {
+    let elem_dt = element_field.data_type();
+    let child_phys_idx = physical_types.len();
+
+    if let DataType::List(inner_field) = elem_dt {
+        physical_types.push(DataType::Int32);
+        children.push(ChildColumnMeta {
+            parent_logical_col: parent_logical,
+            physical_index: child_phys_idx,
+            element_field: element_field.clone(),
+            num_elements: 0,
+        });
+        expand_child(parent_logical, inner_field, physical_types, children);
+    } else {
+        physical_types.push(elem_dt.clone());
+        children.push(ChildColumnMeta {
+            parent_logical_col: parent_logical,
+            physical_index: child_phys_idx,
+            element_field: element_field.clone(),
+            num_elements: 0,
+        });
+    }
+}
+
 fn uses_long_dict(fixed_width: i32) -> bool {
     fixed_width > 0 && fixed_width <= 8
 }
@@ -966,35 +1362,15 @@ fn write_fixed_key_to_vec(buf: &mut Vec<u8>, key: u64, 
width: i32) {
     }
 }
 
-fn write_fixed_key_to_slice(buf: &mut [u8], pos: &mut usize, key: u64, width: 
i32) {
-    match width {
-        1 => {
-            buf[*pos] = key as u8;
-            *pos += 1;
-        }
-        2 => {
-            let bytes = (key as u16).to_be_bytes();
-            buf[*pos..*pos + 2].copy_from_slice(&bytes);
-            *pos += 2;
-        }
-        4 => {
-            let bytes = (key as u32).to_be_bytes();
-            buf[*pos..*pos + 4].copy_from_slice(&bytes);
-            *pos += 4;
-        }
-        8 => {
-            let bytes = key.to_be_bytes();
-            buf[*pos..*pos + 8].copy_from_slice(&bytes);
-            *pos += 8;
-        }
-        _ => {}
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
 
+    fn header_size(_data: &[u8]) -> usize {
+        // No header for non-ARRAY buckets (v1 compatible)
+        0
+    }
+
     #[test]
     fn test_all_null_encoding() {
         let types = [DataType::Int32];
@@ -1006,7 +1382,8 @@ mod tests {
 
         let data = writer.finish();
         assert!(!data.is_empty());
-        assert_eq!(data[0] & 0x03, ENCODING_ALL_NULL);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_ALL_NULL);
     }
 
     #[test]
@@ -1019,7 +1396,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_CONST);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_CONST);
     }
 
     #[test]
@@ -1033,7 +1411,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_DICT);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_DICT);
     }
 
     #[test]
@@ -1047,7 +1426,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_PLAIN);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_PLAIN);
     }
 
     #[test]
@@ -1060,7 +1440,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Utf8]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_CONST);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_CONST);
     }
 
     #[test]
@@ -1074,7 +1455,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Utf8]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_DICT);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_DICT);
     }
 
     #[test]
@@ -1090,7 +1472,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_CONST);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_CONST);
     }
 
     #[test]
@@ -1106,7 +1489,8 @@ mod tests {
         writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_DICT);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_DICT);
     }
 
     #[test]
@@ -1126,7 +1510,8 @@ mod tests {
         writer.write_columns(&[&second], &[&types[0]]).unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_DICT);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_DICT);
     }
 
     #[test]
@@ -1148,9 +1533,10 @@ mod tests {
             .unwrap();
 
         let data = writer.finish();
-        assert_eq!(data[0] & 0x03, ENCODING_ALL_NULL);
-        assert_eq!((data[0] >> 2) & 0x03, ENCODING_CONST);
-        assert_eq!((data[0] >> 4) & 0x03, ENCODING_DICT);
+        let h = header_size(&data);
+        assert_eq!(data[h] & 0x03, ENCODING_ALL_NULL);
+        assert_eq!((data[h] >> 2) & 0x03, ENCODING_CONST);
+        assert_eq!((data[h] >> 4) & 0x03, ENCODING_DICT);
     }
 
     #[test]
@@ -1171,6 +1557,7 @@ mod tests {
         let arr2 = Int32Array::from(vals);
         writer.write_columns(&[&arr2], &[&DataType::Int32]).unwrap();
         let data2 = writer.finish();
-        assert_eq!(data2[0] & 0x03, ENCODING_PLAIN);
+        let h2 = header_size(&data2);
+        assert_eq!(data2[h2] & 0x03, ENCODING_PLAIN);
     }
 }
diff --git a/core/src/reader.rs b/core/src/reader.rs
index e111b9d..8ed6b39 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -458,6 +458,14 @@ impl<I: InputFile> MosaicReader<I> {
         let page_content = zstd::bulk::decompress(compressed_data, 
uncompressed_size)
             .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
 
+        Self::parse_simple_column_slot(page_content, col_type, num_rows)
+    }
+
+    fn parse_simple_column_slot(
+        page_content: Vec<u8>,
+        col_type: &DataType,
+        num_rows: usize,
+    ) -> io::Result<ColumnPageReader> {
         if page_content.len() < 2 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
@@ -615,7 +623,18 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                             "paged bucket requires ZSTD compression",
                         ));
                     }
-                    let dir_size = self.schema.bucket_to_global[b].len() * 4;
+                    let bucket_col_refs: Vec<&DataType> = 
self.schema.bucket_to_global[b]
+                        .iter()
+                        .map(|&gi| &self.schema.columns[gi].data_type)
+                        .collect();
+                    let (bucket_phys, bucket_children) =
+                        
crate::bucket_writer::expand_col_types(&bucket_col_refs);
+                    let child_header_len = if bucket_children.is_empty() {
+                        0
+                    } else {
+                        2 + bucket_children.len() * 4
+                    };
+                    let dir_size = child_header_len + bucket_phys.len() * 4;
                     if dir_size > total_size {
                         return Err(io::Error::new(
                             io::ErrorKind::InvalidData,
@@ -659,7 +678,9 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
         let mut r2_group_infos: Vec<Vec<PagedSlotInfo>> = Vec::new();
 
         // Per-bucket directory parse results (slot_sizes, slot_file_offsets) 
for paged buckets
-        let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> = 
vec![None; self.num_buckets];
+        // (slot_sizes, slot_file_offsets, child_element_counts)
+        type PagedDirInfo = (Vec<usize>, Vec<u64>, Vec<usize>);
+        let mut paged_dir_info: Vec<Option<PagedDirInfo>> = vec![None; 
self.num_buckets];
         let mut partial_paged_buckets: Vec<usize> = Vec::new();
 
         for (ri, &b) in r1_bucket_ids.iter().enumerate() {
@@ -690,19 +711,58 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                 }
                 BucketLayout::Paged { total_size } => {
                     let global_indices = &self.schema.bucket_to_global[b];
-                    let num_columns = global_indices.len();
+                    let col_type_refs: Vec<&DataType> = global_indices
+                        .iter()
+                        .map(|&gi| &self.schema.columns[gi].data_type)
+                        .collect();
+                    let (phys_types, bucket_children) =
+                        crate::bucket_writer::expand_col_types(&col_type_refs);
+                    let num_columns = phys_types.len();
 
-                    // Parse directory
+                    // Parse fixed-size child header (only when ARRAY columns 
exist)
+                    let (hdr_len, child_element_counts) = if 
bucket_children.is_empty() {
+                        (0, Vec::new())
+                    } else {
+                        if buf.len() < 2 {
+                            return Err(io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "paged bucket: too short for child header",
+                            ));
+                        }
+                        let nc = u16::from_le_bytes([buf[0], buf[1]]) as usize;
+                        let hl = 2 + nc * 4;
+                        if buf.len() < hl {
+                            return Err(io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "paged bucket: truncated child header",
+                            ));
+                        }
+                        let mut counts = Vec::with_capacity(nc);
+                        for ci in 0..nc {
+                            let off = 2 + ci * 4;
+                            counts
+                                .push(u32::from_le_bytes(buf[off..off + 
4].try_into().unwrap())
+                                    as usize);
+                        }
+                        (hl, counts)
+                    };
+
+                    // Parse directory (after header)
+                    if buf.len() < hdr_len + num_columns * 4 {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "paged bucket: truncated directory",
+                        ));
+                    }
                     let mut slot_sizes = Vec::with_capacity(num_columns);
                     for i in 0..num_columns {
-                        let off = i * 4;
+                        let off = hdr_len + i * 4;
                         let size =
                             u32::from_le_bytes(buf[off..off + 
4].try_into().unwrap()) as usize;
                         slot_sizes.push(size);
                     }
 
-                    // Validate: directory + slots must exactly equal 
total_size
-                    let dir_size = num_columns * 4;
+                    let dir_size = hdr_len + num_columns * 4;
                     let slot_total: usize = slot_sizes.iter().sum();
                     if dir_size + slot_total != total_size {
                         return Err(io::Error::new(
@@ -715,14 +775,18 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                     }
 
                     if all_projected_in_bucket[b] {
-                        // All columns projected — we already read the full 
bucket in round 1,
-                        // parse all slots directly without a second 
read_ranges call.
+                        let num_primary_in_bucket = global_indices.len();
                         let mut column_readers: Vec<Option<ColumnPageReader>> =
                             Vec::with_capacity(num_columns);
                         let mut data_offset = dir_size;
                         for i in 0..num_columns {
-                            let gi = global_indices[i];
-                            let col_type = 
self.schema.columns[gi].data_type.clone();
+                            let col_type = phys_types[i].clone();
+                            let col_rows = if i < num_primary_in_bucket {
+                                meta.num_rows
+                            } else {
+                                let child_idx = i - num_primary_in_bucket;
+                                
child_element_counts.get(child_idx).copied().unwrap_or(0)
+                            };
 
                             if slot_sizes[i] == 0 {
                                 column_readers.push(Some(ColumnPageReader::new(
@@ -731,12 +795,12 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                                     false,
                                     Value::Null,
                                     Vec::new(),
-                                    meta.num_rows,
+                                    col_rows,
                                 )?));
                             } else {
                                 let slot_data = &buf[data_offset..data_offset 
+ slot_sizes[i]];
                                 let column_reader =
-                                    Self::parse_column_slot(slot_data, 
&col_type, meta.num_rows)?;
+                                    Self::parse_column_slot(slot_data, 
&col_type, col_rows)?;
                                 column_readers.push(Some(column_reader));
                             }
                             data_offset += slot_sizes[i];
@@ -753,11 +817,26 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                             foff += size as u64;
                         }
 
+                        let num_primary_in_bucket = global_indices.len();
                         let mut projected_cols: Vec<usize> = Vec::new();
                         for i in 0..num_columns {
-                            let gi = global_indices[i];
-                            if projected[gi] && slot_sizes[i] > 0 {
-                                projected_cols.push(i);
+                            if i < num_primary_in_bucket {
+                                let gi = global_indices[i];
+                                if projected[gi] && slot_sizes[i] > 0 {
+                                    projected_cols.push(i);
+                                }
+                            } else {
+                                // Child column: project if parent is projected
+                                let child_idx = i - num_primary_in_bucket;
+                                if child_idx < bucket_children.len() {
+                                    let parent = 
bucket_children[child_idx].parent_logical_col;
+                                    if parent < num_primary_in_bucket {
+                                        let gi = global_indices[parent];
+                                        if projected[gi] && slot_sizes[i] > 0 {
+                                            projected_cols.push(i);
+                                        }
+                                    }
+                                }
                             }
                         }
 
@@ -783,7 +862,8 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                             }]);
                         }
 
-                        paged_dir_info[b] = Some((slot_sizes, 
slot_file_offsets));
+                        paged_dir_info[b] =
+                            Some((slot_sizes, slot_file_offsets, 
child_element_counts.clone()));
                         partial_paged_buckets.push(b);
                     }
                 }
@@ -808,15 +888,19 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
             let mut slot_locations: Vec<Vec<Option<SlotLocation>>> =
                 Vec::with_capacity(self.num_buckets);
             for b in 0..self.num_buckets {
-                let n = self.schema.bucket_to_global[b].len();
-                slot_locations.push((0..n).map(|_| None).collect());
+                let col_refs: Vec<&DataType> = self.schema.bucket_to_global[b]
+                    .iter()
+                    .map(|&gi| &self.schema.columns[gi].data_type)
+                    .collect();
+                let (phys, _) = 
crate::bucket_writer::expand_col_types(&col_refs);
+                slot_locations.push((0..phys.len()).map(|_| None).collect());
             }
 
             for (group_idx, group) in r2_group_infos.iter().enumerate() {
                 let buf = r2_buffers[group_idx].as_slice();
                 let group_base = r2_ranges[group_idx].0;
                 for info in group {
-                    let (slot_sizes, slot_file_offsets) =
+                    let (slot_sizes, slot_file_offsets, _) =
                         paged_dir_info[info.bucket_id].as_ref().unwrap();
                     let rel_start = (slot_file_offsets[info.col_idx] - 
group_base) as usize;
                     let slot_len = slot_sizes[info.col_idx];
@@ -844,19 +928,46 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
             // need round 2 IO, but still need readers when projected.
             for &b in &partial_paged_buckets {
                 let global_indices = &self.schema.bucket_to_global[b];
-                let num_columns = global_indices.len();
-                let (slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
+                let col_refs: Vec<&DataType> = global_indices
+                    .iter()
+                    .map(|&gi| &self.schema.columns[gi].data_type)
+                    .collect();
+                let (phys_types_b, children_b) = 
crate::bucket_writer::expand_col_types(&col_refs);
+                let num_columns = phys_types_b.len();
+                let num_primary_b = global_indices.len();
+                let (slot_sizes, _, child_elem_counts) = 
paged_dir_info[b].as_ref().unwrap();
+                // DEBUG removed
 
                 let mut column_readers: Vec<Option<ColumnPageReader>> =
                     Vec::with_capacity(num_columns);
                 for i in 0..num_columns {
-                    let gi = global_indices[i];
-                    if !projected[gi] {
+                    let is_projected = if i < num_primary_b {
+                        let gi = global_indices[i];
+                        projected[gi]
+                    } else {
+                        let child_idx = i - num_primary_b;
+                        if child_idx < children_b.len() {
+                            let parent = 
children_b[child_idx].parent_logical_col;
+                            parent < num_primary_b && 
projected[global_indices[parent]]
+                        } else {
+                            false
+                        }
+                    };
+
+                    if !is_projected {
                         column_readers.push(None);
                         continue;
                     }
 
-                    let col_type = self.schema.columns[gi].data_type.clone();
+                    let col_type = phys_types_b[i].clone();
+                    let col_rows = if i < num_primary_b {
+                        meta.num_rows
+                    } else {
+                        child_elem_counts
+                            .get(i - num_primary_b)
+                            .copied()
+                            .unwrap_or(0)
+                    };
 
                     if slot_sizes[i] == 0 {
                         column_readers.push(Some(ColumnPageReader::new(
@@ -865,7 +976,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                             false,
                             Value::Null,
                             Vec::new(),
-                            meta.num_rows,
+                            col_rows,
                         )?));
                         continue;
                     }
@@ -878,8 +989,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
                     })?;
                     let group_buffer = 
r2_buffers[location.group_idx].as_slice();
                     let slot_data = 
&group_buffer[location.start..location.start + location.len];
-                    let column_reader =
-                        Self::parse_column_slot(slot_data, &col_type, 
meta.num_rows)?;
+                    let column_reader = Self::parse_column_slot(slot_data, 
&col_type, col_rows)?;
                     column_readers.push(Some(column_reader));
                 }
                 bucket_states[b] = Some(BucketState::Paged { column_readers });
@@ -979,12 +1089,54 @@ impl RowGroupReader {
 
             match state {
                 BucketState::Paged { column_readers } => {
+                    let col_type_refs: Vec<&DataType> = global_indices
+                        .iter()
+                        .map(|&gi| &self.schema.columns[gi].data_type)
+                        .collect();
+                    let (phys_types, bucket_children) =
+                        crate::bucket_writer::expand_col_types(&col_type_refs);
+
+                    // Read all physical columns (N+C)
+                    let mut phys_arrays: Vec<ArrayRef> = Vec::new();
+                    for (idx, cr_opt) in column_readers.iter().enumerate() {
+                        if let Some(ref cr) = cr_opt {
+                            phys_arrays.push(cr.read_all()?);
+                        } else {
+                            let dt = 
phys_types.get(idx).unwrap_or(&DataType::Int32);
+                            let rows = if idx < global_indices.len() {
+                                self.num_rows
+                            } else {
+                                0
+                            };
+                            phys_arrays.push(arrow_array::new_null_array(dt, 
rows));
+                        }
+                    }
+
+                    // Only reassemble projected ARRAY parents
+                    let projected_children: Vec<_> = bucket_children
+                        .iter()
+                        .filter(|c| {
+                            c.parent_logical_col < global_indices.len()
+                                && 
self.projected_columns[global_indices[c.parent_logical_col]]
+                        })
+                        .cloned()
+                        .collect();
+
+                    crate::bucket_reader::reassemble_list_columns_pub(
+                        &mut phys_arrays,
+                        &projected_children,
+                        &col_type_refs,
+                        global_indices.len(),
+                        self.num_rows,
+                    );
+
+                    // Map logical columns to global array positions
                     for (local_idx, &global_idx) in 
global_indices.iter().enumerate() {
                         if !self.projected_columns[global_idx] {
                             continue;
                         }
-                        if let Some(ref cr) = column_readers[local_idx] {
-                            arrays[global_idx] = Some(cr.read_all()?);
+                        if local_idx < phys_arrays.len() {
+                            arrays[global_idx] = 
Some(phys_arrays[local_idx].clone());
                         }
                     }
                 }
diff --git a/core/src/types.rs b/core/src/types.rs
index 736c4c5..9326c57 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -103,6 +103,14 @@ pub fn validate_data_type(dt: &DataType) -> Result<(), 
String> {
             _ => Err(format!("unsupported Timestamp unit: {:?}", unit)),
         },
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 
Ok(()),
+        DataType::List(field) => {
+            if let DataType::Struct(fields) = field.data_type() {
+                if is_timestamp_nanos_struct(fields) {
+                    return Err("ARRAY<legacy timestamp nanos struct> is not 
supported".to_string());
+                }
+            }
+            validate_data_type(field.data_type())
+        }
         _ => Err(format!("unsupported DataType: {:?}", dt)),
     }
 }
@@ -124,6 +132,7 @@ pub fn data_type_to_type_byte(dt: &DataType) -> u8 {
         DataType::Timestamp(_, None) => 16,
         DataType::Timestamp(_, Some(_)) => 17,
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 16,
+        DataType::List(_) => 18,
         _ => panic!("unsupported DataType for serialization: {:?}", dt),
     }
 }
@@ -177,6 +186,12 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => {
             varint::encode(buf, 9u32);
         }
+        DataType::List(element_field) => {
+            let name_bytes = element_field.name().as_bytes();
+            varint::encode(buf, name_bytes.len() as u32);
+            buf.extend_from_slice(name_bytes);
+            serialize_field(element_field, buf);
+        }
         _ => {}
     }
 }
@@ -258,6 +273,26 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut 
usize) -> Result<Fiel
                 DataType::Timestamp(TimeUnit::Nanosecond, Some(tz))
             }
         }
+        18 => {
+            let name_len = varint::decode(buf, pos)? as usize;
+            if *pos + name_len > buf.len() {
+                return Err(std::io::Error::new(
+                    std::io::ErrorKind::UnexpectedEof,
+                    "type: not enough bytes for ARRAY element field name",
+                ));
+            }
+            let element_name = std::str::from_utf8(&buf[*pos..*pos + name_len])
+                .map_err(|_| {
+                    std::io::Error::new(
+                        std::io::ErrorKind::InvalidData,
+                        "type: invalid UTF-8 in ARRAY element field name",
+                    )
+                })?
+                .to_string();
+            *pos += name_len;
+            let element_field = deserialize_field(&element_name, buf, pos)?;
+            DataType::List(std::sync::Arc::new(element_field))
+        }
         _ => {
             return Err(std::io::Error::new(
                 std::io::ErrorKind::InvalidData,
diff --git a/core/src/writer.rs b/core/src/writer.rs
index cea1582..59bd2a9 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -462,8 +462,20 @@ impl<S: OutputFile> MosaicWriter<S> {
             column_slots.push(slot);
         }
 
+        // Write child element counts header only when ARRAY columns exist
+        let child_header_len = if paged.children.is_empty() {
+            0
+        } else {
+            let num_children = paged.children.len() as u16;
+            self.out.write(&num_children.to_le_bytes())?;
+            for child in &paged.children {
+                self.out.write(&(child.num_elements as u32).to_le_bytes())?;
+            }
+            2 + paged.children.len() * 4
+        };
+
         // Write fixed-length directory: num_columns * 4 bytes (u32 LE per 
column = slot size)
-        let dir_size = num_columns * 4;
+        let dir_size = child_header_len + num_columns * 4;
         let mut total_size = dir_size;
         for slot in &column_slots {
             let slot_size = to_u32(slot.len(), "paged slot size")?;
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
new file mode 100644
index 0000000..ad899e2
--- /dev/null
+++ b/core/tests/array_type_test.rs
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(
+    clippy::cloned_ref_to_slice_refs,
+    clippy::unnecessary_cast,
+    clippy::field_reassign_with_default
+)]
+
+use std::io;
+use std::sync::Arc;
+
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, 
ScalarBuffer};
+
+use arrow_array::builder::*;
+use arrow_array::*;
+use arrow_schema::{DataType, Field, Schema};
+use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+struct MemOutputFile {
+    pub buf: Vec<u8>,
+}
+
+impl MemOutputFile {
+    fn new() -> Self {
+        Self { buf: Vec::new() }
+    }
+}
+
+impl OutputFile for MemOutputFile {
+    fn write(&mut self, data: &[u8]) -> io::Result<()> {
+        self.buf.extend_from_slice(data);
+        Ok(())
+    }
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+    fn pos(&self) -> u64 {
+        self.buf.len() as u64
+    }
+}
+
+struct ByteArrayInputFile {
+    data: Vec<u8>,
+}
+
+impl InputFile for ByteArrayInputFile {
+    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+        let start = offset as usize;
+        let end = start + buf.len();
+        if end > self.data.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "read past end",
+            ));
+        }
+        buf.copy_from_slice(&self.data[start..end]);
+        Ok(())
+    }
+}
+
+fn roundtrip(schema: &Schema, batches: &[RecordBatch]) -> Vec<RecordBatch> {
+    roundtrip_with_options(schema, batches, WriterOptions::default())
+}
+
+fn roundtrip_with_options(
+    schema: &Schema,
+    batches: &[RecordBatch],
+    options: WriterOptions,
+) -> Vec<RecordBatch> {
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(out, schema, options).unwrap();
+    for batch in batches {
+        writer.write_batch(batch).unwrap();
+    }
+    writer.close().unwrap();
+
+    let data = writer.output().buf.clone();
+    let file_len = data.len() as u64;
+    let input = ByteArrayInputFile { data };
+    let reader = MosaicReader::new(input, file_len).unwrap();
+
+    let mut result = Vec::new();
+    for rg in 0..reader.num_row_groups() {
+        let mut rg_reader = reader.row_group_reader(rg).unwrap();
+        result.push(rg_reader.read_columns().unwrap());
+    }
+    result
+}
+
+#[test]
+fn test_array_int32_basic() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Int32Builder::new());
+    builder.values().append_value(1);
+    builder.values().append_value(2);
+    builder.values().append_value(3);
+    builder.append(true);
+
+    builder.values().append_value(4);
+    builder.values().append_value(5);
+    builder.append(true);
+
+    builder.append(true); // empty array
+
+    builder.append(false); // null
+
+    let array = builder.finish();
+    let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array)]).unwrap();
+
+    let result = roundtrip(&schema, &[batch.clone()]);
+    assert_eq!(result.len(), 1);
+    let result_batch = &result[0];
+    assert_eq!(result_batch.num_rows(), 4);
+
+    let result_col = result_batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    assert!(!result_col.is_null(0));
+    assert!(!result_col.is_null(1));
+    assert!(!result_col.is_null(2));
+    assert!(result_col.is_null(3));
+
+    let row0 = result_col.value(0);
+    let row0_ints = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row0_ints.len(), 3);
+    assert_eq!(row0_ints.value(0), 1);
+    assert_eq!(row0_ints.value(1), 2);
+    assert_eq!(row0_ints.value(2), 3);
+
+    let row1 = result_col.value(1);
+    let row1_ints = row1.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row1_ints.len(), 2);
+    assert_eq!(row1_ints.value(0), 4);
+    assert_eq!(row1_ints.value(1), 5);
+
+    let row2 = result_col.value(2);
+    let row2_ints = row2.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row2_ints.len(), 0);
+}
+
+#[test]
+fn test_array_with_null_elements() {
+    let element_field = Arc::new(Field::new("item", DataType::Int64, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Int64Builder::new());
+    builder.values().append_value(100);
+    builder.values().append_null();
+    builder.values().append_value(300);
+    builder.append(true);
+
+    builder.values().append_null();
+    builder.values().append_null();
+    builder.append(true);
+
+    builder.values().append_value(999);
+    builder.append(true);
+
+    let array = builder.finish();
+    let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array)]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    let row0 = result_col.value(0);
+    let row0_arr = row0.as_any().downcast_ref::<Int64Array>().unwrap();
+    assert_eq!(row0_arr.len(), 3);
+    assert_eq!(row0_arr.value(0), 100);
+    assert!(row0_arr.is_null(1));
+    assert_eq!(row0_arr.value(2), 300);
+
+    let row1 = result_col.value(1);
+    let row1_arr = row1.as_any().downcast_ref::<Int64Array>().unwrap();
+    assert_eq!(row1_arr.len(), 2);
+    assert!(row1_arr.is_null(0));
+    assert!(row1_arr.is_null(1));
+
+    let row2 = result_col.value(2);
+    let row2_arr = row2.as_any().downcast_ref::<Int64Array>().unwrap();
+    assert_eq!(row2_arr.len(), 1);
+    assert_eq!(row2_arr.value(0), 999);
+}
+
+#[test]
+fn test_array_string_elements() {
+    let element_field = Arc::new(Field::new("item", DataType::Utf8, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(StringBuilder::new());
+    builder.values().append_value("hello");
+    builder.values().append_value("world");
+    builder.append(true);
+
+    builder.values().append_null();
+    builder.values().append_value("foo");
+    builder.append(true);
+
+    builder.append(true); // empty
+
+    let array = builder.finish();
+    let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array)]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    let row0 = result_col.value(0);
+    let row0_arr = row0.as_any().downcast_ref::<StringArray>().unwrap();
+    assert_eq!(row0_arr.len(), 2);
+    assert_eq!(row0_arr.value(0), "hello");
+    assert_eq!(row0_arr.value(1), "world");
+
+    let row1 = result_col.value(1);
+    let row1_arr = row1.as_any().downcast_ref::<StringArray>().unwrap();
+    assert_eq!(row1_arr.len(), 2);
+    assert!(row1_arr.is_null(0));
+    assert_eq!(row1_arr.value(1), "foo");
+
+    let row2 = result_col.value(2);
+    let row2_arr = row2.as_any().downcast_ref::<StringArray>().unwrap();
+    assert_eq!(row2_arr.len(), 0);
+}
+
+#[test]
+fn test_array_nested_array() {
+    let inner_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let outer_field = Arc::new(Field::new(
+        "item",
+        DataType::List(inner_field.clone()),
+        true,
+    ));
+    let schema = Schema::new(vec![Field::new(
+        "nested",
+        DataType::List(outer_field.clone()),
+        true,
+    )]);
+
+    let inner_builder = ListBuilder::new(Int32Builder::new());
+    let mut outer_builder = ListBuilder::new(inner_builder);
+
+    // Row 0: [[1, 2], [3]]
+    outer_builder.values().values().append_value(1);
+    outer_builder.values().values().append_value(2);
+    outer_builder.values().append(true);
+    outer_builder.values().values().append_value(3);
+    outer_builder.values().append(true);
+    outer_builder.append(true);
+
+    // Row 1: [[4]]
+    outer_builder.values().values().append_value(4);
+    outer_builder.values().append(true);
+    outer_builder.append(true);
+
+    // Row 2: null
+    outer_builder.append(false);
+
+    let array = outer_builder.finish();
+    let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array)]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    assert!(!result_col.is_null(0));
+    assert!(!result_col.is_null(1));
+    assert!(result_col.is_null(2));
+
+    let row0 = result_col.value(0);
+    let row0_outer = row0.as_any().downcast_ref::<ListArray>().unwrap();
+    assert_eq!(row0_outer.len(), 2);
+
+    let inner0 = row0_outer.value(0);
+    let inner0_arr = inner0.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(inner0_arr.len(), 2);
+    assert_eq!(inner0_arr.value(0), 1);
+    assert_eq!(inner0_arr.value(1), 2);
+
+    let inner1 = row0_outer.value(1);
+    let inner1_arr = inner1.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(inner1_arr.len(), 1);
+    assert_eq!(inner1_arr.value(0), 3);
+
+    let row1 = result_col.value(1);
+    let row1_outer = row1.as_any().downcast_ref::<ListArray>().unwrap();
+    assert_eq!(row1_outer.len(), 1);
+    let inner2 = row1_outer.value(0);
+    let inner2_arr = inner2.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(inner2_arr.value(0), 4);
+}
+
+#[test]
+fn test_array_all_null() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Int32Builder::new());
+    builder.append(false);
+    builder.append(false);
+    builder.append(false);
+    let array = builder.finish();
+    let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array)]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(result_col.len(), 3);
+    assert!(result_col.is_null(0));
+    assert!(result_col.is_null(1));
+    assert!(result_col.is_null(2));
+}
+
+#[test]
+fn test_array_with_other_columns() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("tags", DataType::List(element_field.clone()), true),
+        Field::new("name", DataType::Utf8, true),
+    ]);
+
+    let ids = Int64Array::from(vec![1, 2, 3]);
+
+    let mut list_builder = ListBuilder::new(Int32Builder::new());
+    list_builder.values().append_value(10);
+    list_builder.values().append_value(20);
+    list_builder.append(true);
+    list_builder.append(false); // null
+    list_builder.values().append_value(30);
+    list_builder.append(true);
+    let tags = list_builder.finish();
+
+    let names = StringArray::from(vec![Some("alice"), None, Some("charlie")]);
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(ids), Arc::new(tags), Arc::new(names)],
+    )
+    .unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let rb = &result[0];
+    assert_eq!(rb.num_rows(), 3);
+
+    let result_ids = 
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+    assert_eq!(result_ids.value(0), 1);
+    assert_eq!(result_ids.value(1), 2);
+    assert_eq!(result_ids.value(2), 3);
+
+    let result_tags = 
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+    assert!(!result_tags.is_null(0));
+    assert!(result_tags.is_null(1));
+    assert!(!result_tags.is_null(2));
+
+    let row0 = result_tags.value(0);
+    let row0_arr = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row0_arr.len(), 2);
+    assert_eq!(row0_arr.value(0), 10);
+    assert_eq!(row0_arr.value(1), 20);
+
+    let result_names = 
rb.column(2).as_any().downcast_ref::<StringArray>().unwrap();
+    assert_eq!(result_names.value(0), "alice");
+    assert!(result_names.is_null(1));
+    assert_eq!(result_names.value(2), "charlie");
+}
+
+#[test]
+fn test_array_large_batch() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Int32Builder::new());
+    for i in 0..1000 {
+        if i % 10 == 0 {
+            builder.append(false); // null every 10th row
+        } else {
+            let num_elements = (i % 5) + 1;
+            for j in 0..num_elements {
+                if j == 2 && i % 3 == 0 {
+                    builder.values().append_null();
+                } else {
+                    builder.values().append_value((i * 10 + j) as i32);
+                }
+            }
+            builder.append(true);
+        }
+    }
+    let array = builder.finish();
+    let batch =
+        RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array.clone())]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    assert_eq!(result_col.len(), 1000);
+
+    for i in 0..1000 {
+        if i % 10 == 0 {
+            assert!(result_col.is_null(i), "row {} should be null", i);
+        } else {
+            assert!(!result_col.is_null(i), "row {} should not be null", i);
+            let expected = array.value(i);
+            let actual = result_col.value(i);
+            assert_eq!(&expected, &actual, "mismatch at row {}", i);
+        }
+    }
+}
+
+#[test]
+fn test_array_paged_layout() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("arr", DataType::List(element_field.clone()), true),
+    ]);
+
+    let mut list_builder = ListBuilder::new(Int32Builder::new());
+    let mut ids = Vec::new();
+    for i in 0..200 {
+        ids.push(i as i64);
+        if i % 5 == 0 {
+            list_builder.append(false);
+        } else {
+            let n = (i % 4) + 1;
+            for j in 0..n {
+                if j == 1 && i % 3 == 0 {
+                    list_builder.values().append_null();
+                } else {
+                    list_builder.values().append_value((i * 10 + j) as i32);
+                }
+            }
+            list_builder.append(true);
+        }
+    }
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![
+            Arc::new(Int64Array::from(ids.clone())),
+            Arc::new(list_builder.finish()),
+        ],
+    )
+    .unwrap();
+
+    let mut opts = WriterOptions::default();
+    opts.page_size_threshold = 1;
+
+    let result = roundtrip_with_options(&schema, &[batch.clone()], opts);
+    let rb = &result[0];
+    assert_eq!(rb.num_rows(), 200);
+
+    let result_ids = 
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+    let result_arr = 
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+
+    for i in 0..200 {
+        assert_eq!(result_ids.value(i), i as i64);
+        if i % 5 == 0 {
+            assert!(result_arr.is_null(i), "row {} should be null", i);
+        } else {
+            assert!(!result_arr.is_null(i), "row {} should not be null", i);
+        }
+    }
+}
+
+#[test]
+fn test_array_null_row_preserves_child_offsets() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    // Manually construct: row 0 = [1, 2], row 1 = null (but owns child slots 
99, 100), row 2 = [5]
+    let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 4, 5]));
+    let values = Arc::new(Int32Array::from(vec![1, 2, 99, 100, 5])) as 
ArrayRef;
+    let nulls = Some(NullBuffer::new(BooleanBuffer::new(
+        Buffer::from(vec![0b0000_0101]),
+        0,
+        3,
+    )));
+    let array = ListArray::new(element_field, offsets, values, nulls);
+    let batch =
+        RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array.clone())]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+
+    assert_eq!(result_col.len(), 3);
+    assert!(!result_col.is_null(0));
+    assert!(result_col.is_null(1));
+    assert!(!result_col.is_null(2));
+
+    let row0 = result_col.value(0);
+    let row0_arr = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row0_arr.len(), 2);
+    assert_eq!(row0_arr.value(0), 1);
+    assert_eq!(row0_arr.value(1), 2);
+
+    let row2 = result_col.value(2);
+    let row2_arr = row2.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(row2_arr.len(), 1);
+    assert_eq!(row2_arr.value(0), 5);
+}
+
+#[test]
+fn test_project_array_from_paged_bucket() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("arr", DataType::List(element_field.clone()), true),
+    ]);
+
+    let ids = Int64Array::from(vec![1, 2, 3]);
+    let mut list_builder = ListBuilder::new(Int32Builder::new());
+    list_builder.values().append_value(10);
+    list_builder.values().append_value(20);
+    list_builder.append(true);
+    list_builder.values().append_value(30);
+    list_builder.append(true);
+    list_builder.values().append_value(40);
+    list_builder.append(true);
+    let arr = list_builder.finish();
+    let batch =
+        RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ids), 
Arc::new(arr)]).unwrap();
+
+    let out = MemOutputFile::new();
+    let mut options = WriterOptions::default();
+    options.num_buckets = 1;
+    options.page_size_threshold = 1;
+    let mut writer = MosaicWriter::new(out, &schema, options).unwrap();
+    writer.write_batch(&batch).unwrap();
+    writer.close().unwrap();
+
+    let data = writer.output().buf.clone();
+    let input = ByteArrayInputFile { data: data.clone() };
+    let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+
+    // Project only the "arr" column
+    let sorted_arr_idx = reader
+        .schema()
+        .columns
+        .iter()
+        .position(|c| c.name == "arr")
+        .unwrap();
+    let mut rg_reader = reader
+        .row_group_reader_projected(0, &[sorted_arr_idx])
+        .unwrap();
+    let projected = rg_reader.read_columns().unwrap();
+    assert_eq!(projected.num_columns(), 1);
+
+    let result_arr = projected
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(result_arr.len(), 3);
+
+    let r0 = result_arr.value(0);
+    let r0a = r0.as_any().downcast_ref::<Int32Array>().unwrap();
+    assert_eq!(r0a.len(), 2);
+    assert_eq!(r0a.value(0), 10);
+    assert_eq!(r0a.value(1), 20);
+}
+
+#[test]
+fn test_array_child_dict_encoding() {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Int32Builder::new());
+    for _ in 0..10 {
+        for j in 0..20 {
+            builder.values().append_value((j % 3) as i32);
+        }
+        builder.append(true);
+    }
+    let array = builder.finish();
+    let batch =
+        RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(array.clone())]).unwrap();
+
+    let result = roundtrip(&schema, &[batch]);
+    let result_col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(result_col.len(), 10);
+    for i in 0..10 {
+        let expected = array.value(i);
+        let actual = result_col.value(i);
+        assert_eq!(&expected, &actual, "mismatch at row {}", i);
+    }
+}
+
+#[test]
+fn test_multiple_array_columns_in_bucket() {
+    let elem_i32 = Arc::new(Field::new("item", DataType::Int32, true));
+    let elem_i64 = Arc::new(Field::new("item", DataType::Int64, true));
+    let schema = Schema::new(vec![
+        Field::new("arr_a", DataType::List(elem_i32.clone()), true),
+        Field::new("arr_b", DataType::List(elem_i64.clone()), true),
+    ]);
+
+    let mut builder_a = ListBuilder::new(Int32Builder::new());
+    builder_a.values().append_value(1);
+    builder_a.values().append_value(2);
+    builder_a.append(true);
+    builder_a.append(false); // null
+    builder_a.values().append_value(3);
+    builder_a.append(true);
+
+    let mut builder_b = ListBuilder::new(Int64Builder::new());
+    builder_b.values().append_value(100);
+    builder_b.append(true);
+    builder_b.values().append_value(200);
+    builder_b.values().append_value(300);
+    builder_b.append(true);
+    builder_b.append(true); // empty
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(builder_a.finish()), Arc::new(builder_b.finish())],
+    )
+    .unwrap();
+
+    let mut opts = WriterOptions::default();
+    opts.num_buckets = 1;
+    let result = roundtrip_with_options(&schema, &[batch], opts);
+    let rb = &result[0];
+
+    let col_a = rb.column(0).as_any().downcast_ref::<ListArray>().unwrap();
+    assert_eq!(col_a.len(), 3);
+    assert!(!col_a.is_null(0));
+    assert!(col_a.is_null(1));
+    assert!(!col_a.is_null(2));
+    let a0 = col_a
+        .value(0)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(a0.len(), 2);
+    assert_eq!(a0.value(0), 1);
+    assert_eq!(a0.value(1), 2);
+    let a2 = col_a
+        .value(2)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(a2.value(0), 3);
+
+    let col_b = rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+    assert_eq!(col_b.len(), 3);
+    let b0 = col_b
+        .value(0)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(b0.value(0), 100);
+    let b1 = col_b
+        .value(1)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(b1.len(), 2);
+    assert_eq!(b1.value(0), 200);
+    assert_eq!(b1.value(1), 300);
+    let b2 = col_b
+        .value(2)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(b2.len(), 0);
+}
+
+#[test]
+fn test_array_date32_elements() {
+    let element_field = Arc::new(Field::new("item", DataType::Date32, true));
+    let schema = Schema::new(vec![Field::new(
+        "arr",
+        DataType::List(element_field.clone()),
+        true,
+    )]);
+
+    let mut builder = ListBuilder::new(Date32Builder::new());
+    builder.values().append_value(18000);
+    builder.values().append_value(19000);
+    builder.append(true);
+    builder.append(false); // null row with potential child slots
+    builder.values().append_value(20000);
+    builder.append(true);
+
+    let batch =
+        RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(builder.finish())]).unwrap();
+    let result = roundtrip(&schema, &[batch]);
+    let col = result[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(col.len(), 3);
+    assert!(col.is_null(1));
+    let r0 = col
+        .value(0)
+        .as_any()
+        .downcast_ref::<Date32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(r0.value(0), 18000);
+    assert_eq!(r0.value(1), 19000);
+    let r2 = col
+        .value(2)
+        .as_any()
+        .downcast_ref::<Date32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(r2.value(0), 20000);
+}
+
+#[test]
+fn test_project_one_array_from_multi_array_paged() {
+    let elem_i32 = Arc::new(Field::new("item", DataType::Int32, true));
+    let elem_i64 = Arc::new(Field::new("item", DataType::Int64, true));
+    let schema = Schema::new(vec![
+        Field::new("arr_a", DataType::List(elem_i32.clone()), true),
+        Field::new("arr_b", DataType::List(elem_i64.clone()), true),
+    ]);
+
+    let mut builder_a = ListBuilder::new(Int32Builder::new());
+    builder_a.values().append_value(1);
+    builder_a.values().append_value(2);
+    builder_a.append(true);
+    builder_a.values().append_value(3);
+    builder_a.append(true);
+
+    let mut builder_b = ListBuilder::new(Int64Builder::new());
+    builder_b.values().append_value(100);
+    builder_b.append(true);
+    builder_b.values().append_value(200);
+    builder_b.values().append_value(300);
+    builder_b.append(true);
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(builder_a.finish()), Arc::new(builder_b.finish())],
+    )
+    .unwrap();
+
+    let out = MemOutputFile::new();
+    let mut options = WriterOptions::default();
+    options.num_buckets = 1;
+    options.page_size_threshold = 1;
+    let mut writer = MosaicWriter::new(out, &schema, options).unwrap();
+    writer.write_batch(&batch).unwrap();
+    writer.close().unwrap();
+
+    let data = writer.output().buf.clone();
+    let input = ByteArrayInputFile { data: data.clone() };
+    let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+
+    // Project only arr_a
+    let arr_a_idx = reader
+        .schema()
+        .columns
+        .iter()
+        .position(|c| c.name == "arr_a")
+        .unwrap();
+    let mut rg = reader.row_group_reader_projected(0, &[arr_a_idx]).unwrap();
+    let projected = rg.read_columns().unwrap();
+    assert_eq!(projected.num_columns(), 1);
+    let col = projected
+        .column(0)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .unwrap();
+    assert_eq!(col.len(), 2);
+    let r0 = col
+        .value(0)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(r0.len(), 2);
+    assert_eq!(r0.value(0), 1);
+    assert_eq!(r0.value(1), 2);
+}
diff --git a/core/tests/gen_fixtures.rs b/core/tests/gen_fixtures.rs
new file mode 100644
index 0000000..2cd2df3
--- /dev/null
+++ b/core/tests/gen_fixtures.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary compatibility tests. Verifies that the current code produces
+//! byte-identical output to committed golden files, catching unintended
+//! format changes.
+
+use std::io;
+use std::sync::Arc;
+
+use arrow_array::builder::*;
+use arrow_array::*;
+use arrow_schema::{DataType, Field, Schema};
+use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+struct MemOutputFile {
+    pub buf: Vec<u8>,
+}
+impl MemOutputFile {
+    fn new() -> Self {
+        Self { buf: Vec::new() }
+    }
+}
+impl OutputFile for MemOutputFile {
+    fn write(&mut self, data: &[u8]) -> io::Result<()> {
+        self.buf.extend_from_slice(data);
+        Ok(())
+    }
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+    fn pos(&self) -> u64 {
+        self.buf.len() as u64
+    }
+}
+
+struct ByteArrayInputFile {
+    data: Vec<u8>,
+}
+impl InputFile for ByteArrayInputFile {
+    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+        let s = offset as usize;
+        buf.copy_from_slice(&self.data[s..s + buf.len()]);
+        Ok(())
+    }
+}
+
+fn golden_path(name: &str) -> std::path::PathBuf {
+    std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+        .join("tests/testdata")
+        .join(name)
+}
+
+/// Generate the deterministic non-ARRAY file.
+/// Schema: id(INT32 NOT NULL), name(UTF8), score(FLOAT64)
+/// Data: 5 rows with nulls
+/// Options: num_buckets=1, compression=none
+fn gen_no_array() -> Vec<u8> {
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+        Field::new("score", DataType::Float64, true),
+    ]);
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+            Arc::new(StringArray::from(vec![
+                Some("alice"),
+                None,
+                Some("charlie"),
+                Some("dave"),
+                Some("eve"),
+            ])),
+            Arc::new(Float64Array::from(vec![
+                Some(95.5),
+                Some(87.0),
+                None,
+                Some(72.5),
+                Some(100.0),
+            ])),
+        ],
+    )
+    .unwrap();
+
+    let out = MemOutputFile::new();
+    let opts = WriterOptions {
+        num_buckets: 1,
+        compression: 0,
+        ..WriterOptions::default()
+    };
+    let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+    writer.write_batch(&batch).unwrap();
+    writer.close().unwrap();
+    writer.output().buf.clone()
+}
+
+/// Generate the deterministic ARRAY file.
+/// Schema: id(INT32 NOT NULL), tags(ARRAY<INT32>)
+/// Data: 4 rows — [10,20,30], null, [40,50], []
+/// Options: num_buckets=1, compression=none
+fn gen_with_array() -> Vec<u8> {
+    let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("tags", DataType::List(element_field.clone()), true),
+    ]);
+
+    let ids = Int32Array::from(vec![1, 2, 3, 4]);
+    let mut list_builder = ListBuilder::new(Int32Builder::new());
+    list_builder.values().append_value(10);
+    list_builder.values().append_value(20);
+    list_builder.values().append_value(30);
+    list_builder.append(true);
+    list_builder.append(false);
+    list_builder.values().append_value(40);
+    list_builder.values().append_value(50);
+    list_builder.append(true);
+    list_builder.append(true);
+
+    let batch = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(ids), Arc::new(list_builder.finish())],
+    )
+    .unwrap();
+
+    let out = MemOutputFile::new();
+    let opts = WriterOptions {
+        num_buckets: 1,
+        compression: 0,
+        ..WriterOptions::default()
+    };
+    let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+    writer.write_batch(&batch).unwrap();
+    writer.close().unwrap();
+    writer.output().buf.clone()
+}
+
+#[test]
+fn test_v1_no_array_binary_compatible() {
+    let generated = gen_no_array();
+    let golden = std::fs::read(golden_path("v1_no_array.mosaic"))
+        .expect("golden file missing — run with MOSAIC_REGEN_FIXTURES=1 to 
regenerate");
+    assert_eq!(
+        generated, golden,
+        "non-ARRAY file differs from golden — format may have changed 
unintentionally"
+    );
+}
+
+#[test]
+fn test_v1_with_array_binary_stable() {
+    let generated = gen_with_array();
+    let golden = std::fs::read(golden_path("v1_with_array.mosaic"))
+        .expect("golden file missing — run with MOSAIC_REGEN_FIXTURES=1 to 
regenerate");
+    assert_eq!(
+        generated, golden,
+        "ARRAY file differs from golden — format may have changed 
unintentionally"
+    );
+}
+
+#[test]
+fn test_v1_no_array_golden_readable() {
+    let data = std::fs::read(golden_path("v1_no_array.mosaic")).unwrap();
+    let input = ByteArrayInputFile { data: data.clone() };
+    let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let rb = rg.read_columns().unwrap();
+    assert_eq!(rb.num_rows(), 5);
+    assert_eq!(rb.num_columns(), 3);
+}
+
+#[test]
+fn test_v1_with_array_golden_readable() {
+    let data = std::fs::read(golden_path("v1_with_array.mosaic")).unwrap();
+    let input = ByteArrayInputFile { data: data.clone() };
+    let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let rb = rg.read_columns().unwrap();
+    assert_eq!(rb.num_rows(), 4);
+
+    let tags = rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+    assert!(!tags.is_null(0));
+    assert!(tags.is_null(1));
+    assert!(!tags.is_null(2));
+    assert!(!tags.is_null(3));
+
+    let r0 = tags
+        .value(0)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(r0.len(), 3);
+    assert_eq!(r0.value(0), 10);
+
+    let r3 = tags
+        .value(3)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .unwrap()
+        .clone();
+    assert_eq!(r3.len(), 0);
+}
diff --git a/core/tests/testdata/v1_no_array.mosaic 
b/core/tests/testdata/v1_no_array.mosaic
new file mode 100644
index 0000000..3cf94c2
Binary files /dev/null and b/core/tests/testdata/v1_no_array.mosaic differ
diff --git a/core/tests/testdata/v1_with_array.mosaic 
b/core/tests/testdata/v1_with_array.mosaic
new file mode 100644
index 0000000..19dbcfb
Binary files /dev/null and b/core/tests/testdata/v1_with_array.mosaic differ
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 1d08012..4f85291 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -824,6 +824,187 @@ static void test_stats_empty_string_min() {
     printf("  PASS test_stats_empty_string_min\n");
 }
 
+static void test_array_type() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32(), false),
+        arrow::field("tags", arrow::list(arrow::field("item", 
arrow::int32()))),
+    });
+
+    arrow::Int32Builder id_b;
+    assert(id_b.Append(1).ok());
+    assert(id_b.Append(2).ok());
+    assert(id_b.Append(3).ok());
+    assert(id_b.Append(4).ok());
+
+    auto val_builder = std::make_shared<arrow::Int32Builder>();
+    arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+    // Row 0: [10, 20, 30]
+    assert(list_b.Append().ok());
+    assert(val_builder->Append(10).ok());
+    assert(val_builder->Append(20).ok());
+    assert(val_builder->Append(30).ok());
+
+    // Row 1: [40, 50]
+    assert(list_b.Append().ok());
+    assert(val_builder->Append(40).ok());
+    assert(val_builder->Append(50).ok());
+
+    // Row 2: []
+    assert(list_b.Append().ok());
+
+    // Row 3: null
+    assert(list_b.AppendNull().ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 4, {
+        id_b.Finish().ValueUnsafe(),
+        list_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+    ASSERT_EQ(rb->num_rows(), 4);
+
+    auto ids = 
std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("id"));
+    ASSERT_EQ(ids->Value(0), 1);
+    ASSERT_EQ(ids->Value(1), 2);
+    ASSERT_EQ(ids->Value(2), 3);
+    ASSERT_EQ(ids->Value(3), 4);
+
+    auto tags = 
std::static_pointer_cast<arrow::ListArray>(rb->GetColumnByName("tags"));
+    ASSERT_TRUE(!tags->IsNull(0));
+    ASSERT_TRUE(!tags->IsNull(1));
+    ASSERT_TRUE(!tags->IsNull(2));
+    ASSERT_TRUE(tags->IsNull(3));
+
+    // Row 0: [10, 20, 30]
+    auto row0 = 
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(0));
+    ASSERT_EQ(row0->length(), 3);
+    ASSERT_EQ(row0->Value(0), 10);
+    ASSERT_EQ(row0->Value(1), 20);
+    ASSERT_EQ(row0->Value(2), 30);
+
+    // Row 1: [40, 50]
+    auto row1 = 
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(1));
+    ASSERT_EQ(row1->length(), 2);
+    ASSERT_EQ(row1->Value(0), 40);
+    ASSERT_EQ(row1->Value(1), 50);
+
+    // Row 2: []
+    auto row2 = 
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(2));
+    ASSERT_EQ(row2->length(), 0);
+
+    printf("  PASS test_array_type\n");
+}
+
+static void test_array_with_null_elements() {
+    auto schema = arrow::schema({
+        arrow::field("arr", arrow::list(arrow::field("item", arrow::int64()))),
+    });
+
+    auto val_builder = std::make_shared<arrow::Int64Builder>();
+    arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+    // [100, null, 300]
+    assert(list_b.Append().ok());
+    assert(val_builder->Append(100).ok());
+    assert(val_builder->AppendNull().ok());
+    assert(val_builder->Append(300).ok());
+
+    // [null, null]
+    assert(list_b.Append().ok());
+    assert(val_builder->AppendNull().ok());
+    assert(val_builder->AppendNull().ok());
+
+    // [999]
+    assert(list_b.Append().ok());
+    assert(val_builder->Append(999).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        list_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+
+    auto arr = std::static_pointer_cast<arrow::ListArray>(rb->column(0));
+
+    auto row0 = 
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(0));
+    ASSERT_EQ(row0->length(), 3);
+    ASSERT_EQ(row0->Value(0), 100);
+    ASSERT_TRUE(row0->IsNull(1));
+    ASSERT_EQ(row0->Value(2), 300);
+
+    auto row1 = 
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(1));
+    ASSERT_EQ(row1->length(), 2);
+    ASSERT_TRUE(row1->IsNull(0));
+    ASSERT_TRUE(row1->IsNull(1));
+
+    auto row2 = 
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(2));
+    ASSERT_EQ(row2->length(), 1);
+    ASSERT_EQ(row2->Value(0), 999);
+
+    printf("  PASS test_array_with_null_elements\n");
+}
+
+static void test_array_string_elements() {
+    auto schema = arrow::schema({
+        arrow::field("arr", arrow::list(arrow::field("item", arrow::utf8()))),
+    });
+
+    auto val_builder = std::make_shared<arrow::StringBuilder>();
+    arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+    // ["hello", "world"]
+    assert(list_b.Append().ok());
+    assert(val_builder->Append("hello").ok());
+    assert(val_builder->Append("world").ok());
+
+    // [null, "foo"]
+    assert(list_b.Append().ok());
+    assert(val_builder->AppendNull().ok());
+    assert(val_builder->Append("foo").ok());
+
+    // []
+    assert(list_b.Append().ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        list_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+
+    auto arr = std::static_pointer_cast<arrow::ListArray>(rb->column(0));
+
+    auto row0 = 
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(0));
+    ASSERT_EQ(row0->length(), 2);
+    ASSERT_EQ(row0->GetString(0), "hello");
+    ASSERT_EQ(row0->GetString(1), "world");
+
+    auto row1 = 
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(1));
+    ASSERT_EQ(row1->length(), 2);
+    ASSERT_TRUE(row1->IsNull(0));
+    ASSERT_EQ(row1->GetString(1), "foo");
+
+    auto row2 = 
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(2));
+    ASSERT_EQ(row2->length(), 0);
+
+    printf("  PASS test_array_string_elements\n");
+}
+
 int main() {
     printf("Running Mosaic C++ tests...\n");
     test_basic_roundtrip();
@@ -841,6 +1022,9 @@ int main() {
     test_writer_stats_all_null();
     test_writer_stats_matches_reader();
     test_stats_empty_string_min();
-    printf("All %d tests passed.\n", 15);
+    test_array_type();
+    test_array_with_null_elements();
+    test_array_string_elements();
+    printf("All %d tests passed.\n", 18);
     return 0;
 }
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index 9b3fa4c..0d08ebb 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -146,6 +146,44 @@ g++ -std=c++17 -I include/ example.cpp \
     <span class="kw">return</span> <span class="num">0</span>;
 }</code></pre>
 
+            <h3>Writing ARRAY Columns</h3>
+            <p>
+                ARRAY types use Arrow&rsquo;s <code>ListBuilder</code>. 
Element values are
+                flattened across all rows and benefit from dictionary encoding:
+            </p>
+<pre><code><span class="kw">auto</span> schema = arrow::schema({
+    arrow::field(<span class="str">"id"</span>, arrow::int32()),
+    arrow::field(<span class="str">"tags"</span>, 
arrow::list(arrow::field(<span class="str">"item"</span>, arrow::int32()))),
+});
+
+arrow::Int32Builder id_b;
+<span class="kw">auto</span> val_builder = 
std::make_shared&lt;arrow::Int32Builder&gt;();
+arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+<span class="cmt">// Row 0: [10, 20]</span>
+id_b.Append(<span class="num">1</span>);
+list_b.Append();
+val_builder-&gt;Append(<span class="num">10</span>);
+val_builder-&gt;Append(<span class="num">20</span>);
+
+<span class="cmt">// Row 1: null</span>
+id_b.Append(<span class="num">2</span>);
+list_b.AppendNull();
+
+<span class="cmt">// Row 2: [] (empty)</span>
+id_b.Append(<span class="num">3</span>);
+list_b.Append();
+
+<span class="kw">auto</span> batch = arrow::RecordBatch::Make(schema, <span 
class="num">3</span>, {
+    id_b.Finish().ValueOrDie(),
+    list_b.Finish().ValueOrDie(),
+});
+
+<span class="cmt">// Reading back</span>
+<span class="kw">auto</span> tags = 
std::static_pointer_cast&lt;arrow::ListArray&gt;(rb-&gt;GetColumnByName(<span 
class="str">"tags"</span>));
+<span class="kw">auto</span> row0 = 
std::static_pointer_cast&lt;arrow::Int32Array&gt;(tags-&gt;value_slice(<span 
class="num">0</span>));
+<span class="cmt">// row0: [10, 20]</span></code></pre>
+
             <h2>C++ API Reference</h2>
 
             <h3>Writer Options</h3>
diff --git a/docs/design.html b/docs/design.html
index 84c5e4f..19ffd81 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -739,9 +739,10 @@ repeated numColumns times:
                     <tr><td>15</td><td>TIME</td><td>varint precision</td></tr>
                     <tr><td>16</td><td>TIMESTAMP</td><td>varint 
precision</td></tr>
                     <tr><td>17</td><td>TIMESTAMP_LTZ</td><td>varint precision, 
varint timezoneLength, bytes timezone</td></tr>
+                    <tr><td>18</td><td>ARRAY</td><td>varint nameLength, bytes 
name (element field name), TypeDescriptor (recursive element type)</td></tr>
                 </tbody>
             </table>
-            <p>Complex types (ARRAY, MAP, ROW, etc.), VARIANT, and BLOB are 
not supported.</p>
+            <p>MAP, ROW, VARIANT, and BLOB are not yet supported.</p>
 
             <!-- ============================================================ 
-->
             <h2>Value Serialization</h2>
@@ -765,6 +766,7 @@ repeated numColumns times:
                     <tr><td>TIMESTAMP (precision &gt; 6)</td><td>8 bytes 
(epoch millis) + 4 bytes (nanos of millis)</td></tr>
                     <tr><td>CHAR / VARCHAR / STRING</td><td>varint length + 
UTF-8 bytes</td></tr>
                     <tr><td>BINARY / VARBINARY / BYTES</td><td>varint length + 
raw bytes</td></tr>
+                    <tr><td>ARRAY</td><td>Flattened columnar: lengths (INT32) 
+ values column (see ARRAY Type Storage below)</td></tr>
                 </tbody>
             </table>
             <p>
@@ -777,6 +779,159 @@ repeated numColumns times:
                 those values, and legacy Struct timestamp writes reject them 
before writing.
             </p>
 
+            <!-- ============================================================ 
-->
+            <h2>ARRAY Type Storage</h2>
+            <p>
+                ARRAY columns use a <strong>flattened columnar</strong> 
storage layout.
+                Each ARRAY column is decomposed into physical columns within 
the same bucket &mdash;
+                a lengths column and a values column &mdash; both first-class 
columns that benefit from
+                standard column encoding (DICT, CONST, PLAIN).
+            </p>
+
+            <h3>Decomposition</h3>
+            <p>An <code>ARRAY&lt;T&gt;</code> column with N rows is stored 
as:</p>
+            <ol>
+                <li><strong>Lengths column</strong> (INT32, N entries): the 
number of elements in each array.
+                    Null arrays are represented by a null in the lengths 
column (contributing 0 elements).
+                    Empty arrays (<code>[]</code>) have length 0 and are 
non-null.</li>
+                <li><strong>Values column</strong> (type T, M entries): all 
elements from all rows flattened
+                    into a single contiguous column, where M = sum of all 
non-null lengths.
+                    Element-level nulls are tracked by this column&rsquo;s own 
null bitmap.</li>
+            </ol>
+            <p>
+                Both columns independently go through the standard encoding 
selection (DICT, CONST, PLAIN, ALL_NULL),
+                enabling dictionary compression of element values across all 
arrays in the column.
+            </p>
+
+            <h4>Example</h4>
+<pre><code>Input rows:  [1, 2, 3],  null,  [1, 2],  []
+
+Lengths column (INT32):    3, null, 2, 0    &larr; standard INT32 encoding 
(DICT/CONST/PLAIN)
+Values column  (INT32):    1, 2, 3, 1, 2    &larr; standard INT32 encoding 
(DICT: {1&rarr;0, 2&rarr;1, 3&rarr;2})</code></pre>
+
+            <h4>Nested Arrays</h4>
+            <p>
+                <code>ARRAY&lt;ARRAY&lt;INT&gt;&gt;</code> is stored 
recursively. The outer values column
+                is itself an <code>ARRAY&lt;INT&gt;</code>, which decomposes 
into its own lengths + values pair.
+                All leaf INT values across all nesting levels end up in a 
single INT32 column,
+                sharing one dictionary across the entire column.
+            </p>
+<pre><code>ARRAY&lt;ARRAY&lt;INT&gt;&gt; with rows:  [[1,2], [3]],  [[1,2]]
+
+Outer lengths (INT32):   2, 1                         &larr; 2 inner arrays, 1 
inner array
+Inner lengths (INT32):   2, 1, 2                      &larr; element counts of 
inner arrays
+Leaf values   (INT32):   1, 2, 3, 1, 2                &larr; all INTs in one 
column, shared DICT</code></pre>
+
+            <h3>Bucket Internal Format</h3>
+            <p>
+                ARRAY columns are expanded into <strong>physical 
columns</strong> within the same bucket.
+                An <code>ARRAY&lt;T&gt;</code> becomes two physical columns: a 
lengths column (INT32)
+                and a values column (type T). Both are first-class columns in 
the bucket &mdash;
+                they share the same encoding flags, null bitmaps, and column 
data sections
+                as all other columns. There is no sub-bucket or nested 
container.
+            </p>
+
+            <h4>Monolithic Bucket with ARRAY Columns</h4>
+<pre><code>varint   numPrimary             (number of logical/primary columns 
= N)
+varint   numChildren            (number of child value columns = C; 0 if no 
ARRAY)
+repeated C times:
+    varint  childElementCount   (total element count for each child column)
+
+[encoding flags: 2 bits &times; (N + C) columns]
+[has-nulls flags: 1 bit &times; (N + C) columns]
+[CONST metadata for all N + C columns]
+[DICT metadata for all N + C columns]
+[null bitmaps: primary columns use ceil(numRows/8), child columns use 
ceil(childElementCount/8)]
+[column data for all N + C columns]</code></pre>
+
+            <h4>Paged Bucket with ARRAY Columns</h4>
+            <p>
+                The page directory includes entries for all N + C physical 
columns.
+                Each column (including child value columns) has its own 
independently compressed slot.
+                A fixed-size child header precedes the directory:
+            </p>
+<pre><code>u16     numChildren (LE)
+repeated numChildren times:
+    u32  childElementCount (LE)
+[directory: (N + C) &times; u32 LE slot sizes]
+[column slots: each independently compressed, including child 
columns]</code></pre>
+
+            <h3>Statistics</h3>
+            <p>ARRAY columns do not support min/max statistics (no meaningful 
ordering).</p>
+
+            <h3>Design Rationale: Comparison with Parquet and ORC</h3>
+            <p>
+                There are two mainstream approaches to storing nested ARRAY 
types in columnar formats.
+                Mosaic follows the <strong>ORC-style</strong> approach 
(lengths + child columns)
+                rather than the <strong>Parquet/Dremel-style</strong> approach 
(repetition/definition levels).
+            </p>
+
+            <h4>Mosaic vs ORC</h4>
+            <p>
+                Both Mosaic and ORC decompose <code>ARRAY&lt;T&gt;</code> into 
a lengths stream and a
+                child column. The key differences:
+            </p>
+            <table>
+                <thead>
+                    <tr><th>Aspect</th><th>Mosaic</th><th>ORC</th></tr>
+                </thead>
+                <tbody>
+                    <tr>
+                        <td>Lengths encoding</td>
+                        <td>DICT / CONST / PLAIN &mdash; all-same-length 
arrays use CONST (near zero bytes)</td>
+                        <td>Run-Length Encoding (RLE v1/v2)</td>
+                    </tr>
+                    <tr>
+                        <td>Child column encoding</td>
+                        <td>DICT / CONST / PLAIN &mdash; shared dictionary 
across all arrays</td>
+                        <td>DICT / DIRECT / RLE &mdash; same cross-array 
sharing</td>
+                    </tr>
+                    <tr>
+                        <td>Column placement</td>
+                        <td>Lengths and values are physical columns within the 
same bucket</td>
+                        <td>Lengths and values are independent streams within 
the same stripe</td>
+                    </tr>
+                    <tr>
+                        <td>Null representation</td>
+                        <td>Lengths column null bitmap (shared 
infrastructure)</td>
+                        <td>Separate PRESENT stream (Boolean RLE)</td>
+                    </tr>
+                </tbody>
+            </table>
+            <p>
+                The designs are structurally equivalent. Mosaic&rsquo;s CONST 
encoding for lengths
+                can be more compact than ORC&rsquo;s RLE when all arrays have 
the same length
+                (a common case in feature vectors and fixed-size embeddings).
+            </p>
+
+            <h4>Why Not Parquet&rsquo;s Repetition/Definition Levels?</h4>
+            <p>
+                Parquet uses the Dremel encoding: regardless of nesting depth, 
each ARRAY column
+                produces a single physical column with repetition and 
definition level arrays attached
+                to every leaf value. This has the advantage of constant 
physical column count
+                (always 1 per ARRAY column), but significant disadvantages:
+            </p>
+            <ul>
+                <li><strong>Implementation complexity</strong>: The shredding 
(decomposition) and assembly
+                    (reconstruction) algorithms for rep/def levels are 
substantially more complex than
+                    the lengths + child approach, especially for multi-level 
nesting. Correct handling
+                    of nulls at different nesting levels requires careful 
tracking of definition level
+                    thresholds.</li>
+                <li><strong>Overhead for common cases</strong>: Most 
real-world schemas use at most
+                    1&ndash;2 levels of ARRAY nesting. The rep/def approach 
adds per-value overhead
+                    (two extra integers per leaf value) that only pays off at 
deep nesting levels
+                    (&ge; 3), which are rare in practice.</li>
+                <li><strong>No encoding benefit</strong>: Rep/def levels 
themselves need encoding
+                    (typically RLE or bit-packing). The lengths + child 
approach achieves equivalent
+                    compression by applying standard column encodings to both 
the lengths column
+                    (which captures the same structural information) and the 
values column.</li>
+            </ul>
+            <p>
+                The ORC-style approach was chosen for its simplicity, 
debuggability, and
+                natural fit with Mosaic&rsquo;s bucket architecture where each 
column is
+                independently encoded and compressed.
+            </p>
+
             <!-- ============================================================ 
-->
             <h2>Varint Encoding</h2>
             <p>
@@ -793,7 +948,7 @@ repeated numColumns times:
             <!-- ============================================================ 
-->
             <h2>Limitations</h2>
             <ol>
-                <li>Complex types (ARRAY, MAP, MULTISET, ROW) are not 
supported.</li>
+                <li>Complex types MAP, MULTISET, and ROW are not yet 
supported. ARRAY is supported with flattened columnar storage.</li>
                 <li>Mosaic format is designed for wide tables and may not be 
efficient for narrow tables with few columns.</li>
             </ol>
         </div>
diff --git a/docs/java-api.html b/docs/java-api.html
index f654b99..f8f207a 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -117,6 +117,48 @@
                 Nanosecond timestamps are exposed as standard Arrow 
<code>TimeUnit.NANOSECOND</code>
                 vectors while Mosaic keeps its 12-byte physical encoding 
internally.
             </p>
+            <p>
+                ARRAY types are defined using Arrow&rsquo;s 
<code>ArrowType.List</code> with a child field
+                for the element type. Nested arrays 
(<code>ARRAY&lt;ARRAY&lt;INT&gt;&gt;</code>) are supported:
+            </p>
+<pre><code><span class="ty">Field</span> elementField = <span 
class="kw">new</span> <span class="ty">Field</span>(<span 
class="str">"item"</span>,
+    <span class="ty">FieldType</span>.nullable(<span class="kw">new</span> 
<span class="ty">ArrowType.Int</span>(<span class="num">32</span>, <span 
class="kw">true</span>)), <span class="kw">null</span>);
+<span class="ty">Field</span> tagsField = <span class="kw">new</span> <span 
class="ty">Field</span>(<span class="str">"tags"</span>,
+    <span class="ty">FieldType</span>.nullable(<span 
class="ty">ArrowType.List</span>.INSTANCE), <span 
class="ty">Arrays</span>.asList(elementField));
+
+<span class="ty">Schema</span> schema = <span class="kw">new</span> <span 
class="ty">Schema</span>(<span class="ty">Arrays</span>.asList(
+    <span class="ty">Field</span>.notNullable(<span class="str">"id"</span>, 
<span class="kw">new</span> <span class="ty">ArrowType.Int</span>(<span 
class="num">32</span>, <span class="kw">true</span>)),
+    tagsField
+));</code></pre>
+            <p>
+                Writing ARRAY values uses Arrow&rsquo;s 
<code>ListVector</code> and <code>UnionListWriter</code>:
+            </p>
+<pre><code><span class="ty">ListVector</span> tags = (<span 
class="ty">ListVector</span>) root.getVector(<span class="str">"tags"</span>);
+tags.allocateNew();
+<span class="ty">UnionListWriter</span> listWriter = tags.getWriter();
+
+<span class="cmt">// Row 0: [10, 20, 30]</span>
+listWriter.setPosition(<span class="num">0</span>);
+listWriter.startList();
+listWriter.writeInt(<span class="num">10</span>);
+listWriter.writeInt(<span class="num">20</span>);
+listWriter.writeInt(<span class="num">30</span>);
+listWriter.endList();
+
+<span class="cmt">// Row 1: null</span>
+tags.setNull(<span class="num">1</span>);
+
+<span class="cmt">// Row 2: [] (empty array)</span>
+listWriter.setPosition(<span class="num">2</span>);
+listWriter.startList();
+listWriter.endList();</code></pre>
+            <p>
+                Reading uses <code>ListVector.getObject()</code>:
+            </p>
+<pre><code><span class="ty">ListVector</span> readTags = (<span 
class="ty">ListVector</span>) batch.getVector(<span class="str">"tags"</span>);
+<span class="ty">List</span>&lt;?&gt; row0 = readTags.getObject(<span 
class="num">0</span>);  <span class="cmt">// [10, 20, 30]</span>
+<span class="kw">boolean</span> isNull = readTags.isNull(<span 
class="num">1</span>);    <span class="cmt">// true</span>
+<span class="ty">List</span>&lt;?&gt; row2 = readTags.getObject(<span 
class="num">2</span>);  <span class="cmt">// [] (empty)</span></code></pre>
 
             <h3>2. Create Writer and Write Batches</h3>
             <p>
diff --git a/docs/python-api.html b/docs/python-api.html
index 7d50afe..fd849dd 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -96,6 +96,14 @@ pa_schema = pa.schema([
                 Nanosecond timestamps are exposed as standard Arrow 
<code>timestamp("ns")</code>
                 arrays while Mosaic keeps its 12-byte physical encoding 
internally.
             </p>
+            <p>
+                ARRAY types use PyArrow&rsquo;s <code>pa.list_()</code>. 
Nested arrays are supported:
+            </p>
+<pre><code>pa_schema = pa.schema([
+    pa.field(<span class="str">"id"</span>, pa.int32()),
+    pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
+    pa.field(<span class="str">"nested"</span>, pa.list_(pa.list_(pa.utf8()))),
+])</code></pre>
 
             <h3>2. Create Writer and Write Batches</h3>
             <p>
@@ -131,6 +139,22 @@ buf = io.BytesIO()
 
 data = buf.getvalue()</code></pre>
 
+            <h4>Writing ARRAY Columns</h4>
+            <p>Pass native Python lists to <code>pa.array()</code> with 
<code>pa.list_()</code> type:</p>
+<pre><code>schema = pa.schema([
+    pa.field(<span class="str">"id"</span>, pa.int32()),
+    pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
+])
+
+batch = pa.record_batch([
+    pa.array([<span class="num">1</span>, <span class="num">2</span>, <span 
class="num">3</span>], type=pa.int32()),
+    pa.array([[<span class="num">10</span>, <span class="num">20</span>], 
<span class="kw">None</span>, []], type=pa.list_(pa.int32())),
+], names=[<span class="str">"id"</span>, <span class="str">"tags"</span>])
+
+<span class="cmt"># Reading back</span>
+rb = reader.read_row_group(<span class="num">0</span>)
+tags = rb.column(<span class="str">"tags"</span>).to_pylist()  <span 
class="cmt"># [[10, 20], None, []]</span></code></pre>
+
             <h3>WriterOptions</h3>
             <table>
                 <thead>
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 5344a2d..4b909db 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -39,10 +39,13 @@ import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import org.junit.After;
@@ -1141,4 +1144,92 @@ public class MosaicRoundtripTest {
             assertEquals(10, reader.rowGroupNumRows(0));
         }
     }
+
+    @Test
+    public void testArrayType() {
+        Field elementField = new Field("item", FieldType.nullable(new 
ArrowType.Int(32, true)), null);
+        Field listField = new Field("tags", 
FieldType.nullable(ArrowType.List.INSTANCE), Arrays.asList(elementField));
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                listField
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            ListVector tags = (ListVector) root.getVector("tags");
+
+            ids.allocateNew(4);
+            tags.allocateNew();
+
+            UnionListWriter listWriter = tags.getWriter();
+
+            // Row 0: [10, 20, 30]
+            ids.set(0, 1);
+            listWriter.setPosition(0);
+            listWriter.startList();
+            listWriter.writeInt(10);
+            listWriter.writeInt(20);
+            listWriter.writeInt(30);
+            listWriter.endList();
+
+            // Row 1: [40, 50]
+            ids.set(1, 2);
+            listWriter.setPosition(1);
+            listWriter.startList();
+            listWriter.writeInt(40);
+            listWriter.writeInt(50);
+            listWriter.endList();
+
+            // Row 2: [] (empty array)
+            ids.set(2, 3);
+            listWriter.setPosition(2);
+            listWriter.startList();
+            listWriter.endList();
+
+            // Row 3: null
+            ids.set(3, 4);
+            tags.setNull(3);
+
+            root.setRowCount(4);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                assertEquals(4, batch.getRowCount());
+
+                IntVector readIds = (IntVector) batch.getVector("id");
+                ListVector readTags = (ListVector) batch.getVector("tags");
+
+                assertEquals(1, readIds.get(0));
+                assertEquals(2, readIds.get(1));
+                assertEquals(3, readIds.get(2));
+                assertEquals(4, readIds.get(3));
+
+                // Row 0: [10, 20, 30]
+                assertFalse(readTags.isNull(0));
+                java.util.List<?> row0 = readTags.getObject(0);
+                assertEquals(3, row0.size());
+                assertEquals(10, row0.get(0));
+                assertEquals(20, row0.get(1));
+                assertEquals(30, row0.get(2));
+
+                // Row 1: [40, 50]
+                assertFalse(readTags.isNull(1));
+                java.util.List<?> row1 = readTags.getObject(1);
+                assertEquals(2, row1.size());
+                assertEquals(40, row1.get(0));
+                assertEquals(50, row1.get(1));
+
+                // Row 2: []
+                assertFalse(readTags.isNull(2));
+                java.util.List<?> row2 = readTags.getObject(2);
+                assertEquals(0, row2.size());
+
+                // Row 3: null
+                assertTrue(readTags.isNull(3));
+            }
+        }
+    }
 }
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 00e68b7..de387c6 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -965,3 +965,96 @@ class TestWriter:
         with _reader_from_bytes(data) as reader:
             assert reader.num_row_groups == 1
             assert reader.row_group_num_rows(0) == 10
+
+    def test_array_type(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32(), nullable=False),
+                pa.field("tags", pa.list_(pa.int32())),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([1, 2, 3, 4], type=pa.int32()),
+                pa.array([[10, 20, 30], [40, 50], [], None], 
type=pa.list_(pa.int32())),
+            ],
+            names=["id", "tags"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 4
+
+            ids = rb.column("id").to_pylist()
+            assert ids == [1, 2, 3, 4]
+
+            tags = rb.column("tags").to_pylist()
+            assert tags[0] == [10, 20, 30]
+            assert tags[1] == [40, 50]
+            assert tags[2] == []
+            assert tags[3] is None
+
+    def test_array_with_null_elements(self):
+        pa_schema = pa.schema(
+            [pa.field("arr", pa.list_(pa.int64()))]
+        )
+
+        batch = pa.record_batch(
+            [pa.array([[100, None, 300], [None, None], [999]], 
type=pa.list_(pa.int64()))],
+            names=["arr"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            arr = rb.column("arr").to_pylist()
+            assert arr[0] == [100, None, 300]
+            assert arr[1] == [None, None]
+            assert arr[2] == [999]
+
+    def test_array_string_elements(self):
+        pa_schema = pa.schema(
+            [pa.field("arr", pa.list_(pa.utf8()))]
+        )
+
+        batch = pa.record_batch(
+            [pa.array([["hello", "world"], [None, "foo"], []], 
type=pa.list_(pa.utf8()))],
+            names=["arr"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            arr = rb.column("arr").to_pylist()
+            assert arr[0] == ["hello", "world"]
+            assert arr[1] == [None, "foo"]
+            assert arr[2] == []
+
+    def test_array_nested(self):
+        pa_schema = pa.schema(
+            [pa.field("nested", pa.list_(pa.list_(pa.int32())))]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(
+                    [[[1, 2], [3]], [[4]], None],
+                    type=pa.list_(pa.list_(pa.int32())),
+                )
+            ],
+            names=["nested"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            nested = rb.column("nested").to_pylist()
+            assert nested[0] == [[1, 2], [3]]
+            assert nested[1] == [[4]]
+            assert nested[2] is None


Reply via email to