This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new ae6f651 Support ARRAY type with ORC-style flattened columnar storage
(#52)
ae6f651 is described below
commit ae6f651cc3748e43759288a3ae799139d776d5b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jun 6 22:17:37 2026 +0800
Support ARRAY type with ORC-style flattened columnar storage (#52)
ARRAY<T> columns are stored as two independent streams:
- Lengths column (INT32): number of elements per row, benefits from
DICT/CONST encoding
- Values sub-bucket (type T): all elements flattened across all rows,
benefits from DICT/CONST encoding
Nested arrays (ARRAY<ARRAY<INT>>) are supported via recursive decomposition.
All leaf element values share a single dictionary across all arrays in the
column.
---
core/src/bucket_reader.rs | 203 ++++-
core/src/bucket_writer.rs | 611 ++++++++++++---
core/src/reader.rs | 212 ++++-
core/src/types.rs | 35 +
core/src/writer.rs | 14 +-
core/tests/array_type_test.rs | 856 +++++++++++++++++++++
core/tests/gen_fixtures.rs | 218 ++++++
core/tests/testdata/v1_no_array.mosaic | Bin 0 -> 158 bytes
core/tests/testdata/v1_with_array.mosaic | Bin 0 -> 130 bytes
cpp/test_mosaic.cpp | 186 ++++-
docs/cpp-api.html | 38 +
docs/design.html | 159 +++-
docs/java-api.html | 42 +
docs/python-api.html | 24 +
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 91 +++
python/tests/test_mosaic.py | 93 +++
16 files changed, 2600 insertions(+), 182 deletions(-)
diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 115bae4..f81165e 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -342,9 +342,87 @@ fn build_array(
})
}
+pub fn reassemble_list_columns_pub(
+ arrays: &mut [ArrayRef],
+ children: &[ChildColumnMeta],
+ logical_type_refs: &[&DataType],
+ num_primary: usize,
+ num_rows: usize,
+) {
+ let logical_types: Vec<DataType> = logical_type_refs.iter().map(|t|
(*t).clone()).collect();
+ reassemble_list_columns(arrays, children, &logical_types, num_primary,
num_rows);
+}
+
+fn reassemble_list_columns(
+ arrays: &mut [ArrayRef],
+ children: &[ChildColumnMeta],
+ logical_types: &[DataType],
+ _num_primary: usize,
+ num_rows: usize,
+) {
+ for child in children.iter().rev() {
+ let phys_idx = child.physical_index;
+ let parent = child.parent_logical_col;
+ let values = arrays[phys_idx].clone();
+
+ // Find the lengths column for this child:
+ // - If the previous physical column (phys_idx - 1) is a child with
the same parent,
+ // this is a nested child and lengths are at phys_idx - 1.
+ // - Otherwise, this is a first-level child and lengths are at the
parent primary column.
+ let is_nested = children
+ .iter()
+ .any(|c| c.physical_index == phys_idx - 1 && c.parent_logical_col
== parent);
+ let lengths_idx = if is_nested { phys_idx - 1 } else { parent };
+
+ let lengths = arrays[lengths_idx].clone();
+ let lengths_rows = lengths.len();
+
+ let element_field = child.element_field.clone();
+ let reassembled = reassemble_list_array(lengths, values,
element_field, lengths_rows);
+ arrays[lengths_idx] = reassembled;
+ }
+
+ // Handle ALL_NULL list columns (where no children were created because
all null)
+ for (i, lt) in logical_types.iter().enumerate() {
+ if matches!(lt, DataType::List(_)) && !children.iter().any(|c|
c.parent_logical_col == i) {
+ arrays[i] = arrow_array::new_null_array(lt, num_rows);
+ }
+ }
+}
+
+fn reassemble_list_array(
+ lengths: ArrayRef,
+ values: ArrayRef,
+ element_field: Arc<Field>,
+ num_rows: usize,
+) -> ArrayRef {
+ let lengths_arr = lengths
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("list lengths must be Int32Array");
+
+ let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
+ offsets.push(0);
+ for i in 0..num_rows {
+ let len = if lengths_arr.is_null(i) {
+ 0
+ } else {
+ lengths_arr.value(i)
+ };
+ offsets.push(offsets.last().unwrap() + len);
+ }
+
+ let null_buf = lengths_arr.nulls().cloned();
+ let offset_buf = OffsetBuffer::new(ScalarBuffer::from(offsets));
+ Arc::new(ListArray::new(element_field, offset_buf, values, null_buf))
+}
+
+use crate::bucket_writer::{expand_col_types, ChildColumnMeta};
+
pub struct BucketReader {
data: Vec<u8>,
- num_columns: usize,
+ num_primary: usize,
+ total_columns: usize,
num_rows: usize,
col_types: Vec<DataType>,
@@ -355,23 +433,49 @@ pub struct BucketReader {
dict_values: Vec<Vec<Value>>,
dict_bit_widths: Vec<usize>,
data_cursors: Vec<usize>,
+
+ logical_types: Vec<DataType>,
+ children: Vec<ChildColumnMeta>,
+ child_num_rows: Vec<usize>,
}
impl BucketReader {
+ fn col_num_rows(&self, col: usize) -> usize {
+ if col < self.num_primary {
+ self.num_rows
+ } else {
+ let child_idx = col - self.num_primary;
+ if child_idx < self.child_num_rows.len() {
+ self.child_num_rows[child_idx]
+ } else {
+ 0
+ }
+ }
+ }
+
pub fn new(col_types: Vec<DataType>, data: Vec<u8>, num_rows: usize) ->
io::Result<Self> {
- let num_columns = col_types.len();
+ let logical_types = col_types.clone();
+ let num_primary = col_types.len();
+ let col_refs: Vec<&DataType> = col_types.iter().collect();
+ let (physical_types, children) = expand_col_types(&col_refs);
+ let total_columns = physical_types.len();
+
let mut reader = BucketReader {
data,
- num_columns,
+ num_primary,
+ total_columns,
num_rows,
- col_types,
- encodings: vec![0; num_columns],
- has_nulls: vec![false; num_columns],
+ col_types: physical_types,
+ encodings: vec![0; total_columns],
+ has_nulls: vec![false; total_columns],
null_bitmaps: Vec::new(),
const_values: Vec::new(),
dict_values: Vec::new(),
- dict_bit_widths: vec![0; num_columns],
- data_cursors: vec![0; num_columns],
+ dict_bit_widths: vec![0; total_columns],
+ data_cursors: vec![0; total_columns],
+ logical_types,
+ children,
+ child_num_rows: Vec::new(),
};
reader.init()?;
Ok(reader)
@@ -391,16 +495,32 @@ impl BucketReader {
}
fn init(&mut self) -> io::Result<()> {
- self.null_bitmaps = vec![Vec::new(); self.num_columns];
- self.const_values = vec![Value::Null; self.num_columns];
- self.dict_values = vec![Vec::new(); self.num_columns];
+ self.null_bitmaps = vec![Vec::new(); self.total_columns];
+ self.const_values = vec![Value::Null; self.total_columns];
+ self.dict_values = vec![Vec::new(); self.total_columns];
+
+ if self.num_rows == 0 || self.data.is_empty() {
+ return Ok(());
+ }
let mut pos = 0;
+ // Header: only present when ARRAY columns exist (backward compatible
with v1)
+ let has_children = !self.children.is_empty();
+ if has_children {
+ let _num_primary = varint::decode(&self.data, &mut pos)? as usize;
+ let num_children = varint::decode(&self.data, &mut pos)? as usize;
+ self.child_num_rows = Vec::with_capacity(num_children);
+ for _ in 0..num_children {
+ self.child_num_rows
+ .push(varint::decode(&self.data, &mut pos)? as usize);
+ }
+ }
+
// 1. Encoding flags (2 bits per column)
- let encoding_flags_bytes = (self.num_columns * 2).div_ceil(8);
+ let encoding_flags_bytes = (self.total_columns * 2).div_ceil(8);
self.check_bounds(pos, encoding_flags_bytes)?;
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
let byte_idx = (i * 2) / 8;
let bit_idx = (i * 2) % 8;
self.encodings[i] = (self.data[pos + byte_idx] >> bit_idx) & 0x03;
@@ -408,15 +528,15 @@ impl BucketReader {
pos += encoding_flags_bytes;
// 2. Has-nulls flags (1 bit per column)
- let has_nulls_bytes = self.num_columns.div_ceil(8);
+ let has_nulls_bytes = self.total_columns.div_ceil(8);
self.check_bounds(pos, has_nulls_bytes)?;
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
self.has_nulls[i] = (self.data[pos + i / 8] & (1 << (i % 8))) != 0;
}
pos += has_nulls_bytes;
// 3. CONST metadata
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if self.encodings[i] == ENCODING_CONST {
let (value, size) = self.read_value_at(&self.col_types[i],
pos)?;
self.const_values[i] = value;
@@ -425,7 +545,7 @@ impl BucketReader {
}
// 4. DICT metadata
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if self.encodings[i] == ENCODING_DICT {
let num_entries = varint::decode(&self.data, &mut pos)? as
usize;
self.dict_bit_widths[i] = bit_width(num_entries);
@@ -439,10 +559,10 @@ impl BucketReader {
}
}
- // 5. Null bitmaps
- let null_bitmap_bytes = self.num_rows.div_ceil(8);
- for i in 0..self.num_columns {
+ // 5. Null bitmaps (per-column row count)
+ for i in 0..self.total_columns {
if self.has_nulls[i] && self.encodings[i] != ENCODING_ALL_NULL {
+ let null_bitmap_bytes = self.col_num_rows(i).div_ceil(8);
self.check_bounds(pos, null_bitmap_bytes)?;
self.null_bitmaps[i] = self.data[pos..pos +
null_bitmap_bytes].to_vec();
pos += null_bitmap_bytes;
@@ -450,7 +570,7 @@ impl BucketReader {
}
// 6. Record column data start offsets, skip past data
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
self.data_cursors[i] = pos;
if self.encodings[i] == ENCODING_PLAIN {
let w = types::fixed_width(&self.col_types[i]);
@@ -487,35 +607,37 @@ impl BucketReader {
}
fn count_non_null(&self, col: usize) -> usize {
+ let col_rows = self.col_num_rows(col);
if !self.has_nulls[col] {
- return self.num_rows;
+ return col_rows;
}
if self.encodings[col] == ENCODING_ALL_NULL {
return 0;
}
let bitmap = &self.null_bitmaps[col];
- let full_bytes = self.num_rows / 8;
+ let full_bytes = col_rows / 8;
let mut null_count = 0usize;
for byte in bitmap.iter().take(full_bytes) {
null_count += (*byte as u32).count_ones() as usize;
}
- let remaining = self.num_rows % 8;
+ let remaining = col_rows % 8;
if remaining > 0 {
let mask = (1u8 << remaining) - 1;
null_count += (bitmap[full_bytes] & mask).count_ones() as usize;
}
- self.num_rows - null_count
+ col_rows - null_count
}
pub fn read_all_columns(&self) -> io::Result<Vec<ArrayRef>> {
- let num_rows = self.num_rows;
- let mut result = Vec::with_capacity(self.num_columns);
+ // Read all N+C physical columns
+ let mut all_arrays: Vec<ArrayRef> =
Vec::with_capacity(self.total_columns);
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
+ let col_rows = self.col_num_rows(i);
let variant = data_variant_for_type(&self.col_types[i]);
if self.encodings[i] == ENCODING_ALL_NULL {
- result.push(build_all_null_array(&self.col_types[i],
num_rows));
+ all_arrays.push(build_all_null_array(&self.col_types[i],
col_rows));
continue;
}
@@ -529,7 +651,7 @@ impl BucketReader {
let data = match self.encodings[i] {
ENCODING_CONST => read_all_const(
&self.const_values[i],
- num_rows,
+ col_rows,
has_nulls,
&self.null_bitmaps[i],
variant,
@@ -539,7 +661,7 @@ impl BucketReader {
self.data_cursors[i],
&self.dict_values[i],
self.dict_bit_widths[i],
- num_rows,
+ col_rows,
has_nulls,
&self.null_bitmaps[i],
variant,
@@ -548,7 +670,7 @@ impl BucketReader {
&self.data,
self.data_cursors[i],
&self.col_types[i],
- num_rows,
+ col_rows,
has_nulls,
&self.null_bitmaps[i],
variant,
@@ -556,20 +678,29 @@ impl BucketReader {
_ => empty_raw_data_for_type(&self.col_types[i]),
};
- result.push(build_array(
+ all_arrays.push(build_array(
data,
&self.col_types[i],
null_bitmap,
- num_rows,
+ col_rows,
)?);
}
- Ok(result)
+ reassemble_list_columns(
+ &mut all_arrays,
+ &self.children,
+ &self.logical_types,
+ self.num_primary,
+ self.num_rows,
+ );
+
+ // Return only the primary (logical) columns
+ Ok(all_arrays.into_iter().take(self.num_primary).collect())
}
}
pub struct ColumnPageReader {
- col_type: DataType,
+ pub(crate) col_type: DataType,
encoding: u8,
has_nulls: bool,
const_value: Value,
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 501e6ba..4bfd606 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -17,9 +17,11 @@
use std::collections::HashMap;
use std::io;
+use std::sync::Arc;
use arrow_array::*;
-use arrow_schema::DataType;
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer,
ScalarBuffer};
+use arrow_schema::{DataType, Field};
use crate::spec::*;
use crate::types;
@@ -31,21 +33,30 @@ pub struct PagedBucketOutput {
pub has_nulls: Vec<bool>,
pub const_data: Vec<Vec<u8>>,
pub column_pages: Vec<Option<Vec<u8>>>,
+ pub num_primary: usize,
+ pub children: Vec<ChildColumnMeta>,
+}
+
+#[derive(Clone)]
+pub struct ChildColumnMeta {
+ pub parent_logical_col: usize,
+ pub physical_index: usize,
+ pub element_field: Arc<Field>,
+ pub num_elements: usize,
}
pub struct BucketWriter {
- num_columns: usize,
+ num_primary: usize,
+ total_columns: usize,
fixed_widths: Vec<i32>,
null_bitmaps: Vec<Vec<u8>>,
value_buffers: Vec<Vec<u8>>,
non_null_counts: Vec<usize>,
- // CONST tracking
const_tracking: Vec<bool>,
first_value_len: Vec<usize>,
- // Dict tracking: fixed-width <=8 uses u64 keys, variable-width uses byte
keys
long_dict_maps: Vec<Option<HashMap<u64, usize>>>,
byte_dict_maps: Vec<Option<HashMap<Vec<u8>, usize>>>,
dict_total_bytes: Vec<usize>,
@@ -53,6 +64,7 @@ pub struct BucketWriter {
max_dict_entries: usize,
num_rows: usize,
+ children: Vec<ChildColumnMeta>,
}
impl BucketWriter {
@@ -61,13 +73,14 @@ impl BucketWriter {
max_dict_total_bytes: usize,
max_dict_entries: usize,
) -> Self {
- let num_columns = col_types.len();
- let fixed_widths: Vec<i32> = col_types.iter().map(|t|
types::fixed_width(t)).collect();
-
- let mut long_dict_maps = Vec::with_capacity(num_columns);
- let mut byte_dict_maps = Vec::with_capacity(num_columns);
-
- for fw in fixed_widths.iter().take(num_columns) {
+ let num_primary = col_types.len();
+ let (physical_types, children) = expand_col_types(col_types);
+ let total_columns = physical_types.len();
+ let fixed_widths: Vec<i32> =
physical_types.iter().map(types::fixed_width).collect();
+
+ let mut long_dict_maps = Vec::with_capacity(total_columns);
+ let mut byte_dict_maps = Vec::with_capacity(total_columns);
+ for fw in &fixed_widths {
if uses_long_dict(*fw) {
long_dict_maps.push(Some(HashMap::new()));
byte_dict_maps.push(None);
@@ -78,19 +91,40 @@ impl BucketWriter {
}
BucketWriter {
- num_columns,
+ num_primary,
+ total_columns,
fixed_widths,
- null_bitmaps: vec![vec![0u8; 128]; num_columns],
- value_buffers: vec![Vec::with_capacity(1024); num_columns],
- non_null_counts: vec![0; num_columns],
- const_tracking: vec![true; num_columns],
- first_value_len: vec![0; num_columns],
+ null_bitmaps: vec![vec![0u8; 128]; total_columns],
+ value_buffers: vec![Vec::with_capacity(1024); total_columns],
+ non_null_counts: vec![0; total_columns],
+ const_tracking: vec![true; total_columns],
+ first_value_len: vec![0; total_columns],
long_dict_maps,
byte_dict_maps,
- dict_total_bytes: vec![0; num_columns],
+ dict_total_bytes: vec![0; total_columns],
max_dict_total_bytes,
max_dict_entries,
num_rows: 0,
+ children,
+ }
+ }
+
+ pub fn num_primary(&self) -> usize {
+ self.num_primary
+ }
+
+ pub fn children(&self) -> &[ChildColumnMeta] {
+ &self.children
+ }
+
+ fn col_num_rows(&self, col: usize) -> usize {
+ if col < self.num_primary {
+ self.num_rows
+ } else {
+ self.children
+ .iter()
+ .find(|c| c.physical_index == col)
+ .map_or(0, |c| c.num_elements)
}
}
@@ -111,7 +145,7 @@ impl BucketWriter {
arrays: &[&dyn Array],
data_types: &[&DataType],
) -> io::Result<usize> {
- debug_assert_eq!(arrays.len(), self.num_columns);
+ debug_assert_eq!(arrays.len(), self.num_primary);
let num_new_rows = arrays[0].len();
if num_new_rows == 0 {
return Ok(0);
@@ -119,12 +153,76 @@ impl BucketWriter {
let start_row = self.num_rows;
let mut total_size = 0;
- for i in 0..self.num_columns {
- total_size += self.append_array_column(i, arrays[i],
data_types[i], start_row)?;
+ // Split List arrays into lengths + values, collect child writes
+ let mut list_splits: Vec<(usize, Int32Array, ArrayRef)> = Vec::new();
+ for child in &self.children {
+ let col = child.parent_logical_col;
+ if list_splits.iter().any(|(c, _, _)| *c == col) {
+ continue; // already split this column (nested lists share
parent)
+ }
+ let list_array = arrays[col]
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput,
"expected ListArray"))?;
+ let lengths = extract_list_lengths(list_array);
+ let values = flatten_list_values(list_array);
+ list_splits.push((col, lengths, values));
+ }
+
+ // Write primary columns (lengths for ARRAY cols, regular data for
others)
+ let int32_dt = DataType::Int32;
+ for i in 0..self.num_primary {
+ if let Some(split) = list_splits.iter().find(|(col, _, _)| *col ==
i) {
+ total_size += self.append_array_column(i, &split.1, &int32_dt,
start_row)?;
+ } else {
+ total_size += self.append_array_column(i, arrays[i],
data_types[i], start_row)?;
+ }
+ }
+
+ // Recursively flatten all list levels and write to child columns
+ let mut pending: Vec<(usize, ArrayRef)> = Vec::new(); // (child_index,
values)
+ for split in &list_splits {
+ let parent = split.0;
+ // Find the first child for this parent
+ if let Some(child_idx) = self
+ .children
+ .iter()
+ .position(|c| c.parent_logical_col == parent)
+ {
+ pending.push((child_idx, split.2.clone()));
+ }
+ }
+
+ while let Some((child_idx, values)) = pending.pop() {
+ if child_idx >= self.children.len() || values.is_empty() {
+ continue;
+ }
+ let phys_idx = self.children[child_idx].physical_index;
+ let child_start = self.children[child_idx].num_elements;
+
+ if let DataType::List(_) = values.data_type() {
+ let inner_list =
values.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+ io::Error::new(io::ErrorKind::InvalidInput, "expected
ListArray")
+ })?;
+ let inner_lengths = extract_list_lengths(inner_list);
+ let inner_values = flatten_list_values(inner_list);
+ total_size +=
+ self.append_array_column(phys_idx, &inner_lengths,
&int32_dt, child_start)?;
+ self.children[child_idx].num_elements += inner_lengths.len();
+ // Queue the inner values for the next child
+ if child_idx + 1 < self.children.len() {
+ pending.push((child_idx + 1, inner_values));
+ }
+ } else {
+ let elem_dt =
self.children[child_idx].element_field.data_type().clone();
+ total_size +=
+ self.append_array_column(phys_idx, values.as_ref(),
&elem_dt, child_start)?;
+ self.children[child_idx].num_elements += values.len();
+ }
}
self.num_rows += num_new_rows;
- total_size += num_new_rows * self.num_columns.div_ceil(8);
+ total_size += num_new_rows * self.num_primary.div_ceil(8);
Ok(total_size)
}
@@ -374,14 +472,15 @@ impl BucketWriter {
}
fn compute_encodings(&self) -> (Vec<u8>, Vec<bool>) {
- let mut encodings = vec![0u8; self.num_columns];
- let mut has_nulls = vec![false; self.num_columns];
- for i in 0..self.num_columns {
+ let mut encodings = vec![0u8; self.total_columns];
+ let mut has_nulls = vec![false; self.total_columns];
+ for i in 0..self.total_columns {
+ let col_rows = self.col_num_rows(i);
if self.non_null_counts[i] == 0 {
encodings[i] = ENCODING_ALL_NULL;
} else if self.const_tracking[i] {
encodings[i] = ENCODING_CONST;
- has_nulls[i] = self.non_null_counts[i] < self.num_rows;
+ has_nulls[i] = self.non_null_counts[i] < col_rows;
} else {
let dict_size = self.get_dict_size(i);
if dict_size >= 2
@@ -392,7 +491,7 @@ impl BucketWriter {
} else {
encodings[i] = ENCODING_PLAIN;
}
- has_nulls[i] = self.non_null_counts[i] < self.num_rows;
+ has_nulls[i] = self.non_null_counts[i] < col_rows;
}
}
(encodings, has_nulls)
@@ -406,88 +505,92 @@ impl BucketWriter {
let (encodings, has_nulls) = self.compute_encodings();
- let out_size = self.compute_out_size(&encodings, &has_nulls);
- let mut out = vec![0u8; out_size];
- let mut pos = 0;
+ let mut out = Vec::new();
+
+ // Header: only written when ARRAY columns exist (backward compatible
with v1)
+ if !self.children.is_empty() {
+ varint::encode(&mut out, self.num_primary as u32);
+ varint::encode(&mut out, self.children.len() as u32);
+ for child in &self.children {
+ varint::encode(&mut out, child.num_elements as u32);
+ }
+ }
// Encoding flags: 2 bits per column
- let encoding_flags_bytes = (self.num_columns * 2).div_ceil(8);
- for i in 0..self.num_columns {
+ let encoding_flags_bytes = (self.total_columns * 2).div_ceil(8);
+ let ef_start = out.len();
+ out.resize(ef_start + encoding_flags_bytes, 0);
+ for i in 0..self.total_columns {
let byte_idx = (i * 2) / 8;
let bit_idx = (i * 2) % 8;
- out[pos + byte_idx] |= encodings[i] << bit_idx;
+ out[ef_start + byte_idx] |= encodings[i] << bit_idx;
}
- pos += encoding_flags_bytes;
// Has-nulls flags: 1 bit per column
- let has_nulls_bytes = self.num_columns.div_ceil(8);
- for i in 0..self.num_columns {
+ let has_nulls_bytes = self.total_columns.div_ceil(8);
+ let hn_start = out.len();
+ out.resize(hn_start + has_nulls_bytes, 0);
+ for i in 0..self.total_columns {
if has_nulls[i] {
- out[pos + i / 8] |= 1 << (i % 8);
+ out[hn_start + i / 8] |= 1 << (i % 8);
}
}
- pos += has_nulls_bytes;
// CONST metadata
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if encodings[i] == ENCODING_CONST {
let len = self.first_value_len[i];
- out[pos..pos +
len].copy_from_slice(&self.value_buffers[i][..len]);
- pos += len;
+ out.extend_from_slice(&self.value_buffers[i][..len]);
}
}
// Dict metadata
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if encodings[i] == ENCODING_DICT {
if let Some(ref dict) = self.long_dict_maps[i] {
let num_entries = dict.len();
- pos = varint::encode_to_slice(&mut out, pos, num_entries
as u32);
+ varint::encode(&mut out, num_entries as u32);
let w = self.fixed_widths[i];
let mut keys = vec![0u64; num_entries];
for (&key, &idx) in dict {
keys[idx] = key;
}
for key in &keys {
- write_fixed_key_to_slice(&mut out, &mut pos, *key, w);
+ write_fixed_key_to_vec(&mut out, *key, w);
}
} else if let Some(ref dict) = self.byte_dict_maps[i] {
let num_entries = dict.len();
- pos = varint::encode_to_slice(&mut out, pos, num_entries
as u32);
+ varint::encode(&mut out, num_entries as u32);
let mut keys: Vec<(&Vec<u8>, &usize)> =
dict.iter().collect();
keys.sort_by_key(|&(_, idx)| *idx);
for (key, _) in keys {
- out[pos..pos + key.len()].copy_from_slice(key);
- pos += key.len();
+ out.extend_from_slice(key);
}
}
}
}
- // Null bitmaps
- let null_bitmap_bytes = self.num_rows.div_ceil(8);
- for i in 0..self.num_columns {
+ // Null bitmaps (per-column row count)
+ for i in 0..self.total_columns {
if has_nulls[i] && encodings[i] != ENCODING_ALL_NULL {
- out[pos..pos + null_bitmap_bytes]
-
.copy_from_slice(&self.null_bitmaps[i][..null_bitmap_bytes]);
- pos += null_bitmap_bytes;
+ let nbytes = self.col_num_rows(i).div_ceil(8);
+ out.extend_from_slice(&self.null_bitmaps[i][..nbytes]);
}
}
// Column data
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if encodings[i] == ENCODING_PLAIN {
- let len = self.value_buffers[i].len();
- out[pos..pos + len].copy_from_slice(&self.value_buffers[i]);
- pos += len;
+ out.extend_from_slice(&self.value_buffers[i]);
} else if encodings[i] == ENCODING_DICT {
- let data_start = pos;
- let bit_offset = self.write_dict_bit_packed(i, &mut out,
data_start);
- pos += bit_offset.div_ceil(8);
+ let bw = bit_width(self.get_dict_size(i));
+ let packed_bytes = (self.non_null_counts[i] * bw).div_ceil(8);
+ let data_start = out.len();
+ out.resize(data_start + packed_bytes, 0);
+ self.write_dict_bit_packed(i, &mut out, data_start);
}
}
- debug_assert_eq!(pos, out.len());
out
}
@@ -499,16 +602,20 @@ impl BucketWriter {
has_nulls: Vec::new(),
const_data: Vec::new(),
column_pages: Vec::new(),
+ num_primary: self.num_primary,
+ children: self.children.clone(),
};
}
let (encodings, has_nulls) = self.compute_encodings();
- let null_bitmap_bytes = self.num_rows.div_ceil(8);
- let mut const_data = vec![Vec::new(); self.num_columns];
- let mut column_pages: Vec<Option<Vec<u8>>> = vec![None;
self.num_columns];
+ let mut const_data = vec![Vec::new(); self.total_columns];
+ let mut column_pages: Vec<Option<Vec<u8>>> = vec![None;
self.total_columns];
+
+ for i in 0..self.total_columns {
+ let col_rows = self.col_num_rows(i);
+ let null_bitmap_bytes = col_rows.div_ceil(8);
- for i in 0..self.num_columns {
match encodings[i] {
ENCODING_ALL_NULL => {}
ENCODING_CONST => {
@@ -520,7 +627,6 @@ impl BucketWriter {
}
ENCODING_DICT => {
let mut page = Vec::new();
- // Dict table
if let Some(ref dict) = self.long_dict_maps[i] {
let num_entries = dict.len();
varint::encode(&mut page, num_entries as u32);
@@ -541,11 +647,9 @@ impl BucketWriter {
page.extend_from_slice(key);
}
}
- // Null bitmap
if has_nulls[i] {
page.extend_from_slice(&self.null_bitmaps[i][..null_bitmap_bytes]);
}
- // Bit-packed indices
let dict_size = self.get_dict_size(i);
let bw = bit_width(dict_size);
let packed_bytes = (self.non_null_counts[i] *
bw).div_ceil(8);
@@ -571,11 +675,13 @@ impl BucketWriter {
has_nulls,
const_data,
column_pages,
+ num_primary: self.num_primary,
+ children: self.children.clone(),
}
}
pub fn reset(&mut self) {
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
for b in &mut self.null_bitmaps[i] {
*b = 0;
}
@@ -597,6 +703,9 @@ impl BucketWriter {
}
}
self.num_rows = 0;
+ for child in &mut self.children {
+ child.num_elements = 0;
+ }
}
fn get_dict_size(&self, col: usize) -> usize {
@@ -616,7 +725,7 @@ impl BucketWriter {
let mut bit_offset = 0usize;
let mut val_pos = 0usize;
- for r in 0..self.num_rows {
+ for r in 0..self.col_num_rows(col) {
let is_null = (self.null_bitmaps[col][r / 8] & (1 << (r % 8))) !=
0;
if !is_null {
let idx = if let Some(ref dict) = self.long_dict_maps[col] {
@@ -659,15 +768,24 @@ impl BucketWriter {
}
fn compute_out_size(&self, encodings: &[u8], has_nulls: &[bool]) -> usize {
- let null_bitmap_bytes = self.num_rows.div_ceil(8);
- let mut size = (self.num_columns * 2).div_ceil(8) +
self.num_columns.div_ceil(8);
+ // Header: varint(num_primary) + varint(num_children) + varint per
child
+ let mut size = 0;
+ if !self.children.is_empty() {
+ size += varint::encoded_size(self.num_primary as u32)
+ + varint::encoded_size(self.children.len() as u32);
+ for child in &self.children {
+ size += varint::encoded_size(child.num_elements as u32);
+ }
+ }
+
+ size += (self.total_columns * 2).div_ceil(8) +
self.total_columns.div_ceil(8);
- for i in 0..self.num_columns {
+ for i in 0..self.total_columns {
if encodings[i] == ENCODING_ALL_NULL {
continue;
}
if has_nulls[i] {
- size += null_bitmap_bytes;
+ size += self.col_num_rows(i).div_ceil(8);
}
match encodings[i] {
ENCODING_CONST => {
@@ -926,6 +1044,284 @@ fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
bytes[start..].to_vec()
}
+fn extract_list_lengths(list_array: &ListArray) -> Int32Array {
+ let offsets = list_array.value_offsets();
+ let num_rows = list_array.len();
+ let mut lengths = Vec::with_capacity(num_rows);
+ for i in 0..num_rows {
+ if list_array.is_null(i) {
+ lengths.push(0);
+ } else {
+ lengths.push(offsets[i + 1] - offsets[i]);
+ }
+ }
+ let null_buf = list_array.nulls().cloned();
+ Int32Array::new(ScalarBuffer::from(lengths), null_buf)
+}
+
+fn flatten_list_values(list_array: &ListArray) -> ArrayRef {
+ let offsets = list_array.value_offsets();
+ let values = list_array.values();
+ let num_rows = list_array.len();
+
+ if list_array.null_count() == 0 {
+ let start = offsets[0] as usize;
+ let end = offsets[num_rows] as usize;
+ return values.slice(start, end - start);
+ }
+
+ // Skip child values for null rows — collect only non-null row ranges
+ let mut indices: Vec<u32> = Vec::new();
+ for i in 0..num_rows {
+ if !list_array.is_null(i) {
+ let start = offsets[i] as u32;
+ let end = offsets[i + 1] as u32;
+ for idx in start..end {
+ indices.push(idx);
+ }
+ }
+ }
+
+ if indices.is_empty() {
+ return values.slice(0, 0);
+ }
+
+ let idx_array = UInt32Array::from(indices);
+ take_array(values.as_ref(), &idx_array)
+}
+
+fn take_array(array: &dyn Array, indices: &UInt32Array) -> ArrayRef {
+ use arrow_array::builder::*;
+ macro_rules! take_prim {
+ ($arr_ty:ty, $bld_ty:ty) => {{
+ let src = array.as_any().downcast_ref::<$arr_ty>().unwrap();
+ let mut b = <$bld_ty>::with_capacity(indices.len());
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ Arc::new(b.finish()) as ArrayRef
+ }};
+ }
+ match array.data_type() {
+ DataType::Boolean => take_prim!(BooleanArray, BooleanBuilder),
+ DataType::Int8 => take_prim!(Int8Array, Int8Builder),
+ DataType::Int16 => take_prim!(Int16Array, Int16Builder),
+ DataType::Int32 => take_prim!(Int32Array, Int32Builder),
+ DataType::Int64 => take_prim!(Int64Array, Int64Builder),
+ DataType::Float32 => take_prim!(Float32Array, Float32Builder),
+ DataType::Float64 => take_prim!(Float64Array, Float64Builder),
+ DataType::Date32 => take_prim!(Date32Array, Date32Builder),
+ DataType::Time32(_) => take_prim!(Time32MillisecondArray,
Time32MillisecondBuilder),
+ DataType::Decimal128(p, s) => {
+ let src =
array.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ let mut b = Decimal128Builder::new()
+ .with_precision_and_scale(*p, *s)
+ .unwrap();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ Arc::new(b.finish()) as ArrayRef
+ }
+ DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => {
+ let src = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap();
+ let mut b = TimestampMillisecondBuilder::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ let arr = b.finish();
+ Arc::new(if let Some(tz) = tz {
+ arr.with_timezone(tz.clone())
+ } else {
+ arr
+ })
+ }
+ DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => {
+ let src = array
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ let mut b = TimestampMicrosecondBuilder::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ let arr = b.finish();
+ Arc::new(if let Some(tz) = tz {
+ arr.with_timezone(tz.clone())
+ } else {
+ arr
+ })
+ }
+ DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
+ let src = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ let mut b = TimestampNanosecondBuilder::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ let arr = b.finish();
+ Arc::new(if let Some(tz) = tz {
+ arr.with_timezone(tz.clone())
+ } else {
+ arr
+ })
+ }
+ DataType::Utf8 => {
+ let src = array.as_any().downcast_ref::<StringArray>().unwrap();
+ let mut b = StringBuilder::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ Arc::new(b.finish())
+ }
+ DataType::Binary => {
+ let src = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+ let mut b = BinaryBuilder::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if src.is_null(idx) {
+ b.append_null();
+ } else {
+ b.append_value(src.value(idx));
+ }
+ }
+ Arc::new(b.finish())
+ }
+ DataType::List(_) => {
+ // For nested arrays, rebuild by collecting slices
+ let src = array.as_any().downcast_ref::<ListArray>().unwrap();
+ let mut offsets_builder = vec![0i32];
+ let mut child_indices: Vec<u32> = Vec::new();
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ let start = src.value_offsets()[idx] as u32;
+ let end = src.value_offsets()[idx + 1] as u32;
+ for ci in start..end {
+ child_indices.push(ci);
+ }
+ offsets_builder.push(child_indices.len() as i32);
+ }
+ let child_idx_arr = UInt32Array::from(child_indices);
+ let new_values = take_array(src.values().as_ref(), &child_idx_arr);
+ let field = match array.data_type() {
+ DataType::List(f) => f.clone(),
+ _ => unreachable!(),
+ };
+ let null_buf = if !indices.is_empty() {
+ let mut bm = vec![0u8; indices.len().div_ceil(8)];
+ for i in 0..indices.len() {
+ let idx = indices.value(i) as usize;
+ if !src.is_null(idx) {
+ bm[i / 8] |= 1 << (i % 8);
+ }
+ }
+ if bm.iter().all(|&b| b == 0xFF) || indices.is_empty() {
+ None
+ } else {
+ Some(NullBuffer::new(BooleanBuffer::new(
+ Buffer::from_vec(bm),
+ 0,
+ indices.len(),
+ )))
+ }
+ } else {
+ None
+ };
+ Arc::new(ListArray::new(
+ field,
+ OffsetBuffer::new(ScalarBuffer::from(offsets_builder)),
+ new_values,
+ null_buf,
+ ))
+ }
+ other => panic!("take_array: unsupported DataType {:?}", other),
+ }
+}
+
+pub(crate) fn expand_col_types(col_types: &[&DataType]) -> (Vec<DataType>,
Vec<ChildColumnMeta>) {
+ let mut physical_types: Vec<DataType> = col_types
+ .iter()
+ .map(|t| {
+ if matches!(t, DataType::List(_)) {
+ DataType::Int32
+ } else {
+ (*t).clone()
+ }
+ })
+ .collect();
+ let mut children = Vec::new();
+
+ for (i, t) in col_types.iter().enumerate() {
+ if let DataType::List(element_field) = t {
+ expand_child(i, element_field, &mut physical_types, &mut children);
+ }
+ }
+ (physical_types, children)
+}
+
+fn expand_child(
+ parent_logical: usize,
+ element_field: &Arc<Field>,
+ physical_types: &mut Vec<DataType>,
+ children: &mut Vec<ChildColumnMeta>,
+) {
+ let elem_dt = element_field.data_type();
+ let child_phys_idx = physical_types.len();
+
+ if let DataType::List(inner_field) = elem_dt {
+ physical_types.push(DataType::Int32);
+ children.push(ChildColumnMeta {
+ parent_logical_col: parent_logical,
+ physical_index: child_phys_idx,
+ element_field: element_field.clone(),
+ num_elements: 0,
+ });
+ expand_child(parent_logical, inner_field, physical_types, children);
+ } else {
+ physical_types.push(elem_dt.clone());
+ children.push(ChildColumnMeta {
+ parent_logical_col: parent_logical,
+ physical_index: child_phys_idx,
+ element_field: element_field.clone(),
+ num_elements: 0,
+ });
+ }
+}
+
fn uses_long_dict(fixed_width: i32) -> bool {
fixed_width > 0 && fixed_width <= 8
}
@@ -966,35 +1362,15 @@ fn write_fixed_key_to_vec(buf: &mut Vec<u8>, key: u64,
width: i32) {
}
}
-fn write_fixed_key_to_slice(buf: &mut [u8], pos: &mut usize, key: u64, width:
i32) {
- match width {
- 1 => {
- buf[*pos] = key as u8;
- *pos += 1;
- }
- 2 => {
- let bytes = (key as u16).to_be_bytes();
- buf[*pos..*pos + 2].copy_from_slice(&bytes);
- *pos += 2;
- }
- 4 => {
- let bytes = (key as u32).to_be_bytes();
- buf[*pos..*pos + 4].copy_from_slice(&bytes);
- *pos += 4;
- }
- 8 => {
- let bytes = key.to_be_bytes();
- buf[*pos..*pos + 8].copy_from_slice(&bytes);
- *pos += 8;
- }
- _ => {}
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
+ fn header_size(_data: &[u8]) -> usize {
+ // No header for non-ARRAY buckets (v1 compatible)
+ 0
+ }
+
#[test]
fn test_all_null_encoding() {
let types = [DataType::Int32];
@@ -1006,7 +1382,8 @@ mod tests {
let data = writer.finish();
assert!(!data.is_empty());
- assert_eq!(data[0] & 0x03, ENCODING_ALL_NULL);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_ALL_NULL);
}
#[test]
@@ -1019,7 +1396,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_CONST);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_CONST);
}
#[test]
@@ -1033,7 +1411,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_DICT);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_DICT);
}
#[test]
@@ -1047,7 +1426,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_PLAIN);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_PLAIN);
}
#[test]
@@ -1060,7 +1440,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Utf8]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_CONST);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_CONST);
}
#[test]
@@ -1074,7 +1455,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Utf8]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_DICT);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_DICT);
}
#[test]
@@ -1090,7 +1472,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_CONST);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_CONST);
}
#[test]
@@ -1106,7 +1489,8 @@ mod tests {
writer.write_columns(&[&arr], &[&DataType::Int32]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_DICT);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_DICT);
}
#[test]
@@ -1126,7 +1510,8 @@ mod tests {
writer.write_columns(&[&second], &[&types[0]]).unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_DICT);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_DICT);
}
#[test]
@@ -1148,9 +1533,10 @@ mod tests {
.unwrap();
let data = writer.finish();
- assert_eq!(data[0] & 0x03, ENCODING_ALL_NULL);
- assert_eq!((data[0] >> 2) & 0x03, ENCODING_CONST);
- assert_eq!((data[0] >> 4) & 0x03, ENCODING_DICT);
+ let h = header_size(&data);
+ assert_eq!(data[h] & 0x03, ENCODING_ALL_NULL);
+ assert_eq!((data[h] >> 2) & 0x03, ENCODING_CONST);
+ assert_eq!((data[h] >> 4) & 0x03, ENCODING_DICT);
}
#[test]
@@ -1171,6 +1557,7 @@ mod tests {
let arr2 = Int32Array::from(vals);
writer.write_columns(&[&arr2], &[&DataType::Int32]).unwrap();
let data2 = writer.finish();
- assert_eq!(data2[0] & 0x03, ENCODING_PLAIN);
+ let h2 = header_size(&data2);
+ assert_eq!(data2[h2] & 0x03, ENCODING_PLAIN);
}
}
diff --git a/core/src/reader.rs b/core/src/reader.rs
index e111b9d..8ed6b39 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -458,6 +458,14 @@ impl<I: InputFile> MosaicReader<I> {
let page_content = zstd::bulk::decompress(compressed_data,
uncompressed_size)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
+ Self::parse_simple_column_slot(page_content, col_type, num_rows)
+ }
+
+ fn parse_simple_column_slot(
+ page_content: Vec<u8>,
+ col_type: &DataType,
+ num_rows: usize,
+ ) -> io::Result<ColumnPageReader> {
if page_content.len() < 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
@@ -615,7 +623,18 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
"paged bucket requires ZSTD compression",
));
}
- let dir_size = self.schema.bucket_to_global[b].len() * 4;
+ let bucket_col_refs: Vec<&DataType> =
self.schema.bucket_to_global[b]
+ .iter()
+ .map(|&gi| &self.schema.columns[gi].data_type)
+ .collect();
+ let (bucket_phys, bucket_children) =
+
crate::bucket_writer::expand_col_types(&bucket_col_refs);
+ let child_header_len = if bucket_children.is_empty() {
+ 0
+ } else {
+ 2 + bucket_children.len() * 4
+ };
+ let dir_size = child_header_len + bucket_phys.len() * 4;
if dir_size > total_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
@@ -659,7 +678,9 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
let mut r2_group_infos: Vec<Vec<PagedSlotInfo>> = Vec::new();
// Per-bucket directory parse results (slot_sizes, slot_file_offsets)
for paged buckets
- let mut paged_dir_info: Vec<Option<(Vec<usize>, Vec<u64>)>> =
vec![None; self.num_buckets];
+ // (slot_sizes, slot_file_offsets, child_element_counts)
+ type PagedDirInfo = (Vec<usize>, Vec<u64>, Vec<usize>);
+ let mut paged_dir_info: Vec<Option<PagedDirInfo>> = vec![None;
self.num_buckets];
let mut partial_paged_buckets: Vec<usize> = Vec::new();
for (ri, &b) in r1_bucket_ids.iter().enumerate() {
@@ -690,19 +711,58 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}
BucketLayout::Paged { total_size } => {
let global_indices = &self.schema.bucket_to_global[b];
- let num_columns = global_indices.len();
+ let col_type_refs: Vec<&DataType> = global_indices
+ .iter()
+ .map(|&gi| &self.schema.columns[gi].data_type)
+ .collect();
+ let (phys_types, bucket_children) =
+ crate::bucket_writer::expand_col_types(&col_type_refs);
+ let num_columns = phys_types.len();
- // Parse directory
+ // Parse fixed-size child header (only when ARRAY columns
exist)
+ let (hdr_len, child_element_counts) = if
bucket_children.is_empty() {
+ (0, Vec::new())
+ } else {
+ if buf.len() < 2 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: too short for child header",
+ ));
+ }
+ let nc = u16::from_le_bytes([buf[0], buf[1]]) as usize;
+ let hl = 2 + nc * 4;
+ if buf.len() < hl {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: truncated child header",
+ ));
+ }
+ let mut counts = Vec::with_capacity(nc);
+ for ci in 0..nc {
+ let off = 2 + ci * 4;
+ counts
+ .push(u32::from_le_bytes(buf[off..off +
4].try_into().unwrap())
+ as usize);
+ }
+ (hl, counts)
+ };
+
+ // Parse directory (after header)
+ if buf.len() < hdr_len + num_columns * 4 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "paged bucket: truncated directory",
+ ));
+ }
let mut slot_sizes = Vec::with_capacity(num_columns);
for i in 0..num_columns {
- let off = i * 4;
+ let off = hdr_len + i * 4;
let size =
u32::from_le_bytes(buf[off..off +
4].try_into().unwrap()) as usize;
slot_sizes.push(size);
}
- // Validate: directory + slots must exactly equal
total_size
- let dir_size = num_columns * 4;
+ let dir_size = hdr_len + num_columns * 4;
let slot_total: usize = slot_sizes.iter().sum();
if dir_size + slot_total != total_size {
return Err(io::Error::new(
@@ -715,14 +775,18 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}
if all_projected_in_bucket[b] {
- // All columns projected — we already read the full
bucket in round 1,
- // parse all slots directly without a second
read_ranges call.
+ let num_primary_in_bucket = global_indices.len();
let mut column_readers: Vec<Option<ColumnPageReader>> =
Vec::with_capacity(num_columns);
let mut data_offset = dir_size;
for i in 0..num_columns {
- let gi = global_indices[i];
- let col_type =
self.schema.columns[gi].data_type.clone();
+ let col_type = phys_types[i].clone();
+ let col_rows = if i < num_primary_in_bucket {
+ meta.num_rows
+ } else {
+ let child_idx = i - num_primary_in_bucket;
+
child_element_counts.get(child_idx).copied().unwrap_or(0)
+ };
if slot_sizes[i] == 0 {
column_readers.push(Some(ColumnPageReader::new(
@@ -731,12 +795,12 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
false,
Value::Null,
Vec::new(),
- meta.num_rows,
+ col_rows,
)?));
} else {
let slot_data = &buf[data_offset..data_offset
+ slot_sizes[i]];
let column_reader =
- Self::parse_column_slot(slot_data,
&col_type, meta.num_rows)?;
+ Self::parse_column_slot(slot_data,
&col_type, col_rows)?;
column_readers.push(Some(column_reader));
}
data_offset += slot_sizes[i];
@@ -753,11 +817,26 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
foff += size as u64;
}
+ let num_primary_in_bucket = global_indices.len();
let mut projected_cols: Vec<usize> = Vec::new();
for i in 0..num_columns {
- let gi = global_indices[i];
- if projected[gi] && slot_sizes[i] > 0 {
- projected_cols.push(i);
+ if i < num_primary_in_bucket {
+ let gi = global_indices[i];
+ if projected[gi] && slot_sizes[i] > 0 {
+ projected_cols.push(i);
+ }
+ } else {
+ // Child column: project if parent is projected
+ let child_idx = i - num_primary_in_bucket;
+ if child_idx < bucket_children.len() {
+ let parent =
bucket_children[child_idx].parent_logical_col;
+ if parent < num_primary_in_bucket {
+ let gi = global_indices[parent];
+ if projected[gi] && slot_sizes[i] > 0 {
+ projected_cols.push(i);
+ }
+ }
+ }
}
}
@@ -783,7 +862,8 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
}]);
}
- paged_dir_info[b] = Some((slot_sizes,
slot_file_offsets));
+ paged_dir_info[b] =
+ Some((slot_sizes, slot_file_offsets,
child_element_counts.clone()));
partial_paged_buckets.push(b);
}
}
@@ -808,15 +888,19 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
let mut slot_locations: Vec<Vec<Option<SlotLocation>>> =
Vec::with_capacity(self.num_buckets);
for b in 0..self.num_buckets {
- let n = self.schema.bucket_to_global[b].len();
- slot_locations.push((0..n).map(|_| None).collect());
+ let col_refs: Vec<&DataType> = self.schema.bucket_to_global[b]
+ .iter()
+ .map(|&gi| &self.schema.columns[gi].data_type)
+ .collect();
+ let (phys, _) =
crate::bucket_writer::expand_col_types(&col_refs);
+ slot_locations.push((0..phys.len()).map(|_| None).collect());
}
for (group_idx, group) in r2_group_infos.iter().enumerate() {
let buf = r2_buffers[group_idx].as_slice();
let group_base = r2_ranges[group_idx].0;
for info in group {
- let (slot_sizes, slot_file_offsets) =
+ let (slot_sizes, slot_file_offsets, _) =
paged_dir_info[info.bucket_id].as_ref().unwrap();
let rel_start = (slot_file_offsets[info.col_idx] -
group_base) as usize;
let slot_len = slot_sizes[info.col_idx];
@@ -844,19 +928,46 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
// need round 2 IO, but still need readers when projected.
for &b in &partial_paged_buckets {
let global_indices = &self.schema.bucket_to_global[b];
- let num_columns = global_indices.len();
- let (slot_sizes, _) = paged_dir_info[b].as_ref().unwrap();
+ let col_refs: Vec<&DataType> = global_indices
+ .iter()
+ .map(|&gi| &self.schema.columns[gi].data_type)
+ .collect();
+ let (phys_types_b, children_b) =
crate::bucket_writer::expand_col_types(&col_refs);
+ let num_columns = phys_types_b.len();
+ let num_primary_b = global_indices.len();
+ let (slot_sizes, _, child_elem_counts) =
paged_dir_info[b].as_ref().unwrap();
+ // DEBUG removed
let mut column_readers: Vec<Option<ColumnPageReader>> =
Vec::with_capacity(num_columns);
for i in 0..num_columns {
- let gi = global_indices[i];
- if !projected[gi] {
+ let is_projected = if i < num_primary_b {
+ let gi = global_indices[i];
+ projected[gi]
+ } else {
+ let child_idx = i - num_primary_b;
+ if child_idx < children_b.len() {
+ let parent =
children_b[child_idx].parent_logical_col;
+ parent < num_primary_b &&
projected[global_indices[parent]]
+ } else {
+ false
+ }
+ };
+
+ if !is_projected {
column_readers.push(None);
continue;
}
- let col_type = self.schema.columns[gi].data_type.clone();
+ let col_type = phys_types_b[i].clone();
+ let col_rows = if i < num_primary_b {
+ meta.num_rows
+ } else {
+ child_elem_counts
+ .get(i - num_primary_b)
+ .copied()
+ .unwrap_or(0)
+ };
if slot_sizes[i] == 0 {
column_readers.push(Some(ColumnPageReader::new(
@@ -865,7 +976,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
false,
Value::Null,
Vec::new(),
- meta.num_rows,
+ col_rows,
)?));
continue;
}
@@ -878,8 +989,7 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
})?;
let group_buffer =
r2_buffers[location.group_idx].as_slice();
let slot_data =
&group_buffer[location.start..location.start + location.len];
- let column_reader =
- Self::parse_column_slot(slot_data, &col_type,
meta.num_rows)?;
+ let column_reader = Self::parse_column_slot(slot_data,
&col_type, col_rows)?;
column_readers.push(Some(column_reader));
}
bucket_states[b] = Some(BucketState::Paged { column_readers });
@@ -979,12 +1089,54 @@ impl RowGroupReader {
match state {
BucketState::Paged { column_readers } => {
+ let col_type_refs: Vec<&DataType> = global_indices
+ .iter()
+ .map(|&gi| &self.schema.columns[gi].data_type)
+ .collect();
+ let (phys_types, bucket_children) =
+ crate::bucket_writer::expand_col_types(&col_type_refs);
+
+ // Read all physical columns (N+C)
+ let mut phys_arrays: Vec<ArrayRef> = Vec::new();
+ for (idx, cr_opt) in column_readers.iter().enumerate() {
+ if let Some(ref cr) = cr_opt {
+ phys_arrays.push(cr.read_all()?);
+ } else {
+ let dt =
phys_types.get(idx).unwrap_or(&DataType::Int32);
+ let rows = if idx < global_indices.len() {
+ self.num_rows
+ } else {
+ 0
+ };
+ phys_arrays.push(arrow_array::new_null_array(dt,
rows));
+ }
+ }
+
+ // Only reassemble projected ARRAY parents
+ let projected_children: Vec<_> = bucket_children
+ .iter()
+ .filter(|c| {
+ c.parent_logical_col < global_indices.len()
+ &&
self.projected_columns[global_indices[c.parent_logical_col]]
+ })
+ .cloned()
+ .collect();
+
+ crate::bucket_reader::reassemble_list_columns_pub(
+ &mut phys_arrays,
+ &projected_children,
+ &col_type_refs,
+ global_indices.len(),
+ self.num_rows,
+ );
+
+ // Map logical columns to global array positions
for (local_idx, &global_idx) in
global_indices.iter().enumerate() {
if !self.projected_columns[global_idx] {
continue;
}
- if let Some(ref cr) = column_readers[local_idx] {
- arrays[global_idx] = Some(cr.read_all()?);
+ if local_idx < phys_arrays.len() {
+ arrays[global_idx] =
Some(phys_arrays[local_idx].clone());
}
}
}
diff --git a/core/src/types.rs b/core/src/types.rs
index 736c4c5..9326c57 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -103,6 +103,14 @@ pub fn validate_data_type(dt: &DataType) -> Result<(),
String> {
_ => Err(format!("unsupported Timestamp unit: {:?}", unit)),
},
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) =>
Ok(()),
+ DataType::List(field) => {
+ if let DataType::Struct(fields) = field.data_type() {
+ if is_timestamp_nanos_struct(fields) {
+ return Err("ARRAY<legacy timestamp nanos struct> is not
supported".to_string());
+ }
+ }
+ validate_data_type(field.data_type())
+ }
_ => Err(format!("unsupported DataType: {:?}", dt)),
}
}
@@ -124,6 +132,7 @@ pub fn data_type_to_type_byte(dt: &DataType) -> u8 {
DataType::Timestamp(_, None) => 16,
DataType::Timestamp(_, Some(_)) => 17,
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 16,
+ DataType::List(_) => 18,
_ => panic!("unsupported DataType for serialization: {:?}", dt),
}
}
@@ -177,6 +186,12 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => {
varint::encode(buf, 9u32);
}
+ DataType::List(element_field) => {
+ let name_bytes = element_field.name().as_bytes();
+ varint::encode(buf, name_bytes.len() as u32);
+ buf.extend_from_slice(name_bytes);
+ serialize_field(element_field, buf);
+ }
_ => {}
}
}
@@ -258,6 +273,26 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut
usize) -> Result<Fiel
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz))
}
}
+ 18 => {
+ let name_len = varint::decode(buf, pos)? as usize;
+ if *pos + name_len > buf.len() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "type: not enough bytes for ARRAY element field name",
+ ));
+ }
+ let element_name = std::str::from_utf8(&buf[*pos..*pos + name_len])
+ .map_err(|_| {
+ std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "type: invalid UTF-8 in ARRAY element field name",
+ )
+ })?
+ .to_string();
+ *pos += name_len;
+ let element_field = deserialize_field(&element_name, buf, pos)?;
+ DataType::List(std::sync::Arc::new(element_field))
+ }
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
diff --git a/core/src/writer.rs b/core/src/writer.rs
index cea1582..59bd2a9 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -462,8 +462,20 @@ impl<S: OutputFile> MosaicWriter<S> {
column_slots.push(slot);
}
+ // Write child element counts header only when ARRAY columns exist
+ let child_header_len = if paged.children.is_empty() {
+ 0
+ } else {
+ let num_children = paged.children.len() as u16;
+ self.out.write(&num_children.to_le_bytes())?;
+ for child in &paged.children {
+ self.out.write(&(child.num_elements as u32).to_le_bytes())?;
+ }
+ 2 + paged.children.len() * 4
+ };
+
// Write fixed-length directory: num_columns * 4 bytes (u32 LE per
column = slot size)
- let dir_size = num_columns * 4;
+ let dir_size = child_header_len + num_columns * 4;
let mut total_size = dir_size;
for slot in &column_slots {
let slot_size = to_u32(slot.len(), "paged slot size")?;
diff --git a/core/tests/array_type_test.rs b/core/tests/array_type_test.rs
new file mode 100644
index 0000000..ad899e2
--- /dev/null
+++ b/core/tests/array_type_test.rs
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(
+ clippy::cloned_ref_to_slice_refs,
+ clippy::unnecessary_cast,
+ clippy::field_reassign_with_default
+)]
+
+use std::io;
+use std::sync::Arc;
+
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer,
ScalarBuffer};
+
+use arrow_array::builder::*;
+use arrow_array::*;
+use arrow_schema::{DataType, Field, Schema};
+use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+struct MemOutputFile {
+ pub buf: Vec<u8>,
+}
+
+impl MemOutputFile {
+ fn new() -> Self {
+ Self { buf: Vec::new() }
+ }
+}
+
+impl OutputFile for MemOutputFile {
+ fn write(&mut self, data: &[u8]) -> io::Result<()> {
+ self.buf.extend_from_slice(data);
+ Ok(())
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+ fn pos(&self) -> u64 {
+ self.buf.len() as u64
+ }
+}
+
+struct ByteArrayInputFile {
+ data: Vec<u8>,
+}
+
+impl InputFile for ByteArrayInputFile {
+ fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+ let start = offset as usize;
+ let end = start + buf.len();
+ if end > self.data.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "read past end",
+ ));
+ }
+ buf.copy_from_slice(&self.data[start..end]);
+ Ok(())
+ }
+}
+
+fn roundtrip(schema: &Schema, batches: &[RecordBatch]) -> Vec<RecordBatch> {
+ roundtrip_with_options(schema, batches, WriterOptions::default())
+}
+
+fn roundtrip_with_options(
+ schema: &Schema,
+ batches: &[RecordBatch],
+ options: WriterOptions,
+) -> Vec<RecordBatch> {
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(out, schema, options).unwrap();
+ for batch in batches {
+ writer.write_batch(batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ let data = writer.output().buf.clone();
+ let file_len = data.len() as u64;
+ let input = ByteArrayInputFile { data };
+ let reader = MosaicReader::new(input, file_len).unwrap();
+
+ let mut result = Vec::new();
+ for rg in 0..reader.num_row_groups() {
+ let mut rg_reader = reader.row_group_reader(rg).unwrap();
+ result.push(rg_reader.read_columns().unwrap());
+ }
+ result
+}
+
+#[test]
+fn test_array_int32_basic() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ builder.values().append_value(1);
+ builder.values().append_value(2);
+ builder.values().append_value(3);
+ builder.append(true);
+
+ builder.values().append_value(4);
+ builder.values().append_value(5);
+ builder.append(true);
+
+ builder.append(true); // empty array
+
+ builder.append(false); // null
+
+ let array = builder.finish();
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)]).unwrap();
+
+ let result = roundtrip(&schema, &[batch.clone()]);
+ assert_eq!(result.len(), 1);
+ let result_batch = &result[0];
+ assert_eq!(result_batch.num_rows(), 4);
+
+ let result_col = result_batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ assert!(!result_col.is_null(0));
+ assert!(!result_col.is_null(1));
+ assert!(!result_col.is_null(2));
+ assert!(result_col.is_null(3));
+
+ let row0 = result_col.value(0);
+ let row0_ints = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row0_ints.len(), 3);
+ assert_eq!(row0_ints.value(0), 1);
+ assert_eq!(row0_ints.value(1), 2);
+ assert_eq!(row0_ints.value(2), 3);
+
+ let row1 = result_col.value(1);
+ let row1_ints = row1.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row1_ints.len(), 2);
+ assert_eq!(row1_ints.value(0), 4);
+ assert_eq!(row1_ints.value(1), 5);
+
+ let row2 = result_col.value(2);
+ let row2_ints = row2.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row2_ints.len(), 0);
+}
+
+#[test]
+fn test_array_with_null_elements() {
+ let element_field = Arc::new(Field::new("item", DataType::Int64, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Int64Builder::new());
+ builder.values().append_value(100);
+ builder.values().append_null();
+ builder.values().append_value(300);
+ builder.append(true);
+
+ builder.values().append_null();
+ builder.values().append_null();
+ builder.append(true);
+
+ builder.values().append_value(999);
+ builder.append(true);
+
+ let array = builder.finish();
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ let row0 = result_col.value(0);
+ let row0_arr = row0.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(row0_arr.len(), 3);
+ assert_eq!(row0_arr.value(0), 100);
+ assert!(row0_arr.is_null(1));
+ assert_eq!(row0_arr.value(2), 300);
+
+ let row1 = result_col.value(1);
+ let row1_arr = row1.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(row1_arr.len(), 2);
+ assert!(row1_arr.is_null(0));
+ assert!(row1_arr.is_null(1));
+
+ let row2 = result_col.value(2);
+ let row2_arr = row2.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(row2_arr.len(), 1);
+ assert_eq!(row2_arr.value(0), 999);
+}
+
+#[test]
+fn test_array_string_elements() {
+ let element_field = Arc::new(Field::new("item", DataType::Utf8, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(StringBuilder::new());
+ builder.values().append_value("hello");
+ builder.values().append_value("world");
+ builder.append(true);
+
+ builder.values().append_null();
+ builder.values().append_value("foo");
+ builder.append(true);
+
+ builder.append(true); // empty
+
+ let array = builder.finish();
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ let row0 = result_col.value(0);
+ let row0_arr = row0.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(row0_arr.len(), 2);
+ assert_eq!(row0_arr.value(0), "hello");
+ assert_eq!(row0_arr.value(1), "world");
+
+ let row1 = result_col.value(1);
+ let row1_arr = row1.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(row1_arr.len(), 2);
+ assert!(row1_arr.is_null(0));
+ assert_eq!(row1_arr.value(1), "foo");
+
+ let row2 = result_col.value(2);
+ let row2_arr = row2.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(row2_arr.len(), 0);
+}
+
+#[test]
+fn test_array_nested_array() {
+ let inner_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let outer_field = Arc::new(Field::new(
+ "item",
+ DataType::List(inner_field.clone()),
+ true,
+ ));
+ let schema = Schema::new(vec![Field::new(
+ "nested",
+ DataType::List(outer_field.clone()),
+ true,
+ )]);
+
+ let inner_builder = ListBuilder::new(Int32Builder::new());
+ let mut outer_builder = ListBuilder::new(inner_builder);
+
+ // Row 0: [[1, 2], [3]]
+ outer_builder.values().values().append_value(1);
+ outer_builder.values().values().append_value(2);
+ outer_builder.values().append(true);
+ outer_builder.values().values().append_value(3);
+ outer_builder.values().append(true);
+ outer_builder.append(true);
+
+ // Row 1: [[4]]
+ outer_builder.values().values().append_value(4);
+ outer_builder.values().append(true);
+ outer_builder.append(true);
+
+ // Row 2: null
+ outer_builder.append(false);
+
+ let array = outer_builder.finish();
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ assert!(!result_col.is_null(0));
+ assert!(!result_col.is_null(1));
+ assert!(result_col.is_null(2));
+
+ let row0 = result_col.value(0);
+ let row0_outer = row0.as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(row0_outer.len(), 2);
+
+ let inner0 = row0_outer.value(0);
+ let inner0_arr = inner0.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(inner0_arr.len(), 2);
+ assert_eq!(inner0_arr.value(0), 1);
+ assert_eq!(inner0_arr.value(1), 2);
+
+ let inner1 = row0_outer.value(1);
+ let inner1_arr = inner1.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(inner1_arr.len(), 1);
+ assert_eq!(inner1_arr.value(0), 3);
+
+ let row1 = result_col.value(1);
+ let row1_outer = row1.as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(row1_outer.len(), 1);
+ let inner2 = row1_outer.value(0);
+ let inner2_arr = inner2.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(inner2_arr.value(0), 4);
+}
+
+#[test]
+fn test_array_all_null() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ builder.append(false);
+ builder.append(false);
+ builder.append(false);
+ let array = builder.finish();
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(result_col.len(), 3);
+ assert!(result_col.is_null(0));
+ assert!(result_col.is_null(1));
+ assert!(result_col.is_null(2));
+}
+
+#[test]
+fn test_array_with_other_columns() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("tags", DataType::List(element_field.clone()), true),
+ Field::new("name", DataType::Utf8, true),
+ ]);
+
+ let ids = Int64Array::from(vec![1, 2, 3]);
+
+ let mut list_builder = ListBuilder::new(Int32Builder::new());
+ list_builder.values().append_value(10);
+ list_builder.values().append_value(20);
+ list_builder.append(true);
+ list_builder.append(false); // null
+ list_builder.values().append_value(30);
+ list_builder.append(true);
+ let tags = list_builder.finish();
+
+ let names = StringArray::from(vec![Some("alice"), None, Some("charlie")]);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ids), Arc::new(tags), Arc::new(names)],
+ )
+ .unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let rb = &result[0];
+ assert_eq!(rb.num_rows(), 3);
+
+ let result_ids =
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(result_ids.value(0), 1);
+ assert_eq!(result_ids.value(1), 2);
+ assert_eq!(result_ids.value(2), 3);
+
+ let result_tags =
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+ assert!(!result_tags.is_null(0));
+ assert!(result_tags.is_null(1));
+ assert!(!result_tags.is_null(2));
+
+ let row0 = result_tags.value(0);
+ let row0_arr = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row0_arr.len(), 2);
+ assert_eq!(row0_arr.value(0), 10);
+ assert_eq!(row0_arr.value(1), 20);
+
+ let result_names =
rb.column(2).as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(result_names.value(0), "alice");
+ assert!(result_names.is_null(1));
+ assert_eq!(result_names.value(2), "charlie");
+}
+
+#[test]
+fn test_array_large_batch() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ for i in 0..1000 {
+ if i % 10 == 0 {
+ builder.append(false); // null every 10th row
+ } else {
+ let num_elements = (i % 5) + 1;
+ for j in 0..num_elements {
+ if j == 2 && i % 3 == 0 {
+ builder.values().append_null();
+ } else {
+ builder.values().append_value((i * 10 + j) as i32);
+ }
+ }
+ builder.append(true);
+ }
+ }
+ let array = builder.finish();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array.clone())]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ assert_eq!(result_col.len(), 1000);
+
+ for i in 0..1000 {
+ if i % 10 == 0 {
+ assert!(result_col.is_null(i), "row {} should be null", i);
+ } else {
+ assert!(!result_col.is_null(i), "row {} should not be null", i);
+ let expected = array.value(i);
+ let actual = result_col.value(i);
+ assert_eq!(&expected, &actual, "mismatch at row {}", i);
+ }
+ }
+}
+
+#[test]
+fn test_array_paged_layout() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("arr", DataType::List(element_field.clone()), true),
+ ]);
+
+ let mut list_builder = ListBuilder::new(Int32Builder::new());
+ let mut ids = Vec::new();
+ for i in 0..200 {
+ ids.push(i as i64);
+ if i % 5 == 0 {
+ list_builder.append(false);
+ } else {
+ let n = (i % 4) + 1;
+ for j in 0..n {
+ if j == 1 && i % 3 == 0 {
+ list_builder.values().append_null();
+ } else {
+ list_builder.values().append_value((i * 10 + j) as i32);
+ }
+ }
+ list_builder.append(true);
+ }
+ }
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int64Array::from(ids.clone())),
+ Arc::new(list_builder.finish()),
+ ],
+ )
+ .unwrap();
+
+ let mut opts = WriterOptions::default();
+ opts.page_size_threshold = 1;
+
+ let result = roundtrip_with_options(&schema, &[batch.clone()], opts);
+ let rb = &result[0];
+ assert_eq!(rb.num_rows(), 200);
+
+ let result_ids =
rb.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
+ let result_arr =
rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+
+ for i in 0..200 {
+ assert_eq!(result_ids.value(i), i as i64);
+ if i % 5 == 0 {
+ assert!(result_arr.is_null(i), "row {} should be null", i);
+ } else {
+ assert!(!result_arr.is_null(i), "row {} should not be null", i);
+ }
+ }
+}
+
+#[test]
+fn test_array_null_row_preserves_child_offsets() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ // Manually construct: row 0 = [1, 2], row 1 = null (but owns child slots
99, 100), row 2 = [5]
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 4, 5]));
+ let values = Arc::new(Int32Array::from(vec![1, 2, 99, 100, 5])) as
ArrayRef;
+ let nulls = Some(NullBuffer::new(BooleanBuffer::new(
+ Buffer::from(vec![0b0000_0101]),
+ 0,
+ 3,
+ )));
+ let array = ListArray::new(element_field, offsets, values, nulls);
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array.clone())]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+
+ assert_eq!(result_col.len(), 3);
+ assert!(!result_col.is_null(0));
+ assert!(result_col.is_null(1));
+ assert!(!result_col.is_null(2));
+
+ let row0 = result_col.value(0);
+ let row0_arr = row0.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row0_arr.len(), 2);
+ assert_eq!(row0_arr.value(0), 1);
+ assert_eq!(row0_arr.value(1), 2);
+
+ let row2 = result_col.value(2);
+ let row2_arr = row2.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(row2_arr.len(), 1);
+ assert_eq!(row2_arr.value(0), 5);
+}
+
+#[test]
+fn test_project_array_from_paged_bucket() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("arr", DataType::List(element_field.clone()), true),
+ ]);
+
+ let ids = Int64Array::from(vec![1, 2, 3]);
+ let mut list_builder = ListBuilder::new(Int32Builder::new());
+ list_builder.values().append_value(10);
+ list_builder.values().append_value(20);
+ list_builder.append(true);
+ list_builder.values().append_value(30);
+ list_builder.append(true);
+ list_builder.values().append_value(40);
+ list_builder.append(true);
+ let arr = list_builder.finish();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ids),
Arc::new(arr)]).unwrap();
+
+ let out = MemOutputFile::new();
+ let mut options = WriterOptions::default();
+ options.num_buckets = 1;
+ options.page_size_threshold = 1;
+ let mut writer = MosaicWriter::new(out, &schema, options).unwrap();
+ writer.write_batch(&batch).unwrap();
+ writer.close().unwrap();
+
+ let data = writer.output().buf.clone();
+ let input = ByteArrayInputFile { data: data.clone() };
+ let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+
+ // Project only the "arr" column
+ let sorted_arr_idx = reader
+ .schema()
+ .columns
+ .iter()
+ .position(|c| c.name == "arr")
+ .unwrap();
+ let mut rg_reader = reader
+ .row_group_reader_projected(0, &[sorted_arr_idx])
+ .unwrap();
+ let projected = rg_reader.read_columns().unwrap();
+ assert_eq!(projected.num_columns(), 1);
+
+ let result_arr = projected
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(result_arr.len(), 3);
+
+ let r0 = result_arr.value(0);
+ let r0a = r0.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(r0a.len(), 2);
+ assert_eq!(r0a.value(0), 10);
+ assert_eq!(r0a.value(1), 20);
+}
+
+#[test]
+fn test_array_child_dict_encoding() {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Int32Builder::new());
+ for _ in 0..10 {
+ for j in 0..20 {
+ builder.values().append_value((j % 3) as i32);
+ }
+ builder.append(true);
+ }
+ let array = builder.finish();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array.clone())]).unwrap();
+
+ let result = roundtrip(&schema, &[batch]);
+ let result_col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(result_col.len(), 10);
+ for i in 0..10 {
+ let expected = array.value(i);
+ let actual = result_col.value(i);
+ assert_eq!(&expected, &actual, "mismatch at row {}", i);
+ }
+}
+
+#[test]
+fn test_multiple_array_columns_in_bucket() {
+ let elem_i32 = Arc::new(Field::new("item", DataType::Int32, true));
+ let elem_i64 = Arc::new(Field::new("item", DataType::Int64, true));
+ let schema = Schema::new(vec![
+ Field::new("arr_a", DataType::List(elem_i32.clone()), true),
+ Field::new("arr_b", DataType::List(elem_i64.clone()), true),
+ ]);
+
+ let mut builder_a = ListBuilder::new(Int32Builder::new());
+ builder_a.values().append_value(1);
+ builder_a.values().append_value(2);
+ builder_a.append(true);
+ builder_a.append(false); // null
+ builder_a.values().append_value(3);
+ builder_a.append(true);
+
+ let mut builder_b = ListBuilder::new(Int64Builder::new());
+ builder_b.values().append_value(100);
+ builder_b.append(true);
+ builder_b.values().append_value(200);
+ builder_b.values().append_value(300);
+ builder_b.append(true);
+ builder_b.append(true); // empty
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(builder_a.finish()), Arc::new(builder_b.finish())],
+ )
+ .unwrap();
+
+ let mut opts = WriterOptions::default();
+ opts.num_buckets = 1;
+ let result = roundtrip_with_options(&schema, &[batch], opts);
+ let rb = &result[0];
+
+ let col_a = rb.column(0).as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(col_a.len(), 3);
+ assert!(!col_a.is_null(0));
+ assert!(col_a.is_null(1));
+ assert!(!col_a.is_null(2));
+ let a0 = col_a
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(a0.len(), 2);
+ assert_eq!(a0.value(0), 1);
+ assert_eq!(a0.value(1), 2);
+ let a2 = col_a
+ .value(2)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(a2.value(0), 3);
+
+ let col_b = rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(col_b.len(), 3);
+ let b0 = col_b
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(b0.value(0), 100);
+ let b1 = col_b
+ .value(1)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(b1.len(), 2);
+ assert_eq!(b1.value(0), 200);
+ assert_eq!(b1.value(1), 300);
+ let b2 = col_b
+ .value(2)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(b2.len(), 0);
+}
+
+#[test]
+fn test_array_date32_elements() {
+ let element_field = Arc::new(Field::new("item", DataType::Date32, true));
+ let schema = Schema::new(vec![Field::new(
+ "arr",
+ DataType::List(element_field.clone()),
+ true,
+ )]);
+
+ let mut builder = ListBuilder::new(Date32Builder::new());
+ builder.values().append_value(18000);
+ builder.values().append_value(19000);
+ builder.append(true);
+ builder.append(false); // null row with potential child slots
+ builder.values().append_value(20000);
+ builder.append(true);
+
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(builder.finish())]).unwrap();
+ let result = roundtrip(&schema, &[batch]);
+ let col = result[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(col.len(), 3);
+ assert!(col.is_null(1));
+ let r0 = col
+ .value(0)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(r0.value(0), 18000);
+ assert_eq!(r0.value(1), 19000);
+ let r2 = col
+ .value(2)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(r2.value(0), 20000);
+}
+
+#[test]
+fn test_project_one_array_from_multi_array_paged() {
+ let elem_i32 = Arc::new(Field::new("item", DataType::Int32, true));
+ let elem_i64 = Arc::new(Field::new("item", DataType::Int64, true));
+ let schema = Schema::new(vec![
+ Field::new("arr_a", DataType::List(elem_i32.clone()), true),
+ Field::new("arr_b", DataType::List(elem_i64.clone()), true),
+ ]);
+
+ let mut builder_a = ListBuilder::new(Int32Builder::new());
+ builder_a.values().append_value(1);
+ builder_a.values().append_value(2);
+ builder_a.append(true);
+ builder_a.values().append_value(3);
+ builder_a.append(true);
+
+ let mut builder_b = ListBuilder::new(Int64Builder::new());
+ builder_b.values().append_value(100);
+ builder_b.append(true);
+ builder_b.values().append_value(200);
+ builder_b.values().append_value(300);
+ builder_b.append(true);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(builder_a.finish()), Arc::new(builder_b.finish())],
+ )
+ .unwrap();
+
+ let out = MemOutputFile::new();
+ let mut options = WriterOptions::default();
+ options.num_buckets = 1;
+ options.page_size_threshold = 1;
+ let mut writer = MosaicWriter::new(out, &schema, options).unwrap();
+ writer.write_batch(&batch).unwrap();
+ writer.close().unwrap();
+
+ let data = writer.output().buf.clone();
+ let input = ByteArrayInputFile { data: data.clone() };
+ let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+
+ // Project only arr_a
+ let arr_a_idx = reader
+ .schema()
+ .columns
+ .iter()
+ .position(|c| c.name == "arr_a")
+ .unwrap();
+ let mut rg = reader.row_group_reader_projected(0, &[arr_a_idx]).unwrap();
+ let projected = rg.read_columns().unwrap();
+ assert_eq!(projected.num_columns(), 1);
+ let col = projected
+ .column(0)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(col.len(), 2);
+ let r0 = col
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(r0.len(), 2);
+ assert_eq!(r0.value(0), 1);
+ assert_eq!(r0.value(1), 2);
+}
diff --git a/core/tests/gen_fixtures.rs b/core/tests/gen_fixtures.rs
new file mode 100644
index 0000000..2cd2df3
--- /dev/null
+++ b/core/tests/gen_fixtures.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary compatibility tests. Verifies that the current code produces
+//! byte-identical output to committed golden files, catching unintended
+//! format changes.
+
+use std::io;
+use std::sync::Arc;
+
+use arrow_array::builder::*;
+use arrow_array::*;
+use arrow_schema::{DataType, Field, Schema};
+use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
+use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+struct MemOutputFile {
+ pub buf: Vec<u8>,
+}
+impl MemOutputFile {
+ fn new() -> Self {
+ Self { buf: Vec::new() }
+ }
+}
+impl OutputFile for MemOutputFile {
+ fn write(&mut self, data: &[u8]) -> io::Result<()> {
+ self.buf.extend_from_slice(data);
+ Ok(())
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+ fn pos(&self) -> u64 {
+ self.buf.len() as u64
+ }
+}
+
+struct ByteArrayInputFile {
+ data: Vec<u8>,
+}
+impl InputFile for ByteArrayInputFile {
+ fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+ let s = offset as usize;
+ buf.copy_from_slice(&self.data[s..s + buf.len()]);
+ Ok(())
+ }
+}
+
+fn golden_path(name: &str) -> std::path::PathBuf {
+ std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+ .join("tests/testdata")
+ .join(name)
+}
+
+/// Generate the deterministic non-ARRAY file.
+/// Schema: id(INT32 NOT NULL), name(UTF8), score(FLOAT64)
+/// Data: 5 rows with nulls
+/// Options: num_buckets=1, compression=none
+fn gen_no_array() -> Vec<u8> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("score", DataType::Float64, true),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+ Arc::new(StringArray::from(vec![
+ Some("alice"),
+ None,
+ Some("charlie"),
+ Some("dave"),
+ Some("eve"),
+ ])),
+ Arc::new(Float64Array::from(vec![
+ Some(95.5),
+ Some(87.0),
+ None,
+ Some(72.5),
+ Some(100.0),
+ ])),
+ ],
+ )
+ .unwrap();
+
+ let out = MemOutputFile::new();
+ let opts = WriterOptions {
+ num_buckets: 1,
+ compression: 0,
+ ..WriterOptions::default()
+ };
+ let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+ writer.write_batch(&batch).unwrap();
+ writer.close().unwrap();
+ writer.output().buf.clone()
+}
+
+/// Generate the deterministic ARRAY file.
+/// Schema: id(INT32 NOT NULL), tags(ARRAY<INT32>)
+/// Data: 4 rows — [10,20,30], null, [40,50], []
+/// Options: num_buckets=1, compression=none
+fn gen_with_array() -> Vec<u8> {
+ let element_field = Arc::new(Field::new("item", DataType::Int32, true));
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("tags", DataType::List(element_field.clone()), true),
+ ]);
+
+ let ids = Int32Array::from(vec![1, 2, 3, 4]);
+ let mut list_builder = ListBuilder::new(Int32Builder::new());
+ list_builder.values().append_value(10);
+ list_builder.values().append_value(20);
+ list_builder.values().append_value(30);
+ list_builder.append(true);
+ list_builder.append(false);
+ list_builder.values().append_value(40);
+ list_builder.values().append_value(50);
+ list_builder.append(true);
+ list_builder.append(true);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(ids), Arc::new(list_builder.finish())],
+ )
+ .unwrap();
+
+ let out = MemOutputFile::new();
+ let opts = WriterOptions {
+ num_buckets: 1,
+ compression: 0,
+ ..WriterOptions::default()
+ };
+ let mut writer = MosaicWriter::new(out, &schema, opts).unwrap();
+ writer.write_batch(&batch).unwrap();
+ writer.close().unwrap();
+ writer.output().buf.clone()
+}
+
+#[test]
+fn test_v1_no_array_binary_compatible() {
+ let generated = gen_no_array();
+ let golden = std::fs::read(golden_path("v1_no_array.mosaic"))
+ .expect("golden file missing — run with MOSAIC_REGEN_FIXTURES=1 to
regenerate");
+ assert_eq!(
+ generated, golden,
+ "non-ARRAY file differs from golden — format may have changed
unintentionally"
+ );
+}
+
+#[test]
+fn test_v1_with_array_binary_stable() {
+ let generated = gen_with_array();
+ let golden = std::fs::read(golden_path("v1_with_array.mosaic"))
+ .expect("golden file missing — run with MOSAIC_REGEN_FIXTURES=1 to
regenerate");
+ assert_eq!(
+ generated, golden,
+ "ARRAY file differs from golden — format may have changed
unintentionally"
+ );
+}
+
+#[test]
+fn test_v1_no_array_golden_readable() {
+ let data = std::fs::read(golden_path("v1_no_array.mosaic")).unwrap();
+ let input = ByteArrayInputFile { data: data.clone() };
+ let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let rb = rg.read_columns().unwrap();
+ assert_eq!(rb.num_rows(), 5);
+ assert_eq!(rb.num_columns(), 3);
+}
+
+#[test]
+fn test_v1_with_array_golden_readable() {
+ let data = std::fs::read(golden_path("v1_with_array.mosaic")).unwrap();
+ let input = ByteArrayInputFile { data: data.clone() };
+ let reader = MosaicReader::new(input, data.len() as u64).unwrap();
+ let mut rg = reader.row_group_reader(0).unwrap();
+ let rb = rg.read_columns().unwrap();
+ assert_eq!(rb.num_rows(), 4);
+
+ let tags = rb.column(1).as_any().downcast_ref::<ListArray>().unwrap();
+ assert!(!tags.is_null(0));
+ assert!(tags.is_null(1));
+ assert!(!tags.is_null(2));
+ assert!(!tags.is_null(3));
+
+ let r0 = tags
+ .value(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(r0.len(), 3);
+ assert_eq!(r0.value(0), 10);
+
+ let r3 = tags
+ .value(3)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!(r3.len(), 0);
+}
diff --git a/core/tests/testdata/v1_no_array.mosaic
b/core/tests/testdata/v1_no_array.mosaic
new file mode 100644
index 0000000..3cf94c2
Binary files /dev/null and b/core/tests/testdata/v1_no_array.mosaic differ
diff --git a/core/tests/testdata/v1_with_array.mosaic
b/core/tests/testdata/v1_with_array.mosaic
new file mode 100644
index 0000000..19dbcfb
Binary files /dev/null and b/core/tests/testdata/v1_with_array.mosaic differ
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 1d08012..4f85291 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -824,6 +824,187 @@ static void test_stats_empty_string_min() {
printf(" PASS test_stats_empty_string_min\n");
}
+static void test_array_type() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("tags", arrow::list(arrow::field("item",
arrow::int32()))),
+ });
+
+ arrow::Int32Builder id_b;
+ assert(id_b.Append(1).ok());
+ assert(id_b.Append(2).ok());
+ assert(id_b.Append(3).ok());
+ assert(id_b.Append(4).ok());
+
+ auto val_builder = std::make_shared<arrow::Int32Builder>();
+ arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+ // Row 0: [10, 20, 30]
+ assert(list_b.Append().ok());
+ assert(val_builder->Append(10).ok());
+ assert(val_builder->Append(20).ok());
+ assert(val_builder->Append(30).ok());
+
+ // Row 1: [40, 50]
+ assert(list_b.Append().ok());
+ assert(val_builder->Append(40).ok());
+ assert(val_builder->Append(50).ok());
+
+ // Row 2: []
+ assert(list_b.Append().ok());
+
+ // Row 3: null
+ assert(list_b.AppendNull().ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 4, {
+ id_b.Finish().ValueUnsafe(),
+ list_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+ ASSERT_EQ(rb->num_rows(), 4);
+
+ auto ids =
std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("id"));
+ ASSERT_EQ(ids->Value(0), 1);
+ ASSERT_EQ(ids->Value(1), 2);
+ ASSERT_EQ(ids->Value(2), 3);
+ ASSERT_EQ(ids->Value(3), 4);
+
+ auto tags =
std::static_pointer_cast<arrow::ListArray>(rb->GetColumnByName("tags"));
+ ASSERT_TRUE(!tags->IsNull(0));
+ ASSERT_TRUE(!tags->IsNull(1));
+ ASSERT_TRUE(!tags->IsNull(2));
+ ASSERT_TRUE(tags->IsNull(3));
+
+ // Row 0: [10, 20, 30]
+ auto row0 =
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(0));
+ ASSERT_EQ(row0->length(), 3);
+ ASSERT_EQ(row0->Value(0), 10);
+ ASSERT_EQ(row0->Value(1), 20);
+ ASSERT_EQ(row0->Value(2), 30);
+
+ // Row 1: [40, 50]
+ auto row1 =
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(1));
+ ASSERT_EQ(row1->length(), 2);
+ ASSERT_EQ(row1->Value(0), 40);
+ ASSERT_EQ(row1->Value(1), 50);
+
+ // Row 2: []
+ auto row2 =
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(2));
+ ASSERT_EQ(row2->length(), 0);
+
+ printf(" PASS test_array_type\n");
+}
+
+static void test_array_with_null_elements() {
+ auto schema = arrow::schema({
+ arrow::field("arr", arrow::list(arrow::field("item", arrow::int64()))),
+ });
+
+ auto val_builder = std::make_shared<arrow::Int64Builder>();
+ arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+ // [100, null, 300]
+ assert(list_b.Append().ok());
+ assert(val_builder->Append(100).ok());
+ assert(val_builder->AppendNull().ok());
+ assert(val_builder->Append(300).ok());
+
+ // [null, null]
+ assert(list_b.Append().ok());
+ assert(val_builder->AppendNull().ok());
+ assert(val_builder->AppendNull().ok());
+
+ // [999]
+ assert(list_b.Append().ok());
+ assert(val_builder->Append(999).ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ list_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+
+ auto arr = std::static_pointer_cast<arrow::ListArray>(rb->column(0));
+
+ auto row0 =
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(0));
+ ASSERT_EQ(row0->length(), 3);
+ ASSERT_EQ(row0->Value(0), 100);
+ ASSERT_TRUE(row0->IsNull(1));
+ ASSERT_EQ(row0->Value(2), 300);
+
+ auto row1 =
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(1));
+ ASSERT_EQ(row1->length(), 2);
+ ASSERT_TRUE(row1->IsNull(0));
+ ASSERT_TRUE(row1->IsNull(1));
+
+ auto row2 =
std::static_pointer_cast<arrow::Int64Array>(arr->value_slice(2));
+ ASSERT_EQ(row2->length(), 1);
+ ASSERT_EQ(row2->Value(0), 999);
+
+ printf(" PASS test_array_with_null_elements\n");
+}
+
+static void test_array_string_elements() {
+ auto schema = arrow::schema({
+ arrow::field("arr", arrow::list(arrow::field("item", arrow::utf8()))),
+ });
+
+ auto val_builder = std::make_shared<arrow::StringBuilder>();
+ arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+ // ["hello", "world"]
+ assert(list_b.Append().ok());
+ assert(val_builder->Append("hello").ok());
+ assert(val_builder->Append("world").ok());
+
+ // [null, "foo"]
+ assert(list_b.Append().ok());
+ assert(val_builder->AppendNull().ok());
+ assert(val_builder->Append("foo").ok());
+
+ // []
+ assert(list_b.Append().ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ list_b.Finish().ValueUnsafe(),
+ });
+
+ auto data_vec = write_and_get(schema, batch);
+
+ MemBuffer buf;
+ buf.data = data_vec;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto rb = read_row_group(reader, 0);
+
+ auto arr = std::static_pointer_cast<arrow::ListArray>(rb->column(0));
+
+ auto row0 =
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(0));
+ ASSERT_EQ(row0->length(), 2);
+ ASSERT_EQ(row0->GetString(0), "hello");
+ ASSERT_EQ(row0->GetString(1), "world");
+
+ auto row1 =
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(1));
+ ASSERT_EQ(row1->length(), 2);
+ ASSERT_TRUE(row1->IsNull(0));
+ ASSERT_EQ(row1->GetString(1), "foo");
+
+ auto row2 =
std::static_pointer_cast<arrow::StringArray>(arr->value_slice(2));
+ ASSERT_EQ(row2->length(), 0);
+
+ printf(" PASS test_array_string_elements\n");
+}
+
int main() {
printf("Running Mosaic C++ tests...\n");
test_basic_roundtrip();
@@ -841,6 +1022,9 @@ int main() {
test_writer_stats_all_null();
test_writer_stats_matches_reader();
test_stats_empty_string_min();
- printf("All %d tests passed.\n", 15);
+ test_array_type();
+ test_array_with_null_elements();
+ test_array_string_elements();
+ printf("All %d tests passed.\n", 18);
return 0;
}
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index 9b3fa4c..0d08ebb 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -146,6 +146,44 @@ g++ -std=c++17 -I include/ example.cpp \
<span class="kw">return</span> <span class="num">0</span>;
}</code></pre>
+ <h3>Writing ARRAY Columns</h3>
+ <p>
+ ARRAY types use Arrow’s <code>ListBuilder</code>.
Element values are
+ flattened across all rows and benefit from dictionary encoding:
+ </p>
+<pre><code><span class="kw">auto</span> schema = arrow::schema({
+ arrow::field(<span class="str">"id"</span>, arrow::int32()),
+ arrow::field(<span class="str">"tags"</span>,
arrow::list(arrow::field(<span class="str">"item"</span>, arrow::int32()))),
+});
+
+arrow::Int32Builder id_b;
+<span class="kw">auto</span> val_builder =
std::make_shared<arrow::Int32Builder>();
+arrow::ListBuilder list_b(arrow::default_memory_pool(), val_builder);
+
+<span class="cmt">// Row 0: [10, 20]</span>
+id_b.Append(<span class="num">1</span>);
+list_b.Append();
+val_builder->Append(<span class="num">10</span>);
+val_builder->Append(<span class="num">20</span>);
+
+<span class="cmt">// Row 1: null</span>
+id_b.Append(<span class="num">2</span>);
+list_b.AppendNull();
+
+<span class="cmt">// Row 2: [] (empty)</span>
+id_b.Append(<span class="num">3</span>);
+list_b.Append();
+
+<span class="kw">auto</span> batch = arrow::RecordBatch::Make(schema, <span
class="num">3</span>, {
+ id_b.Finish().ValueOrDie(),
+ list_b.Finish().ValueOrDie(),
+});
+
+<span class="cmt">// Reading back</span>
+<span class="kw">auto</span> tags =
std::static_pointer_cast<arrow::ListArray>(rb->GetColumnByName(<span
class="str">"tags"</span>));
+<span class="kw">auto</span> row0 =
std::static_pointer_cast<arrow::Int32Array>(tags->value_slice(<span
class="num">0</span>));
+<span class="cmt">// row0: [10, 20]</span></code></pre>
+
<h2>C++ API Reference</h2>
<h3>Writer Options</h3>
diff --git a/docs/design.html b/docs/design.html
index 84c5e4f..19ffd81 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -739,9 +739,10 @@ repeated numColumns times:
<tr><td>15</td><td>TIME</td><td>varint precision</td></tr>
<tr><td>16</td><td>TIMESTAMP</td><td>varint
precision</td></tr>
<tr><td>17</td><td>TIMESTAMP_LTZ</td><td>varint precision,
varint timezoneLength, bytes timezone</td></tr>
+ <tr><td>18</td><td>ARRAY</td><td>varint nameLength, bytes
name (element field name), TypeDescriptor (recursive element type)</td></tr>
</tbody>
</table>
- <p>Complex types (ARRAY, MAP, ROW, etc.), VARIANT, and BLOB are
not supported.</p>
+ <p>MAP, ROW, VARIANT, and BLOB are not yet supported.</p>
<!-- ============================================================
-->
<h2>Value Serialization</h2>
@@ -765,6 +766,7 @@ repeated numColumns times:
<tr><td>TIMESTAMP (precision > 6)</td><td>8 bytes
(epoch millis) + 4 bytes (nanos of millis)</td></tr>
<tr><td>CHAR / VARCHAR / STRING</td><td>varint length +
UTF-8 bytes</td></tr>
<tr><td>BINARY / VARBINARY / BYTES</td><td>varint length +
raw bytes</td></tr>
+ <tr><td>ARRAY</td><td>Flattened columnar: lengths (INT32)
+ values column (see ARRAY Type Storage below)</td></tr>
</tbody>
</table>
<p>
@@ -777,6 +779,159 @@ repeated numColumns times:
those values, and legacy Struct timestamp writes reject them
before writing.
</p>
+ <!-- ============================================================
-->
+ <h2>ARRAY Type Storage</h2>
+ <p>
+ ARRAY columns use a <strong>flattened columnar</strong>
storage layout.
+ Each ARRAY column is decomposed into physical columns within
the same bucket —
+ a lengths column and a values column — both first-class
columns that benefit from
+ standard column encoding (DICT, CONST, PLAIN).
+ </p>
+
+ <h3>Decomposition</h3>
+ <p>An <code>ARRAY<T></code> column with N rows is stored
as:</p>
+ <ol>
+ <li><strong>Lengths column</strong> (INT32, N entries): the
number of elements in each array.
+ Null arrays are represented by a null in the lengths
column (contributing 0 elements).
+ Empty arrays (<code>[]</code>) have length 0 and are
non-null.</li>
+ <li><strong>Values column</strong> (type T, M entries): all
elements from all rows flattened
+ into a single contiguous column, where M = sum of all
non-null lengths.
+ Element-level nulls are tracked by this column’s own
null bitmap.</li>
+ </ol>
+ <p>
+ Both columns independently go through the standard encoding
selection (DICT, CONST, PLAIN, ALL_NULL),
+ enabling dictionary compression of element values across all
arrays in the column.
+ </p>
+
+ <h4>Example</h4>
+<pre><code>Input rows: [1, 2, 3], null, [1, 2], []
+
+Lengths column (INT32): 3, null, 2, 0 ← standard INT32 encoding
(DICT/CONST/PLAIN)
+Values column (INT32): 1, 2, 3, 1, 2 ← standard INT32 encoding
(DICT: {1→0, 2→1, 3→2})</code></pre>
+
+ <h4>Nested Arrays</h4>
+ <p>
+ <code>ARRAY<ARRAY<INT>></code> is stored
recursively. The outer values column
+ is itself an <code>ARRAY<INT></code>, which decomposes
into its own lengths + values pair.
+ All leaf INT values across all nesting levels end up in a
single INT32 column,
+ sharing one dictionary across the entire column.
+ </p>
+<pre><code>ARRAY<ARRAY<INT>> with rows: [[1,2], [3]], [[1,2]]
+
+Outer lengths (INT32): 2, 1 ← 2 inner arrays, 1
inner array
+Inner lengths (INT32): 2, 1, 2 ← element counts of
inner arrays
+Leaf values (INT32): 1, 2, 3, 1, 2 ← all INTs in one
column, shared DICT</code></pre>
+
+ <h3>Bucket Internal Format</h3>
+ <p>
+ ARRAY columns are expanded into <strong>physical
columns</strong> within the same bucket.
+ An <code>ARRAY<T></code> becomes two physical columns: a
lengths column (INT32)
+ and a values column (type T). Both are first-class columns in
the bucket —
+ they share the same encoding flags, null bitmaps, and column
data sections
+ as all other columns. There is no sub-bucket or nested
container.
+ </p>
+
+ <h4>Monolithic Bucket with ARRAY Columns</h4>
+<pre><code>varint numPrimary (number of logical/primary columns
= N)
+varint numChildren (number of child value columns = C; 0 if no
ARRAY)
+repeated C times:
+ varint childElementCount (total element count for each child column)
+
+[encoding flags: 2 bits × (N + C) columns]
+[has-nulls flags: 1 bit × (N + C) columns]
+[CONST metadata for all N + C columns]
+[DICT metadata for all N + C columns]
+[null bitmaps: primary columns use ceil(numRows/8), child columns use
ceil(childElementCount/8)]
+[column data for all N + C columns]</code></pre>
+
+ <h4>Paged Bucket with ARRAY Columns</h4>
+ <p>
+ The page directory includes entries for all N + C physical
columns.
+ Each column (including child value columns) has its own
independently compressed slot.
+ A fixed-size child header precedes the directory:
+ </p>
+<pre><code>u16 numChildren (LE)
+repeated numChildren times:
+ u32 childElementCount (LE)
+[directory: (N + C) × u32 LE slot sizes]
+[column slots: each independently compressed, including child
columns]</code></pre>
+
+ <h3>Statistics</h3>
+ <p>ARRAY columns do not support min/max statistics (no meaningful
ordering).</p>
+
+ <h3>Design Rationale: Comparison with Parquet and ORC</h3>
+ <p>
+ There are two mainstream approaches to storing nested ARRAY
types in columnar formats.
+ Mosaic follows the <strong>ORC-style</strong> approach
(lengths + child columns)
+ rather than the <strong>Parquet/Dremel-style</strong> approach
(repetition/definition levels).
+ </p>
+
+ <h4>Mosaic vs ORC</h4>
+ <p>
+ Both Mosaic and ORC decompose <code>ARRAY<T></code> into
a lengths stream and a
+ child column. The key differences:
+ </p>
+ <table>
+ <thead>
+ <tr><th>Aspect</th><th>Mosaic</th><th>ORC</th></tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>Lengths encoding</td>
+ <td>DICT / CONST / PLAIN — all-same-length
arrays use CONST (near zero bytes)</td>
+ <td>Run-Length Encoding (RLE v1/v2)</td>
+ </tr>
+ <tr>
+ <td>Child column encoding</td>
+ <td>DICT / CONST / PLAIN — shared dictionary
across all arrays</td>
+ <td>DICT / DIRECT / RLE — same cross-array
sharing</td>
+ </tr>
+ <tr>
+ <td>Column placement</td>
+ <td>Lengths and values are physical columns within the
same bucket</td>
+ <td>Lengths and values are independent streams within
the same stripe</td>
+ </tr>
+ <tr>
+ <td>Null representation</td>
+ <td>Lengths column null bitmap (shared
infrastructure)</td>
+ <td>Separate PRESENT stream (Boolean RLE)</td>
+ </tr>
+ </tbody>
+ </table>
+ <p>
+ The designs are structurally equivalent. Mosaic’s CONST
encoding for lengths
+ can be more compact than ORC’s RLE when all arrays have
the same length
+ (a common case in feature vectors and fixed-size embeddings).
+ </p>
+
+ <h4>Why Not Parquet’s Repetition/Definition Levels?</h4>
+ <p>
+ Parquet uses the Dremel encoding: regardless of nesting depth,
each ARRAY column
+ produces a single physical column with repetition and
definition level arrays attached
+ to every leaf value. This has the advantage of constant
physical column count
+ (always 1 per ARRAY column), but significant disadvantages:
+ </p>
+ <ul>
+ <li><strong>Implementation complexity</strong>: The shredding
(decomposition) and assembly
+ (reconstruction) algorithms for rep/def levels are
substantially more complex than
+ the lengths + child approach, especially for multi-level
nesting. Correct handling
+ of nulls at different nesting levels requires careful
tracking of definition level
+ thresholds.</li>
+ <li><strong>Overhead for common cases</strong>: Most
real-world schemas use at most
+ 1–2 levels of ARRAY nesting. The rep/def approach
adds per-value overhead
+ (two extra integers per leaf value) that only pays off at
deep nesting levels
+ (≥ 3), which are rare in practice.</li>
+ <li><strong>No encoding benefit</strong>: Rep/def levels
themselves need encoding
+ (typically RLE or bit-packing). The lengths + child
approach achieves equivalent
+ compression by applying standard column encodings to both
the lengths column
+ (which captures the same structural information) and the
values column.</li>
+ </ul>
+ <p>
+ The ORC-style approach was chosen for its simplicity,
debuggability, and
+ natural fit with Mosaic’s bucket architecture where each
column is
+ independently encoded and compressed.
+ </p>
+
<!-- ============================================================
-->
<h2>Varint Encoding</h2>
<p>
@@ -793,7 +948,7 @@ repeated numColumns times:
<!-- ============================================================
-->
<h2>Limitations</h2>
<ol>
- <li>Complex types (ARRAY, MAP, MULTISET, ROW) are not
supported.</li>
+ <li>Complex types MAP, MULTISET, and ROW are not yet
supported. ARRAY is supported with flattened columnar storage.</li>
<li>Mosaic format is designed for wide tables and may not be
efficient for narrow tables with few columns.</li>
</ol>
</div>
diff --git a/docs/java-api.html b/docs/java-api.html
index f654b99..f8f207a 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -117,6 +117,48 @@
Nanosecond timestamps are exposed as standard Arrow
<code>TimeUnit.NANOSECOND</code>
vectors while Mosaic keeps its 12-byte physical encoding
internally.
</p>
+ <p>
+ ARRAY types are defined using Arrow’s
<code>ArrowType.List</code> with a child field
+ for the element type. Nested arrays
(<code>ARRAY<ARRAY<INT>></code>) are supported:
+ </p>
+<pre><code><span class="ty">Field</span> elementField = <span
class="kw">new</span> <span class="ty">Field</span>(<span
class="str">"item"</span>,
+ <span class="ty">FieldType</span>.nullable(<span class="kw">new</span>
<span class="ty">ArrowType.Int</span>(<span class="num">32</span>, <span
class="kw">true</span>)), <span class="kw">null</span>);
+<span class="ty">Field</span> tagsField = <span class="kw">new</span> <span
class="ty">Field</span>(<span class="str">"tags"</span>,
+ <span class="ty">FieldType</span>.nullable(<span
class="ty">ArrowType.List</span>.INSTANCE), <span
class="ty">Arrays</span>.asList(elementField));
+
+<span class="ty">Schema</span> schema = <span class="kw">new</span> <span
class="ty">Schema</span>(<span class="ty">Arrays</span>.asList(
+ <span class="ty">Field</span>.notNullable(<span class="str">"id"</span>,
<span class="kw">new</span> <span class="ty">ArrowType.Int</span>(<span
class="num">32</span>, <span class="kw">true</span>)),
+ tagsField
+));</code></pre>
+ <p>
+ Writing ARRAY values uses Arrow’s
<code>ListVector</code> and <code>UnionListWriter</code>:
+ </p>
+<pre><code><span class="ty">ListVector</span> tags = (<span
class="ty">ListVector</span>) root.getVector(<span class="str">"tags"</span>);
+tags.allocateNew();
+<span class="ty">UnionListWriter</span> listWriter = tags.getWriter();
+
+<span class="cmt">// Row 0: [10, 20, 30]</span>
+listWriter.setPosition(<span class="num">0</span>);
+listWriter.startList();
+listWriter.writeInt(<span class="num">10</span>);
+listWriter.writeInt(<span class="num">20</span>);
+listWriter.writeInt(<span class="num">30</span>);
+listWriter.endList();
+
+<span class="cmt">// Row 1: null</span>
+tags.setNull(<span class="num">1</span>);
+
+<span class="cmt">// Row 2: [] (empty array)</span>
+listWriter.setPosition(<span class="num">2</span>);
+listWriter.startList();
+listWriter.endList();</code></pre>
+ <p>
+ Reading uses <code>ListVector.getObject()</code>:
+ </p>
+<pre><code><span class="ty">ListVector</span> readTags = (<span
class="ty">ListVector</span>) batch.getVector(<span class="str">"tags"</span>);
+<span class="ty">List</span><?> row0 = readTags.getObject(<span
class="num">0</span>); <span class="cmt">// [10, 20, 30]</span>
+<span class="kw">boolean</span> isNull = readTags.isNull(<span
class="num">1</span>); <span class="cmt">// true</span>
+<span class="ty">List</span><?> row2 = readTags.getObject(<span
class="num">2</span>); <span class="cmt">// [] (empty)</span></code></pre>
<h3>2. Create Writer and Write Batches</h3>
<p>
diff --git a/docs/python-api.html b/docs/python-api.html
index 7d50afe..fd849dd 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -96,6 +96,14 @@ pa_schema = pa.schema([
Nanosecond timestamps are exposed as standard Arrow
<code>timestamp("ns")</code>
arrays while Mosaic keeps its 12-byte physical encoding
internally.
</p>
+ <p>
+ ARRAY types use PyArrow’s <code>pa.list_()</code>.
Nested arrays are supported:
+ </p>
+<pre><code>pa_schema = pa.schema([
+ pa.field(<span class="str">"id"</span>, pa.int32()),
+ pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
+ pa.field(<span class="str">"nested"</span>, pa.list_(pa.list_(pa.utf8()))),
+])</code></pre>
<h3>2. Create Writer and Write Batches</h3>
<p>
@@ -131,6 +139,22 @@ buf = io.BytesIO()
data = buf.getvalue()</code></pre>
+ <h4>Writing ARRAY Columns</h4>
+ <p>Pass native Python lists to <code>pa.array()</code> with
<code>pa.list_()</code> type:</p>
+<pre><code>schema = pa.schema([
+ pa.field(<span class="str">"id"</span>, pa.int32()),
+ pa.field(<span class="str">"tags"</span>, pa.list_(pa.int32())),
+])
+
+batch = pa.record_batch([
+ pa.array([<span class="num">1</span>, <span class="num">2</span>, <span
class="num">3</span>], type=pa.int32()),
+ pa.array([[<span class="num">10</span>, <span class="num">20</span>],
<span class="kw">None</span>, []], type=pa.list_(pa.int32())),
+], names=[<span class="str">"id"</span>, <span class="str">"tags"</span>])
+
+<span class="cmt"># Reading back</span>
+rb = reader.read_row_group(<span class="num">0</span>)
+tags = rb.column(<span class="str">"tags"</span>).to_pylist() <span
class="cmt"># [[10, 20], None, []]</span></code></pre>
+
<h3>WriterOptions</h3>
<table>
<thead>
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 5344a2d..4b909db 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -39,10 +39,13 @@ import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.After;
@@ -1141,4 +1144,92 @@ public class MosaicRoundtripTest {
assertEquals(10, reader.rowGroupNumRows(0));
}
}
+
+ @Test
+ public void testArrayType() {
+ Field elementField = new Field("item", FieldType.nullable(new
ArrowType.Int(32, true)), null);
+ Field listField = new Field("tags",
FieldType.nullable(ArrowType.List.INSTANCE), Arrays.asList(elementField));
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ listField
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ ListVector tags = (ListVector) root.getVector("tags");
+
+ ids.allocateNew(4);
+ tags.allocateNew();
+
+ UnionListWriter listWriter = tags.getWriter();
+
+ // Row 0: [10, 20, 30]
+ ids.set(0, 1);
+ listWriter.setPosition(0);
+ listWriter.startList();
+ listWriter.writeInt(10);
+ listWriter.writeInt(20);
+ listWriter.writeInt(30);
+ listWriter.endList();
+
+ // Row 1: [40, 50]
+ ids.set(1, 2);
+ listWriter.setPosition(1);
+ listWriter.startList();
+ listWriter.writeInt(40);
+ listWriter.writeInt(50);
+ listWriter.endList();
+
+ // Row 2: [] (empty array)
+ ids.set(2, 3);
+ listWriter.setPosition(2);
+ listWriter.startList();
+ listWriter.endList();
+
+ // Row 3: null
+ ids.set(3, 4);
+ tags.setNull(3);
+
+ root.setRowCount(4);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ assertEquals(4, batch.getRowCount());
+
+ IntVector readIds = (IntVector) batch.getVector("id");
+ ListVector readTags = (ListVector) batch.getVector("tags");
+
+ assertEquals(1, readIds.get(0));
+ assertEquals(2, readIds.get(1));
+ assertEquals(3, readIds.get(2));
+ assertEquals(4, readIds.get(3));
+
+ // Row 0: [10, 20, 30]
+ assertFalse(readTags.isNull(0));
+ java.util.List<?> row0 = readTags.getObject(0);
+ assertEquals(3, row0.size());
+ assertEquals(10, row0.get(0));
+ assertEquals(20, row0.get(1));
+ assertEquals(30, row0.get(2));
+
+ // Row 1: [40, 50]
+ assertFalse(readTags.isNull(1));
+ java.util.List<?> row1 = readTags.getObject(1);
+ assertEquals(2, row1.size());
+ assertEquals(40, row1.get(0));
+ assertEquals(50, row1.get(1));
+
+ // Row 2: []
+ assertFalse(readTags.isNull(2));
+ java.util.List<?> row2 = readTags.getObject(2);
+ assertEquals(0, row2.size());
+
+ // Row 3: null
+ assertTrue(readTags.isNull(3));
+ }
+ }
+ }
}
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 00e68b7..de387c6 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -965,3 +965,96 @@ class TestWriter:
with _reader_from_bytes(data) as reader:
assert reader.num_row_groups == 1
assert reader.row_group_num_rows(0) == 10
+
+ def test_array_type(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32(), nullable=False),
+ pa.field("tags", pa.list_(pa.int32())),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([1, 2, 3, 4], type=pa.int32()),
+ pa.array([[10, 20, 30], [40, 50], [], None],
type=pa.list_(pa.int32())),
+ ],
+ names=["id", "tags"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 4
+
+ ids = rb.column("id").to_pylist()
+ assert ids == [1, 2, 3, 4]
+
+ tags = rb.column("tags").to_pylist()
+ assert tags[0] == [10, 20, 30]
+ assert tags[1] == [40, 50]
+ assert tags[2] == []
+ assert tags[3] is None
+
+ def test_array_with_null_elements(self):
+ pa_schema = pa.schema(
+ [pa.field("arr", pa.list_(pa.int64()))]
+ )
+
+ batch = pa.record_batch(
+ [pa.array([[100, None, 300], [None, None], [999]],
type=pa.list_(pa.int64()))],
+ names=["arr"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ arr = rb.column("arr").to_pylist()
+ assert arr[0] == [100, None, 300]
+ assert arr[1] == [None, None]
+ assert arr[2] == [999]
+
+ def test_array_string_elements(self):
+ pa_schema = pa.schema(
+ [pa.field("arr", pa.list_(pa.utf8()))]
+ )
+
+ batch = pa.record_batch(
+ [pa.array([["hello", "world"], [None, "foo"], []],
type=pa.list_(pa.utf8()))],
+ names=["arr"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ arr = rb.column("arr").to_pylist()
+ assert arr[0] == ["hello", "world"]
+ assert arr[1] == [None, "foo"]
+ assert arr[2] == []
+
+ def test_array_nested(self):
+ pa_schema = pa.schema(
+ [pa.field("nested", pa.list_(pa.list_(pa.int32())))]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(
+ [[[1, 2], [3]], [[4]], None],
+ type=pa.list_(pa.list_(pa.int32())),
+ )
+ ],
+ names=["nested"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ nested = rb.column("nested").to_pylist()
+ assert nested[0] == [[1, 2], [3]]
+ assert nested[1] == [[4]]
+ assert nested[2] is None