This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 63ed438  feat: expose timestamp nanos as Arrow timestamp(ns) (#21)
63ed438 is described below

commit 63ed4383b2da4e6eaf4f933a7732e70f9810f979
Author: QuakeWang <[email protected]>
AuthorDate: Thu May 21 09:15:13 2026 +0800

    feat: expose timestamp nanos as Arrow timestamp(ns) (#21)
---
 core/src/bucket_reader.rs                          |  85 ++++--
 core/src/bucket_writer.rs                          | 146 ++++++++--
 core/src/reader_tests.rs                           | 313 +++++++++++++++++++--
 core/src/stats.rs                                  |  27 ++
 core/src/types.rs                                  |  60 ++--
 core/tests/robustness_test.rs                      |  47 ++--
 core/tests/stress_test.rs                          |  39 +--
 cpp/test_mosaic.cpp                                |  49 +++-
 docs/design.html                                   |   9 +
 docs/java-api.html                                 |   8 +-
 docs/python-api.html                               |   6 +
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  |  49 ++++
 python/mosaic/_ffi.py                              |   8 +-
 python/tests/test_mosaic.py                        |  39 +++
 14 files changed, 732 insertions(+), 153 deletions(-)

diff --git a/core/src/bucket_reader.rs b/core/src/bucket_reader.rs
index 495cae1..115bae4 100644
--- a/core/src/bucket_reader.rs
+++ b/core/src/bucket_reader.rs
@@ -56,10 +56,8 @@ fn data_variant_for_type(dt: &DataType) -> DataVariant {
                 DataVariant::Binary
             }
         }
+        dt if types::is_timestamp_nanos(dt) => DataVariant::TimestampNanos,
         DataType::Timestamp(_, _) => DataVariant::Int64,
-        DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
-            DataVariant::TimestampNanos
-        }
         _ => DataVariant::Binary,
     }
 }
@@ -104,13 +102,11 @@ fn empty_raw_data_for_type(dt: &DataType) -> 
RawColumnData {
                 }
             }
         }
+        dt if types::is_timestamp_nanos(dt) => RawColumnData::TimestampNanos {
+            millis: Vec::new(),
+            nanos_of_milli: Vec::new(),
+        },
         DataType::Timestamp(_, _) => RawColumnData::Int64(Vec::new()),
-        DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
-            RawColumnData::TimestampNanos {
-                millis: Vec::new(),
-                nanos_of_milli: Vec::new(),
-            }
-        }
         _ => RawColumnData::Binary {
             offsets: vec![0],
             data: Vec::new(),
@@ -191,10 +187,10 @@ fn build_array(
     dt: &DataType,
     null_bitmap: Option<Vec<u8>>,
     num_rows: usize,
-) -> ArrayRef {
+) -> io::Result<ArrayRef> {
     let null_buf = make_null_buffer(null_bitmap.clone(), num_rows);
 
-    match data {
+    Ok(match data {
         RawColumnData::Boolean(values) => {
             let bool_buf = BooleanBuffer::new(Buffer::from_vec(values), 0, 
num_rows);
             Arc::new(BooleanArray::new(bool_buf, null_buf))
@@ -307,19 +303,43 @@ fn build_array(
         } => {
             let millis_scattered = scatter_fixed(millis, &null_bitmap, 
num_rows);
             let nanos_scattered = scatter_fixed(nanos_of_milli, &null_bitmap, 
num_rows);
-            let millis_array = Arc::new(Int64Array::from(millis_scattered)) as 
ArrayRef;
-            let nanos_array = Arc::new(Int32Array::from(nanos_scattered)) as 
ArrayRef;
-            let fields = vec![
-                Field::new("millis", DataType::Int64, false),
-                Field::new("nanos_of_milli", DataType::Int32, false),
-            ];
-            Arc::new(StructArray::new(
-                fields.into(),
-                vec![millis_array, nanos_array],
-                null_buf,
-            ))
+
+            match dt {
+                DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+                    let values = millis_scattered
+                        .into_iter()
+                        .zip(nanos_scattered)
+                        .map(|(millis, nanos)| 
types::millis_nanos_to_ns(millis, nanos))
+                        .collect::<io::Result<Vec<_>>>()?;
+                    let arr = 
TimestampNanosecondArray::new(ScalarBuffer::from(values), null_buf);
+                    Arc::new(if let Some(tz) = tz {
+                        arr.with_timezone(tz.clone())
+                    } else {
+                        arr
+                    })
+                }
+                DataType::Struct(fields) if 
types::is_timestamp_nanos_struct(fields) => {
+                    let millis_array = 
Arc::new(Int64Array::from(millis_scattered)) as ArrayRef;
+                    let nanos_array = 
Arc::new(Int32Array::from(nanos_scattered)) as ArrayRef;
+                    let fields = vec![
+                        Field::new("millis", DataType::Int64, false),
+                        Field::new("nanos_of_milli", DataType::Int32, false),
+                    ];
+                    Arc::new(StructArray::new(
+                        fields.into(),
+                        vec![millis_array, nanos_array],
+                        null_buf,
+                    ))
+                }
+                _ => {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!("timestamp nanos data for non-nanos type: 
{:?}", dt),
+                    ));
+                }
+            }
         }
-    }
+    })
 }
 
 pub struct BucketReader {
@@ -536,7 +556,12 @@ impl BucketReader {
                 _ => empty_raw_data_for_type(&self.col_types[i]),
             };
 
-            result.push(build_array(data, &self.col_types[i], null_bitmap, 
num_rows));
+            result.push(build_array(
+                data,
+                &self.col_types[i],
+                null_bitmap,
+                num_rows,
+            )?);
         }
 
         Ok(result)
@@ -788,7 +813,7 @@ impl ColumnPageReader {
             _ => empty_raw_data_for_type(&self.col_type),
         };
 
-        Ok(build_array(data, &self.col_type, null_bitmap, num_rows))
+        build_array(data, &self.col_type, null_bitmap, num_rows)
     }
 }
 
@@ -824,6 +849,16 @@ pub(crate) fn read_typed_value(dt: &DataType, buf: &[u8], 
pos: usize, width: i32
         DataType::Decimal128(_, _) => Value::DecimalCompact(read_i64(buf, 
pos)),
         DataType::Timestamp(TimeUnit::Millisecond, _) => 
Value::TimestampMillis(read_i64(buf, pos)),
         DataType::Timestamp(TimeUnit::Microsecond, _) => 
Value::TimestampMicros(read_i64(buf, pos)),
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            debug_assert_eq!(width, 12);
+            let millis = read_i64(buf, pos);
+            let nanos =
+                i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10], 
buf[pos + 11]]);
+            Value::TimestampNanos {
+                millis,
+                nanos_of_milli: nanos,
+            }
+        }
         DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
             debug_assert_eq!(width, 12);
             let millis = read_i64(buf, pos);
diff --git a/core/src/bucket_writer.rs b/core/src/bucket_writer.rs
index 699095c..501e6ba 100644
--- a/core/src/bucket_writer.rs
+++ b/core/src/bucket_writer.rs
@@ -144,7 +144,8 @@ impl BucketWriter {
         let typed = downcast_array(array, dt)?;
 
         if array.null_count() == 0 {
-            if let Some(col_size) = self.append_no_null_batch(col, &typed, 
start_row, num_new_rows)
+            if let Some(col_size) =
+                self.append_no_null_batch(col, &typed, start_row, 
num_new_rows)?
             {
                 return Ok(col_size);
             }
@@ -158,7 +159,7 @@ impl BucketWriter {
             } else {
                 self.non_null_counts[col] += 1;
                 let before = self.value_buffers[col].len();
-                write_typed_value(&mut self.value_buffers[col], &typed, row);
+                write_typed_value(&mut self.value_buffers[col], &typed, row)?;
                 let written = self.value_buffers[col].len() - before;
                 col_size += written;
 
@@ -207,7 +208,7 @@ impl BucketWriter {
         typed: &TypedArrayRef,
         start_row: usize,
         num_rows: usize,
-    ) -> Option<usize> {
+    ) -> io::Result<Option<usize>> {
         let buf = &mut self.value_buffers[col];
         let before_all = buf.len();
 
@@ -295,16 +296,22 @@ impl BucketWriter {
                     buf.extend_from_slice(&v.to_be_bytes());
                 }
             }
-            TypedArrayRef::TimestampNanos { millis, nanos } => {
+            TypedArrayRef::TimestampNanos(a) => {
+                let vals = a.values();
+                buf.reserve(num_rows * 12);
+                for &v in vals.iter() {
+                    write_timestamp_nanos_value(buf, v);
+                }
+            }
+            TypedArrayRef::LegacyTimestampNanos { millis, nanos } => {
                 let m_vals = millis.values();
                 let n_vals = nanos.values();
                 buf.reserve(num_rows * 12);
                 for i in 0..num_rows {
-                    buf.extend_from_slice(&m_vals[i].to_be_bytes());
-                    buf.extend_from_slice(&n_vals[i].to_be_bytes());
+                    write_legacy_timestamp_nanos_value(buf, m_vals[i], 
n_vals[i])?;
                 }
             }
-            _ => return None,
+            _ => return Ok(None),
         }
 
         let col_size = buf.len() - before_all;
@@ -342,12 +349,28 @@ impl BucketWriter {
                     break;
                 }
             }
+        } else if let Some(ref mut dict) = self.byte_dict_maps[col] {
+            for i in 0..num_rows {
+                let start = before_all + i * fw;
+                let slice = &buf[start..start + fw];
+                if !dict.contains_key(slice) {
+                    let len = dict.len();
+                    dict.insert(slice.to_vec(), len);
+                    self.dict_total_bytes[col] += fw;
+                }
+                if dict.len() > self.max_dict_entries
+                    || self.dict_total_bytes[col] > self.max_dict_total_bytes
+                {
+                    self.byte_dict_maps[col] = None;
+                    break;
+                }
+            }
         }
 
         // null_bitmap stays all-zero (no nulls), start_row offsets are fine
         let _ = start_row;
 
-        Some(col_size)
+        Ok(Some(col_size))
     }
 
     fn compute_encodings(&self) -> (Vec<u8>, Vec<bool>) {
@@ -688,7 +711,8 @@ enum TypedArrayRef<'a> {
     Decimal128Large(&'a Decimal128Array),
     TimestampMillis(&'a TimestampMillisecondArray),
     TimestampMicros(&'a TimestampMicrosecondArray),
-    TimestampNanos {
+    TimestampNanos(&'a TimestampNanosecondArray),
+    LegacyTimestampNanos {
         millis: &'a Int64Array,
         nanos: &'a Int32Array,
     },
@@ -768,24 +792,30 @@ fn downcast_array<'a>(array: &'a dyn Array, dt: 
&DataType) -> io::Result<TypedAr
                     .ok_or_else(|| cast_err(dt))?,
             ))
         }
+        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+            Ok(TypedArrayRef::TimestampNanos(
+                any.downcast_ref::<TimestampNanosecondArray>()
+                    .ok_or_else(|| cast_err(dt))?,
+            ))
+        }
         DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
             let s = any
                 .downcast_ref::<StructArray>()
                 .ok_or_else(|| cast_err(dt))?;
             let ts_dt = DataType::Int64;
             let ns_dt = DataType::Int32;
-            Ok(TypedArrayRef::TimestampNanos {
-                millis: s
-                    .column(0)
-                    .as_any()
-                    .downcast_ref::<Int64Array>()
-                    .ok_or_else(|| cast_err(&ts_dt))?,
-                nanos: s
-                    .column(1)
-                    .as_any()
-                    .downcast_ref::<Int32Array>()
-                    .ok_or_else(|| cast_err(&ns_dt))?,
-            })
+            let millis = s
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int64Array>()
+                .ok_or_else(|| cast_err(&ts_dt))?;
+            let nanos = s
+                .column(1)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .ok_or_else(|| cast_err(&ns_dt))?;
+            validate_legacy_timestamp_nanos(s, millis, nanos)?;
+            Ok(TypedArrayRef::LegacyTimestampNanos { millis, nanos })
         }
         _ => Err(io::Error::new(
             io::ErrorKind::InvalidInput,
@@ -795,7 +825,7 @@ fn downcast_array<'a>(array: &'a dyn Array, dt: &DataType) 
-> io::Result<TypedAr
 }
 
 #[inline]
-fn write_typed_value(buf: &mut Vec<u8>, typed: &TypedArrayRef, row: usize) {
+fn write_typed_value(buf: &mut Vec<u8>, typed: &TypedArrayRef, row: usize) -> 
io::Result<()> {
     match typed {
         TypedArrayRef::Boolean(a) => buf.push(if a.value(row) { 1 } else { 0 
}),
         TypedArrayRef::Int8(a) => buf.push(a.value(row) as u8),
@@ -826,11 +856,57 @@ fn write_typed_value(buf: &mut Vec<u8>, typed: 
&TypedArrayRef, row: usize) {
         }
         TypedArrayRef::TimestampMillis(a) => 
buf.extend_from_slice(&a.value(row).to_be_bytes()),
         TypedArrayRef::TimestampMicros(a) => 
buf.extend_from_slice(&a.value(row).to_be_bytes()),
-        TypedArrayRef::TimestampNanos { millis, nanos } => {
-            buf.extend_from_slice(&millis.value(row).to_be_bytes());
-            buf.extend_from_slice(&nanos.value(row).to_be_bytes());
+        TypedArrayRef::TimestampNanos(a) => write_timestamp_nanos_value(buf, 
a.value(row)),
+        TypedArrayRef::LegacyTimestampNanos { millis, nanos } => {
+            write_legacy_timestamp_nanos_value(buf, millis.value(row), 
nanos.value(row))?;
+        }
+    }
+    Ok(())
+}
+
+fn validate_legacy_timestamp_nanos(
+    parent: &StructArray,
+    millis: &Int64Array,
+    nanos: &Int32Array,
+) -> io::Result<()> {
+    for row in 0..parent.len() {
+        if parent.is_null(row) {
+            continue;
         }
+        if millis.is_null(row) || nanos.is_null(row) {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "legacy timestamp nanos has null child for a non-null parent 
row",
+            ));
+        }
+        validate_timestamp_nanos_pair(millis.value(row), nanos.value(row))?;
     }
+    Ok(())
+}
+
+#[inline]
+fn write_timestamp_nanos_value(buf: &mut Vec<u8>, ns: i64) {
+    let (millis, nanos) = types::ns_to_millis_nanos(ns);
+    buf.extend_from_slice(&millis.to_be_bytes());
+    buf.extend_from_slice(&nanos.to_be_bytes());
+}
+
+#[inline]
+fn write_legacy_timestamp_nanos_value(
+    buf: &mut Vec<u8>,
+    millis: i64,
+    nanos: i32,
+) -> io::Result<()> {
+    validate_timestamp_nanos_pair(millis, nanos)?;
+    buf.extend_from_slice(&millis.to_be_bytes());
+    buf.extend_from_slice(&nanos.to_be_bytes());
+    Ok(())
+}
+
+fn validate_timestamp_nanos_pair(millis: i64, nanos: i32) -> io::Result<()> {
+    types::millis_nanos_to_ns(millis, nanos)
+        .map(|_| ())
+        .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, 
err.to_string()))
 }
 
 fn i128_to_biginteger_bytes(val: i128) -> Vec<u8> {
@@ -1033,6 +1109,26 @@ mod tests {
         assert_eq!(data[0] & 0x03, ENCODING_DICT);
     }
 
+    #[test]
+    fn test_timestamp_nanos_byte_dict_after_no_null_batch() {
+        let types = [DataType::Timestamp(
+            arrow_schema::TimeUnit::Nanosecond,
+            None,
+        )];
+        let type_refs: Vec<&DataType> = types.iter().collect();
+        let mut writer = BucketWriter::new(&type_refs, 32768, 255);
+
+        let first = TimestampNanosecondArray::from(vec![Some(1), None, 
Some(2)]);
+        writer.write_columns(&[&first], &[&types[0]]).unwrap();
+
+        let second_values: Vec<i64> = (0..120).map(|i| 3 + (i % 3) as 
i64).collect();
+        let second = TimestampNanosecondArray::from(second_values);
+        writer.write_columns(&[&second], &[&types[0]]).unwrap();
+
+        let data = writer.finish();
+        assert_eq!(data[0] & 0x03, ENCODING_DICT);
+    }
+
     #[test]
     fn test_multi_column_mixed_encodings() {
         let types = [DataType::Int32, DataType::Utf8, DataType::Int64];
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index 3bdacda..55eecee 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -30,6 +30,18 @@ fn columns_to_arrow_schema(columns: &[(String, DataType, 
bool)]) -> Schema {
     )
 }
 
+fn legacy_timestamp_nanos_fields() -> arrow_schema::Fields {
+    vec![
+        Field::new("millis", DataType::Int64, false),
+        Field::new("nanos_of_milli", DataType::Int32, false),
+    ]
+    .into()
+}
+
+fn legacy_timestamp_nanos_type() -> DataType {
+    DataType::Struct(legacy_timestamp_nanos_fields())
+}
+
 struct ByteArrayInputFile {
     data: Vec<u8>,
 }
@@ -263,6 +275,25 @@ fn build_array_from_values(rows: &[Vec<Value>], col: 
usize, dt: &DataType) -> Ar
                 arr
             })
         }
+        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+            let vals: Vec<Option<i64>> = rows
+                .iter()
+                .map(|row| match &row[col] {
+                    Value::TimestampNanos {
+                        millis,
+                        nanos_of_milli,
+                    } => Some(crate::types::millis_nanos_to_ns(*millis, 
*nanos_of_milli).unwrap()),
+                    Value::Null => None,
+                    _ => panic!("type mismatch"),
+                })
+                .collect();
+            let arr = TimestampNanosecondArray::from(vals);
+            Arc::new(if let Some(tz) = tz {
+                arr.with_timezone(tz.clone())
+            } else {
+                arr
+            })
+        }
         DataType::Struct(struct_fields)
             if crate::types::is_timestamp_nanos_struct(struct_fields) =>
         {
@@ -698,20 +729,12 @@ fn test_roundtrip_all_types() {
     let c_ts_high = batch
         .column(col("f_timestamp_high"))
         .as_any()
-        .downcast_ref::<StructArray>()
-        .unwrap();
-    let millis_arr = c_ts_high
-        .column(0)
-        .as_any()
-        .downcast_ref::<Int64Array>()
-        .unwrap();
-    let nanos_arr = c_ts_high
-        .column(1)
-        .as_any()
-        .downcast_ref::<Int32Array>()
+        .downcast_ref::<TimestampNanosecondArray>()
         .unwrap();
-    assert_eq!(millis_arr.value(0), 1700000000000i64);
-    assert_eq!(nanos_arr.value(0), 123456);
+    assert_eq!(
+        c_ts_high.value(0),
+        crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+    );
 }
 
 fn write_and_read(
@@ -1890,6 +1913,251 @@ fn test_timestamp_ltz_roundtrip() {
     assert_eq!(ts1.timezone().unwrap(), "America/New_York");
 }
 
+#[test]
+fn test_timestamp_nanos_roundtrip() {
+    let columns = vec![(
+        "ts_nanos".to_string(),
+        DataType::Timestamp(TimeUnit::Nanosecond, None),
+        true,
+    )];
+    let rows = vec![
+        vec![Value::TimestampNanos {
+            millis: 1700000000000,
+            nanos_of_milli: 123456,
+        }],
+        vec![Value::Null],
+        vec![Value::TimestampNanos {
+            millis: -1,
+            nanos_of_milli: 999999,
+        }],
+        vec![Value::TimestampNanos {
+            millis: 0,
+            nanos_of_milli: 0,
+        }],
+    ];
+    let (reader, _) = write_and_read(columns, &rows);
+
+    assert_eq!(
+        reader.schema().columns[0].data_type,
+        DataType::Timestamp(TimeUnit::Nanosecond, None)
+    );
+
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let batch = rg.read_columns().unwrap();
+    let ts = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .unwrap();
+    assert_eq!(
+        ts.value(0),
+        crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+    );
+    assert!(ts.is_null(1));
+    assert_eq!(ts.value(2), -1);
+    assert_eq!(ts.value(3), 0);
+}
+
+#[test]
+fn test_timestamp_ltz_nanos_roundtrip() {
+    let columns = vec![(
+        "ts_ltz_nanos".to_string(),
+        DataType::Timestamp(TimeUnit::Nanosecond, 
Some("Asia/Shanghai".into())),
+        true,
+    )];
+    let rows = vec![
+        vec![Value::TimestampNanos {
+            millis: 1700000000000,
+            nanos_of_milli: 42,
+        }],
+        vec![Value::Null],
+    ];
+    let (reader, _) = write_and_read(columns, &rows);
+
+    assert_eq!(
+        reader.schema().columns[0].data_type,
+        DataType::Timestamp(TimeUnit::Nanosecond, Some("Asia/Shanghai".into()))
+    );
+
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let batch = rg.read_columns().unwrap();
+    let ts = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .unwrap();
+    assert_eq!(ts.timezone().unwrap(), "Asia/Shanghai");
+    assert_eq!(
+        ts.value(0),
+        crate::types::millis_nanos_to_ns(1700000000000, 42).unwrap()
+    );
+    assert!(ts.is_null(1));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_writes_as_arrow_nanos() {
+    let columns = vec![("ts".to_string(), legacy_timestamp_nanos_type(), 
true)];
+    let rows = vec![vec![Value::TimestampNanos {
+        millis: 1700000000000,
+        nanos_of_milli: 123456,
+    }]];
+    let (reader, _) = write_and_read(columns, &rows);
+
+    assert_eq!(
+        reader.schema().columns[0].data_type,
+        DataType::Timestamp(TimeUnit::Nanosecond, None)
+    );
+
+    let mut rg = reader.row_group_reader(0).unwrap();
+    let batch = rg.read_columns().unwrap();
+    let ts = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<TimestampNanosecondArray>()
+        .unwrap();
+    assert_eq!(
+        ts.value(0),
+        crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+    );
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_child_null() {
+    let legacy_fields: arrow_schema::Fields = vec![
+        Field::new("millis", DataType::Int64, true),
+        Field::new("nanos_of_milli", DataType::Int32, true),
+    ]
+    .into();
+    let legacy_dt = DataType::Struct(legacy_fields.clone());
+    let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+    let struct_array = StructArray::new(
+        legacy_fields,
+        vec![
+            Arc::new(Int64Array::from(vec![Some(0)])) as ArrayRef,
+            Arc::new(Int32Array::from(vec![None])) as ArrayRef,
+        ],
+        Some(arrow_buffer::NullBuffer::from(vec![true])),
+    );
+    let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(struct_array)]).unwrap();
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+        WriterOptions::default(),
+    )
+    .unwrap();
+
+    let err = writer.write_batch(&batch).unwrap_err();
+    assert!(err.to_string().contains("null child"));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_invalid_nanos() {
+    let legacy_dt = legacy_timestamp_nanos_type();
+    let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+    let fields = legacy_timestamp_nanos_fields();
+    let struct_array = StructArray::new(
+        fields,
+        vec![
+            Arc::new(Int64Array::from(vec![Some(0)])) as ArrayRef,
+            Arc::new(Int32Array::from(vec![Some(1_000_000)])) as ArrayRef,
+        ],
+        Some(arrow_buffer::NullBuffer::from(vec![true])),
+    );
+    let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(struct_array)]).unwrap();
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+        WriterOptions::default(),
+    )
+    .unwrap();
+
+    let err = writer.write_batch(&batch).unwrap_err();
+    assert!(err.to_string().contains("invalid nanos_of_milli"));
+}
+
+#[test]
+fn test_legacy_timestamp_nanos_struct_rejects_overflow() {
+    let legacy_dt = legacy_timestamp_nanos_type();
+    let schema = Schema::new(vec![Field::new("ts", legacy_dt.clone(), true)]);
+    let fields = legacy_timestamp_nanos_fields();
+    let struct_array = StructArray::new(
+        fields,
+        vec![
+            Arc::new(Int64Array::from(vec![Some(i64::MAX)])) as ArrayRef,
+            Arc::new(Int32Array::from(vec![Some(0)])) as ArrayRef,
+        ],
+        Some(arrow_buffer::NullBuffer::from(vec![true])),
+    );
+    let batch = RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(struct_array)]).unwrap();
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &Schema::new(vec![Field::new("ts", legacy_dt, true)]),
+        WriterOptions::default(),
+    )
+    .unwrap();
+
+    let err = writer.write_batch(&batch).unwrap_err();
+    assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+    assert!(err.to_string().contains("overflow"));
+}
+
+#[test]
+fn test_timestamp_nanos_stats_min_max() {
+    let columns = vec![(
+        "ts".to_string(),
+        DataType::Timestamp(TimeUnit::Nanosecond, None),
+        true,
+    )];
+    let rows = vec![
+        vec![Value::TimestampNanos {
+            millis: -1,
+            nanos_of_milli: 999999,
+        }],
+        vec![Value::Null],
+        vec![Value::TimestampNanos {
+            millis: 0,
+            nanos_of_milli: 42,
+        }],
+        vec![Value::TimestampNanos {
+            millis: -1,
+            nanos_of_milli: 999998,
+        }],
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            stats_columns: vec!["ts".to_string()],
+            ..Default::default()
+        },
+    )
+    .unwrap();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 1);
+    assert_eq!(stats[0].null_count, 1);
+    assert!(matches!(
+        stats[0].min,
+        Some(Value::TimestampNanos {
+            millis: -1,
+            nanos_of_milli: 999998
+        })
+    ));
+    assert!(matches!(
+        stats[0].max,
+        Some(Value::TimestampNanos {
+            millis: 0,
+            nanos_of_milli: 42
+        })
+    ));
+}
+
 #[test]
 fn test_timestamp_micros_roundtrip() {
     let columns = vec![
@@ -1956,23 +2224,15 @@ fn test_timestamp_micros_roundtrip() {
     let ts_nanos = batch
         .column(nanos_pos)
         .as_any()
-        .downcast_ref::<StructArray>()
-        .unwrap();
-    let millis = ts_nanos
-        .column(0)
-        .as_any()
-        .downcast_ref::<Int64Array>()
-        .unwrap();
-    let nanos = ts_nanos
-        .column(1)
-        .as_any()
-        .downcast_ref::<Int32Array>()
+        .downcast_ref::<TimestampNanosecondArray>()
         .unwrap();
 
     assert_eq!(ts_millis.value(0), 1700000000000i64);
     assert_eq!(ts_micros.value(0), 1_700_000_000_000_000i64);
-    assert_eq!(millis.value(0), 1700000000000i64);
-    assert_eq!(nanos.value(0), 123456);
+    assert_eq!(
+        ts_nanos.value(0),
+        crate::types::millis_nanos_to_ns(1700000000000, 123456).unwrap()
+    );
 
     assert!(ts_millis.is_null(1));
     assert!(ts_micros.is_null(1));
@@ -1980,6 +2240,7 @@ fn test_timestamp_micros_roundtrip() {
 
     assert_eq!(ts_millis.value(2), 0);
     assert_eq!(ts_micros.value(2), 0);
+    assert_eq!(ts_nanos.value(2), 0);
 }
 
 #[test]
diff --git a/core/src/stats.rs b/core/src/stats.rs
index e96181d..860a5c2 100644
--- a/core/src/stats.rs
+++ b/core/src/stats.rs
@@ -245,6 +245,14 @@ fn extract_value_for_stats(array: &dyn Array, row: usize, 
dt: &DataType) -> Opti
             let a = 
array.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
             Some(Value::TimestampMicros(a.value(row)))
         }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            let a = array.as_any().downcast_ref::<TimestampNanosecondArray>()?;
+            let (millis, nanos) = types::ns_to_millis_nanos(a.value(row));
+            Some(Value::TimestampNanos {
+                millis,
+                nanos_of_milli: nanos,
+            })
+        }
         DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
             let s = array.as_any().downcast_ref::<StructArray>()?;
             let millis = s
@@ -438,6 +446,25 @@ fn read_fixed_value(buf: &[u8], pos: usize, dt: &DataType, 
width: i32) -> Value
                 buf[pos + 7],
             ]))
         }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            debug_assert_eq!(width, 12);
+            let millis = i64::from_be_bytes([
+                buf[pos],
+                buf[pos + 1],
+                buf[pos + 2],
+                buf[pos + 3],
+                buf[pos + 4],
+                buf[pos + 5],
+                buf[pos + 6],
+                buf[pos + 7],
+            ]);
+            let nanos =
+                i32::from_be_bytes([buf[pos + 8], buf[pos + 9], buf[pos + 10], 
buf[pos + 11]]);
+            Value::TimestampNanos {
+                millis,
+                nanos_of_milli: nanos,
+            }
+        }
         DataType::Struct(fields) if types::is_timestamp_nanos_struct(fields) 
=> {
             debug_assert_eq!(width, 12);
             let millis = i64::from_be_bytes([
diff --git a/core/src/types.rs b/core/src/types.rs
index 0652680..736c4c5 100644
--- a/core/src/types.rs
+++ b/core/src/types.rs
@@ -15,10 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::io;
+
 use arrow_schema::{DataType, Field, Fields, TimeUnit};
 
 use crate::varint;
 
+pub const NANOS_PER_MILLI: i64 = 1_000_000;
+
 pub fn fixed_width(dt: &DataType) -> i32 {
     match dt {
         DataType::Boolean | DataType::Int8 => 1,
@@ -26,12 +30,18 @@ pub fn fixed_width(dt: &DataType) -> i32 {
         DataType::Int32 | DataType::Date32 | DataType::Time32(_) | 
DataType::Float32 => 4,
         DataType::Int64 | DataType::Float64 => 8,
         DataType::Decimal128(p, _) if *p <= 18 => 8,
-        DataType::Timestamp(_, _) => 8,
+        DataType::Timestamp(TimeUnit::Millisecond | TimeUnit::Microsecond, _) 
=> 8,
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => 12,
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 12,
         _ => -1,
     }
 }
 
+pub fn is_timestamp_nanos(dt: &DataType) -> bool {
+    matches!(dt, DataType::Timestamp(TimeUnit::Nanosecond, _))
+        || matches!(dt, DataType::Struct(fields) if 
is_timestamp_nanos_struct(fields))
+}
+
 pub fn is_timestamp_nanos_struct(fields: &Fields) -> bool {
     fields.len() == 2
         && fields[0].name() == "millis"
@@ -40,6 +50,34 @@ pub fn is_timestamp_nanos_struct(fields: &Fields) -> bool {
         && *fields[1].data_type() == DataType::Int32
 }
 
+pub fn is_valid_nanos_of_milli(nanos: i32) -> bool {
+    (0..NANOS_PER_MILLI as i32).contains(&nanos)
+}
+
+pub fn ns_to_millis_nanos(ns: i64) -> (i64, i32) {
+    (
+        ns.div_euclid(NANOS_PER_MILLI),
+        ns.rem_euclid(NANOS_PER_MILLI) as i32,
+    )
+}
+
+pub fn millis_nanos_to_ns(millis: i64, nanos: i32) -> io::Result<i64> {
+    if !is_valid_nanos_of_milli(nanos) {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidData,
+            format!("invalid nanos_of_milli: {}", nanos),
+        ));
+    }
+    let ns = millis as i128 * NANOS_PER_MILLI as i128 + nanos as i128;
+    if ns < i64::MIN as i128 || ns > i64::MAX as i128 {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidData,
+            "timestamp ns overflow",
+        ));
+    }
+    Ok(ns as i64)
+}
+
 pub fn validate_data_type(dt: &DataType) -> Result<(), String> {
     match dt {
         DataType::Boolean
@@ -61,7 +99,7 @@ pub fn validate_data_type(dt: &DataType) -> Result<(), 
String> {
             }
         }
         DataType::Timestamp(unit, _) => match unit {
-            TimeUnit::Millisecond | TimeUnit::Microsecond => Ok(()),
+            TimeUnit::Millisecond | TimeUnit::Microsecond | 
TimeUnit::Nanosecond => Ok(()),
             _ => Err(format!("unsupported Timestamp unit: {:?}", unit)),
         },
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 
Ok(()),
@@ -95,6 +133,7 @@ pub fn precision_of(dt: &DataType) -> u32 {
         DataType::Decimal128(p, _) => *p as u32,
         DataType::Timestamp(TimeUnit::Millisecond, _) => 3,
         DataType::Timestamp(TimeUnit::Microsecond, _) => 6,
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => 9,
         DataType::Struct(fields) if is_timestamp_nanos_struct(fields) => 9,
         DataType::Time32(TimeUnit::Millisecond) => 3,
         _ => 0,
@@ -125,6 +164,7 @@ pub fn serialize_field(field: &Field, buf: &mut Vec<u8>) {
             let p = match unit {
                 TimeUnit::Millisecond => 3u32,
                 TimeUnit::Microsecond => 6u32,
+                TimeUnit::Nanosecond => 9u32,
                 _ => 0,
             };
             varint::encode(buf, p);
@@ -190,13 +230,7 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut 
usize) -> Result<Fiel
             } else if precision <= 6 {
                 DataType::Timestamp(TimeUnit::Microsecond, None)
             } else {
-                DataType::Struct(
-                    vec![
-                        Field::new("millis", DataType::Int64, false),
-                        Field::new("nanos_of_milli", DataType::Int32, false),
-                    ]
-                    .into(),
-                )
+                DataType::Timestamp(TimeUnit::Nanosecond, None)
             }
         }
         17 => {
@@ -221,13 +255,7 @@ pub fn deserialize_field(name: &str, buf: &[u8], pos: &mut 
usize) -> Result<Fiel
             } else if precision <= 6 {
                 DataType::Timestamp(TimeUnit::Microsecond, Some(tz))
             } else {
-                DataType::Struct(
-                    vec![
-                        Field::new("millis", DataType::Int64, false),
-                        Field::new("nanos_of_milli", DataType::Int32, false),
-                    ]
-                    .into(),
-                )
+                DataType::Timestamp(TimeUnit::Nanosecond, Some(tz))
             }
         }
         _ => {
diff --git a/core/tests/robustness_test.rs b/core/tests/robustness_test.rs
index 7e2e15c..08e5a27 100644
--- a/core/tests/robustness_test.rs
+++ b/core/tests/robustness_test.rs
@@ -1653,7 +1653,7 @@ fn test_decimal128_extreme_values() {
     }
 }
 
-// ======================== TimestampNanos (Struct) Tests 
========================
+// ======================== TimestampNanos Tests ========================
 
 #[test]
 fn test_timestamp_nanos_basic_roundtrip() {
@@ -1707,19 +1707,22 @@ fn test_timestamp_nanos_basic_roundtrip() {
     let col = result[0]
         .column(0)
         .as_any()
-        .downcast_ref::<StructArray>()
+        .downcast_ref::<TimestampNanosecondArray>()
         .unwrap();
 
-    let millis_col = 
col.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
-    let nanos_col = 
col.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
-
     for i in 0..num_rows {
         if i % 11 == 0 {
             assert!(col.is_null(i), "expected null at row {}", i);
         } else {
             assert!(!col.is_null(i));
-            assert_eq!(millis_col.value(i), 1_700_000_000_000i64 + i as i64);
-            assert_eq!(nanos_col.value(i), (i % 1_000_000) as i32);
+            assert_eq!(
+                col.value(i),
+                mosaic_core::types::millis_nanos_to_ns(
+                    1_700_000_000_000i64 + i as i64,
+                    (i % 1_000_000) as i32,
+                )
+                .unwrap()
+            );
         }
     }
 }
@@ -1738,20 +1741,19 @@ fn test_timestamp_nanos_extreme_values() {
         false,
     )]);
 
-    // Extreme millisecond values and nanos boundary values
-    let millis_data: Vec<i64> = vec![
+    let ns_data: Vec<i64> = vec![
         0,
         i64::MAX,
         i64::MIN,
-        1_700_000_000_000,
-        -1_700_000_000_000,
+        mosaic_core::types::millis_nanos_to_ns(1_700_000_000_000, 
999_999).unwrap(),
+        mosaic_core::types::millis_nanos_to_ns(-1_700_000_000_000, 1).unwrap(),
         1,
         -1,
     ];
-    let nanos_data: Vec<i32> = vec![
-        0, 999_999, // max valid nanos_of_milli
-        0, 500_000, 999_999, 1, 0,
-    ];
+    let (millis_data, nanos_data): (Vec<i64>, Vec<i32>) = ns_data
+        .iter()
+        .map(|&ns| mosaic_core::types::ns_to_millis_nanos(ns))
+        .unzip();
 
     let millis_arr = Int64Array::from(millis_data.clone());
     let nanos_arr = Int32Array::from(nanos_data.clone());
@@ -1768,20 +1770,11 @@ fn test_timestamp_nanos_extreme_values() {
     let col = result[0]
         .column(0)
         .as_any()
-        .downcast_ref::<StructArray>()
+        .downcast_ref::<TimestampNanosecondArray>()
         .unwrap();
 
-    let millis_col = 
col.column(0).as_any().downcast_ref::<Int64Array>().unwrap();
-    let nanos_col = 
col.column(1).as_any().downcast_ref::<Int32Array>().unwrap();
-
-    for i in 0..millis_data.len() {
-        assert_eq!(
-            millis_col.value(i),
-            millis_data[i],
-            "millis mismatch at {}",
-            i
-        );
-        assert_eq!(nanos_col.value(i), nanos_data[i], "nanos mismatch at {}", 
i);
+    for (i, expected) in ns_data.iter().enumerate() {
+        assert_eq!(col.value(i), *expected, "timestamp ns mismatch at {}", i);
     }
 }
 
diff --git a/core/tests/stress_test.rs b/core/tests/stress_test.rs
index 69e3204..37d0289 100644
--- a/core/tests/stress_test.rs
+++ b/core/tests/stress_test.rs
@@ -27,7 +27,7 @@ use std::io;
 use std::sync::Arc;
 
 use arrow_array::*;
-use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
 use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
 use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
 
@@ -289,10 +289,6 @@ fn test_wide_table_200_columns() {
 #[test]
 fn test_all_types_at_scale() {
     let num_rows = 500_000;
-    let ts_nanos_fields = Fields::from(vec![
-        Field::new("millis", DataType::Int64, false),
-        Field::new("nanos_of_milli", DataType::Int32, false),
-    ]);
     let schema = Schema::new(vec![
         Field::new("bool_col", DataType::Boolean, true),
         Field::new("i8_col", DataType::Int8, true),
@@ -316,7 +312,11 @@ fn test_all_types_at_scale() {
             DataType::Timestamp(TimeUnit::Microsecond, None),
             true,
         ),
-        Field::new("ts_ns_col", DataType::Struct(ts_nanos_fields.clone()), 
true),
+        Field::new(
+            "ts_ns_col",
+            DataType::Timestamp(TimeUnit::Nanosecond, None),
+            true,
+        ),
     ]);
 
     let batch_size = 50_000;
@@ -476,35 +476,18 @@ fn test_all_types_at_scale() {
             })
             .collect();
 
-        // Timestamp nanos as struct
-        let millis_vals: Vec<Option<i64>> = (0..count)
-            .map(|i| {
-                if (batch_start + i) % 21 == 0 {
-                    None
-                } else {
-                    Some(1_700_000_000_000i64 + (batch_start + i) as i64)
-                }
-            })
-            .collect();
-        let nanos_vals: Vec<Option<i32>> = (0..count)
+        let ts_ns: Vec<Option<i64>> = (0..count)
             .map(|i| {
                 if (batch_start + i) % 21 == 0 {
                     None
                 } else {
-                    Some(((batch_start + i) % 1_000_000) as i32)
+                    let millis = 1_700_000_000_000i64 + (batch_start + i) as 
i64;
+                    let nanos = ((batch_start + i) % 1_000_000) as i32;
+                    Some(mosaic_core::types::millis_nanos_to_ns(millis, 
nanos).unwrap())
                 }
             })
             .collect();
 
-        let millis_arr = Int64Array::from(millis_vals);
-        let nanos_arr = Int32Array::from(nanos_vals);
-        let null_buf = millis_arr.nulls().cloned();
-        let ts_ns_struct = StructArray::new(
-            ts_nanos_fields.clone(),
-            vec![Arc::new(millis_arr), Arc::new(nanos_arr)],
-            null_buf,
-        );
-
         let batch = RecordBatch::try_new(
             Arc::new(schema.clone()),
             vec![
@@ -526,7 +509,7 @@ fn test_all_types_at_scale() {
                 ),
                 Arc::new(TimestampMillisecondArray::from(ts_ms)),
                 Arc::new(TimestampMicrosecondArray::from(ts_us)),
-                Arc::new(ts_ns_struct),
+                Arc::new(TimestampNanosecondArray::from(ts_ns)),
             ],
         )
         .unwrap();
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 37e8c07..1d08012 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -246,6 +246,52 @@ static void test_all_types() {
     printf("  PASS test_all_types\n");
 }
 
+static void test_timestamp_ns_roundtrip() {
+    auto ts_ns_type = arrow::timestamp(arrow::TimeUnit::NANO);
+    auto ts_ns_tz_type = arrow::timestamp(arrow::TimeUnit::NANO, 
"Asia/Shanghai");
+    auto schema = arrow::schema({
+        arrow::field("ts_ns", ts_ns_type),
+        arrow::field("ts_ns_tz", ts_ns_tz_type),
+    });
+
+    const int64_t values[] = {1700000000000000123LL, -1LL};
+
+    arrow::TimestampBuilder ts_ns_b(ts_ns_type, arrow::default_memory_pool());
+    assert(ts_ns_b.Append(values[0]).ok());
+    assert(ts_ns_b.AppendNull().ok());
+    assert(ts_ns_b.Append(values[1]).ok());
+
+    arrow::TimestampBuilder ts_ns_tz_b(ts_ns_tz_type, 
arrow::default_memory_pool());
+    assert(ts_ns_tz_b.Append(values[0]).ok());
+    assert(ts_ns_tz_b.AppendNull().ok());
+    assert(ts_ns_tz_b.Append(values[1]).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        ts_ns_b.Finish().ValueUnsafe(),
+        ts_ns_tz_b.Finish().ValueUnsafe(),
+    });
+
+    auto data_vec = write_and_get(schema, batch);
+
+    MemBuffer buf;
+    buf.data = data_vec;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto rb = read_row_group(reader, 0);
+
+    ASSERT_TRUE(rb->schema()->field(0)->type()->Equals(ts_ns_type));
+    ASSERT_TRUE(rb->schema()->field(1)->type()->Equals(ts_ns_tz_type));
+
+    auto ts_ns = 
std::static_pointer_cast<arrow::TimestampArray>(rb->column(0));
+    auto ts_ns_tz = 
std::static_pointer_cast<arrow::TimestampArray>(rb->column(1));
+    ASSERT_EQ(ts_ns->Value(0), values[0]);
+    ASSERT_TRUE(ts_ns->IsNull(1));
+    ASSERT_EQ(ts_ns->Value(2), values[1]);
+    ASSERT_EQ(ts_ns_tz->Value(0), values[0]);
+    ASSERT_TRUE(ts_ns_tz->IsNull(1));
+    ASSERT_EQ(ts_ns_tz->Value(2), values[1]);
+    printf("  PASS test_timestamp_ns_roundtrip\n");
+}
+
 static void test_projection() {
     auto schema = arrow::schema({
         arrow::field("a", arrow::int32()),
@@ -783,6 +829,7 @@ int main() {
     test_basic_roundtrip();
     test_null_values();
     test_all_types();
+    test_timestamp_ns_roundtrip();
     test_projection();
     test_projection_empty();
     test_statistics();
@@ -794,6 +841,6 @@ int main() {
     test_writer_stats_all_null();
     test_writer_stats_matches_reader();
     test_stats_empty_string_min();
-    printf("All %d tests passed.\n", 14);
+    printf("All %d tests passed.\n", 15);
     return 0;
 }
diff --git a/docs/design.html b/docs/design.html
index b534836..af052e8 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -766,6 +766,15 @@ repeated numColumns times:
                     <tr><td>BINARY / VARBINARY / BYTES</td><td>varint length + 
raw bytes</td></tr>
                 </tbody>
             </table>
+            <p>
+                Readers expose TIMESTAMP precision &gt; 6 as Arrow 
Timestamp(Nanosecond, timezone).
+                This keeps the 12-byte physical encoding unchanged, but 
precisions 7 and 8 are
+                normalized to Arrow nanosecond units in the external schema 
because Arrow timestamp
+                units do not preserve decimal timestamp precision separately.
+                Existing 12-byte timestamp values outside Arrow's signed 
64-bit epoch-nanosecond
+                range cannot be represented through this Arrow API; readers 
return InvalidData for
+                those values, and legacy Struct timestamp writes reject them 
before writing.
+            </p>
 
             <!-- ============================================================ 
-->
             <h2>Varint Encoding</h2>
diff --git a/docs/java-api.html b/docs/java-api.html
index 0050ad0..d6cd7cb 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -108,8 +108,14 @@
     <span class="ty">Field</span>.nullable(<span class="str">"name"</span>, 
<span class="ty">ArrowType.Utf8</span>.INSTANCE),
     <span class="ty">Field</span>.nullable(<span class="str">"score"</span>, 
<span class="kw">new</span> <span 
class="ty">ArrowType.FloatingPoint</span>(FloatingPointPrecision.DOUBLE)),
     <span class="ty">Field</span>.nullable(<span class="str">"amount"</span>, 
<span class="kw">new</span> <span class="ty">ArrowType.Decimal</span>(<span 
class="num">10</span>, <span class="num">2</span>, <span 
class="num">128</span>)),
-    <span class="ty">Field</span>.nullable(<span class="str">"ts"</span>, 
<span class="kw">new</span> <span 
class="ty">ArrowType.Timestamp</span>(TimeUnit.MILLISECOND, <span 
class="kw">null</span>))
+    <span class="ty">Field</span>.nullable(<span class="str">"ts"</span>, 
<span class="kw">new</span> <span 
class="ty">ArrowType.Timestamp</span>(TimeUnit.MILLISECOND, <span 
class="kw">null</span>)),
+    <span class="ty">Field</span>.nullable(<span class="str">"ts_ns"</span>, 
<span class="kw">new</span> <span 
class="ty">ArrowType.Timestamp</span>(TimeUnit.NANOSECOND, <span 
class="str">"Asia/Shanghai"</span>))
 ));</code></pre>
+            <p>
+                Timestamp fields support Arrow millisecond, microsecond, and 
nanosecond units.
+                Nanosecond timestamps are exposed as standard Arrow 
<code>TimeUnit.NANOSECOND</code>
+                vectors while Mosaic keeps its 12-byte physical encoding 
internally.
+            </p>
 
             <h3>2. Create Writer and Write Batches</h3>
             <p>
diff --git a/docs/python-api.html b/docs/python-api.html
index 168d362..d1cd442 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -88,7 +88,13 @@ pa_schema = pa.schema([
     pa.field(<span class="str">"score"</span>, pa.float64()),
     pa.field(<span class="str">"amount"</span>, pa.decimal128(<span 
class="num">10</span>, <span class="num">2</span>)),
     pa.field(<span class="str">"ts"</span>, pa.timestamp(<span 
class="str">"ms"</span>)),
+    pa.field(<span class="str">"ts_ns"</span>, pa.timestamp(<span 
class="str">"ns"</span>, tz=<span class="str">"Asia/Shanghai"</span>)),
 ])</code></pre>
+            <p>
+                Timestamp fields support Arrow millisecond, microsecond, and 
nanosecond units.
+                Nanosecond timestamps are exposed as standard Arrow 
<code>timestamp("ns")</code>
+                arrays while Mosaic keeps its 12-byte physical encoding 
internally.
+            </p>
 
             <h3>2. Create Writer and Write Batches</h3>
             <p>
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 0fc3f88..5344a2d 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -33,11 +33,14 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -484,6 +487,52 @@ public class MosaicRoundtripTest {
         }
     }
 
+    @Test
+    public void testTimestampNsRoundtrip() {
+        ArrowType.Timestamp tsNsType = new 
ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+        ArrowType.Timestamp tsNsTzType = new 
ArrowType.Timestamp(TimeUnit.NANOSECOND, "Asia/Shanghai");
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("ts_ns", tsNsType),
+                Field.nullable("ts_ns_tz", tsNsTzType)
+        ));
+
+        long[] values = {1700000000000000123L, -1L};
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            TimeStampNanoVector tsNsVec = (TimeStampNanoVector) 
root.getVector("ts_ns");
+            TimeStampNanoTZVector tsNsTzVec = (TimeStampNanoTZVector) 
root.getVector("ts_ns_tz");
+            int n = 3;
+            tsNsVec.allocateNew(n);
+            tsNsTzVec.allocateNew(n);
+
+            tsNsVec.set(0, values[0]);
+            tsNsVec.setNull(1);
+            tsNsVec.set(2, values[1]);
+            tsNsTzVec.set(0, values[0]);
+            tsNsTzVec.setNull(1);
+            tsNsTzVec.set(2, values[1]);
+
+            root.setRowCount(n);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            assertEquals(tsNsType, 
reader.getSchema().findField("ts_ns").getType());
+            assertEquals(tsNsTzType, 
reader.getSchema().findField("ts_ns_tz").getType());
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                TimeStampNanoVector tsNs = (TimeStampNanoVector) 
batch.getVector("ts_ns");
+                TimeStampNanoTZVector tsNsTz = (TimeStampNanoTZVector) 
batch.getVector("ts_ns_tz");
+
+                assertEquals(values[0], tsNs.get(0));
+                assertTrue(tsNs.isNull(1));
+                assertEquals(values[1], tsNs.get(2));
+                assertEquals(values[0], tsNsTz.get(0));
+                assertTrue(tsNsTz.isNull(1));
+                assertEquals(values[1], tsNsTz.get(2));
+            }
+        }
+    }
+
     @Test
     public void testCompressionNone() {
         Schema arrowSchema = new Schema(Arrays.asList(
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
index 7816413..625369b 100644
--- a/python/mosaic/_ffi.py
+++ b/python/mosaic/_ffi.py
@@ -49,15 +49,15 @@ def _load_library():
     else:
         lib_name = "libmosaic_ffi.so"
 
+    env_path = os.environ.get("MOSAIC_LIB_PATH")
     search_paths = []
+    if env_path:
+        search_paths.append(env_path)
+
     pkg_dir = os.path.dirname(os.path.abspath(__file__))
     search_paths.append(pkg_dir)
     search_paths.append(os.path.join(pkg_dir, "..", ".."))
 
-    env_path = os.environ.get("MOSAIC_LIB_PATH")
-    if env_path:
-        search_paths.append(env_path)
-
     for rel in [
         os.path.join("..", "target", "release"),
         os.path.join("..", "target", "debug"),
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 50a5fee..b60f0f1 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -176,9 +176,12 @@ class TestRoundtrip:
                 pa.field("f_decimal", pa.decimal128(10, 2)),
                 pa.field("f_date", pa.date32()),
                 pa.field("f_timestamp", pa.timestamp("ms")),
+                pa.field("f_timestamp_ns", pa.timestamp("ns")),
+                pa.field("f_timestamp_ns_tz", pa.timestamp("ns", 
tz="Asia/Shanghai")),
             ]
         )
 
+        ts_ns_values = [1700000000000000123, -1]
         batch = pa.record_batch(
             [
                 pa.array([True, False]),
@@ -193,11 +196,14 @@ class TestRoundtrip:
                 pa.array([1234567, -9876543], type=pa.decimal128(10, 2)),
                 pa.array([19000, 0], type=pa.date32()),
                 pa.array([1700000000000, 0], type=pa.timestamp("ms")),
+                pa.array(ts_ns_values, type=pa.timestamp("ns")),
+                pa.array(ts_ns_values, type=pa.timestamp("ns", 
tz="Asia/Shanghai")),
             ],
             names=[
                 "f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
                 "f_float32", "f_float64", "f_utf8", "f_binary",
                 "f_decimal", "f_date", "f_timestamp",
+                "f_timestamp_ns", "f_timestamp_ns_tz",
             ],
         )
 
@@ -223,6 +229,39 @@ class TestRoundtrip:
             assert abs(f64[0] - 2.718281828) < 1e-9
             assert abs(f64[1] - (-3.141592653)) < 1e-9
 
+            assert rb.schema.field("f_timestamp_ns").type == pa.timestamp("ns")
+            assert rb.column("f_timestamp_ns").cast(pa.int64()).to_pylist() == 
ts_ns_values
+            assert rb.schema.field("f_timestamp_ns_tz").type == pa.timestamp(
+                "ns", tz="Asia/Shanghai"
+            )
+            assert rb.column("f_timestamp_ns_tz").cast(pa.int64()).to_pylist() 
== ts_ns_values
+
+    def test_timestamp_nanos_mixed_batches_updates_dictionary(self):
+        pa_schema = pa.schema([pa.field("ts", pa.timestamp("ns"))])
+        first_values = [1, None, 2]
+        second_values = [3 + (i % 3) for i in range(120)]
+
+        buf = io.BytesIO()
+        with MosaicWriter(buf, pa_schema) as writer:
+            writer.write(
+                pa.record_batch(
+                    [pa.array(first_values, type=pa.timestamp("ns"))],
+                    names=["ts"],
+                )
+            )
+            writer.write(
+                pa.record_batch(
+                    [pa.array(second_values, type=pa.timestamp("ns"))],
+                    names=["ts"],
+                )
+            )
+
+        with _reader_from_bytes(buf.getvalue()) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.column("ts").cast(pa.int64()).to_pylist() == (
+                first_values + second_values
+            )
+
     def test_multiple_row_groups(self):
         pa_schema = pa.schema(
             [pa.field("id", pa.int32()), pa.field("data", pa.int64())]

Reply via email to